Skip to content

AgentMesh

The central class. Manages NATS connection, agent subscriptions, and lifecycle.

Construction

from openagentmesh import AgentMesh

# Connect to an existing NATS server
mesh = AgentMesh("nats://localhost:4222")

# Connect using default localhost URL
mesh = AgentMesh()

AgentMesh(url: str = "nats://localhost:4222")

Connect to a running NATS server. Defaults to nats://localhost:4222 when no URL is provided.

Parameter Type Default Description
url str "nats://localhost:4222" NATS connection URL

AgentMesh.local()

Async context manager that starts an embedded NATS subprocess with JetStream and pre-created KV buckets. For tests and quick demos only. The NATS process stops when the context exits.

async with AgentMesh.local() as mesh:
    # embedded NATS starts, KV buckets created
    result = await mesh.call("echo", {"message": "hello"})
    # NATS stops when context exits

Lifecycle

Two lifecycle models, one for each participation pattern.

mesh.run()

Blocking lifecycle for providers and hybrids. Connects to NATS, subscribes all registered agents, and blocks until interrupted. Similar to uvicorn.run().

mesh.run()

This is the standard entry point for any process that registers agents with @mesh.agent.

async with mesh:

Scoped lifecycle for consumers. Connects on entry, disconnects on exit. No agent registration; used by scripts, notebooks, and orchestrators that only discover and call agents.

async with mesh:
    result = await mesh.call("echo", {"message": "hello"})

Embedding in an existing async application (e.g. FastAPI lifespan):

async def lifespan(app):
    async with mesh:
        yield

Registration

@mesh.agent(spec)

Decorator to register an async function as a mesh agent. Takes a single AgentSpec instance.

from openagentmesh import AgentMesh, AgentSpec

mesh = AgentMesh()

spec = AgentSpec(
    name="nlp.summarizer",
    description="Summarizes text to a target length.",
    tags=["text", "summarization"],
)

@mesh.agent(spec)
async def summarize(req: SummarizeInput) -> SummarizeOutput:
    ...

Capabilities are inferred from the handler shape at decoration time. AgentSpec carries only human-authored metadata; invocable and streaming are never declared manually.

Handler shape invocable streaming Consumer API
async def f(req) -> Out: return ... True False mesh.call()
async def f(req) -> Chunk: yield ... True True mesh.stream()
async def f() -> Out: return ... True False mesh.call()
async def f() -> Event: yield ... False True mesh.subscribe()
async def f(): ... False False (background task)

Invocation

Four interaction modes. See Invocation for patterns and semantics.

await mesh.call(name, payload, *, timeout=30.0)

Synchronous request/reply. Blocks until the agent responds or times out.

Parameter Type Default Description
name str required Agent name
payload Any None Input payload (dict, Pydantic model, or any JSON-serializable value)
timeout float 30.0 Timeout in seconds

Returns: dict with the deserialized response payload.

async for chunk in mesh.stream(name, payload, *, timeout=60.0)

Streaming request. Yields response chunks as dicts.

Parameter Type Default Description
name str required Agent name
payload Any None Input payload (dict, Pydantic model, or any JSON-serializable value)
timeout float 60.0 Total stream timeout in seconds

Yields: dict chunks.

await mesh.send(name, payload, *, on_reply, on_error, reply_to, timeout)

Async callback invocation. Three modes: fire-and-forget, managed callback, or manual reply subject.

Parameter Type Default Description
name str required Agent name
payload Any None Input payload (dict, Pydantic model, or any JSON-serializable value)
on_reply Callable[[dict], Awaitable[None]] \| None None Callback for each reply message
on_error Callable[[MeshError], Awaitable[None]] \| None None Callback for timeout or error
reply_to str \| None None Manual reply subject (mutually exclusive with on_reply)
timeout float 60.0 Inactivity timeout (only applies with on_reply)
# Managed callback
await mesh.send("summarizer", payload, on_reply=handle, on_error=on_err, timeout=30.0)

# Fire-and-forget
await mesh.send("summarizer", payload)

# Manual reply subject
await mesh.send("summarizer", payload, reply_to="mesh.results.abc")

Subscription

async for msg in mesh.subscribe(*, agent, channel, subject, timeout)

Subscribe to events on a subject, agent, or channel. Returns an async generator yielding dicts.

Parameter Type Default Description
agent str \| None None Agent's dotted name (resolves to its event subject)
channel str \| None None Channel prefix (subscribes to mesh.agent.{channel}.>)
subject str \| None None Raw NATS subject
timeout float \| None None Inactivity timeout in seconds

At least one of agent, channel, or subject must be provided. agent and subject are mutually exclusive.

# Subscribe to an agent's event stream
async for event in mesh.subscribe(agent="price-feed"):
    print(event["symbol"], event["price"])

# Subscribe to all events in a channel
async for event in mesh.subscribe(channel="finance"):
    print(event)

# Subscribe to a raw subject
async for msg in mesh.subscribe(subject="mesh.results.abc123", timeout=30.0):
    print(msg)
    break

Discovery

await mesh.catalog(*, channel=None, tags=None, streaming=None, invocable=None)

Lightweight listing of registered agents. Returns typed CatalogEntry objects.

Parameter Type Default Description
channel str \| None None Filter by name prefix (an entry matches when its name equals channel or starts with channel + ".")
tags list[str] \| None None Filter by tags (all must match)
streaming bool \| None None Filter by streaming capability
invocable bool \| None None Filter by invocable capability

Returns: list[CatalogEntry]

catalog = await mesh.catalog(channel="nlp")

for entry in catalog:
    print(entry.name, "-", entry.description)
    # entry.invocable, entry.streaming, entry.version, entry.tags also available

await mesh.discover(*, channel=None)

Full AgentContract objects for all matching agents.

Parameter Type Default Description
channel str \| None None Filter by name prefix (same semantics as catalog(channel=...))

Returns: list[AgentContract]

await mesh.contract(name)

Fetch a single agent's full contract. This is the authoritative source.

Parameter Type Default Description
name str required Agent's dotted name

Returns: AgentContract

contract = await mesh.contract("nlp.summarizer")

contract.name             # "nlp.summarizer"
contract.description      # "Summarizes text..."
contract.input_schema     # JSON Schema dict
contract.output_schema    # JSON Schema dict
contract.invocable        # True
contract.streaming        # False

KV Store

Shared KV store for structured data exchange between agents.

await mesh.kv.put(key, value)

Store a value.

await mesh.kv.get(key)

Retrieve a value by key. Returns str.

async with mesh.kv.cas(key) as entry

Single-attempt compare-and-swap. Read entry.value, modify it, and the new value is written on exit with CAS semantics. For concurrent access, use update() instead.

await mesh.kv.update(key, fn)

CAS update with automatic retry. fn receives the current value and returns the new value. On revision conflict, the value is re-read and fn is called again.

def increment(value: str) -> str:
    return str(int(value) + 1)

await mesh.kv.update("counter", increment)

async for value in mesh.kv.watch(key)

Watch a key for changes. Yields the new value on each update.

await mesh.kv.delete(key)

Delete a key.

Workspace (Object Store)

Shared binary artifact storage backed by the NATS JetStream Object Store (mesh-artifacts bucket). Use for files, images, embeddings, or any binary payload too large for the KV store.

await mesh.workspace.put(key, data)

Store a binary artifact.

Parameter Type Description
key str Artifact key (supports / for hierarchy, e.g. docs/report.pdf)
data bytes \| str Content to store. Strings are UTF-8 encoded.

await mesh.workspace.get(key)

Retrieve an artifact by key. Returns bytes.

Parameter Type Description
key str Artifact key

Returns: bytes

await mesh.workspace.delete(key)

Delete an artifact.

Parameter Type Description
key str Artifact key
# Store and retrieve a binary artifact
await mesh.workspace.put("results/output.png", image_bytes)
data = await mesh.workspace.get("results/output.png")
await mesh.workspace.delete("results/output.png")