Open Source · AGPL-3.0
v1.1.1 · S2.dev Durable Streaming
MAPLE is the multi-agent runtime that ships with reliability, security, and resource management built in — not bolted on later by your platform team.
Teams love the demos. Then spend months rebuilding production infrastructure that should have been there from day one.
A clean separation of concerns — wire protocol to autonomous agents — lets you adopt incrementally and swap components independently.
Adopt L1 as a wire protocol spec. Add L2 for production runtime. Use L3 for full autonomous agents. Each layer is independently useful.
Rust-inspired typed error handling eliminates silent failures. Every MAPLE operation returns a discriminated Result — errors are values you handle explicitly, not exceptions that propagate silently through your agent graph.
Protocol Primitive · Zero Silent FailuresDeclare CPU, memory, GPU, and network bandwidth requirements directly in protocol message fields. The scheduler routes work to agents that can actually satisfy those requirements — no more OOM kills or runaway GPU jobs.
Protocol Level · Scheduler-EnforcedAES-256-GCM encrypted, cryptographically verified communication channels. Establish a Link ID between two agents — all subsequent messages are authenticated against that channel, preventing impersonation and MITM.
AES-256-GCM · Channel VerifiedShared circuit breaker infrastructure with configurable thresholds, automatic HALF_OPEN probing, and recovery. Cascading failures are stopped at the boundary — not after taking down your entire fleet.
Shared · CLOSED→OPEN→HALF_OPENPluggable state backends — memory, file, or SQLite — with consistency models and sync primitives. Agents share state with well-defined semantics, not "figure out Redis yourself and hope for the best."
Memory · File · SQLite BackendsInterop without rewrites: A2A, MCP, FIPA ACL, AutoGen, CrewAI, LangGraph, OpenAI SDK, IBM ACP, S2.dev, n8n. Bridge MAPLE agents to external systems, or wrap existing agents in MAPLE's reliability layer.
10 Adapters · No Ecosystem Lock-InOther frameworks excel at orchestration ergonomics. MAPLE adds the production infrastructure layer they leave out.
| Feature | MAPLE | LangGraph | CrewAI | AutoGen | Google A2A |
|---|---|---|---|---|---|
| Production infra built-in | ✓ | — | — | — | partial |
| Result<T,E> typed errors | ✓ | — | — | — | — |
| Resource-aware messaging | ✓ | — | — | — | — |
| Secure link identification | ✓ | — | — | — | — |
| Built-in circuit breakers | ✓ | — | — | partial | — |
| Distributed state store | ✓ | partial | — | — | — |
| Priority queue scheduler | ✓ | — | — | — | — |
| Multi-protocol adapters | 10 adapters | — | — | — | 1 protocol |
| n8n no-code integration | 3 nodes | — | — | — | — |
| Orchestration ergonomics | ✓ | ✓ | ✓ | ✓ | partial |
MAPLE's APIs are explicit, typed, and designed for production — not just demos.
from maple import Agent, Message, Priority, Config
import asyncio
async def main():
# 1. Configure — memory:// broker for dev, nats:// for production
config = Config(
agent_id="my_agent",
broker_url="memory://localhost"
)
agent = Agent(config)
await agent.start()
# 2. Build a typed, priority-routed message
msg = Message(
message_type="PROCESS_DATA",
receiver="worker_agent",
priority=Priority.HIGH,
payload={"task": "analyze", "dataset_id": "q4-2025"}
)
# 3. Send — always returns Result[str, Error], never raises
result = agent.send(msg)
if result.is_ok():
msg_id = result.unwrap() # str: delivered message ID
print(f"✓ Delivered: {msg_id}")
else:
err = result.unwrap_err() # dict with code, message, recoverable
print(f"✗ [{err['code']}]: {err['message']}")
if err.get('recoverable'):
print(f" Suggestion: {err.get('suggestion')}")
await agent.stop()
asyncio.run(main())
from maple import Agent, Message, Priority, Config
from maple.core import Result
import asyncio
# Every MAPLE operation returns Result[T, Error].
# Errors are values — no exceptions in your agent graph.
async def main():
config = Config(agent_id="my_agent", broker_url="memory://localhost")
agent = Agent(config)
await agent.start()
msg = Message(message_type="TASK", receiver="worker", priority=Priority.HIGH)
result = agent.send(msg)
# ── Pattern 1: Explicit discriminated check (most readable)
if result.is_ok():
msg_id = result.unwrap() # safe: won't raise
print(f"✓ Delivered: {msg_id}")
else:
err = result.unwrap_err()
code = err.get('code', 'UNKNOWN')
txt = err.get('message', '')
if err.get('recoverable'):
print(f"⚠ Recoverable [{code}]: {txt}")
else:
print(f"✗ Fatal [{code}]: {txt}")
# ── Pattern 2: Functional map/map_err chaining
final = (agent.send(msg)
.map(lambda mid: f"sent:{mid}") # transform Ok value
.map_err(lambda e: {**e, "logged": True}) # enrich Err value
.unwrap_or("fallback")) # safe default on Err
# ── Pattern 3: Short-circuit with fallback function
value = agent.send(msg).unwrap_or_else(
lambda err: attempt_recovery(err) # called only on Err
)
# ── Pattern 4: Force-unwrap — raises ResultError on Err
# Use only when you are certain send() cannot fail (e.g. in tests)
msg_id = agent.send(msg).unwrap()
asyncio.run(main())
from maple import Message, Priority
from maple.resources import ResourceRequest, ResourceRange
# Declare exactly what your workload needs — at the protocol level.
# The scheduler routes to agents that can satisfy the requirements.
# Requests that can't be satisfied return Err — no silent overloads.
heavy_job = Message(
message_type="ML_INFERENCE",
receiver="gpu_worker_pool",
priority=Priority.HIGH,
payload={
"model": "llama3-70b",
"batch_size": 32,
"resources": ResourceRequest(
memory=ResourceRange(
min="16GB", preferred="32GB", max="64GB"
),
compute=ResourceRange( # CPU cores
min=8, preferred=16, max=32
),
gpu_memory=ResourceRange(
min="16GB", preferred="48GB"
),
network_bandwidth=ResourceRange(
min="1Gbps", preferred="10Gbps"
),
deadline="2025-12-01T18:00:00Z"
).to_dict()
}
)
# If no agent in the pool can satisfy resources,
# send() returns Err instead of silently queuing an unrunnable job.
result = agent.send(heavy_job)
from maple import Agent, Message, Config
from maple.security import SecurityConfig
import asyncio
async def main():
# 1. Configure with security
config = Config(
agent_id="compliance_agent",
security=SecurityConfig(
auth_type="JWT",
credentials={"secret": "your-256-bit-secret"},
encryption="AES-256-GCM"
)
)
agent = Agent(config)
await agent.start()
# 2. Establish a cryptographically-verified channel.
# The Link ID proves identity on both ends of the connection.
link_result = await agent.establish_link(
target_agent="data_processor",
security_level="MAXIMUM"
)
if link_result.is_ok():
link_id = link_result.unwrap() # cryptographic channel token
# 3. All messages over this link are authenticated
# against the Link ID. Forged or expired IDs → Err on delivery.
sensitive_msg = Message(
message_type="CONFIDENTIAL_PAYLOAD",
payload={
"record_id": "PATIENT-00421",
"data": "...encrypted payload..."
}
).with_link(link_id) # MAPLE-exclusive: link-bound message
result = agent.send(sensitive_msg)
# Tampering, replay, or MITM breaks channel verification → Err
if result.is_ok():
print(f"✓ Secure delivery confirmed: {result.unwrap()}")
else:
err = result.unwrap_err()
print(f"✗ Link verification failed: {err['message']}")
else:
print(f"✗ Could not establish link: {link_result.unwrap_err()['message']}")
await agent.stop()
asyncio.run(main())
from maple.error import CircuitBreaker, CircuitState
from maple.core import Err
# MAPLE's CircuitBreaker is shared infrastructure.
# Configure once, protect your entire agent mesh from cascading failures.
cb = CircuitBreaker(
failure_threshold=5, # open after 5 consecutive failures
recovery_timeout=30.0, # probe after 30s in OPEN state
half_open_max_calls=3 # max test calls in HALF_OPEN
)
# State machine: CLOSED → OPEN (threshold hit)
# → HALF_OPEN (timeout elapsed)
# → CLOSED (probes succeed) | OPEN (probes fail)
async def call_external_service(payload):
if not cb.should_allow():
# Circuit is OPEN — fail fast, protect the downstream
return Err({"code": "CIRCUIT_OPEN", "message": "Service unavailable"})
result = await external_api.call(payload)
if result.is_ok():
cb.record_success() # may transition HALF_OPEN → CLOSED
else:
cb.record_failure() # may transition CLOSED → OPEN
return result
# Inspect state for dashboards / alerting
state = cb.get_state()
print(f"State: {state.state}") # CLOSED | OPEN | HALF_OPEN
print(f"Failures: {state.failure_count}")
print(f"Since: {state.last_failure_time}")
MAPLE speaks your existing ecosystem's language. Integrate incrementally — no big-bang migrations required.
n8n ships with 3 MAPLE node types: MAPLE Agent, MAPLE Coordinator, and MAPLE Resource Manager — giving non-engineers a visual on-ramp into your agent infrastructure.
Production scenarios where "just use an agent framework" isn't sufficient.
50ms max-latency SLAs, atomic transactions, SERIALIZABLE isolation, automatic failover on degradation. MAPLE's circuit breakers and CRITICAL priority queues stop cascading failures in high-frequency environments.
Secure Link IDs for PHI channels, AuditLog for every tool call, AES-256-GCM encryption, Result<T,E> error chains for compliance traceability. Designed for environments where a silent failure is a regulatory event.
Resource-aware job dispatch — GPU memory, compute cores, and deadlines travel with every inference request. No more OOM kills or misconfigured queues silently eating your GPU budget at 3am.
Thousands of agents, real-time coordination, guaranteed delivery, bounded-latency mesh networking. EMERGENCY signals routed ahead of routine telemetry — priority separation that actually matters at the edge.
Dynamic discovery, weighted routing, distributed circuit breakers, health monitoring. Drop MAPLE in as the agent coordination layer without replacing your existing infrastructure — the 10 adapters make it seamless.
Multi-tenant isolation, per-agent auth policies, shared state with consistency guarantees. Each team owns their agents independently while sharing a single production runtime — with audit trails for every operation.
Install core, then add extras you need.
Start with in-memory broker for development, switch to NATS for production.
Every send returns a Result. Handle errors the right way from day one.
from maple import Agent, Message, Priority, Config
from maple.resources import ResourceRequest, ResourceRange
import asyncio
async def main():
# Configure
config = Config(
agent_id="starter_agent",
broker_url="memory://localhost"
)
agent = Agent(config)
await agent.start()
# Build a resource-aware, priority-routed message
msg = Message(
message_type="PROCESS",
receiver="worker",
priority=Priority.NORMAL,
payload={
"job": "summarize",
"resources": ResourceRequest(
memory=ResourceRange("1GB", "4GB", "8GB"),
compute=ResourceRange(2, 4, 8)
).to_dict()
}
)
# Send — always returns Result, never throws
result = agent.send(msg)
if result.is_ok():
print(f"✓ Sent: {result.unwrap()}")
else:
err = result.unwrap_err()
print(f"✗ {err['code']}: {err['message']}")
if err.get('recoverable'):
print(f" Suggestion: {err.get('suggestion')}")
await agent.stop()
asyncio.run(main())
MAPLE is open source, AGPL-3.0 licensed. Star the repo, file issues, contribute — or just use it to build something that lasts.