Skip to content

pradithya/deepagent-temporal

Repository files navigation

deepagent-temporal

Temporal integration for Deep Agents — durable execution for AI agent workflows.

If your Deep Agent process crashes mid-task, all progress is lost. Sub-agents are ephemeral. Human-in-the-loop approval blocks a running process. deepagent-temporal solves these problems by running your Deep Agent as a Temporal Workflow:

  • Durable execution — survives process crashes, restarts, and deployments
  • Sub-agent dispatch — sub-agents run as independent Temporal Child Workflows
  • Worker affinity — sticky task queues keep file operations on the same machine, side stepping the need of NFS or shared storage.
  • Zero-resource HITL — workflow pauses consume no compute while waiting for approval

This project is experimental, use at your own risk

Installation

pip install deepagent-temporal

Requires Python 3.10+, langgraph-temporal >= 0.1.0, and a running Temporal server.

Quick Start

Before: vanilla Deep Agent

from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic

agent = create_deep_agent(
    model=ChatAnthropic(model="claude-sonnet-4-20250514"),
    tools=[read_file, write_file, execute],
    system_prompt="You are a helpful coding assistant.",
    backend=FilesystemBackend(root_dir="/workspace"),
)

# No durability — if the process crashes, all progress is lost.
# Sub-agents run in-process. HITL blocks a live process.
result = await agent.ainvoke(
    {"messages": [HumanMessage(content="Fix the bug in main.py")]},
    config={"configurable": {"thread_id": "task-123"}},
)

After: Temporal-backed Deep Agent

from datetime import timedelta
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_anthropic import ChatAnthropic
from temporalio.client import Client

from deepagent_temporal import TemporalDeepAgent

# 1. Create your agent exactly as before
agent = create_deep_agent(
    model=ChatAnthropic(model="claude-sonnet-4-20250514"),
    tools=[read_file, write_file, execute],
    system_prompt="You are a helpful coding assistant.",
    backend=FilesystemBackend(root_dir="/workspace"),
)

# 2. Connect to Temporal and wrap the agent
client = await Client.connect("localhost:7233")
temporal_agent = TemporalDeepAgent(
    agent,
    client,
    task_queue="coding-agents",
    use_worker_affinity=True,  # automatic worker pinning
)

# 3. Same API — now with durable execution
result = await temporal_agent.ainvoke(
    {"messages": [HumanMessage(content="Fix the bug in main.py")]},
    config={"configurable": {"thread_id": "task-123"}},
)

The ainvoke, astream, get_state, and resume APIs are identical. Your existing code changes by three lines.

Running a Worker

The agent graph executes on a Temporal Worker. Run this in a separate process (or on a dedicated machine for filesystem affinity):

import asyncio
from temporalio.client import Client
from temporalio.worker import UnsandboxedWorkflowRunner

from deepagent_temporal import TemporalDeepAgent

async def main():
    agent = create_deep_agent(...)  # same setup as above

    client = await Client.connect("localhost:7233")
    temporal_agent = TemporalDeepAgent(
        agent, client,
        task_queue="coding-agents",
        use_worker_affinity=True,
    )

    # create_worker() returns a WorkerGroup with two internal workers:
    # one on the shared queue (Workflows + discovery) and one on a
    # unique queue (node Activities).
    worker = temporal_agent.create_worker(
        workflow_runner=UnsandboxedWorkflowRunner(),
    )
    async with worker:
        print("Worker running. Ctrl+C to stop.")
        await asyncio.Future()  # run forever

asyncio.run(main())

Worker Affinity via Worker-Specific Task Queues

Deep Agents often use FilesystemBackend — tools read and write files on the local disk. All Activities for an agent must run on the same worker to keep the filesystem consistent.

Enable use_worker_affinity=True and the framework handles it automatically following the Temporal worker-specific task queues pattern:

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="coding-agents",
    use_worker_affinity=True,  # transparent to the client
)

How it works:

  1. create_worker() generates a unique queue name per worker process and starts two internal workers: one on the shared queue (Workflows + discovery Activity), one on its unique queue (node Activities)
  2. When a Workflow starts, it calls a get_available_task_queue Activity on the shared queue — whichever worker picks it up returns its unique queue
  3. All subsequent node Activities are dispatched to that discovered queue
  4. The discovered queue survives continue-as-new — the same worker stays pinned across workflow runs
  5. HITL waits have no timeout concern — the queue persists independently

The client never needs to know queue names. Workers self-register.

Sub-Agents as Child Workflows

Deep Agents can spawn sub-agents via the task tool. With deepagent-temporal, each sub-agent runs as an independent Temporal Child Workflow with its own durability, timeout, and observability.

Setting up the middleware

TemporalSubAgentMiddleware intercepts task tool calls and dispatches them as Child Workflows instead of running them in-process. Inject it before graph compilation:

from deepagent_temporal import TemporalSubAgentMiddleware

middleware = TemporalSubAgentMiddleware(
    subagent_specs={
        "researcher": "subagent:researcher",
        "coder": "subagent:coder",
    },
)

agent = create_deep_agent(
    model=model,
    tools=tools,
    middleware=[middleware],  # inject before compilation
    # ... other params
)

Configuring sub-agent execution

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="main-agents",
    subagent_task_queue="sub-agents",           # separate queue for sub-agents
    subagent_execution_timeout=timedelta(minutes=15),  # per sub-agent timeout
)

When the LLM invokes the task tool, the middleware stores a SubAgentRequest in a context variable. The Activity collects it, and the Workflow dispatches a Child Workflow. The result flows back as a ToolMessage to the parent agent — exactly matching the behavior of the in-process SubAgentMiddleware.

Human-in-the-Loop

Deep Agents' interrupt() works out of the box. The workflow pauses with zero resource consumption and resumes when you send a Signal:

# Start the agent (non-blocking)
handle = await temporal_agent.astart(
    {"messages": [HumanMessage(content="Refactor auth module")]},
    config={"configurable": {"thread_id": "task-456"}},
)

# ... later, check if it's waiting for approval
state = await temporal_agent.get_state(
    {"configurable": {"thread_id": "task-456"}}
)
if state["status"] == "interrupted":
    print("Pending approval:", state["interrupts"])

    # Approve and resume
    await temporal_agent.resume(
        {"configurable": {"thread_id": "task-456"}},
        "approved",
    )

Token Streaming

Token-level LLM streaming is supported via callback-based capture. Enable it with enable_token_streaming=True:

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="coding-agents",
    enable_token_streaming=True,  # capture LLM tokens via callbacks
)

# Stream tokens (arrives after each Activity completes)
async for token_event in temporal_agent.astream(
    {"messages": [HumanMessage(content="Explain quantum computing")]},
    config={"configurable": {"thread_id": "task-789"}},
    stream_mode="tokens",
):
    print(token_event["token"], end="", flush=True)

For real-time token delivery (~10-50ms latency), add a Redis Streams sidecar:

from deepagent_temporal import TemporalDeepAgent, RedisStreamBackend

redis_backend = RedisStreamBackend(redis_url="redis://localhost:6379")

temporal_agent = TemporalDeepAgent(
    agent, client,
    task_queue="coding-agents",
    enable_token_streaming=True,
    redis_stream_backend=redis_backend,  # real-time via Redis
)

# Tokens arrive in real-time as the LLM generates them
async for token_event in temporal_agent.astream(
    {"messages": [HumanMessage(content="Explain quantum computing")]},
    config={"configurable": {"thread_id": "task-789"}},
    stream_mode="tokens",
):
    print(token_event["token"], end="", flush=True)

How it works: A StreamingNodeWrapper wraps each graph node's runnable and injects a TokenCapturingHandler (LangChain callback) into the config before ainvoke(). The handler intercepts on_llm_new_token events from the chat model. With Redis, tokens are published via XADD in real-time; without Redis, tokens are buffered in the Activity result. See docs/streaming-design.md for the full architecture.

Limitations

This project is experimental. Before adopting, understand these constraints:

  • Token streaming requires opt-in configuration. By default, astream emits Activity-level events (node started, node completed), not individual LLM tokens. Enable enable_token_streaming=True for token-level capture via LangChain callback injection. For real-time delivery (~10-50ms latency), add a RedisStreamBackend. Without Redis, tokens are buffered and delivered after each Activity completes. See docs/streaming-design.md for the architecture and docs/comparison.md for how this compares to LangGraph Platform.

  • UnsandboxedWorkflowRunner is required. LangGraph's imports trigger Temporal's workflow sandbox restrictions. All non-deterministic code runs in Activities (not the workflow function), so the practical risk is low. See docs/sandbox-tradeoffs.md for details.

  • Payload size limits apply. Temporal has a ~2 MB per-event payload limit. Long conversations with hundreds of messages can exceed this. Use SummarizationMiddleware to compact history, or the claim-check pattern for large state. See docs/serialization.md.

  • Double-retry risk with LLM calls. LLM SDKs retry internally. Temporal also retries failed Activities. Without configuration, both layers retry — causing duplicate API costs. Use TemporalDeepAgent.recommended_retry_policies() to disable Temporal-level retries for LLM nodes. See docs/retry-semantics.md.

Local Development

For testing without a Temporal server deployment:

temporal_agent = await TemporalDeepAgent.local(agent)
result = await temporal_agent.ainvoke({"messages": ["hello"]})

This starts an in-process Temporal test server automatically.

Testing

# Unit + integration tests (uses in-process Temporal test server)
make test

# Integration tests only
make test_integration

# Integration tests against Dockerized Temporal
make test_integration_docker

License

MIT

About

Temporal integration for Deep Agent

Topics

Resources

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors