feat(agent): add token-by-token streaming to tool_calling_agent#1595
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
Walkthroughagent_node signatures extended to accept a RunnableConfig; tool-calling now streams via Changes
Sequence DiagramsequenceDiagram
participant Caller
participant Agent as ToolCallAgentGraph.agent_node
participant Graph as Graph.astream
participant State as state/messages
Caller->>Agent: call agent_node(state, config)
Agent->>Agent: merge/augment config (inject __pregel_runtime)
Agent->>Graph: call astream(..., config=run_config, stream_mode="messages")
loop per streamed chunk
Graph-->>Agent: AIMessageChunk (agent-directed) content
Agent->>Agent: accumulate chunk into response
end
Agent->>Agent: validate response non-empty
Agent->>State: append final response to state.messages
Agent->>Caller: return updated state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
9f827f9 to
973f223
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py (1)
126-166:⚠️ Potential issue | 🟠 MajorAdd tests for the
_stream_fnstreaming path.The
_stream_fnfunction inregister.py(lines 107–136) implements token-by-token streaming but has no test coverage. Add tests to verify:
- Yields only
AIMessageChunkcontent from the"agent"node- Skips tool-call chunks (
msg.tool_call_chunks)- Error handling and exception propagation
🧹 Nitpick comments (2)
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/agent.py (1)
103-103: In-place mutation of the caller'sconfigdict.
config["configurable"] = ...mutates the dict passed by the caller. In the LangGraph runtime this is typically safe since configs are created per invocation, but direct callers (including tests) could be surprised if they reuse the same dict. Consider working with a shallow copy to be defensive.♻️ Defensive copy
- config["configurable"] = {**(config.get("configurable") or {}), "__pregel_runtime": DEFAULT_RUNTIME} + config = {**config, "configurable": {**(config.get("configurable") or {}), "__pregel_runtime": DEFAULT_RUNTIME}}packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py (1)
126-162: Duplicated state-initialization logic with_response_fn.Lines 138-148 are nearly identical to lines 100-110 in
_response_fn(convert input,trim_messages, buildToolCallAgentGraphState). Extract a shared helper to reduce drift risk between the two paths.♻️ Extract shared helper
+ def _build_initial_state(chat_request_or_message: ChatRequestOrMessage) -> ToolCallAgentGraphState: + chat_request = GlobalTypeConverter.get().convert(chat_request_or_message, to_type=ChatRequest) + messages: list[BaseMessage] = trim_messages( + messages=[m.model_dump() for m in chat_request.messages], + max_tokens=config.max_history, + strategy="last", + token_counter=len, + start_on="human", + include_system=True, + ) + return ToolCallAgentGraphState(messages=messages) + async def _response_fn(chat_request_or_message: ChatRequestOrMessage) -> str: ... try: - message = GlobalTypeConverter.get().convert(chat_request_or_message, to_type=ChatRequest) - - # initialize the starting state with the user query - messages: list[BaseMessage] = trim_messages(messages=[m.model_dump() for m in message.messages], - max_tokens=config.max_history, - strategy="last", - token_counter=len, - start_on="human", - include_system=True) - state = ToolCallAgentGraphState(messages=messages) + state = _build_initial_state(chat_request_or_message) ...Apply the same replacement inside
_stream_fn.
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/agent.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py (1)
170-170:⚠️ Potential issue | 🟡 MinorStale log message references "react_agent" instead of "tool_calling_agent".
This appears to be a copy-paste artifact. The cleanup log should reference the correct workflow name.
Proposed fix
- logger.debug("%s Cleaning up react_agent workflow.", AGENT_LOG_PREFIX) + logger.debug("%s Cleaning up tool_calling_agent workflow.", AGENT_LOG_PREFIX)
🤖 Fix all issues with AI agents
In
`@packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/agent.py`:
- Line 98: The public async method agent_node on the ToolCallAgentGraphState
flow is missing a return type annotation; open the async def agent_node(self,
state: ToolCallAgentGraphState, config: RunnableConfig) implementation,
determine the actual value it returns (or that it returns nothing), and add an
explicit return type hint (e.g., -> None if it returns nothing, or ->
Awaitable[ActualType] / -> Coroutine[Any, Any, ActualType] / -> ActualType if
synchronous) using typing imports as needed; ensure the annotation references
the real return type rather than omitting typing (update imports for
Any/Awaitable/Coroutine if used).
- Line 103: The code currently mutates the caller's config dict in place by
assigning config["configurable"] = ... which can leak state; instead create a
shallow copy of the config (or the configurable sub-dict) and set the
"__pregel_runtime" there, or call LangGraph's merge_configs / patch_config
helper with DEFAULT_RUNTIME to produce a new config object; specifically, avoid
modifying the passed-in config variable and update the call site to use the
new_config (or merged result) so the original config, DEFAULT_RUNTIME, and the
"__pregel_runtime" key are handled without in-place mutation.
🧹 Nitpick comments (1)
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py (1)
126-162: Duplicated message-conversion and state-initialization logic.Lines 138–148 are nearly identical to lines 100–110 in
_response_fn. Consider extracting a small helper (e.g.,_prepare_state(chat_request_or_message)) to DRY this up, which also ensures any future changes to trimming or conversion are applied consistently.
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/agent.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/agent.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In
`@packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/react_agent/agent.py`:
- Line 158: The agent_node method currently accepts a config: RunnableConfig but
never uses it; fix by merging/forwarding the incoming config into the
RunnableConfig instances you build inside agent_node (instead of constructing
fresh ones), so runtime callbacks (e.g., streaming callbacks from LangGraph)
propagate into the ReAct path. Locate the two spots where you create new
RunnableConfig objects inside agent_node and replace those with merged configs
(e.g., new_config = existing_runnable_config.merge(config) or use the
appropriate RunnableConfig.merge/with_* helper used by the tool-calling agent)
so that callbacks and other runtime options from the incoming config are
preserved.
In `@packages/nvidia_nat_langchain/tests/agent/test_react.py`:
- Line 110: The test line calling mock_react_agent_no_raise.agent_node is longer
than the 120-char yapf limit; split the call across multiple lines to respect
column_limit by e.g. creating the ReActGraphState and/or config as separate
variables or formatting the await call with each argument on its own line so
that mock_react_agent_no_raise.agent_node(...),
ReActGraphState(messages=[HumanMessage('hi')]) and config={"configurable": {}}
are wrapped across lines; update the lines around the call to agent_node (the
test using mock_react_agent_no_raise.agent_node and
ReActGraphState/HumanMessage) similarly for the other long lines (1066, 1087,
1097).
🧹 Nitpick comments (1)
packages/nvidia_nat_langchain/tests/agent/test_react.py (1)
105-105: Consider extracting the repeatedconfigdict into a fixture or module-level constant.The literal
config={"configurable": {}}is repeated across ~20 test call sites. A shared constant or fixture would reduce duplication and make it easier to update if the default config shape changes.Example
# At module level or as a fixture: EMPTY_CONFIG = {"configurable": {}} # Then in tests: await mock_react_agent.agent_node(state, config=EMPTY_CONFIG)Also applies to: 110-110, 121-121
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/react_agent/agent.py
Outdated
Show resolved
Hide resolved
7d42c02 to
38bb137
Compare
Enable real-time SSE streaming for the tool_calling_agent via the OpenAI-compatible /v1/chat/completions endpoint. Changes: - Add _stream_fn using graph.astream(stream_mode="messages") to yield individual LLM tokens, registered via FunctionInfo.create() - Switch agent_node from ainvoke to astream so LangGraph's message streaming hooks can observe individual tokens from the LLM - Fix ChatResponseChunk.from_string() to use finish_reason=None (OpenAI spec: only the final chunk should have finish_reason="stop") - Add final stop chunk and data: [DONE] sentinel to SSE stream - Add test for graph.astream message streaming path Signed-off-by: Myles Shannon <mshannon@nvidia.com>
38bb137 to
154b826
Compare
|
/ok to test 154b826 |
willkill07
left a comment
There was a problem hiding this comment.
Minor changes, but otherwise LGTM!
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py
Outdated
Show resolved
Hide resolved
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py
Outdated
Show resolved
Hide resolved
- Add optional finish_reason param to ChatResponseChunk.from_string() with FINISH_REASONS validation - Move AIMessageChunk import to top of tool_calling_agent_workflow - Remove unnecessary try/except/finally around FunctionInfo.create yield Signed-off-by: Myles Shannon <mshannon@nvidia.com>
|
/ok to test 8869d68 |
|
/merge |
Description
This PR enables token-by-token streaming for the Tool Calling Agent when
stream=trueis set on chat completion requests. Previously, streaming requests returned the full response in a single Server-Sent Event; responses are now streamed incrementally as LLM tokens are produced.Changes:
register.py: Added a streaming entry point_stream_fnthat uses LangGraphastream(stream_mode="messages")to consume message chunks from the graph and yield onlyAIMessageChunkcontent from the agent node (excluding tool-call chunks). The Tool Calling Agent is now registered withFunctionInfo.create(single_fn=_response_fn, stream_fn=_stream_fn, ...)so both single-call and streaming are supported.agent.py: The agent node now acceptsRunnableConfig, forwards it to the underlying LLM, and usesself.agent.astream()instead ofainvoke()so the LLM streams. Chunks are accumulated into the finalAIMessagefor non-streaming paths, and config is merged so streaming callbacks propagate correctly.test_tool_calling.pyupdated soagent_nodeis called with a mockconfigargument to match the new signature.No new user-facing documentation was added; existing docstrings and inline comments were updated to describe the streaming behavior.
By Submitting this PR I confirm:
Summary by CodeRabbit
New Features
Refactor
Bug Fixes
Tests