Agents¶
An agent is an async function registered on the mesh. It receives typed input, does work, and returns typed output.
In practice, any async function can be registered as an agent, not just LLM-driven code: deterministic tools, data transformers, event publishers, or anything else that fits the function shape. The library was designed primarily for multi-agent systems, so the "agent" name stuck in the API even though the abstraction is more general.
Registering an Agent¶
Define an AgentSpec with the agent's metadata, then apply @mesh.agent. This first example uses a non-streaming handler (the most common shape): take input, return one output.
from pydantic import BaseModel
from openagentmesh import AgentMesh, AgentSpec
mesh = AgentMesh()
class EchoInput(BaseModel):
message: str
class EchoOutput(BaseModel):
reply: str
spec = AgentSpec(name="echo", description="Echoes a message back.")
@mesh.agent(spec)
async def echo(req: EchoInput) -> EchoOutput:
return EchoOutput(reply=f"Echo: {req.message}")
The decorator:
- Inspects the handler shape to determine capabilities
- Builds an
AgentContractfrom the spec and handler type hints - On entering the context manager, subscribes to a NATS queue group at
mesh.agent.{channel}.{name} - Deserializes and validates input via Pydantic v2
- Calls your handler function
- Serializes the response and writes the contract to the registry
Handler Shapes¶
The SDK infers two things from the handler's signature at decoration time:
- Invocable. Does the handler promise a response to a caller? If so, the SDK subscribes it to a NATS request subject (with queue group for load balancing). Callers reach it via
mesh.call()ormesh.stream(). A handler promises a response when it accepts input (has a request parameter) or returns a typed result (has an output model without streaming). - Streaming. Is the handler an async generator (
yield)? If so, the response uses the streaming wire protocol instead of a single reply.
Handlers that are neither invocable nor streaming run as background tasks: the SDK launches them at startup and cancels them at shutdown. They never receive requests; they do their own work (watching KV, polling external systems, etc.).
These two properties combine into five common patterns:
| Pattern | Handler shape | Invocable | Streaming |
|---|---|---|---|
| Responder | async def f(req: In) -> Out: return ... |
Yes (has input) | No |
| Streamer | async def f(req: In) -> Chunk: yield ... |
Yes (has input) | Yes |
| Trigger | async def f() -> Out: return ... |
Yes (has output) | No |
| Publisher | async def f() -> Event: yield ... |
No | Yes |
| Watcher | async def f(): ... |
No | No |
No explicit type or capability flags. The handler shape is the source of truth.
Responder¶
The most common pattern: accept typed input, return typed output. Shown in the registration example above.
Streamer¶
class SummarizeChunk(BaseModel):
delta: str
spec = AgentSpec(name="nlp.summarizer", description="Streams a summary.")
@mesh.agent(spec)
async def summarize(req: SummarizeInput) -> SummarizeChunk:
async for token in call_llm_stream(req.text):
yield SummarizeChunk(delta=token)
Trigger¶
No input parameter, but returns a typed result. The call itself is the signal. Because the handler declares an output model, the SDK knows a caller is waiting for a response and subscribes it to a request subject.
class RefreshResult(BaseModel):
keys_refreshed: int
duration_ms: float
spec = AgentSpec(name="ops.refresh-cache", description="Flushes and rebuilds the cache. Returns refresh stats.")
@mesh.agent(spec)
async def refresh_cache() -> RefreshResult:
stats = await rebuild_cache()
return RefreshResult(keys_refreshed=stats.count, duration_ms=stats.elapsed)
Called without payload:
Publisher¶
class PriceEvent(BaseModel):
symbol: str
price: float
spec = AgentSpec(name="finance.price-feed", description="Emits price events.")
@mesh.agent(spec)
async def monitor_prices() -> PriceEvent:
while True:
yield PriceEvent(symbol="AAPL", price=await fetch_price())
await asyncio.sleep(1)
Watcher¶
No input, no output, no yield. Runs as a background task. The handler body typically contains a KV watch loop or other long-running coordination logic.
spec = AgentSpec(name="pipeline.extract", description="Watches for raw documents and extracts entities.")
@mesh.agent(spec)
async def extract():
async for value in mesh.kv.watch("pipeline.*.raw"):
doc = Document.model_validate_json(value)
extracted = do_extraction(doc)
await mesh.kv.put(f"pipeline.{doc.id}.extracted", extracted.model_dump_json())
All agents (including publishers and watchers) are visible in the catalog and participate in liveness tracking. Use mesh.catalog(invocable=True) to select only agents that accept calls.
Scaling background agents
Publishers and watchers run as background tasks and do not use queue groups. Every instance receives every KV update or emits its own event stream. For expensive processing in a watcher, delegate to an invocable agent via mesh.call(), which scales via queue groups. The watcher becomes a thin routing layer; the processing agent scales independently.
Type Hints¶
Handler type hints can be any type that Pydantic v2 can validate, not just BaseModel subclasses. Use scalar types, generics, or standard library types when a full model would be unnecessary ceremony.
Scalar types¶
spec = AgentSpec(name="greet", description="Greets by name.")
@mesh.agent(spec)
async def greet(name: str) -> str:
return f"Hello, {name}"
The contract schema reflects the scalar type:
Generic containers¶
spec = AgentSpec(name="split", description="Splits text into words.")
@mesh.agent(spec)
async def split(text: str) -> list[str]:
return text.split()
Supported types¶
Any type Pydantic's TypeAdapter can handle:
- Scalars:
str,int,float,bool - Standard library:
datetime,date,UUID,Path,Decimal,Enumsubclasses - Generics:
list[X],dict[str, X],set[X],tuple[X, ...] - Optionals and unions:
X | None,Optional[X],Union[X, Y] - Literals:
Literal["a", "b"] - Pydantic models:
BaseModelsubclasses
Types that cannot produce a JSON Schema (callables, IO objects) raise an error at decoration time.
Use BaseModel when your payload has multiple fields or when you want named, self-documenting schemas in the contract. Use scalar or generic types when the payload is a single value.
Lifecycle¶
Agents follow a predictable lifecycle:
- Start.
mesh.run()(blocking) orasync with mesh:(non-blocking context manager) - Register. Invocable agents subscribe to a NATS request subject. Publishers and watchers launch as background tasks. All agents write their contract to the registry.
- Serve. Invocable agents handle requests via queue group. Publishers emit events. Watchers react to state changes.
- Stop. Exiting the context manager: cancel background tasks, unsubscribe, drain, deregister, disconnect.
Queue Groups¶
Every invocable agent subscribes via a NATS queue group. This means multiple instances of the same agent automatically load-balance with no configuration changes. Deploy three replicas of summarizer and NATS distributes requests across them.