Python SDK for building AI agents over stdio_bus.
stdiobus gives you a reliable transport layer for ACP/MCP-style workflows: route requests to agent workers, keep request/response correlation stable, handle streaming updates, and work with async or sync Python code.
Use it when you want to focus on agent logic, not process wiring and message transport.
- Build ACP/MCP agents without writing transport plumbing.
- Send typed JSON-RPC requests with automatic session routing.
- Receive streamed agent output as a final aggregated text.
- Use the same protocol model across Python, Node, and Rust SDKs.
- Run locally with a binary, or via Docker when needed.
- Simple client API:
AsyncStdioBusandStdioBus. - Programmatic config: define worker pools in Python or use a config file.
- Session routing by default:
clientSessionIdis injected automatically. - Hello handshake:
stdio_bus/hellonegotiation support. - Protocol extensions: identity, audit metadata, and
agentIdrouting. - Streaming support:
agent_message_chunkaggregation into final response text. - Incremental streaming:
stream_request()yields agent output live as typed events (async-only). - Pull-based notifications:
subscribe_notifications()gives each consumer an independent async iterator (async-only). - Fluent construction:
StdioBusBuilderlayered on the existing constructor. - Predictable cancellation: in-flight requests fail with
TransportErroron shutdown or crash. - Cross-platform: subprocess backend with Docker fallback.
- Typed API: dataclasses and type hints for IDE support.
pip install stdiobusRequirements:
- Python 3.10+
stdio_busbinary in PATH (or Docker)
import asyncio
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig
async def main():
async with AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
)
) as bus:
result = await bus.request("echo", {"message": "hello"})
print(result)
asyncio.run(main())from stdiobus import StdioBus, BusConfig, PoolConfig
with StdioBus(
config=BusConfig(
pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
)
) as bus:
result = bus.request("echo", {"message": "hello"})
print(result)from stdiobus import (
AsyncStdioBus, BusConfig, PoolConfig,
HelloParams, RequestOptions, Identity, AuditEvent,
)
bus = AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="acp-worker", command="python", args=["./acp_worker.py"], instances=1)]
),
timeout_ms=60000,
)
await bus.start()
# Optional protocol handshake
hello = await bus.hello(HelloParams())
print("Negotiated:", hello.negotiated_protocol_version)
# Request with identity/audit metadata + agent routing
result = await bus.request(
"session/update",
{"input": "Summarize latest incident report"},
options=RequestOptions(
agent_id="agent-42",
identity=Identity(subject_id="user-123", role="operator"),
audit=AuditEvent(event_id="evt-1001", action="session/update"),
),
)
# If stream chunks were received, result["text"] contains aggregated output
print(result.get("text", result))
await bus.stop()
bus.destroy()from stdiobus import AsyncStdioBus, BusConfig, PoolConfig
async with AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="mcp-tools", command="python", args=["-m", "my_tools_worker"], instances=2)]
)
) as bus:
tools = await bus.request("tools/list")
print("Tools:", tools)
output = await bus.request("tools/call", {
"name": "search_docs",
"arguments": {"query": "retry policy"},
})
print(output)stream_request() sends a request and yields agent output live: zero or more
chunk events (one per agent_message_chunk, in arrival order) followed by
exactly one result event carrying the final result. The result event's
result["text"] is the same aggregated text request() would return for the
identical response, so you can render incrementally without giving up the final
value.
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig
async with AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="agent", command="python", args=["./agent_worker.py"], instances=1)]
)
) as bus:
async for event in bus.stream_request("session/prompt", {"input": "Tell me a story"}):
if event.type == "chunk":
print(event.text, end="", flush=True) # live output
elif event.type == "result":
print()
final_text = event.result.get("text", "") # aggregated, == request()Streaming is async-only — the synchronous
StdioBuswrapper keeps itsrequest()behavior and exposes nostream_request().Only one
stream_request()may be active at a time.agent_message_chunknotifications carry no per-request correlation id and are broadcast to every pending request, so a second concurrent stream is rejected withInvalidStateError. Run streams sequentially.
subscribe_notifications() returns an independent async iterator over JSON-RPC
notifications. Each subscriber owns its own bounded queue, so multiple
subscribers can consume the same stream at their own pace. This is additive to
the existing on_notification() push callback, which keeps firing unchanged.
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig
async with AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="agent", command="python", args=["./agent_worker.py"], instances=1)]
)
) as bus:
async for notification in bus.subscribe_notifications(max_queue=256, overflow="drop"):
print(notification["method"]) # treat the dict as read-onlyWhen a subscriber's bounded queue is full, the overflow policy applies to that
subscriber only: "drop" (default) discards the newest notification, "close"
terminates that subscriber. The iterator ends with StopAsyncIteration when the
subscription is closed or the bus is stopped/destroyed.
Pull-based subscriptions are async-only. The synchronous
StdioBuswrapper exposes nosubscribe_notifications(); itson_notification()push callback is unaffected and remains available on both clients.
StdioBusBuilder is a thin fluent layer over the existing constructor. Each
setter mirrors one constructor keyword and returns the builder for chaining;
build() produces an AsyncStdioBus and build_sync() produces a StdioBus.
The builder performs no validation of its own — all validation runs in the
unchanged __init__, so invalid inputs raise the same exceptions as direct
construction.
from stdiobus import StdioBusBuilder, BusConfig, PoolConfig
bus = (
StdioBusBuilder()
.config(BusConfig(pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]))
.backend("subprocess")
.timeout_ms(15000)
.build()
)
async with bus:
result = await bus.request("echo", {"message": "hello"})
print(result)from stdiobus import BusConfig, PoolConfig, LimitsConfig
config = BusConfig(
pools=[
PoolConfig(id="agent-a", command="python", args=["./worker_a.py"], instances=2),
PoolConfig(id="agent-b", command="python", args=["-m", "worker_b"], instances=1),
],
limits=LimitsConfig(
max_input_buffer=2_097_152,
max_restarts=10,
),
)from stdiobus import StdioBus
bus = StdioBus(config_path="./stdio-bus-config.json")config and config_path are mutually exclusive.
AsyncStdioBus(config=..., config_path=..., backend="auto", timeout_ms=...)StdioBus(...)— sync wrapperStdioBusBuilder()— fluent builder over the constructor (build()/build_sync())
| Method | Description |
|---|---|
start() |
Start the bus and spawn workers |
stop(timeout_sec=30) |
Stop gracefully, cancel in-flight requests |
connect(params) |
Start + optional hello handshake |
hello(params) |
Perform stdio_bus/hello handshake |
destroy() |
Release all resources |
| Method | Description |
|---|---|
request(method, params, ...) |
Send request and wait for response |
stream_request(method, ...) |
Async iterator of StreamEvent (chunks + final result); async-only |
notify(method, params, ...) |
Send notification (no response) |
send(message) |
Send raw JSON-RPC message |
on_message(handler) |
Register handler for all inbound messages |
on_notification(handler) |
Register handler for notifications only |
subscribe_notifications(...) |
Pull-based async iterator of notifications; async-only |
| Property | Description |
|---|---|
client_session_id |
Auto-generated routing session ID |
agent_session_id |
Agent-returned session ID (after hello) |
get_state() |
Current bus state |
get_stats() |
Runtime statistics |
get_backend_type() |
Active backend: subprocess, native, docker |
get_listen_mode() |
Effective external listener mode (native only; none otherwise) |
get_worker_count() |
Running workers, or -1 if the backend cannot report it |
get_client_count() |
Connected clients, or -1 if the backend cannot report it |
-1is a deliberate "not introspectable" sentinel. The subprocess and Docker backends have no channel to count daemon workers, so they return-1rather than a misleading0. For Docker,get_client_count()reports whether this SDK is connected to the container (0/1).
HelloParams, HelloResult, RequestOptions, Identity, AuditEvent,
BusConfig, PoolConfig, LimitsConfig, SubprocessOptions, NativeOptions,
ListenMode, StreamEvent, StreamEventType, OverflowPolicy,
NotificationSubscription
| Exception | When |
|---|---|
InvalidArgumentError |
Bad parameter or config |
InvalidStateError |
Operation not valid in current state |
TimeoutError |
Request exceeded deadline |
TransportError |
Transport failure, shutdown, or crash |
PolicyDeniedError |
Operation denied by policy |
- No automatic reconnect. If the bus process exits, pending requests fail with
TransportError. Create a new instance to reconnect. stop()cancels all in-flight requests withTransportErrorbefore stopping the backend.- Streaming chunks (
agent_message_chunk) are aggregated intoresult["text"]when the response result is a dict. stream_request()andsubscribe_notifications()are async-only; the synchronousStdioBusdoes not expose them.- Only one
stream_request()is active at a time; a concurrent stream raisesInvalidStateError. stdoutfrom the bus process is expected to carry NDJSON protocol messages only.
For most users, backend="auto" is the right choice. Details for those who need control:
| Backend | When | Config delivery |
|---|---|---|
subprocess |
stdio_bus binary in PATH (default) | --config-fd <N> pipe |
native |
libstdio_bus.a built with cffi | embed API (in-process) |
docker |
Docker available | --config <mounted-file> |
Auto-selection: subprocess → native → docker (Unix), subprocess → docker (Windows).
from stdiobus import StdioBus, SubprocessOptions
bus = StdioBus(
config=config,
backend="subprocess",
subprocess=SubprocessOptions(
binary_path="/usr/local/bin/stdio_bus",
start_timeout_sec=10.0,
),
)By default the bus runs embedded: messages flow through request()/send() and
on_message(). The native backend can instead open an external listener (TCP or
Unix socket) so that other processes connect and speak NDJSON directly.
This requires the native cffi bindings to be built
(python -m stdiobus.native.build_ffi). The subprocess and Docker backends do
not expose a user-controlled listener — passing a non-none listen_mode with
backend="subprocess" or backend="docker" raises InvalidArgumentError.
from stdiobus import AsyncStdioBus, BusConfig, PoolConfig, NativeOptions, ListenMode
bus = AsyncStdioBus(
config=BusConfig(
pools=[PoolConfig(id="echo", command="python", args=["./echo_worker.py"], instances=1)]
),
backend="native",
native=NativeOptions(
listen_mode=ListenMode.TCP,
tcp_host="127.0.0.1",
tcp_port=8765,
),
)NativeOptions validates its own arguments: ListenMode.TCP requires tcp_port
(in 1..65535) and ListenMode.UNIX requires unix_path.
pip install -e ".[dev]"
pytest -vApache-2.0