HomeDocumentation
v1.1.1 — Production Ready

MAPLE Documentation

MAPLE (Multi Agent Protocol Language Engine) is a production-ready multi-agent runtime that combines autonomous agent orchestration with enterprise-grade infrastructure — resource-aware messaging, type-safe error handling, distributed state, and cryptographic security — all built into the protocol itself.

High-Throughput Broker

Priority-queue message routing with backpressure control and adaptive load balancing.

🛡

Result<T,E>

Rust-style typed error handling. Zero silent failures across every API surface.

🔒

AES-256-GCM Links

Cryptographically verified agent-to-agent communication channels.

Resource-Aware

Embed CPU, memory, GPU, and bandwidth requirements directly in messages.

10 Protocol Adapters

Native interop with A2A, MCP, AutoGen, CrewAI, LangGraph, n8n, and more.

🧪

818 Tests, 80%+ Coverage

Production-validated with a full pytest suite and CI pipeline.

Why MAPLE

Most agent frameworks solve orchestration ergonomics. They leave reliability, security, and resource control for "your platform team." MAPLE's thesis: production infrastructure should be first-class, not bolted on later.

The Gap

Teams building multi-agent systems with LangGraph, CrewAI, or AutoGen inevitably re-implement the same primitives in production:

  • Structured error propagation (not raw exceptions)
  • Resource budgeting across agent tasks
  • Encrypted, verified agent-to-agent channels
  • Distributed state that survives restarts
  • Circuit breakers and adaptive retry
  • Priority-based scheduling under load

MAPLE ships all of this as the default — so your team ships product, not infrastructure.

MAPLE vs Alternatives

CapabilityMAPLELangGraphCrewAIAutoGenGoogle A2A
Resource-aware messaging Built-in
Result<T,E> type safety Every API
AES-256-GCM secure links Built-in~ OAuth
Distributed state store Built-in~ External~ External
Circuit breakers Built-in
Priority scheduling Built-in
Protocol adapters 10 adapters~ 2–3
Autonomous ReAct agents Built-in~
Production test coverage 80%+~

Architecture

MAPLE is organised into three coherent layers. Each layer is independently usable but designed to compose seamlessly.

Layer 3 Autonomy SDK ReAct Loop · Tool Use · Memory · Orchestrator · Observability
Layer 2 Runtime Broker · State Store · Scheduler · Registry · Health · Circuit Breakers
Layer 1 Protocol Message Schema · Result<T,E> · Resource Spec · Secure Links · Serialization

Module Map

PackageModuleResponsibility
maple/core/Result, Message, typesProtocol primitives, serialization
maple/agent/Agent, ConfigAgent base class, lifecycle management
maple/broker/MessageBroker, RouterMessage routing, priority queues
maple/security/Auth, Links, AuditAuthn/authz, AES-256-GCM channels
maple/state/StateStore, SyncDistributed state (memory/file/SQLite)
maple/resources/ResourceManagerResource negotiation and tracking
maple/task_management/TaskQueue, SchedulerPriority scheduling, fault tolerance
maple/error/CircuitBreaker, RecoveryCircuit breaking, adaptive retry
maple/llm/LLMProvider, registryOpenAI / Anthropic provider abstraction
maple/autonomy/AutonomousAgent, ToolsReAct loop, tool use, memory
maple/discovery/AgentRegistry, HealthService discovery, failure detection

Installation

MAPLE is available on PyPI. Install the core package or add optional extras for LLM support, high-performance brokers, and security primitives.

Core
Full Stack
Extras
bash
pip install maple-oss
bash
# Full production stack
pip install "maple-oss[llm,security,performance,nats]"

# Verify
python -c "import maple; print(maple.__version__)"
ExtraPackagesWhen to use
[llm]openai, anthropicLLM provider support
[security]cryptography, pyjwtAES-256-GCM links, JWT auth
[performance]uvloop, orjsonFaster event loop + JSON
[nats]nats-pyNATS.io distributed broker
[all]All of the aboveFull enterprise deployment
Python 3.8+

MAPLE requires Python 3.8 or higher. Python 3.11+ is recommended for best performance.

Quick Start

Get two agents talking in under 30 lines — resource-aware messaging with Result<T,E> error handling.

python
import asyncio
from maple import Agent, Message, Priority, Config
from maple.resources import ResourceRequest, ResourceRange

async def main():
    # 1. Configure agents
    sender   = Agent(Config(agent_id="sender",   broker_url="memory://localhost"))
    receiver = Agent(Config(agent_id="receiver", broker_url="memory://localhost"))
    await sender.start()
    await receiver.start()

    # 2. Register a handler
    @receiver.on_message("TASK")
    async def handle(msg: Message):
        print(f"[receiver] got: {msg.payload.get('job')}")

    # 3. Build a resource-aware message
    msg = Message(
        message_type="TASK",
        receiver="receiver",
        priority=Priority.HIGH,
        payload={
            "job": "run_analysis",
            "resources": ResourceRequest(
                memory=ResourceRange(min="2GB", preferred="4GB", max="8GB"),
                compute=ResourceRange(min=2, preferred=4, max=8),
            ).to_dict(),
        },
    )

    # 4. Send with Result<T,E>
    result = await sender.send(msg)
    if result.is_ok():
        print(f"[sender] sent: {result.unwrap()}")
    else:
        err = result.unwrap_err()
        print(f"[sender] failed: {err['message']}")
        if err.get("recoverable"):
            print(f"  suggestion: {err.get('suggestion')}")

    await asyncio.gather(sender.stop(), receiver.stop())

asyncio.run(main())
What you just saw

Resource requirements embedded in the message payload; type-safe Result with structured recovery hints — no raw exceptions, no silent failures.

Configuration

Every MAPLE agent is created from a Config object.

python
from maple import Config
from maple.security import SecurityConfig

config = Config(
    agent_id        = "my-agent",
    broker_url      = "memory://localhost",    # or "nats://localhost:4222"
    security_config = SecurityConfig(
        "jwt",                                 # auth_type (positional)
        {"token": "Bearer <your-token>"},     # credentials (positional)
    ),
    timeout         = 30.0,
    max_retries     = 3,
    enable_audit    = True,
)
FieldTypeDefaultDescription
agent_idstr requiredUnique identifier for this agent
broker_urlstr"memory://localhost"Broker connection string
security_configSecurityConfig optionalNoneAuth and encryption settings
timeoutfloat30.0Default request timeout in seconds
max_retriesint3Max retry attempts on transient failures
enable_auditboolFalseEnable structured audit log output
state_backendstr"memory""memory", "file", or "sqlite"
SecurityConfig argument order

SecurityConfig takes auth_type and credentials as positional arguments. Passing them as kwargs will raise a TypeError.

Messages & Types

A Message is the fundamental unit of communication. Every field is typed; serialization is automatic.

python
from maple import Message, Priority

msg = Message(
    message_type   = "ANALYSIS_REQUEST",
    sender         = "coordinator",
    receiver       = "worker-1",
    priority       = Priority.HIGH,
    payload        = {"data": [1, 2, 3], "model": "gpt-4o"},
    metadata       = {"trace_id": "abc123", "tenant": "acme"},
    correlation_id = "req-001",
)
FieldTypeDescription
message_idstrAuto-generated UUID
message_typestr requiredApplication-defined type string
senderstrSource agent ID (auto-filled on send)
receiverstr requiredTarget agent ID
priorityPriorityRouting priority
payloaddictApplication-defined message body
metadatadictEnvelope metadata (tracing, tenant, etc.)
correlation_idstrFor matching requests to responses
timestampfloatUnix timestamp (auto-set)
link_idstrSecure link ID (see Secure Links)

Result<T,E>

Every MAPLE API returns a Result — either Ok(value) or Err(error_dict). Silent failures are impossible by design.

python
from maple.core import Result, Ok, Err

ok_result  = Ok("message-id-123")
err_result = Err({"code": "TIMEOUT", "message": "Agent unreachable", "recoverable": True})

# Checking
if ok_result.is_ok():
    value = ok_result.unwrap()         # "message-id-123"

if err_result.is_err():
    error = err_result.unwrap_err()    # {"code": "TIMEOUT", ...}

# Chaining
result = ok_result.and_then(lambda v: Ok(v.upper()))   # Ok("MESSAGE-ID-123")
result = ok_result.map(lambda v: {"id": v})            # Ok({"id": "message-id-123"})
safe   = err_result.unwrap_or("default")               # "default"

Error Structure

python
{
    "code":        "RESOURCE_UNAVAILABLE",
    "message":     "No agent with GPU capacity",
    "recoverable": True,
    "suggestion": {
        "action":             "retry_with_backoff",
        "delay_s":            5,
        "fallback_receiver":  "worker-cpu-pool",
    },
    "context": {
        "requested_gpu_gb": 16,
        "available_gpu_gb": 0,
    }
}
MethodReturnsDescription
.is_ok()boolTrue if Ok result
.is_err()boolTrue if Err result
.unwrap()TGet Ok value; raises if Err
.unwrap_err()EGet Err dict; raises if Ok
.unwrap_or(default)TGet Ok value or a default
.map(fn)ResultTransform Ok value; pass Err through
.and_then(fn)ResultChain Ok to another Result-returning fn
.map_err(fn)ResultTransform Err; pass Ok through

Priority System

MAPLE's broker routes messages through priority queues. Higher-priority messages are dequeued first regardless of arrival order.

python
from maple import Priority

Priority.CRITICAL   # = 0  — emergency alerts, system-wide events
Priority.HIGH       # = 1  — time-sensitive tasks
Priority.NORMAL     # = 2  — default
Priority.LOW        # = 3  — background batch work
Priority.BACKGROUND # = 4  — best-effort housekeeping
Backpressure

When queue depth exceeds the configured limit, BACKGROUND messages are shed first. CRITICAL messages are never dropped.

Resource Specification

MAPLE is the only multi-agent protocol with resource requirements as a first-class protocol field. Agents negotiate resources before committing to a task.

python
from maple.resources import ResourceRequest, ResourceRange

req = ResourceRequest(
    memory            = ResourceRange(min="4GB",    preferred="8GB",  max="16GB"),
    compute           = ResourceRange(min=4,         preferred=8,      max=16),
    gpu_memory        = ResourceRange(min="8GB",    preferred="16GB"),
    network_bandwidth = ResourceRange(min="100Mbps", preferred="1Gbps"),
    deadline          = "2025-12-01T09:00:00Z",
)

msg = Message(
    message_type = "ML_INFERENCE",
    receiver     = "gpu-worker",
    payload      = {"model": "llama-3-70b", "resources": req.to_dict()},
)

The Resource Manager reads these fields during routing and selects the best available agent. If no agent satisfies min constraints, send() returns Err({"code": "RESOURCE_UNAVAILABLE", "recoverable": True, ...}).

Message Broker

The MessageBroker is a singleton managing all message routing. It supports in-memory (development) and NATS (production) backends.

python
from maple.broker import MessageBroker

broker = MessageBroker.get_instance()

stats = broker.get_stats()
# {
#   "queued": 42,
#   "delivered": 10_847,
#   "failed": 3,
#   "agents": 12,
#   "throughput_per_sec": 33_000,
# }
Singleton reset in tests

Between test cases, reset the singleton with MessageBroker._instance = None to avoid state leakage.

State Store

Agents share persistent state through StateStore, backed by memory, file, or SQLite.

python
from maple.state import StateStore, ConsistencyLevel

store = StateStore(backend="sqlite", path="./state.db")

await store.set("job:123:status", "running", ttl=3600)

result = await store.get("job:123:status")
if result.is_ok():
    status = result.unwrap()    # "running"

await store.compare_and_set(
    key         = "job:123:status",
    expected    = "running",
    new_value   = "complete",
    consistency = ConsistencyLevel.STRONG,
)

async def on_change(key, old_val, new_val):
    print(f"{key}: {old_val} -> {new_val}")

await store.subscribe("job:*", on_change)

Task Scheduler

The task scheduler queues work, assigns it to available agents based on resource requirements, and tracks completion with configurable aggregation strategies.

python
from maple.task_management import TaskQueue, Task, AggregationStrategy

queue = TaskQueue()

task = Task(
    task_id     = "batch-001",
    message     = msg,
    priority    = Priority.HIGH,
    max_retries = 3,
    timeout     = 60.0,
)

result = await queue.submit(task)

# Fan-out to multiple workers
results = await queue.fan_out(
    tasks    = [task_a, task_b, task_c],
    strategy = AggregationStrategy.FIRST_SUCCESS,
    timeout  = 30.0,
)

Agent Registry

Dynamic service discovery — find agents by capability, resource availability, or custom metadata.

python
from maple.discovery import AgentRegistry, CapabilityMatcher

registry = AgentRegistry.get_instance()

await registry.register(
    agent_id     = "ml-worker-1",
    capabilities = ["inference", "embedding"],
    resources    = {"gpu_gb": 24, "memory_gb": 64},
    metadata     = {"model": "llama-3-70b", "region": "us-east"},
)

agents  = await registry.find_by_capability("inference")
matched = await CapabilityMatcher().match(req, agents, strategy="least_loaded")

Health Monitor

Periodic health checks with state-transition callbacks for automated failover.

python
from maple.discovery import HealthMonitor

monitor = HealthMonitor(check_interval=10.0)

@monitor.on_agent_unhealthy
async def handle_unhealthy(agent_id: str, reason: str):
    print(f"[health] {agent_id} unhealthy: {reason}")
    await registry.deregister(agent_id)

await monitor.start()

Authentication

MAPLE supports JWT, API key, and no-auth strategies via SecurityConfig.

python
from maple.security import SecurityConfig

# JWT
jwt_cfg = SecurityConfig("jwt", {"token": "Bearer eyJhbGci..."})

# API key
key_cfg = SecurityConfig("api_key", {"key": "sk-your-key", "header": "X-API-Key"})

# Development only
dev_cfg = SecurityConfig("none", {})
Production security

Always use JWT or API key in production. "none" disables all identity verification.

Audit Logging

Structured audit records for every message send, link establishment, auth event, and state change.

python
from maple.security import AuditLogger

AuditLogger.configure(
    output = "file",
    path   = "/var/log/maple/audit.jsonl",
    level  = "INFO",
    sign   = True,    # HMAC-SHA256 signature per record
)

Circuit Breakers

MAPLE's CircuitBreaker opens automatically on repeated failures, preventing cascading failures across your agent graph.

python
from maple.error import CircuitBreaker
from maple.core import Err

cb = CircuitBreaker(
    failure_threshold = 5,
    recovery_timeout  = 30.0,
    success_threshold = 2,
)

async def call_worker(payload):
    if not cb.should_allow():
        return Err({
            "code":        "CIRCUIT_OPEN",
            "message":     f"Worker unavailable, retry in {cb.retry_after:.0f}s",
            "recoverable": True,
        })

    result = await agent.send(Message(
        message_type = "WORK",
        payload      = payload,
        receiver     = "worker",
    ))

    if result.is_ok():
        cb.record_success()
    else:
        cb.record_failure()

    return result

State machine: CLOSED → (threshold failures) → OPEN → (recovery timeout) → HALF_OPEN → (success threshold) → CLOSED.

Error Recovery

python
from maple.error import RecoveryManager, BackoffStrategy

recovery = RecoveryManager(
    strategy     = BackoffStrategy.EXPONENTIAL_JITTER,
    base_delay   = 1.0,
    max_delay    = 60.0,
    max_attempts = 5,
)

result = await recovery.execute(
    fn       = lambda: agent.send(msg),
    fallback = lambda err: agent.send(msg.with_receiver("worker-fallback")),
)

Fault Tolerance

python
from maple.task_management import FaultToleranceManager

ft = FaultToleranceManager(
    circuit_breaker  = cb,
    recovery_manager = recovery,
    health_monitor   = monitor,
)

result = await ft.execute_with_resilience(
    task       = task,
    agent_pool = ["worker-1", "worker-2", "worker-3"],
    strategy   = "fail_over",
)

Use Case Gallery

Six reference architectures showing how MAPLE's production primitives — resource-aware messaging, Result<T,E>, circuit breakers, distributed state — compose into real systems.

1. Distributed ML Pipeline

Coordinating data ingestion, model inference, and result aggregation across specialised agents — each with explicit GPU/memory requirements negotiated at the protocol level.

json
{
  "taskType": "ML_PIPELINE",
  "stages": {
    "dataIngestion": {
      "source": "kafka_stream",
      "validation": { "schema": "avro_schema_v1", "constraints": ["non_null","range_check"] },
      "errorHandling": { "strategy": "dead_letter_queue", "retry": { "maxAttempts": 3, "backoff": "exponential" } }
    },
    "modelInference": {
      "model": { "type": "tensorflow", "version": "2.4",
                 "resources": { "gpu": "required", "memory": "16GB" } },
      "loadBalancing": { "strategy": "least_loaded", "healthCheck": "every_30s" }
    },
    "resultAggregation": {
      "window": "5m", "consistency": "exactly_once", "outputFormat": "parquet"
    }
  }
}
  • Resource-aware routing — GPU requirement embedded in the message; the broker routes to whichever worker has capacity
  • Dead-letter queue — failed ingestion messages are preserved, not silently dropped
  • Exactly-once aggregation — Result<T,E> ensures every partial result is accounted for before emitting the window

2. IoT Sensor Network

Thousands of sensors with priority-based routing — emergency readings bypass batch queues and reach the control plane immediately.

json
{
  "deviceRegistry": {
    "type": "SENSOR_NETWORK",
    "protocol": { "discovery": "automatic", "authentication": "mutual_tls", "compression": "enabled" },
    "messageHandling": {
      "priority": { "emergency": "immediate", "routine": "batch" },
      "batching": { "size": "1000", "timeout": "1s" }
    },
    "stateManagement": { "sync": "eventual", "storage": "distributed_cache" }
  }
}

3. Financial Trading System

Ultra-low-latency trade execution with atomic consistency guarantees and automatic failover when latency breaches the SLA threshold.

json
{
  "tradingSystem": {
    "latencyRequirements": { "maxLatency": "50ms", "priority": "CRITICAL" },
    "transactions": { "type": "ATOMIC", "consistency": "STRONG", "isolation": "SERIALIZABLE" },
    "monitoring": {
      "metrics": ["latency", "throughput", "error_rate"],
      "alerting": { "threshold": "latency > 45ms", "action": "FAILOVER" }
    }
  }
}

4. Microservices Orchestration

Complex service graphs with dynamic discovery, weighted routing, and circuit-breaker protection across service boundaries.

json
{
  "serviceOrchestration": {
    "discovery": { "method": "dynamic", "healthCheck": "tcp+http", "interval": "10s" },
    "circuitBreaker": { "threshold": "5_failures", "timeout": "30s", "fallback": "cached_response" },
    "routing": { "strategy": "weighted_round_robin", "filters": ["version","region","load"] }
  }
}

5. Real-time Analytics Pipeline

Sliding window stream processing with RocksDB state backend, exactly-once checkpointing, and auto-scaling on backpressure signals.

json
{
  "analyticsEngine": {
    "streaming": { "windowType": "sliding", "windowSize": "5m", "watermark": "10s" },
    "processing": {
      "operators": ["filter","aggregate","join"],
      "stateBackend": "rocksdb",
      "checkpointing": { "interval": "1m", "mode": "exactly_once" }
    },
    "scaling": { "autoScale": true, "metrics": ["backpressure","lag"], "limits": {"min":1,"max":10} }
  }
}

6. Autonomous Robotics System

Consensus-based multi-robot coordination with distributed path planning, guaranteed delivery, and dynamic obstacle avoidance.

json
{
  "roboticSystem": {
    "coordination": { "protocol": "consensus_based", "synchronization": "time_bounded" },
    "pathPlanning": {
      "algorithm": "distributed_rrt",
      "constraints": { "collision_avoidance": true, "dynamic_obstacles": true }
    },
    "communication": { "mesh_network": true, "latency_bound": "100ms", "reliability": "guaranteed_delivery" }
  }
}

Industry Applications

Production-grade MAPLE message schemas for six vertical industries. Each example shows the exact JSON envelope MAPLE agents exchange, including priority, error recovery, and resource fields.

Healthcare — Patient Monitoring

Coordinating medical devices, patient data streams, and clinical notifications with threshold-triggered alerts.

json
{
  "messageType": "PATIENT_MONITORING",
  "priority": "HIGH",
  "payload": {
    "patientId": "P123456",
    "deviceData": {
      "heartRate": { "value": 85, "unit": "bpm", "timestamp": "2024-12-12T10:15:00Z", "deviceId": "HR_MONITOR_001" },
      "bloodPressure": { "systolic": 120, "diastolic": 80, "unit": "mmHg" },
      "oxygenSaturation": { "value": 98, "unit": "percentage" }
    },
    "alerts": [
      { "type": "THRESHOLD_EXCEEDED", "severity": "MEDIUM", "metric": "heartRate", "threshold": 80, "action": "NOTIFY_NURSE" }
    ],
    "metadata": { "ward": "ICU", "floor": 3, "attending": "DR_SMITH" }
  }
}
json — Error recovery
{
  "messageType": "DEVICE_ERROR",
  "payload": {
    "errorType": "DEVICE_DISCONNECTED",
    "deviceId": "HR_MONITOR_001",
    "recovery": { "action": "FAILOVER", "backupDevice": "HR_MONITOR_002", "dataContinuity": "maintained" }
  }
}

Finance — Algorithmic Trading

High-frequency order execution with risk pre-checks, VWAP strategy splitting, and atomic settlement.

json
{
  "messageType": "TRADE_EXECUTION",
  "priority": "CRITICAL",
  "payload": {
    "orderId": "ORD789",
    "instrument": { "symbol": "AAPL", "type": "EQUITY", "market": "NASDAQ" },
    "action": { "type": "BUY", "quantity": 1000, "orderType": "LIMIT", "price": 150.25, "timeInForce": "IOC" },
    "riskChecks": { "position": "VERIFIED", "margin": "SUFFICIENT", "limits": "WITHIN_BOUNDS" },
    "execution": { "strategy": "VWAP", "slippage": 0.02, "childOrders": ["ORD789_1","ORD789_2"] }
  }
}

Manufacturing — Industry 4.0

Smart factory automation: robotic assembly coordination, quality metrics, and conveyor sensor telemetry in one message.

json
{
  "messageType": "PRODUCTION_CONTROL",
  "payload": {
    "productionLine": "ASSEMBLY_A",
    "status": { "state": "ACTIVE", "efficiency": 94.5, "currentBatch": "BATCH_2024_123" },
    "robotics": {
      "arm_1": { "position": [23.5, 45.2, 12.1], "load": 75, "nextAction": "PICK", "target": "COMPONENT_XYZ" },
      "conveyor": { "speed": 0.5, "loadFactor": 0.8, "sensors": { "proximity": "CLEAR", "temperature": 35.2 } }
    },
    "qualityMetrics": { "defectRate": 0.1, "accuracy": 99.8, "calibrationStatus": "IN_SPEC" }
  }
}

Smart City — Traffic Control

Real-time intersection management with emergency vehicle priority override and congestion-aware signal timing.

json
{
  "messageType": "TRAFFIC_CONTROL",
  "payload": {
    "intersection": { "id": "INT_456", "location": { "lat": 37.7749, "lng": -122.4194 },
                      "status": { "congestion": "HIGH", "weather": "RAIN" } },
    "signalControl": {
      "currentPhase": 2,
      "timing": { "green": 45, "yellow": 5, "red": 60 },
      "override": { "active": true, "reason": "EMERGENCY_VEHICLE", "direction": "NORTH_SOUTH" }
    },
    "sensorData": { "vehicleCount": 45, "averageSpeed": 28.5, "queueLength": 12 }
  }
}

Energy Grid — Load Balancing

Distributed energy resource coordination with demand-response activation and supply-side optimisation.

json
{
  "messageType": "GRID_MANAGEMENT",
  "payload": {
    "gridSegment": "SECTOR_7",
    "powerMetrics": {
      "demand": { "current": 1250, "predicted": 1400, "unit": "kW" },
      "supply": { "solar": 450, "wind": 300, "conventional": 600, "unit": "kW" }
    },
    "loadBalance": {
      "status": "OPTIMIZING",
      "actions": [{ "type": "DEMAND_RESPONSE", "target": "COMMERCIAL", "reduction": 100, "duration": "1h" }]
    }
  }
}

Logistics — Warehouse Automation

AGV fleet coordination: pick task assignment, battery monitoring, payload tracking, and inventory delta in one message.

json
{
  "messageType": "WAREHOUSE_OPERATIONS",
  "payload": {
    "facility": "WH_NORTH",
    "operations": {
      "pickingTasks": [
        { "taskId": "PICK_123", "priority": "HIGH", "location": "AISLE_5_RACK_3",
          "item": { "sku": "ITEM_789", "quantity": 5, "weight": 2.3 }, "assignedTo": "AGV_001" }
      ],
      "agvFleet": {
        "AGV_001": { "status": "MOVING", "battery": 85,
                     "location": { "x": 123.5, "y": 456.7 },
                     "payload": { "current": 15.5, "capacity": 50 } }
      },
      "inventory": { "updates": [{ "sku": "ITEM_789", "quantity": -5, "type": "OUTBOUND" }] }
    }
  }
}

Protocol Comparison

A factual comparison of MAPLE against the major agent communication protocols and frameworks. Use this to guide technology selection for your project.

Feature Matrix

CapabilityMAPLEGoogle A2AFIPA ACLMCPAGENTCY / ACP
Resource-aware messaging First-class field
Result<T,E> type safety Every API exceptions legacy
AES-256-GCM secure links Built-in~ OAuth only none~ platform
Distributed state store Built-in~ external req. context only
Circuit breakers Built-in
Priority scheduling 5 levels~ platform~ basic
Protocol adapters / interop 10 adapters~ 1–2
Autonomous ReAct agents Built-in~
Open source AGPL-3.0 proprietary
Production test coverage 80%+ Google SLA~ legacy research

When to Choose Each

ProtocolBest forAvoid when
MAPLEProduction multi-agent systems that need resource management, typed errors, security, and distributed state out of the boxSimple single-agent LLM tool calling with no reliability requirements
Google A2AAgents deeply integrated into Google Cloud — Vertex AI, Cloud Run, etc.Multi-cloud or on-prem deployments; anything requiring resource negotiation
FIPA ACLAcademic research; legacy enterprise systems already speaking FIPA; standards compliance requirementsAny greenfield project — the 1990s design shows in performance and tooling
MCPExposing or consuming tools for a single LLM — a great tool layerMulti-agent coordination, parallel execution, or anything requiring state
AGENTCY / ACPAcademic research and protocol theory explorationAny production deployment
They compose, not compete

MAPLE ships adapters for all of the above. You can use MAPLE as the reliability and routing layer while still speaking A2A, MCP, or FIPA ACL to existing agents — no rewrite required.

Type System

MAPLE's type system makes error conditions first-class citizens. Every operation that can fail returns a structured type — not an exception, not a status code — so the compiler and runtime can enforce correct error handling.

Primitive Types

python
from maple.core.types import (
    Boolean, Integer, Float, String,
    Timestamp, UUID, Size, Duration,
)

# Size and Duration parse human-readable strings
memory_size = Size.validate("16GB")    # → 17_179_869_184  (bytes)
timeout     = Duration.validate("30s") # → 30.0  (seconds)
agent_id    = UUID.validate("550e8400-e29b-41d4-a716-446655440000")

Collection Types

python
from maple.core.types import Array, Map, Set, Option

AgentList    = Array(String)
ResourceMap  = Map(String, Integer)
CapabilitySet = Set(String)

agents    = AgentList.validate(["agent-1", "agent-2", "agent-3"])
resources = ResourceMap.validate({"cpu": 8, "memory": 16, "gpu": 2})

Priority Enum

python
from maple.core.types import Priority

class Priority(Enum):
    CRITICAL   = "CRITICAL"   # Life-critical, emergency override
    HIGH       = "HIGH"       # Time-sensitive tasks
    NORMAL     = "NORMAL"     # Standard (default)
    LOW        = "LOW"        # Background tasks
    BACKGROUND = "BACKGROUND" # Best-effort housekeeping

Error Type Hierarchy

python
from maple.error.types import ErrorType, Severity

class ErrorType(Enum):
    # Communication
    NETWORK_ERROR            = "NETWORK_ERROR"
    TIMEOUT                  = "TIMEOUT"
    ROUTING_ERROR            = "ROUTING_ERROR"
    MESSAGE_VALIDATION_ERROR = "MESSAGE_VALIDATION_ERROR"

    # Resources
    RESOURCE_UNAVAILABLE         = "RESOURCE_UNAVAILABLE"
    RESOURCE_EXHAUSTED           = "RESOURCE_EXHAUSTED"
    RESOURCE_NEGOTIATION_FAILED  = "RESOURCE_NEGOTIATION_FAILED"

    # Security
    AUTHENTICATION_ERROR     = "AUTHENTICATION_ERROR"
    AUTHORIZATION_ERROR      = "AUTHORIZATION_ERROR"
    LINK_VERIFICATION_FAILED = "LINK_VERIFICATION_FAILED"
    ENCRYPTION_ERROR         = "ENCRYPTION_ERROR"

    # State
    STATE_CONFLICT               = "STATE_CONFLICT"
    STATE_SYNCHRONIZATION_FAILED = "STATE_SYNCHRONIZATION_FAILED"
    CONSISTENCY_VIOLATION        = "CONSISTENCY_VIOLATION"

ConsistencyLevel

python
from maple.state import ConsistencyLevel

class ConsistencyLevel(Enum):
    STRONG   = "STRONG"    # All replicas consistent immediately
    CAUSAL   = "CAUSAL"    # Causally consistent ordering
    EVENTUAL = "EVENTUAL"  # Eventually consistent with conflict resolution
    WEAK     = "WEAK"      # Best-effort, highest throughput

Error Types Reference

Complete listing of MAPLE error codes with recoverability guidance.

Error CodeCategoryRecoverable?Typical cause & fix
RESOURCE_UNAVAILABLEResource YesNo agent meets min constraints. Lower min or add workers.
RESOURCE_EXHAUSTEDResource YesAgent ran out of quota mid-task. Add capacity or reduce batch size.
CIRCUIT_OPENReliability After timeoutCircuit tripped. Wait for retry_after seconds then probe again.
TIMEOUTCommunication YesIncrease Config.timeout or use send() instead of request().
ROUTING_ERRORCommunication YesAgent not registered. Check AgentRegistry and call start().
MESSAGE_VALIDATION_ERRORProtocol NoInvalid message schema. Fix message_type or missing required fields.
AUTHENTICATION_ERRORSecurity NoInvalid or expired token. Refresh credentials in SecurityConfig.
AUTHORIZATION_ERRORSecurity NoAgent lacks permission for the requested operation.
LINK_VERIFICATION_FAILEDSecurity~ Re-establishSecure link expired or tampered. Call establish_link() again.
STATE_CONFLICTState YesCompare-and-swap conflict. Re-read state and retry the update.
CONSISTENCY_VIOLATIONState~ ConfigRead returned stale data under STRONG consistency. Investigate replicas.

Result<T,E> In Depth

The Result<T,E> type has its roots in functional programming (Rust's Result<T, E>, Haskell's Either, Swift's Result, Scala's Either). MAPLE adapts this pattern specifically for agent communication, adding protocol-specific semantics for coordination, resource management, and structured recovery.

Why not exceptions?

ApproachProblem
Exception / raiseNot visible in type signatures — callers can miss them entirely
Error codes (int)Limited context; callers must look up meaning in docs
Optional / NoneIndicates success/failure but carries no error detail
Status flag + dataMixes success and error info in one structure; still no type enforcement
Result<T,E> Type-safe, carries full error context, chainable, forces explicit handling

Chaining pipelines

python
from maple.core import Ok, Err

# Chain with and_then — error short-circuits the chain automatically
result = (
    load_data(source)
    .and_then(lambda data:     validate_schema(data))
    .map(lambda valid:         process_ai_analysis(valid))
    .and_then(lambda analysis: generate_insights(analysis))
    .map(lambda insights:      format_output(insights))
)

if result.is_ok():
    print("Pipeline complete:", result.unwrap())
else:
    err = result.unwrap_err()
    print(f"Failed at step: {err['code']} — {err['message']}")
    if err.get("recoverable"):
        apply_recovery_strategy(err["suggestion"])

Result<T,E> in message payloads

MAPLE also uses the Result pattern in message payloads for agent-to-agent responses:

json — Success
{
  "messageType": "TASK_RESULT",
  "payload": {
    "taskId": "task_123",
    "result": {
      "status": "Ok",
      "value": {
        "processedItems": 100,
        "analysis": { "patterns": ["pattern1","pattern2"], "confidence": 0.95 }
      }
    }
  }
}
json — Error with recovery hints
{
  "messageType": "TASK_RESULT",
  "payload": {
    "taskId": "task_123",
    "result": {
      "status": "Err",
      "error": {
        "type": "DATA_VALIDATION_ERROR",
        "message": "Invalid time series format",
        "details": { "expectedFormat": "ISO8601", "receivedFormat": "MM/DD/YYYY", "affectedRecords": [12,15,18] },
        "recoverable": true
      }
    }
  }
}

Resource allocation — partial failure example

json
{
  "messageType": "RESOURCE_RESPONSE",
  "payload": {
    "requestId": "req_456",
    "result": {
      "status": "Err",
      "error": {
        "type": "PARTIAL_ALLOCATION_FAILURE",
        "message": "Could not allocate all requested resources",
        "details": {
          "allocated": { "cpuCores": 4, "memory": "16GB", "gpuMemory": "4GB" },
          "missing":   { "gpuMemory": "4GB" }
        },
        "alternatives": [
          { "option": "WAIT_FOR_RESOURCES", "estimatedWaitTime": "5m" },
          { "option": "PROCEED_WITH_PARTIAL", "expectedPerformanceImpact": "moderate" }
        ]
      }
    }
  }
}

Google A2A Integration

Bridge Google's Agent-to-Agent protocol, letting MAPLE agents participate in A2A networks.

python
from maple.adapters.a2a import A2AAdapter

adapter = A2AAdapter(
    maple_agent  = my_agent,
    a2a_endpoint = "https://a2a.googleapis.com/v1",
    credentials  = {"token": "Bearer ..."},
)

await adapter.bridge_inbound()

result = await adapter.send_to_a2a(
    a2a_agent_id = "projects/my-proj/agents/classifier",
    payload      = {"text": "Classify this document"},
)
Resource enrichment

A2A messages have no native resource fields. The adapter automatically wraps them in MAPLE's ResourceRequest envelope so resource-aware routing still applies.

Model Context Protocol (MCP)

Use MAPLE agents as MCP tool servers, or call external MCP tool servers from within an autonomous MAPLE agent.

python
from maple.adapters.mcp import MCPAdapter
from maple.autonomy import AutonomousAgent

agent = AutonomousAgent(
    config    = Config(agent_id="researcher", broker_url="memory://localhost"),
    llm_model = "gpt-4o",
    tools     = [
        MCPAdapter.from_server("npx -y @modelcontextprotocol/server-brave-search"),
        MCPAdapter.from_server("npx -y @modelcontextprotocol/server-filesystem"),
    ],
)

result = await agent.run("Search and summarise the latest MAPLE documentation.")

Expose MAPLE as MCP Server

python
from maple.adapters.mcp import MCPServer

server = MCPServer(agent=my_agent, port=3000, expose=["send", "request", "get_state"])
await server.start()

FIPA ACL Integration

Bridge legacy FIPA ACL agents into a MAPLE network with automatic performative mapping.

python
from maple.adapters.fipa import FIPAAdapter

adapter    = FIPAAdapter(maple_agent=my_agent)
maple_msg  = adapter.from_fipa({
    "performative": "REQUEST",
    "sender":       "legacy-agent@platform:5678",
    "content":      "(action (agent1 (process data)))",
})

await adapter.send_fipa(
    receiver     = "legacy-agent@platform:5678",
    performative = "INFORM",
    content      = "(result ok)",
)

AutoGen Integration

Wrap AutoGen's ConversableAgent as a MAPLE agent for production reliability.

python
from maple.adapters.autogen import AutoGenAdapter
import autogen

autogen_agent = autogen.AssistantAgent("assistant", llm_config={...})

maple_wrapper = AutoGenAdapter.wrap(
    autogen_agent = autogen_agent,
    maple_config  = Config(agent_id="autogen-assistant", broker_url="memory://localhost"),
)

await maple_wrapper.start()

result = await sender.send(Message(
    message_type = "CHAT",
    receiver     = "autogen-assistant",
    payload      = {"content": "Explain MAPLE's circuit breaker pattern."},
))

CrewAI Integration

Add MAPLE's production infrastructure — circuit breakers, state, resource routing — to CrewAI crews.

python
from maple.adapters.crewai import MAPLECrewAdapter
from crewai import Crew, Agent, Task

crew       = Crew(agents=[researcher, analyst], tasks=[...])
maple_crew = MAPLECrewAdapter(
    crew                   = crew,
    broker_url             = "nats://localhost:4222",
    circuit_breaker_config = {"failure_threshold": 3},
)

result = await maple_crew.kickoff_async(inputs={"topic": "AI trends 2025"})

LangGraph Integration

Use MAPLE as the message transport and state backend for LangGraph StateGraph workflows.

python
from maple.adapters.langgraph import MAPLELangGraphAdapter
from langgraph.graph import StateGraph

graph = StateGraph(dict)
graph.add_node("fetch",   fetch_node)
graph.add_node("process", process_node)
graph.add_edge("fetch", "process")

adapter = MAPLELangGraphAdapter(
    graph       = graph,
    state_store = StateStore(backend="sqlite", path="./lg_state.db"),
    broker_url  = "memory://localhost",
)

result = await adapter.invoke({"query": "Analyse MAPLE architecture"})

OpenAI SDK Integration

Use OpenAI's models as the LLM backend for MAPLE autonomous agents.

python
from maple.autonomy import AutonomousAgent

agent = AutonomousAgent(
    config     = Config(agent_id="gpt-worker", broker_url="memory://localhost"),
    llm_model  = "gpt-4o",
    llm_config = {"api_key": "sk-...", "temperature": 0.2, "max_tokens": 2048},
)

result = await agent.run("Summarise the last 10 git commits in this repo.")

IBM ACP Integration

Bridge IBM's Agent Communication Protocol into MAPLE networks for enterprise system integrations.

python
from maple.adapters.ibm_acp import IBMACPAdapter

adapter = IBMACPAdapter(
    maple_agent  = my_agent,
    acp_endpoint = "https://acp.ibm.example.com/v1",
    api_key      = "ibm-api-key-...",
)

await adapter.start_bridge()

result = await adapter.send_to_acp(
    acp_agent_id = "watson-orchestrator",
    payload      = {"action": "process_claim", "claim_id": "CLM-001"},
)

S2.dev Durable Streaming New

S2.dev provides durable, ordered, high-throughput streams. MAPLE's S2 integration gives agents persistent message history and replay capability.

python
from maple.adapters.s2dev import S2Adapter

adapter = S2Adapter(
    maple_agent  = my_agent,
    s2_endpoint  = "https://s2.dev",
    access_token = "s2-token-...",
    stream_name  = "maple-prod-stream",
    retention    = "7d",
)

await adapter.publish(msg)

await adapter.subscribe(
    from_seq   = 1_000,
    on_message = lambda m: agent.handle_message(m),
)
When to use S2.dev

Use S2 when you need message replay for audit, recovery after agent restarts, or exactly-once processing guarantees across distributed deployments.

n8n Integration

Three native n8n nodes let non-technical users build MAPLE workflows in the n8n visual editor.

MAPLE Agent Node
Send tasks to any registered MAPLE agent from an n8n workflow
Coordinator Node
Fan-out to multiple agents and aggregate results with configurable strategies
Resource Manager Node
Inspect and allocate resources across your agent pool from n8n
bash
cd ~/.n8n
npm install n8n-nodes-maple

After installing, restart n8n and search for MAPLE in the node palette. Configure the broker URL and credentials in n8n's credential manager — no Python required.

LLM Providers

MAPLE's LLMProvider abstraction supports OpenAI and Anthropic, with a pluggable interface for custom backends.

python
from maple.llm import get_provider

# Auto-selects based on model prefix
openai_prov    = get_provider("gpt-4o",             api_key="sk-...")
anthropic_prov = get_provider("claude-sonnet-4-6",  api_key="sk-ant-...")

response = await openai_prov.complete(
    messages    = [{"role": "user", "content": "Hello, MAPLE!"}],
    temperature = 0.2,
)

Autonomous Agents

MAPLE's AutonomousAgent runs a ReAct (Reasoning + Acting) loop — reason about the goal, select tools, execute them, observe results, iterate until done.

python
from maple.autonomy import AutonomousAgent, Tool
from maple import Config

async def search_web(query: str) -> str:
    return f"Results for: {query}"   # your implementation

agent = AutonomousAgent(
    config         = Config(agent_id="auto-1", broker_url="memory://localhost"),
    llm_model      = "claude-sonnet-4-6",
    tools          = [Tool(name="search_web", fn=search_web,
                          description="Search the web for information")],
    max_iterations = 10,
    memory_enabled = True,
)

result = await agent.run(
    "Research the top 5 multi-agent frameworks and compare their resource management features."
)
print(result.output)
print(result.tool_calls_made)

Tools & Memory

MAPLE agents have short-term (in-run) and long-term (persistent) memory, plus a flexible tool registry.

python
from maple.autonomy import AgentMemory, MemoryStore

memory = AgentMemory(
    store       = MemoryStore(backend="sqlite", path="./agent_mem.db"),
    max_items   = 1000,
    embed_model = "text-embedding-3-small",
)

agent = AutonomousAgent(config=cfg, llm_model="gpt-4o", memory=memory, tools=my_tools)

await memory.remember("User prefers concise answers", importance=0.9)
results = await memory.recall("user preferences", top_k=5)

API: Agent

Agent(config: Config)
Create an agent. Does not connect until start() is called.
MethodSignatureDescription
start()Awaitable[None]Connect to broker and register agent
stop()Awaitable[None]Gracefully disconnect and deregister
send(msg)Result[str, dict]Fire-and-forget; returns message ID or error
request(msg, timeout)Result[Message, dict]Request/response; awaits reply message
on_message(type)(handler)DecoratorRegister async handler for a message type
establish_link(target, ...)Result[str, dict]Create AES-256-GCM secure channel
get_state(key)Result[Any, dict]Read from the agent's state store
set_state(key, value)Result[None, dict]Write to the agent's state store

API: Message

Message(message_type, receiver, *, priority=Priority.NORMAL, payload={}, ...)
Immutable message envelope. All fields are validated on construction.
MethodReturnsDescription
.with_link(link_id)MessageReturn copy with secure link set
.with_receiver(agent_id)MessageReturn copy with new receiver
.to_dict()dictSerialise to JSON-compatible dict
Message.from_dict(d)MessageDeserialise from dict

API: Config

Config(agent_id: str, *, broker_url: str = "memory://localhost", ...)
Agent configuration. All fields have sensible defaults except agent_id.

API: Result<T,E>

Ok(value: T) → Result[T, E]
Err(error: E) → Result[T, E]
Construct Ok or Err variants. Import from maple.core.

API: ResourceRequest

ResourceRequest(memory: ResourceRange = None, compute: ResourceRange = None, ...)
ResourceRange(min, preferred, max = None)
Specify memory, compute, GPU, and bandwidth requirements. Call .to_dict() to embed in a message payload.

API: CircuitBreaker

CircuitBreaker(failure_threshold: int = 5, recovery_timeout: float = 30.0, success_threshold: int = 2)
Method / PropertyReturnsDescription
.should_allow()boolTrue if call should proceed; transitions OPEN→HALF_OPEN
.record_success()NoneIncrement success counter; may close circuit
.record_failure()NoneIncrement failure counter; may open circuit
.stateCircuitStateCLOSED, OPEN, or HALF_OPEN
.retry_afterfloatSeconds until next half-open probe

Deployment: Memory Broker

The in-memory broker requires no external services — ideal for single-process deployments, tests, and local development.

python
config = Config(agent_id="my-agent", broker_url="memory://localhost")
# No extra setup needed — broker starts automatically
Not for multi-process production

The memory broker does not persist across restarts and is not shared between separate Python processes. Use NATS for distributed deployments.

Deployment: NATS Production

For distributed multi-process deployments, MAPLE uses NATS.io as the broker backend.

bash
pip install "maple-oss[nats]"
docker run -p 4222:4222 nats:latest --jetstream
python
config = Config(
    agent_id   = "prod-agent",
    broker_url = "nats://localhost:4222",
)

Deployment: Docker & Kubernetes

yaml
version: "3.9"
services:
  nats:
    image: nats:latest
    command: ["--jetstream", "--http_port", "8222"]
    ports: ["4222:4222", "8222:8222"]

  maple-worker:
    image: python:3.11-slim
    environment:
      MAPLE_BROKER_URL: "nats://nats:4222"
      MAPLE_AGENT_ID:   "worker-1"
      OPENAI_API_KEY:   "${OPENAI_API_KEY}"
    command: ["python", "worker.py"]
    depends_on: [nats]
    deploy:
      replicas: 3

Best Practices

Always handle Result

Never call .unwrap() without first checking .is_ok(). Prefer .unwrap_or(default) for non-critical paths.

Size ResourceRequests conservatively

Set min to the genuine minimum your task can function with. Inflated values cause unnecessary routing failures under resource pressure.

Use Priority deliberately

If everything is CRITICAL, nothing is. Reserve CRITICAL for true emergencies. Default to NORMAL.

Circuit breaker per downstream service

Create one CircuitBreaker per downstream agent or service — not one global breaker. This prevents one flaky service from blocking all others.

Prefer request() for synchronous flows

Use agent.request(msg, timeout=10.0) when you need the response before proceeding. Use agent.send() for fire-and-forget tasks.

Enable audit in production

Set enable_audit=True in production configs. Audit records are essential for debugging distributed failures and meeting compliance requirements.

Troubleshooting

RESOURCE_UNAVAILABLE errors

Cause

No registered agent satisfies the min constraints in your ResourceRequest. Check err['context'] for available vs requested values.

Fix: Lower min constraints, ensure workers are registered with AgentRegistry, or add more worker agents.

CIRCUIT_OPEN errors

The circuit breaker tripped due to repeated failures. The circuit probes automatically after recovery_timeout seconds via a single HALF_OPEN request.

TypeError: SecurityConfig takes positional args

python
# Wrong
SecurityConfig(auth_type="jwt", credentials={"token": "..."})

# Correct
SecurityConfig("jwt", {"token": "..."})

MessageBroker state leaking between tests

python
@pytest.fixture(autouse=True)
def reset_broker():
    yield
    from maple.broker import MessageBroker
    MessageBroker._instance = None

NATS connection refused

Ensure NATS is running: docker run -p 4222:4222 nats:latest --jetstream. Verify the [nats] extra is installed: pip install "maple-oss[nats]".

Changelog

v1.1.1 — 2025-Q4

  • S2.dev durable streaming integration — persistent replay, sequence-numbered consumption
  • Fixed flaky health monitor test on Windows
  • Fixed publish/release/dependencies workflows; added security extras
  • Fixed CI test failures: install security extras for JWT tests

v1.1.0

  • Autonomous agents — ReAct loop, tool registry, short/long-term memory
  • LLM providers — OpenAI and Anthropic adapters
  • MCP tool server and client integration
  • n8n community nodes (Agent, Coordinator, Resource Manager)
  • Automated release pipeline: tag push triggers PyPI publish

v1.0.0

  • Initial production release
  • Core protocol: Message, Result<T,E>, Priority, ResourceRequest
  • Runtime: MessageBroker, StateStore, TaskScheduler, AgentRegistry, HealthMonitor
  • Security: JWT auth, AES-256-GCM secure links, audit logging
  • Reliability: CircuitBreaker, RecoveryManager, FaultToleranceManager
  • Adapters: A2A, MCP, FIPA ACL, AutoGen, CrewAI, LangGraph, OpenAI SDK, IBM ACP
  • 818 tests passing, ~80% coverage