Quickstart¶
Installation¶
As a dependency (projects using OAM agents):
As a CLI tool (using oam commands directly):
For development:
Hello World¶
Start a local development server:
Register an agent (agent.py):
from pydantic import BaseModel
from openagentmesh import AgentMesh, AgentSpec
mesh = AgentMesh() # connects to localhost:4222
class EchoInput(BaseModel):
message: str
class EchoOutput(BaseModel):
reply: str
@mesh.agent(AgentSpec(name="echo", description="Echoes a message back."))
async def echo(req: EchoInput) -> EchoOutput:
return EchoOutput(reply=f"Echo: {req.message}")
mesh.run() # blocks, like uvicorn.run()
Now discover and call it from the terminal:
oam mesh catalog # list registered agents
oam agent contract echo # view the full contract and input schema
oam agent call echo '{"message": "hi"}' # invoke it
What just happened?
oam mesh up started a local development server with JetStream and KV buckets. @mesh.agent registered echo with a typed contract derived from the function signature. mesh.run() connects to the bus, publishes the contract, and serves requests until interrupted. The CLI discovered echo from the catalog without knowing its address.
Agent-to-Agent Calls¶
The fabric's value shows when agents call each other. One file, two agents: editor calls writer by name, with no import and no address.
from pydantic import BaseModel
from openagentmesh import AgentMesh, AgentSpec
mesh = AgentMesh()
class DraftInput(BaseModel):
topic: str
class DraftOutput(BaseModel):
text: str
class EditInput(BaseModel):
topic: str
class EditOutput(BaseModel):
polished: str
@mesh.agent(AgentSpec(name="writer", description="Drafts text on a given topic."))
async def writer(req: DraftInput) -> DraftOutput:
return DraftOutput(text=f"A draft about {req.topic}.")
@mesh.agent(AgentSpec(name="editor", description="Polishes a draft by calling writer."))
async def editor(req: EditInput) -> EditOutput:
draft = await mesh.call("writer", {"topic": req.topic})
return EditOutput(polished=draft["text"].upper())
mesh.run()
editor discovers and calls writer through the mesh. The same code works whether writer runs in the same process or on a different machine.
Two Separate Processes¶
The more realistic deployment: each agent runs independently.
writer.py:
from pydantic import BaseModel
from openagentmesh import AgentMesh, AgentSpec
mesh = AgentMesh()
class SummarizeInput(BaseModel):
text: str
max_length: int = 200
class SummarizeOutput(BaseModel):
summary: str
@mesh.agent(AgentSpec(
name="nlp.summarizer",
description="Summarizes text to a target length. Input: raw text. Not for structured data.",
))
async def summarize(req: SummarizeInput) -> SummarizeOutput:
return SummarizeOutput(summary=req.text[: req.max_length])
mesh.run()
consumer.py. Discovers the agent and calls it. async with mesh: is the idiom for interacting with the mesh without registering agents of your own, scripts, notebooks, and orchestrators that only discover and call.
import asyncio
from openagentmesh import AgentMesh
async def main():
mesh = AgentMesh()
async with mesh:
catalog = await mesh.catalog()
for entry in catalog:
print(entry.name, "-", entry.description)
result = await mesh.call(
"nlp.summarizer",
{"text": "OpenAgentMesh makes coding multi-agent systems as easy as writing a REST endpoint", "max_length": 50},
)
print(result["summary"])
asyncio.run(main())
Channels¶
Channels are the leading dot-segments of an agent's name. They group agents by domain or team.
spec = AgentSpec(
name="finance.risk.scorer",
description="Scores credit risk from a company profile.",
)
@mesh.agent(spec)
async def score(req: ScoreInput) -> ScoreOutput:
...
Call by full dotted name:
Discover by channel prefix:
agents = await mesh.catalog(channel="finance.risk") # exact tier and sub-tiers
agents = await mesh.catalog(channel="finance") # all of finance.*
Agents without a dot in the name register at the root level.
Connect to Shared NATS¶
Replace the default localhost connection with a connection string. Agent code is unchanged.
Note:
AgentMesh()connects tonats://localhost:4222by default (youroam mesh upserver). In staging and production, pass the connection string for your shared NATS deployment.
Embed in an Existing App¶
Use async with mesh: instead of mesh.run() to embed the mesh in an existing async application.
Async Callback Invocation¶
For non-blocking invocations, use mesh.send() with a callback:
from openagentmesh import MeshError
async def on_summary(result: dict):
print(result["summary"])
async def on_error(err: MeshError):
print(f"Failed: {err.message}")
await mesh.send(
"summarizer",
{"text": long_doc, "max_length": 500},
on_reply=on_summary,
on_error=on_error,
timeout=30.0,
)
# Continues immediately. Callback fires when the agent responds.
LLM Tool Definitions¶
Fetch an agent's full contract and use its schemas for LLM tool injection.
contract = await mesh.contract("nlp.summarizer")
# Access schemas directly
contract.input_schema # JSON Schema dict for the input model
contract.output_schema # JSON Schema dict for the output model
contract.description # LLM-consumable description
Build tool definitions for your LLM provider:
# Example: construct an Anthropic tool definition
tool = {
"name": contract.name,
"description": contract.description,
"input_schema": contract.input_schema,
}
Reference¶
AgentMesh¶
| Method | Description |
|---|---|
AgentMesh(url) |
Connect to an existing NATS server |
AgentMesh() |
Connect to localhost:4222 (default) |
async with AgentMesh.local() as mesh: |
Embedded NATS for tests and demos |
mesh.run() |
Start event loop, block until interrupted |
async with mesh: |
Connect, subscribe agents, and serve. Disconnects on exit. |
Registration¶
| Method | Description |
|---|---|
@mesh.agent(spec) |
Register an async function as a mesh agent. spec is an AgentSpec instance. |
Invocation¶
| Method | Description |
|---|---|
await mesh.call(name, payload, timeout=30.0) |
Synchronous request/reply. Returns dict. |
async for chunk in mesh.stream(name, payload) |
Streaming request. Yields dict chunks. |
await mesh.send(name, payload, on_reply=cb, on_error=err_cb) |
Managed async callback |
await mesh.send(name, payload, reply_to=subject) |
Manual reply subject |
await mesh.send(name, payload) |
Fire-and-forget |
Subscription¶
| Method | Description |
|---|---|
async for msg in mesh.subscribe(agent=name) |
Subscribe to an agent's event stream |
async for msg in mesh.subscribe(channel=prefix) |
Subscribe to all events in a channel (wildcard) |
async for msg in mesh.subscribe(subject=raw) |
Subscribe to a raw NATS subject |
Discovery¶
| Method | Description |
|---|---|
await mesh.catalog(channel=None, tags=None, streaming=None, invocable=None) |
Returns list[CatalogEntry] (channel is a prefix filter) |
await mesh.discover(channel=None) |
Full AgentContract objects |
await mesh.contract(name) |
Single agent's full contract (authoritative) |
AgentSpec¶
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Dotted identifier (e.g. finance.risk.scorer or echo) |
description |
str |
required | LLM-consumable description |
tags |
list[str] |
[] |
Searchable tags |
version |
str |
"0.1.0" |
Semver version |
CLI¶
| Command | Description |
|---|---|
oam mesh up |
Start local development server with JetStream and pre-created KV buckets |
oam mesh catalog |
List registered agents |