Temporal integration for Deep Agents — durable execution for AI agent workflows.
If your Deep Agent process crashes mid-task, all progress is lost. Sub-agents are ephemeral. Human-in-the-loop approval blocks a running process. deepagent-temporal solves these problems by running your Deep Agent as a Temporal Workflow:
- Durable execution — survives process crashes, restarts, and deployments
- Sub-agent dispatch — sub-agents run as independent Temporal Child Workflows
- Worker affinity — sticky task queues keep file operations on the same machine, side stepping the need of NFS or shared storage.
- Zero-resource HITL — workflow pauses consume no compute while waiting for approval
This project is experimental, use at your own risk
pip install deepagent-temporalRequires Python 3.10+, langgraph-temporal >= 0.1.0, and a running Temporal server.
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic
agent = create_deep_agent(
model=ChatAnthropic(model="claude-sonnet-4-20250514"),
tools=[read_file, write_file, execute],
system_prompt="You are a helpful coding assistant.",
backend=FilesystemBackend(root_dir="/workspace"),
)
# No durability — if the process crashes, all progress is lost.
# Sub-agents run in-process. HITL blocks a live process.
result = await agent.ainvoke(
{"messages": [HumanMessage(content="Fix the bug in main.py")]},
config={"configurable": {"thread_id": "task-123"}},
)from datetime import timedelta
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic
from temporalio.client import Client
from deepagent_temporal import TemporalDeepAgent
# 1. Create your agent exactly as before
agent = create_deep_agent(
model=ChatAnthropic(model="claude-sonnet-4-20250514"),
tools=[read_file, write_file, execute],
system_prompt="You are a helpful coding assistant.",
backend=FilesystemBackend(root_dir="/workspace"),
)
# 2. Connect to Temporal and wrap the agent
client = await Client.connect("localhost:7233")
temporal_agent = TemporalDeepAgent(
agent,
client,
task_queue="coding-agents",
use_worker_affinity=True, # automatic worker pinning
)
# 3. Same API — now with durable execution
result = await temporal_agent.ainvoke(
{"messages": [HumanMessage(content="Fix the bug in main.py")]},
config={"configurable": {"thread_id": "task-123"}},
)The ainvoke, astream, get_state, and resume APIs are identical. Your existing code changes by three lines.
The agent graph executes on a Temporal Worker. Run this in a separate process (or on a dedicated machine for filesystem affinity):
import asyncio
from temporalio.client import Client
from temporalio.worker import UnsandboxedWorkflowRunner
from deepagent_temporal import TemporalDeepAgent
async def main():
agent = create_deep_agent(...) # same setup as above
client = await Client.connect("localhost:7233")
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="coding-agents",
use_worker_affinity=True,
)
# create_worker() returns a WorkerGroup with two internal workers:
# one on the shared queue (Workflows + discovery) and one on a
# unique queue (node Activities).
worker = temporal_agent.create_worker(
workflow_runner=UnsandboxedWorkflowRunner(),
)
async with worker:
print("Worker running. Ctrl+C to stop.")
await asyncio.Future() # run forever
asyncio.run(main())Deep Agents often use FilesystemBackend — tools read and write files on the local disk. All Activities for an agent must run on the same worker to keep the filesystem consistent.
Enable use_worker_affinity=True and the framework handles it automatically following the Temporal worker-specific task queues pattern:
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="coding-agents",
use_worker_affinity=True, # transparent to the client
)How it works:
create_worker()generates a unique queue name per worker process and starts two internal workers: one on the shared queue (Workflows + discovery Activity), one on its unique queue (node Activities)- When a Workflow starts, it calls a
get_available_task_queueActivity on the shared queue — whichever worker picks it up returns its unique queue - All subsequent node Activities are dispatched to that discovered queue
- The discovered queue survives
continue-as-new— the same worker stays pinned across workflow runs - HITL waits have no timeout concern — the queue persists independently
The client never needs to know queue names. Workers self-register.
Deep Agents can spawn sub-agents via the task tool. With deepagent-temporal, each sub-agent runs as an independent Temporal Child Workflow with its own durability, timeout, and observability.
TemporalSubAgentMiddleware intercepts task tool calls and dispatches them as Child Workflows instead of running them in-process. Inject it before graph compilation:
from deepagent_temporal import TemporalSubAgentMiddleware
middleware = TemporalSubAgentMiddleware(
subagent_specs={
"researcher": "subagent:researcher",
"coder": "subagent:coder",
},
)
agent = create_deep_agent(
model=model,
tools=tools,
middleware=[middleware], # inject before compilation
# ... other params
)temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="main-agents",
subagent_task_queue="sub-agents", # separate queue for sub-agents
subagent_execution_timeout=timedelta(minutes=15), # per sub-agent timeout
)When the LLM invokes the task tool, the middleware stores a SubAgentRequest in a context variable. The Activity collects it, and the Workflow dispatches a Child Workflow. The result flows back as a ToolMessage to the parent agent — exactly matching the behavior of the in-process SubAgentMiddleware.
Deep Agents' interrupt() works out of the box. The workflow pauses with zero resource consumption and resumes when you send a Signal:
# Start the agent (non-blocking)
handle = await temporal_agent.astart(
{"messages": [HumanMessage(content="Refactor auth module")]},
config={"configurable": {"thread_id": "task-456"}},
)
# ... later, check if it's waiting for approval
state = await temporal_agent.get_state(
{"configurable": {"thread_id": "task-456"}}
)
if state["status"] == "interrupted":
print("Pending approval:", state["interrupts"])
# Approve and resume
await temporal_agent.resume(
{"configurable": {"thread_id": "task-456"}},
"approved",
)Token-level LLM streaming is supported via callback-based capture. Enable it with enable_token_streaming=True:
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="coding-agents",
enable_token_streaming=True, # capture LLM tokens via callbacks
)
# Stream tokens (arrives after each Activity completes)
async for token_event in temporal_agent.astream(
{"messages": [HumanMessage(content="Explain quantum computing")]},
config={"configurable": {"thread_id": "task-789"}},
stream_mode="tokens",
):
print(token_event["token"], end="", flush=True)For real-time token delivery (~10-50ms latency), add a Redis Streams sidecar:
from deepagent_temporal import TemporalDeepAgent, RedisStreamBackend
redis_backend = RedisStreamBackend(redis_url="redis://localhost:6379")
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="coding-agents",
enable_token_streaming=True,
redis_stream_backend=redis_backend, # real-time via Redis
)
# Tokens arrive in real-time as the LLM generates them
async for token_event in temporal_agent.astream(
{"messages": [HumanMessage(content="Explain quantum computing")]},
config={"configurable": {"thread_id": "task-789"}},
stream_mode="tokens",
):
print(token_event["token"], end="", flush=True)How it works: A StreamingNodeWrapper wraps each graph node's runnable and injects a TokenCapturingHandler (LangChain callback) into the config before ainvoke(). The handler intercepts on_llm_new_token events from the chat model. With Redis, tokens are published via XADD in real-time; without Redis, tokens are buffered in the Activity result. See docs/streaming-design.md for the full architecture.
This project is experimental. Before adopting, understand these constraints:
-
Token streaming requires opt-in configuration. By default,
astreamemits Activity-level events (node started, node completed), not individual LLM tokens. Enableenable_token_streaming=Truefor token-level capture via LangChain callback injection. For real-time delivery (~10-50ms latency), add aRedisStreamBackend. Without Redis, tokens are buffered and delivered after each Activity completes. See docs/streaming-design.md for the architecture and docs/comparison.md for how this compares to LangGraph Platform. -
UnsandboxedWorkflowRunneris required. LangGraph's imports trigger Temporal's workflow sandbox restrictions. All non-deterministic code runs in Activities (not the workflow function), so the practical risk is low. See docs/sandbox-tradeoffs.md for details. -
Payload size limits apply. Temporal has a ~2 MB per-event payload limit. Long conversations with hundreds of messages can exceed this. Use
SummarizationMiddlewareto compact history, or the claim-check pattern for large state. See docs/serialization.md. -
Double-retry risk with LLM calls. LLM SDKs retry internally. Temporal also retries failed Activities. Without configuration, both layers retry — causing duplicate API costs. Use
TemporalDeepAgent.recommended_retry_policies()to disable Temporal-level retries for LLM nodes. See docs/retry-semantics.md.
For testing without a Temporal server deployment:
temporal_agent = await TemporalDeepAgent.local(agent)
result = await temporal_agent.ainvoke({"messages": ["hello"]})This starts an in-process Temporal test server automatically.
# Unit + integration tests (uses in-process Temporal test server)
make test
# Integration tests only
make test_integration
# Integration tests against Dockerized Temporal
make test_integration_dockerMIT