MAPLE Documentation
MAPLE (Multi Agent Protocol Language Engine) is a production-ready multi-agent runtime that combines autonomous agent orchestration with enterprise-grade infrastructure — resource-aware messaging, type-safe error handling, distributed state, and cryptographic security — all built into the protocol itself.
High-Throughput Broker
Priority-queue message routing with backpressure control and adaptive load balancing.
Result<T,E>
Rust-style typed error handling. Zero silent failures across every API surface.
AES-256-GCM Links
Cryptographically verified agent-to-agent communication channels.
Resource-Aware
Embed CPU, memory, GPU, and bandwidth requirements directly in messages.
10 Protocol Adapters
Native interop with A2A, MCP, AutoGen, CrewAI, LangGraph, n8n, and more.
818 Tests, 80%+ Coverage
Production-validated with a full pytest suite and CI pipeline.
Why MAPLE
Most agent frameworks solve orchestration ergonomics. They leave reliability, security, and resource control for "your platform team." MAPLE's thesis: production infrastructure should be first-class, not bolted on later.
The Gap
Teams building multi-agent systems with LangGraph, CrewAI, or AutoGen inevitably re-implement the same primitives in production:
- Structured error propagation (not raw exceptions)
- Resource budgeting across agent tasks
- Encrypted, verified agent-to-agent channels
- Distributed state that survives restarts
- Circuit breakers and adaptive retry
- Priority-based scheduling under load
MAPLE ships all of this as the default — so your team ships product, not infrastructure.
MAPLE vs Alternatives
| Capability | MAPLE | LangGraph | CrewAI | AutoGen | Google A2A |
|---|---|---|---|---|---|
| Resource-aware messaging | ✔ Built-in | ✘ | ✘ | ✘ | ✘ |
| Result<T,E> type safety | ✔ Every API | ✘ | ✘ | ✘ | ✘ |
| AES-256-GCM secure links | ✔ Built-in | ✘ | ✘ | ✘ | ~ OAuth |
| Distributed state store | ✔ Built-in | ~ External | ✘ | ✘ | ~ External |
| Circuit breakers | ✔ Built-in | ✘ | ✘ | ✘ | ✘ |
| Priority scheduling | ✔ Built-in | ✘ | ✘ | ✘ | ✘ |
| Protocol adapters | ✔ 10 adapters | ✘ | ✘ | ~ 2–3 | ✘ |
| Autonomous ReAct agents | ✔ Built-in | ✔ | ✔ | ✔ | ~ |
| Production test coverage | ✔ 80%+ | ✔ | ~ | ✔ | ✔ |
Architecture
MAPLE is organised into three coherent layers. Each layer is independently usable but designed to compose seamlessly.
Module Map
| Package | Module | Responsibility |
|---|---|---|
maple/core/ | Result, Message, types | Protocol primitives, serialization |
maple/agent/ | Agent, Config | Agent base class, lifecycle management |
maple/broker/ | MessageBroker, Router | Message routing, priority queues |
maple/security/ | Auth, Links, Audit | Authn/authz, AES-256-GCM channels |
maple/state/ | StateStore, Sync | Distributed state (memory/file/SQLite) |
maple/resources/ | ResourceManager | Resource negotiation and tracking |
maple/task_management/ | TaskQueue, Scheduler | Priority scheduling, fault tolerance |
maple/error/ | CircuitBreaker, Recovery | Circuit breaking, adaptive retry |
maple/llm/ | LLMProvider, registry | OpenAI / Anthropic provider abstraction |
maple/autonomy/ | AutonomousAgent, Tools | ReAct loop, tool use, memory |
maple/discovery/ | AgentRegistry, Health | Service discovery, failure detection |
Installation
MAPLE is available on PyPI. Install the core package or add optional extras for LLM support, high-performance brokers, and security primitives.
pip install maple-oss# Full production stack
pip install "maple-oss[llm,security,performance,nats]"
# Verify
python -c "import maple; print(maple.__version__)"| Extra | Packages | When to use |
|---|---|---|
[llm] | openai, anthropic | LLM provider support |
[security] | cryptography, pyjwt | AES-256-GCM links, JWT auth |
[performance] | uvloop, orjson | Faster event loop + JSON |
[nats] | nats-py | NATS.io distributed broker |
[all] | All of the above | Full enterprise deployment |
MAPLE requires Python 3.8 or higher. Python 3.11+ is recommended for best performance.
Quick Start
Get two agents talking in under 30 lines — resource-aware messaging with Result<T,E> error handling.
import asyncio
from maple import Agent, Message, Priority, Config
from maple.resources import ResourceRequest, ResourceRange
async def main():
# 1. Configure agents
sender = Agent(Config(agent_id="sender", broker_url="memory://localhost"))
receiver = Agent(Config(agent_id="receiver", broker_url="memory://localhost"))
await sender.start()
await receiver.start()
# 2. Register a handler
@receiver.on_message("TASK")
async def handle(msg: Message):
print(f"[receiver] got: {msg.payload.get('job')}")
# 3. Build a resource-aware message
msg = Message(
message_type="TASK",
receiver="receiver",
priority=Priority.HIGH,
payload={
"job": "run_analysis",
"resources": ResourceRequest(
memory=ResourceRange(min="2GB", preferred="4GB", max="8GB"),
compute=ResourceRange(min=2, preferred=4, max=8),
).to_dict(),
},
)
# 4. Send with Result<T,E>
result = await sender.send(msg)
if result.is_ok():
print(f"[sender] sent: {result.unwrap()}")
else:
err = result.unwrap_err()
print(f"[sender] failed: {err['message']}")
if err.get("recoverable"):
print(f" suggestion: {err.get('suggestion')}")
await asyncio.gather(sender.stop(), receiver.stop())
asyncio.run(main())Resource requirements embedded in the message payload; type-safe Result with structured recovery hints — no raw exceptions, no silent failures.
Configuration
Every MAPLE agent is created from a Config object.
from maple import Config
from maple.security import SecurityConfig
config = Config(
agent_id = "my-agent",
broker_url = "memory://localhost", # or "nats://localhost:4222"
security_config = SecurityConfig(
"jwt", # auth_type (positional)
{"token": "Bearer <your-token>"}, # credentials (positional)
),
timeout = 30.0,
max_retries = 3,
enable_audit = True,
)| Field | Type | Default | Description |
|---|---|---|---|
agent_id | str required | — | Unique identifier for this agent |
broker_url | str | "memory://localhost" | Broker connection string |
security_config | SecurityConfig optional | None | Auth and encryption settings |
timeout | float | 30.0 | Default request timeout in seconds |
max_retries | int | 3 | Max retry attempts on transient failures |
enable_audit | bool | False | Enable structured audit log output |
state_backend | str | "memory" | "memory", "file", or "sqlite" |
SecurityConfig takes auth_type and credentials as positional arguments. Passing them as kwargs will raise a TypeError.
Messages & Types
A Message is the fundamental unit of communication. Every field is typed; serialization is automatic.
from maple import Message, Priority
msg = Message(
message_type = "ANALYSIS_REQUEST",
sender = "coordinator",
receiver = "worker-1",
priority = Priority.HIGH,
payload = {"data": [1, 2, 3], "model": "gpt-4o"},
metadata = {"trace_id": "abc123", "tenant": "acme"},
correlation_id = "req-001",
)| Field | Type | Description |
|---|---|---|
message_id | str | Auto-generated UUID |
message_type | str required | Application-defined type string |
sender | str | Source agent ID (auto-filled on send) |
receiver | str required | Target agent ID |
priority | Priority | Routing priority |
payload | dict | Application-defined message body |
metadata | dict | Envelope metadata (tracing, tenant, etc.) |
correlation_id | str | For matching requests to responses |
timestamp | float | Unix timestamp (auto-set) |
link_id | str | Secure link ID (see Secure Links) |
Result<T,E>
Every MAPLE API returns a Result — either Ok(value) or Err(error_dict). Silent failures are impossible by design.
from maple.core import Result, Ok, Err
ok_result = Ok("message-id-123")
err_result = Err({"code": "TIMEOUT", "message": "Agent unreachable", "recoverable": True})
# Checking
if ok_result.is_ok():
value = ok_result.unwrap() # "message-id-123"
if err_result.is_err():
error = err_result.unwrap_err() # {"code": "TIMEOUT", ...}
# Chaining
result = ok_result.and_then(lambda v: Ok(v.upper())) # Ok("MESSAGE-ID-123")
result = ok_result.map(lambda v: {"id": v}) # Ok({"id": "message-id-123"})
safe = err_result.unwrap_or("default") # "default"Error Structure
{
"code": "RESOURCE_UNAVAILABLE",
"message": "No agent with GPU capacity",
"recoverable": True,
"suggestion": {
"action": "retry_with_backoff",
"delay_s": 5,
"fallback_receiver": "worker-cpu-pool",
},
"context": {
"requested_gpu_gb": 16,
"available_gpu_gb": 0,
}
}| Method | Returns | Description |
|---|---|---|
.is_ok() | bool | True if Ok result |
.is_err() | bool | True if Err result |
.unwrap() | T | Get Ok value; raises if Err |
.unwrap_err() | E | Get Err dict; raises if Ok |
.unwrap_or(default) | T | Get Ok value or a default |
.map(fn) | Result | Transform Ok value; pass Err through |
.and_then(fn) | Result | Chain Ok to another Result-returning fn |
.map_err(fn) | Result | Transform Err; pass Ok through |
Priority System
MAPLE's broker routes messages through priority queues. Higher-priority messages are dequeued first regardless of arrival order.
from maple import Priority
Priority.CRITICAL # = 0 — emergency alerts, system-wide events
Priority.HIGH # = 1 — time-sensitive tasks
Priority.NORMAL # = 2 — default
Priority.LOW # = 3 — background batch work
Priority.BACKGROUND # = 4 — best-effort housekeepingWhen queue depth exceeds the configured limit, BACKGROUND messages are shed first. CRITICAL messages are never dropped.
Resource Specification
MAPLE is the only multi-agent protocol with resource requirements as a first-class protocol field. Agents negotiate resources before committing to a task.
from maple.resources import ResourceRequest, ResourceRange
req = ResourceRequest(
memory = ResourceRange(min="4GB", preferred="8GB", max="16GB"),
compute = ResourceRange(min=4, preferred=8, max=16),
gpu_memory = ResourceRange(min="8GB", preferred="16GB"),
network_bandwidth = ResourceRange(min="100Mbps", preferred="1Gbps"),
deadline = "2025-12-01T09:00:00Z",
)
msg = Message(
message_type = "ML_INFERENCE",
receiver = "gpu-worker",
payload = {"model": "llama-3-70b", "resources": req.to_dict()},
)The Resource Manager reads these fields during routing and selects the best available agent. If no agent satisfies min constraints, send() returns Err({"code": "RESOURCE_UNAVAILABLE", "recoverable": True, ...}).
Message Broker
The MessageBroker is a singleton managing all message routing. It supports in-memory (development) and NATS (production) backends.
from maple.broker import MessageBroker
broker = MessageBroker.get_instance()
stats = broker.get_stats()
# {
# "queued": 42,
# "delivered": 10_847,
# "failed": 3,
# "agents": 12,
# "throughput_per_sec": 33_000,
# }Between test cases, reset the singleton with MessageBroker._instance = None to avoid state leakage.
State Store
Agents share persistent state through StateStore, backed by memory, file, or SQLite.
from maple.state import StateStore, ConsistencyLevel
store = StateStore(backend="sqlite", path="./state.db")
await store.set("job:123:status", "running", ttl=3600)
result = await store.get("job:123:status")
if result.is_ok():
status = result.unwrap() # "running"
await store.compare_and_set(
key = "job:123:status",
expected = "running",
new_value = "complete",
consistency = ConsistencyLevel.STRONG,
)
async def on_change(key, old_val, new_val):
print(f"{key}: {old_val} -> {new_val}")
await store.subscribe("job:*", on_change)Task Scheduler
The task scheduler queues work, assigns it to available agents based on resource requirements, and tracks completion with configurable aggregation strategies.
from maple.task_management import TaskQueue, Task, AggregationStrategy
queue = TaskQueue()
task = Task(
task_id = "batch-001",
message = msg,
priority = Priority.HIGH,
max_retries = 3,
timeout = 60.0,
)
result = await queue.submit(task)
# Fan-out to multiple workers
results = await queue.fan_out(
tasks = [task_a, task_b, task_c],
strategy = AggregationStrategy.FIRST_SUCCESS,
timeout = 30.0,
)Agent Registry
Dynamic service discovery — find agents by capability, resource availability, or custom metadata.
from maple.discovery import AgentRegistry, CapabilityMatcher
registry = AgentRegistry.get_instance()
await registry.register(
agent_id = "ml-worker-1",
capabilities = ["inference", "embedding"],
resources = {"gpu_gb": 24, "memory_gb": 64},
metadata = {"model": "llama-3-70b", "region": "us-east"},
)
agents = await registry.find_by_capability("inference")
matched = await CapabilityMatcher().match(req, agents, strategy="least_loaded")Health Monitor
Periodic health checks with state-transition callbacks for automated failover.
from maple.discovery import HealthMonitor
monitor = HealthMonitor(check_interval=10.0)
@monitor.on_agent_unhealthy
async def handle_unhealthy(agent_id: str, reason: str):
print(f"[health] {agent_id} unhealthy: {reason}")
await registry.deregister(agent_id)
await monitor.start()Authentication
MAPLE supports JWT, API key, and no-auth strategies via SecurityConfig.
from maple.security import SecurityConfig
# JWT
jwt_cfg = SecurityConfig("jwt", {"token": "Bearer eyJhbGci..."})
# API key
key_cfg = SecurityConfig("api_key", {"key": "sk-your-key", "header": "X-API-Key"})
# Development only
dev_cfg = SecurityConfig("none", {})Always use JWT or API key in production. "none" disables all identity verification.
Secure Links New
A Secure Link is a cryptographically verified, persistent channel using AES-256-GCM encryption. All messages through a link are encrypted end-to-end.
import asyncio
from maple import Agent, Message, Config
async def main():
agent = Agent(Config(agent_id="initiator", broker_url="memory://localhost"))
await agent.start()
link_result = await agent.establish_link(
target_agent = "secure-processor",
security_level = "MAXIMUM",
encryption = "AES-256-GCM",
)
if link_result.is_ok():
link_id = link_result.unwrap()
secure_msg = Message(
message_type = "CONFIDENTIAL",
payload = {"secret": "top-secret-data"},
).with_link(link_id)
result = await agent.send(secure_msg)
print(f"Secure send: {result}")
else:
print(f"Link failed: {link_result.unwrap_err()}")
await agent.stop()
asyncio.run(main())Audit Logging
Structured audit records for every message send, link establishment, auth event, and state change.
from maple.security import AuditLogger
AuditLogger.configure(
output = "file",
path = "/var/log/maple/audit.jsonl",
level = "INFO",
sign = True, # HMAC-SHA256 signature per record
)Circuit Breakers
MAPLE's CircuitBreaker opens automatically on repeated failures, preventing cascading failures across your agent graph.
from maple.error import CircuitBreaker
from maple.core import Err
cb = CircuitBreaker(
failure_threshold = 5,
recovery_timeout = 30.0,
success_threshold = 2,
)
async def call_worker(payload):
if not cb.should_allow():
return Err({
"code": "CIRCUIT_OPEN",
"message": f"Worker unavailable, retry in {cb.retry_after:.0f}s",
"recoverable": True,
})
result = await agent.send(Message(
message_type = "WORK",
payload = payload,
receiver = "worker",
))
if result.is_ok():
cb.record_success()
else:
cb.record_failure()
return resultState machine: CLOSED → (threshold failures) → OPEN → (recovery timeout) → HALF_OPEN → (success threshold) → CLOSED.
Error Recovery
from maple.error import RecoveryManager, BackoffStrategy
recovery = RecoveryManager(
strategy = BackoffStrategy.EXPONENTIAL_JITTER,
base_delay = 1.0,
max_delay = 60.0,
max_attempts = 5,
)
result = await recovery.execute(
fn = lambda: agent.send(msg),
fallback = lambda err: agent.send(msg.with_receiver("worker-fallback")),
)Fault Tolerance
from maple.task_management import FaultToleranceManager
ft = FaultToleranceManager(
circuit_breaker = cb,
recovery_manager = recovery,
health_monitor = monitor,
)
result = await ft.execute_with_resilience(
task = task,
agent_pool = ["worker-1", "worker-2", "worker-3"],
strategy = "fail_over",
)Use Case Gallery
Six reference architectures showing how MAPLE's production primitives — resource-aware messaging, Result<T,E>, circuit breakers, distributed state — compose into real systems.
1. Distributed ML Pipeline
Coordinating data ingestion, model inference, and result aggregation across specialised agents — each with explicit GPU/memory requirements negotiated at the protocol level.
{
"taskType": "ML_PIPELINE",
"stages": {
"dataIngestion": {
"source": "kafka_stream",
"validation": { "schema": "avro_schema_v1", "constraints": ["non_null","range_check"] },
"errorHandling": { "strategy": "dead_letter_queue", "retry": { "maxAttempts": 3, "backoff": "exponential" } }
},
"modelInference": {
"model": { "type": "tensorflow", "version": "2.4",
"resources": { "gpu": "required", "memory": "16GB" } },
"loadBalancing": { "strategy": "least_loaded", "healthCheck": "every_30s" }
},
"resultAggregation": {
"window": "5m", "consistency": "exactly_once", "outputFormat": "parquet"
}
}
}- Resource-aware routing — GPU requirement embedded in the message; the broker routes to whichever worker has capacity
- Dead-letter queue — failed ingestion messages are preserved, not silently dropped
- Exactly-once aggregation — Result<T,E> ensures every partial result is accounted for before emitting the window
2. IoT Sensor Network
Thousands of sensors with priority-based routing — emergency readings bypass batch queues and reach the control plane immediately.
{
"deviceRegistry": {
"type": "SENSOR_NETWORK",
"protocol": { "discovery": "automatic", "authentication": "mutual_tls", "compression": "enabled" },
"messageHandling": {
"priority": { "emergency": "immediate", "routine": "batch" },
"batching": { "size": "1000", "timeout": "1s" }
},
"stateManagement": { "sync": "eventual", "storage": "distributed_cache" }
}
}3. Financial Trading System
Ultra-low-latency trade execution with atomic consistency guarantees and automatic failover when latency breaches the SLA threshold.
{
"tradingSystem": {
"latencyRequirements": { "maxLatency": "50ms", "priority": "CRITICAL" },
"transactions": { "type": "ATOMIC", "consistency": "STRONG", "isolation": "SERIALIZABLE" },
"monitoring": {
"metrics": ["latency", "throughput", "error_rate"],
"alerting": { "threshold": "latency > 45ms", "action": "FAILOVER" }
}
}
}4. Microservices Orchestration
Complex service graphs with dynamic discovery, weighted routing, and circuit-breaker protection across service boundaries.
{
"serviceOrchestration": {
"discovery": { "method": "dynamic", "healthCheck": "tcp+http", "interval": "10s" },
"circuitBreaker": { "threshold": "5_failures", "timeout": "30s", "fallback": "cached_response" },
"routing": { "strategy": "weighted_round_robin", "filters": ["version","region","load"] }
}
}5. Real-time Analytics Pipeline
Sliding window stream processing with RocksDB state backend, exactly-once checkpointing, and auto-scaling on backpressure signals.
{
"analyticsEngine": {
"streaming": { "windowType": "sliding", "windowSize": "5m", "watermark": "10s" },
"processing": {
"operators": ["filter","aggregate","join"],
"stateBackend": "rocksdb",
"checkpointing": { "interval": "1m", "mode": "exactly_once" }
},
"scaling": { "autoScale": true, "metrics": ["backpressure","lag"], "limits": {"min":1,"max":10} }
}
}6. Autonomous Robotics System
Consensus-based multi-robot coordination with distributed path planning, guaranteed delivery, and dynamic obstacle avoidance.
{
"roboticSystem": {
"coordination": { "protocol": "consensus_based", "synchronization": "time_bounded" },
"pathPlanning": {
"algorithm": "distributed_rrt",
"constraints": { "collision_avoidance": true, "dynamic_obstacles": true }
},
"communication": { "mesh_network": true, "latency_bound": "100ms", "reliability": "guaranteed_delivery" }
}
}Industry Applications
Production-grade MAPLE message schemas for six vertical industries. Each example shows the exact JSON envelope MAPLE agents exchange, including priority, error recovery, and resource fields.
Healthcare — Patient Monitoring
Coordinating medical devices, patient data streams, and clinical notifications with threshold-triggered alerts.
{
"messageType": "PATIENT_MONITORING",
"priority": "HIGH",
"payload": {
"patientId": "P123456",
"deviceData": {
"heartRate": { "value": 85, "unit": "bpm", "timestamp": "2024-12-12T10:15:00Z", "deviceId": "HR_MONITOR_001" },
"bloodPressure": { "systolic": 120, "diastolic": 80, "unit": "mmHg" },
"oxygenSaturation": { "value": 98, "unit": "percentage" }
},
"alerts": [
{ "type": "THRESHOLD_EXCEEDED", "severity": "MEDIUM", "metric": "heartRate", "threshold": 80, "action": "NOTIFY_NURSE" }
],
"metadata": { "ward": "ICU", "floor": 3, "attending": "DR_SMITH" }
}
}{
"messageType": "DEVICE_ERROR",
"payload": {
"errorType": "DEVICE_DISCONNECTED",
"deviceId": "HR_MONITOR_001",
"recovery": { "action": "FAILOVER", "backupDevice": "HR_MONITOR_002", "dataContinuity": "maintained" }
}
}Finance — Algorithmic Trading
High-frequency order execution with risk pre-checks, VWAP strategy splitting, and atomic settlement.
{
"messageType": "TRADE_EXECUTION",
"priority": "CRITICAL",
"payload": {
"orderId": "ORD789",
"instrument": { "symbol": "AAPL", "type": "EQUITY", "market": "NASDAQ" },
"action": { "type": "BUY", "quantity": 1000, "orderType": "LIMIT", "price": 150.25, "timeInForce": "IOC" },
"riskChecks": { "position": "VERIFIED", "margin": "SUFFICIENT", "limits": "WITHIN_BOUNDS" },
"execution": { "strategy": "VWAP", "slippage": 0.02, "childOrders": ["ORD789_1","ORD789_2"] }
}
}Manufacturing — Industry 4.0
Smart factory automation: robotic assembly coordination, quality metrics, and conveyor sensor telemetry in one message.
{
"messageType": "PRODUCTION_CONTROL",
"payload": {
"productionLine": "ASSEMBLY_A",
"status": { "state": "ACTIVE", "efficiency": 94.5, "currentBatch": "BATCH_2024_123" },
"robotics": {
"arm_1": { "position": [23.5, 45.2, 12.1], "load": 75, "nextAction": "PICK", "target": "COMPONENT_XYZ" },
"conveyor": { "speed": 0.5, "loadFactor": 0.8, "sensors": { "proximity": "CLEAR", "temperature": 35.2 } }
},
"qualityMetrics": { "defectRate": 0.1, "accuracy": 99.8, "calibrationStatus": "IN_SPEC" }
}
}Smart City — Traffic Control
Real-time intersection management with emergency vehicle priority override and congestion-aware signal timing.
{
"messageType": "TRAFFIC_CONTROL",
"payload": {
"intersection": { "id": "INT_456", "location": { "lat": 37.7749, "lng": -122.4194 },
"status": { "congestion": "HIGH", "weather": "RAIN" } },
"signalControl": {
"currentPhase": 2,
"timing": { "green": 45, "yellow": 5, "red": 60 },
"override": { "active": true, "reason": "EMERGENCY_VEHICLE", "direction": "NORTH_SOUTH" }
},
"sensorData": { "vehicleCount": 45, "averageSpeed": 28.5, "queueLength": 12 }
}
}Energy Grid — Load Balancing
Distributed energy resource coordination with demand-response activation and supply-side optimisation.
{
"messageType": "GRID_MANAGEMENT",
"payload": {
"gridSegment": "SECTOR_7",
"powerMetrics": {
"demand": { "current": 1250, "predicted": 1400, "unit": "kW" },
"supply": { "solar": 450, "wind": 300, "conventional": 600, "unit": "kW" }
},
"loadBalance": {
"status": "OPTIMIZING",
"actions": [{ "type": "DEMAND_RESPONSE", "target": "COMMERCIAL", "reduction": 100, "duration": "1h" }]
}
}
}Logistics — Warehouse Automation
AGV fleet coordination: pick task assignment, battery monitoring, payload tracking, and inventory delta in one message.
{
"messageType": "WAREHOUSE_OPERATIONS",
"payload": {
"facility": "WH_NORTH",
"operations": {
"pickingTasks": [
{ "taskId": "PICK_123", "priority": "HIGH", "location": "AISLE_5_RACK_3",
"item": { "sku": "ITEM_789", "quantity": 5, "weight": 2.3 }, "assignedTo": "AGV_001" }
],
"agvFleet": {
"AGV_001": { "status": "MOVING", "battery": 85,
"location": { "x": 123.5, "y": 456.7 },
"payload": { "current": 15.5, "capacity": 50 } }
},
"inventory": { "updates": [{ "sku": "ITEM_789", "quantity": -5, "type": "OUTBOUND" }] }
}
}
}Protocol Comparison
A factual comparison of MAPLE against the major agent communication protocols and frameworks. Use this to guide technology selection for your project.
Feature Matrix
| Capability | MAPLE | Google A2A | FIPA ACL | MCP | AGENTCY / ACP |
|---|---|---|---|---|---|
| Resource-aware messaging | ✔ First-class field | ✘ | ✘ | ✘ | ✘ |
| Result<T,E> type safety | ✔ Every API | ✘ exceptions | ✘ legacy | ✘ | ✘ |
| AES-256-GCM secure links | ✔ Built-in | ~ OAuth only | ✘ none | ~ platform | ✘ |
| Distributed state store | ✔ Built-in | ~ external req. | ✘ | ✘ context only | ✘ |
| Circuit breakers | ✔ Built-in | ✘ | ✘ | ✘ | ✘ |
| Priority scheduling | ✔ 5 levels | ~ platform | ~ basic | ✘ | ✘ |
| Protocol adapters / interop | ✔ 10 adapters | ✘ | ~ 1–2 | ✘ | ✘ |
| Autonomous ReAct agents | ✔ Built-in | ~ | ✘ | ✘ | ✘ |
| Open source | ✔ AGPL-3.0 | ✘ proprietary | ✔ | ✔ | ✔ |
| Production test coverage | ✔ 80%+ | ✔ Google SLA | ~ legacy | ✔ | ✘ research |
When to Choose Each
| Protocol | Best for | Avoid when |
|---|---|---|
| MAPLE | Production multi-agent systems that need resource management, typed errors, security, and distributed state out of the box | Simple single-agent LLM tool calling with no reliability requirements |
| Google A2A | Agents deeply integrated into Google Cloud — Vertex AI, Cloud Run, etc. | Multi-cloud or on-prem deployments; anything requiring resource negotiation |
| FIPA ACL | Academic research; legacy enterprise systems already speaking FIPA; standards compliance requirements | Any greenfield project — the 1990s design shows in performance and tooling |
| MCP | Exposing or consuming tools for a single LLM — a great tool layer | Multi-agent coordination, parallel execution, or anything requiring state |
| AGENTCY / ACP | Academic research and protocol theory exploration | Any production deployment |
MAPLE ships adapters for all of the above. You can use MAPLE as the reliability and routing layer while still speaking A2A, MCP, or FIPA ACL to existing agents — no rewrite required.
Type System
MAPLE's type system makes error conditions first-class citizens. Every operation that can fail returns a structured type — not an exception, not a status code — so the compiler and runtime can enforce correct error handling.
Primitive Types
from maple.core.types import (
Boolean, Integer, Float, String,
Timestamp, UUID, Size, Duration,
)
# Size and Duration parse human-readable strings
memory_size = Size.validate("16GB") # → 17_179_869_184 (bytes)
timeout = Duration.validate("30s") # → 30.0 (seconds)
agent_id = UUID.validate("550e8400-e29b-41d4-a716-446655440000")Collection Types
from maple.core.types import Array, Map, Set, Option
AgentList = Array(String)
ResourceMap = Map(String, Integer)
CapabilitySet = Set(String)
agents = AgentList.validate(["agent-1", "agent-2", "agent-3"])
resources = ResourceMap.validate({"cpu": 8, "memory": 16, "gpu": 2})Priority Enum
from maple.core.types import Priority
class Priority(Enum):
CRITICAL = "CRITICAL" # Life-critical, emergency override
HIGH = "HIGH" # Time-sensitive tasks
NORMAL = "NORMAL" # Standard (default)
LOW = "LOW" # Background tasks
BACKGROUND = "BACKGROUND" # Best-effort housekeepingError Type Hierarchy
from maple.error.types import ErrorType, Severity
class ErrorType(Enum):
# Communication
NETWORK_ERROR = "NETWORK_ERROR"
TIMEOUT = "TIMEOUT"
ROUTING_ERROR = "ROUTING_ERROR"
MESSAGE_VALIDATION_ERROR = "MESSAGE_VALIDATION_ERROR"
# Resources
RESOURCE_UNAVAILABLE = "RESOURCE_UNAVAILABLE"
RESOURCE_EXHAUSTED = "RESOURCE_EXHAUSTED"
RESOURCE_NEGOTIATION_FAILED = "RESOURCE_NEGOTIATION_FAILED"
# Security
AUTHENTICATION_ERROR = "AUTHENTICATION_ERROR"
AUTHORIZATION_ERROR = "AUTHORIZATION_ERROR"
LINK_VERIFICATION_FAILED = "LINK_VERIFICATION_FAILED"
ENCRYPTION_ERROR = "ENCRYPTION_ERROR"
# State
STATE_CONFLICT = "STATE_CONFLICT"
STATE_SYNCHRONIZATION_FAILED = "STATE_SYNCHRONIZATION_FAILED"
CONSISTENCY_VIOLATION = "CONSISTENCY_VIOLATION"ConsistencyLevel
from maple.state import ConsistencyLevel
class ConsistencyLevel(Enum):
STRONG = "STRONG" # All replicas consistent immediately
CAUSAL = "CAUSAL" # Causally consistent ordering
EVENTUAL = "EVENTUAL" # Eventually consistent with conflict resolution
WEAK = "WEAK" # Best-effort, highest throughputError Types Reference
Complete listing of MAPLE error codes with recoverability guidance.
| Error Code | Category | Recoverable? | Typical cause & fix |
|---|---|---|---|
RESOURCE_UNAVAILABLE | Resource | ✔ Yes | No agent meets min constraints. Lower min or add workers. |
RESOURCE_EXHAUSTED | Resource | ✔ Yes | Agent ran out of quota mid-task. Add capacity or reduce batch size. |
CIRCUIT_OPEN | Reliability | ✔ After timeout | Circuit tripped. Wait for retry_after seconds then probe again. |
TIMEOUT | Communication | ✔ Yes | Increase Config.timeout or use send() instead of request(). |
ROUTING_ERROR | Communication | ✔ Yes | Agent not registered. Check AgentRegistry and call start(). |
MESSAGE_VALIDATION_ERROR | Protocol | ✘ No | Invalid message schema. Fix message_type or missing required fields. |
AUTHENTICATION_ERROR | Security | ✘ No | Invalid or expired token. Refresh credentials in SecurityConfig. |
AUTHORIZATION_ERROR | Security | ✘ No | Agent lacks permission for the requested operation. |
LINK_VERIFICATION_FAILED | Security | ~ Re-establish | Secure link expired or tampered. Call establish_link() again. |
STATE_CONFLICT | State | ✔ Yes | Compare-and-swap conflict. Re-read state and retry the update. |
CONSISTENCY_VIOLATION | State | ~ Config | Read returned stale data under STRONG consistency. Investigate replicas. |
Result<T,E> In Depth
The Result<T,E> type has its roots in functional programming (Rust's Result<T, E>, Haskell's Either, Swift's Result, Scala's Either). MAPLE adapts this pattern specifically for agent communication, adding protocol-specific semantics for coordination, resource management, and structured recovery.
Why not exceptions?
| Approach | Problem |
|---|---|
| Exception / raise | Not visible in type signatures — callers can miss them entirely |
| Error codes (int) | Limited context; callers must look up meaning in docs |
| Optional / None | Indicates success/failure but carries no error detail |
| Status flag + data | Mixes success and error info in one structure; still no type enforcement |
| Result<T,E> | ✔ Type-safe, carries full error context, chainable, forces explicit handling |
Chaining pipelines
from maple.core import Ok, Err
# Chain with and_then — error short-circuits the chain automatically
result = (
load_data(source)
.and_then(lambda data: validate_schema(data))
.map(lambda valid: process_ai_analysis(valid))
.and_then(lambda analysis: generate_insights(analysis))
.map(lambda insights: format_output(insights))
)
if result.is_ok():
print("Pipeline complete:", result.unwrap())
else:
err = result.unwrap_err()
print(f"Failed at step: {err['code']} — {err['message']}")
if err.get("recoverable"):
apply_recovery_strategy(err["suggestion"])Result<T,E> in message payloads
MAPLE also uses the Result pattern in message payloads for agent-to-agent responses:
{
"messageType": "TASK_RESULT",
"payload": {
"taskId": "task_123",
"result": {
"status": "Ok",
"value": {
"processedItems": 100,
"analysis": { "patterns": ["pattern1","pattern2"], "confidence": 0.95 }
}
}
}
}{
"messageType": "TASK_RESULT",
"payload": {
"taskId": "task_123",
"result": {
"status": "Err",
"error": {
"type": "DATA_VALIDATION_ERROR",
"message": "Invalid time series format",
"details": { "expectedFormat": "ISO8601", "receivedFormat": "MM/DD/YYYY", "affectedRecords": [12,15,18] },
"recoverable": true
}
}
}
}Resource allocation — partial failure example
{
"messageType": "RESOURCE_RESPONSE",
"payload": {
"requestId": "req_456",
"result": {
"status": "Err",
"error": {
"type": "PARTIAL_ALLOCATION_FAILURE",
"message": "Could not allocate all requested resources",
"details": {
"allocated": { "cpuCores": 4, "memory": "16GB", "gpuMemory": "4GB" },
"missing": { "gpuMemory": "4GB" }
},
"alternatives": [
{ "option": "WAIT_FOR_RESOURCES", "estimatedWaitTime": "5m" },
{ "option": "PROCEED_WITH_PARTIAL", "expectedPerformanceImpact": "moderate" }
]
}
}
}
}Google A2A Integration
Bridge Google's Agent-to-Agent protocol, letting MAPLE agents participate in A2A networks.
from maple.adapters.a2a import A2AAdapter
adapter = A2AAdapter(
maple_agent = my_agent,
a2a_endpoint = "https://a2a.googleapis.com/v1",
credentials = {"token": "Bearer ..."},
)
await adapter.bridge_inbound()
result = await adapter.send_to_a2a(
a2a_agent_id = "projects/my-proj/agents/classifier",
payload = {"text": "Classify this document"},
)A2A messages have no native resource fields. The adapter automatically wraps them in MAPLE's ResourceRequest envelope so resource-aware routing still applies.
Model Context Protocol (MCP)
Use MAPLE agents as MCP tool servers, or call external MCP tool servers from within an autonomous MAPLE agent.
from maple.adapters.mcp import MCPAdapter
from maple.autonomy import AutonomousAgent
agent = AutonomousAgent(
config = Config(agent_id="researcher", broker_url="memory://localhost"),
llm_model = "gpt-4o",
tools = [
MCPAdapter.from_server("npx -y @modelcontextprotocol/server-brave-search"),
MCPAdapter.from_server("npx -y @modelcontextprotocol/server-filesystem"),
],
)
result = await agent.run("Search and summarise the latest MAPLE documentation.")Expose MAPLE as MCP Server
from maple.adapters.mcp import MCPServer
server = MCPServer(agent=my_agent, port=3000, expose=["send", "request", "get_state"])
await server.start()FIPA ACL Integration
Bridge legacy FIPA ACL agents into a MAPLE network with automatic performative mapping.
from maple.adapters.fipa import FIPAAdapter
adapter = FIPAAdapter(maple_agent=my_agent)
maple_msg = adapter.from_fipa({
"performative": "REQUEST",
"sender": "legacy-agent@platform:5678",
"content": "(action (agent1 (process data)))",
})
await adapter.send_fipa(
receiver = "legacy-agent@platform:5678",
performative = "INFORM",
content = "(result ok)",
)AutoGen Integration
Wrap AutoGen's ConversableAgent as a MAPLE agent for production reliability.
from maple.adapters.autogen import AutoGenAdapter
import autogen
autogen_agent = autogen.AssistantAgent("assistant", llm_config={...})
maple_wrapper = AutoGenAdapter.wrap(
autogen_agent = autogen_agent,
maple_config = Config(agent_id="autogen-assistant", broker_url="memory://localhost"),
)
await maple_wrapper.start()
result = await sender.send(Message(
message_type = "CHAT",
receiver = "autogen-assistant",
payload = {"content": "Explain MAPLE's circuit breaker pattern."},
))CrewAI Integration
Add MAPLE's production infrastructure — circuit breakers, state, resource routing — to CrewAI crews.
from maple.adapters.crewai import MAPLECrewAdapter
from crewai import Crew, Agent, Task
crew = Crew(agents=[researcher, analyst], tasks=[...])
maple_crew = MAPLECrewAdapter(
crew = crew,
broker_url = "nats://localhost:4222",
circuit_breaker_config = {"failure_threshold": 3},
)
result = await maple_crew.kickoff_async(inputs={"topic": "AI trends 2025"})LangGraph Integration
Use MAPLE as the message transport and state backend for LangGraph StateGraph workflows.
from maple.adapters.langgraph import MAPLELangGraphAdapter
from langgraph.graph import StateGraph
graph = StateGraph(dict)
graph.add_node("fetch", fetch_node)
graph.add_node("process", process_node)
graph.add_edge("fetch", "process")
adapter = MAPLELangGraphAdapter(
graph = graph,
state_store = StateStore(backend="sqlite", path="./lg_state.db"),
broker_url = "memory://localhost",
)
result = await adapter.invoke({"query": "Analyse MAPLE architecture"})OpenAI SDK Integration
Use OpenAI's models as the LLM backend for MAPLE autonomous agents.
from maple.autonomy import AutonomousAgent
agent = AutonomousAgent(
config = Config(agent_id="gpt-worker", broker_url="memory://localhost"),
llm_model = "gpt-4o",
llm_config = {"api_key": "sk-...", "temperature": 0.2, "max_tokens": 2048},
)
result = await agent.run("Summarise the last 10 git commits in this repo.")IBM ACP Integration
Bridge IBM's Agent Communication Protocol into MAPLE networks for enterprise system integrations.
from maple.adapters.ibm_acp import IBMACPAdapter
adapter = IBMACPAdapter(
maple_agent = my_agent,
acp_endpoint = "https://acp.ibm.example.com/v1",
api_key = "ibm-api-key-...",
)
await adapter.start_bridge()
result = await adapter.send_to_acp(
acp_agent_id = "watson-orchestrator",
payload = {"action": "process_claim", "claim_id": "CLM-001"},
)S2.dev Durable Streaming New
S2.dev provides durable, ordered, high-throughput streams. MAPLE's S2 integration gives agents persistent message history and replay capability.
from maple.adapters.s2dev import S2Adapter
adapter = S2Adapter(
maple_agent = my_agent,
s2_endpoint = "https://s2.dev",
access_token = "s2-token-...",
stream_name = "maple-prod-stream",
retention = "7d",
)
await adapter.publish(msg)
await adapter.subscribe(
from_seq = 1_000,
on_message = lambda m: agent.handle_message(m),
)Use S2 when you need message replay for audit, recovery after agent restarts, or exactly-once processing guarantees across distributed deployments.
n8n Integration
Three native n8n nodes let non-technical users build MAPLE workflows in the n8n visual editor.
cd ~/.n8n
npm install n8n-nodes-mapleAfter installing, restart n8n and search for MAPLE in the node palette. Configure the broker URL and credentials in n8n's credential manager — no Python required.
LLM Providers
MAPLE's LLMProvider abstraction supports OpenAI and Anthropic, with a pluggable interface for custom backends.
from maple.llm import get_provider
# Auto-selects based on model prefix
openai_prov = get_provider("gpt-4o", api_key="sk-...")
anthropic_prov = get_provider("claude-sonnet-4-6", api_key="sk-ant-...")
response = await openai_prov.complete(
messages = [{"role": "user", "content": "Hello, MAPLE!"}],
temperature = 0.2,
)Autonomous Agents
MAPLE's AutonomousAgent runs a ReAct (Reasoning + Acting) loop — reason about the goal, select tools, execute them, observe results, iterate until done.
from maple.autonomy import AutonomousAgent, Tool
from maple import Config
async def search_web(query: str) -> str:
return f"Results for: {query}" # your implementation
agent = AutonomousAgent(
config = Config(agent_id="auto-1", broker_url="memory://localhost"),
llm_model = "claude-sonnet-4-6",
tools = [Tool(name="search_web", fn=search_web,
description="Search the web for information")],
max_iterations = 10,
memory_enabled = True,
)
result = await agent.run(
"Research the top 5 multi-agent frameworks and compare their resource management features."
)
print(result.output)
print(result.tool_calls_made)Tools & Memory
MAPLE agents have short-term (in-run) and long-term (persistent) memory, plus a flexible tool registry.
from maple.autonomy import AgentMemory, MemoryStore
memory = AgentMemory(
store = MemoryStore(backend="sqlite", path="./agent_mem.db"),
max_items = 1000,
embed_model = "text-embedding-3-small",
)
agent = AutonomousAgent(config=cfg, llm_model="gpt-4o", memory=memory, tools=my_tools)
await memory.remember("User prefers concise answers", importance=0.9)
results = await memory.recall("user preferences", top_k=5)API: Agent
start() is called.| Method | Signature | Description |
|---|---|---|
start() | Awaitable[None] | Connect to broker and register agent |
stop() | Awaitable[None] | Gracefully disconnect and deregister |
send(msg) | Result[str, dict] | Fire-and-forget; returns message ID or error |
request(msg, timeout) | Result[Message, dict] | Request/response; awaits reply message |
on_message(type)(handler) | Decorator | Register async handler for a message type |
establish_link(target, ...) | Result[str, dict] | Create AES-256-GCM secure channel |
get_state(key) | Result[Any, dict] | Read from the agent's state store |
set_state(key, value) | Result[None, dict] | Write to the agent's state store |
API: Message
| Method | Returns | Description |
|---|---|---|
.with_link(link_id) | Message | Return copy with secure link set |
.with_receiver(agent_id) | Message | Return copy with new receiver |
.to_dict() | dict | Serialise to JSON-compatible dict |
Message.from_dict(d) | Message | Deserialise from dict |
API: Config
agent_id.API: Result<T,E>
Err(error: E) → Result[T, E]
maple.core.API: ResourceRequest
ResourceRange(min, preferred, max = None)
.to_dict() to embed in a message payload.API: CircuitBreaker
| Method / Property | Returns | Description |
|---|---|---|
.should_allow() | bool | True if call should proceed; transitions OPEN→HALF_OPEN |
.record_success() | None | Increment success counter; may close circuit |
.record_failure() | None | Increment failure counter; may open circuit |
.state | CircuitState | CLOSED, OPEN, or HALF_OPEN |
.retry_after | float | Seconds until next half-open probe |
Deployment: Memory Broker
The in-memory broker requires no external services — ideal for single-process deployments, tests, and local development.
config = Config(agent_id="my-agent", broker_url="memory://localhost")
# No extra setup needed — broker starts automaticallyThe memory broker does not persist across restarts and is not shared between separate Python processes. Use NATS for distributed deployments.
Deployment: NATS Production
For distributed multi-process deployments, MAPLE uses NATS.io as the broker backend.
pip install "maple-oss[nats]"
docker run -p 4222:4222 nats:latest --jetstreamconfig = Config(
agent_id = "prod-agent",
broker_url = "nats://localhost:4222",
)Deployment: Docker & Kubernetes
version: "3.9"
services:
nats:
image: nats:latest
command: ["--jetstream", "--http_port", "8222"]
ports: ["4222:4222", "8222:8222"]
maple-worker:
image: python:3.11-slim
environment:
MAPLE_BROKER_URL: "nats://nats:4222"
MAPLE_AGENT_ID: "worker-1"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
command: ["python", "worker.py"]
depends_on: [nats]
deploy:
replicas: 3Best Practices
Always handle Result
Never call .unwrap() without first checking .is_ok(). Prefer .unwrap_or(default) for non-critical paths.
Size ResourceRequests conservatively
Set min to the genuine minimum your task can function with. Inflated values cause unnecessary routing failures under resource pressure.
Use Priority deliberately
If everything is CRITICAL, nothing is. Reserve CRITICAL for true emergencies. Default to NORMAL.
Circuit breaker per downstream service
Create one CircuitBreaker per downstream agent or service — not one global breaker. This prevents one flaky service from blocking all others.
Prefer request() for synchronous flows
Use agent.request(msg, timeout=10.0) when you need the response before proceeding. Use agent.send() for fire-and-forget tasks.
Enable audit in production
Set enable_audit=True in production configs. Audit records are essential for debugging distributed failures and meeting compliance requirements.
Troubleshooting
RESOURCE_UNAVAILABLE errors
No registered agent satisfies the min constraints in your ResourceRequest. Check err['context'] for available vs requested values.
Fix: Lower min constraints, ensure workers are registered with AgentRegistry, or add more worker agents.
CIRCUIT_OPEN errors
The circuit breaker tripped due to repeated failures. The circuit probes automatically after recovery_timeout seconds via a single HALF_OPEN request.
TypeError: SecurityConfig takes positional args
# Wrong
SecurityConfig(auth_type="jwt", credentials={"token": "..."})
# Correct
SecurityConfig("jwt", {"token": "..."})MessageBroker state leaking between tests
@pytest.fixture(autouse=True)
def reset_broker():
yield
from maple.broker import MessageBroker
MessageBroker._instance = NoneNATS connection refused
Ensure NATS is running: docker run -p 4222:4222 nats:latest --jetstream. Verify the [nats] extra is installed: pip install "maple-oss[nats]".
Changelog
v1.1.1 — 2025-Q4
- S2.dev durable streaming integration — persistent replay, sequence-numbered consumption
- Fixed flaky health monitor test on Windows
- Fixed publish/release/dependencies workflows; added security extras
- Fixed CI test failures: install security extras for JWT tests
v1.1.0
- Autonomous agents — ReAct loop, tool registry, short/long-term memory
- LLM providers — OpenAI and Anthropic adapters
- MCP tool server and client integration
- n8n community nodes (Agent, Coordinator, Resource Manager)
- Automated release pipeline: tag push triggers PyPI publish
v1.0.0
- Initial production release
- Core protocol: Message, Result<T,E>, Priority, ResourceRequest
- Runtime: MessageBroker, StateStore, TaskScheduler, AgentRegistry, HealthMonitor
- Security: JWT auth, AES-256-GCM secure links, audit logging
- Reliability: CircuitBreaker, RecoveryManager, FaultToleranceManager
- Adapters: A2A, MCP, FIPA ACL, AutoGen, CrewAI, LangGraph, OpenAI SDK, IBM ACP
- 818 tests passing, ~80% coverage