fix(bedrock): parse streaming tool args from contentBlockDelta events#6207
fix(bedrock): parse streaming tool args from contentBlockDelta events#6207JYeswak wants to merge 1 commit into
Conversation
At contentBlockStop, both _handle_streaming_converse and
_ahandle_streaming_converse read current_tool_use.get('input', {})
which is always empty because the Converse streaming API delivers
tool arguments as JSON string fragments via contentBlockDelta events,
not on the initial contentBlockStart block.
Parse accumulated_tool_input (the concatenated JSON string) at
contentBlockStop and write the result back to current_tool_use['input']
so both the tool executor and the assistant message carry the real args.
Fixes crewAIInc#6149
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughIn both the synchronous and asynchronous Bedrock streaming handlers, the ChangesBedrock Streaming Tool-Arg Parsing
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/llms/providers/bedrock/completion.py`:
- Around line 1037-1049: The json.loads call in the try block parsing
accumulated_tool_input may return non-dict JSON types like lists or strings,
which will cause a TypeError when downstream code attempts to execute tool
functions with fn(**function_args). After the json.loads call where
function_args is assigned, add validation to ensure the parsed result is
actually a dictionary, and if it is not, set function_args to an empty dict and
log a warning. Apply this same validation to both occurrences mentioned in the
diff (around line 1039 in the accumulated_tool_input parsing and at line 1647 in
the similar code block).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 5af3120b-07c6-4ef9-8f8d-d73e1ab6784b
📒 Files selected for processing (2)
lib/crewai/src/crewai/llms/providers/bedrock/completion.pylib/crewai/tests/llms/bedrock/test_bedrock_streaming_tool_args.py
| if accumulated_tool_input: | ||
| try: | ||
| function_args = json.loads(accumulated_tool_input) | ||
| except json.JSONDecodeError: | ||
| function_args = {} | ||
| logging.warning( | ||
| f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}" | ||
| ) | ||
| else: | ||
| function_args = cast( | ||
| dict[str, Any], current_tool_use.get("input", {}) | ||
| ) | ||
| current_tool_use["input"] = function_args |
There was a problem hiding this comment.
Enforce object-shaped tool args after JSON parsing.
At Line [1039] and Line [1647], json.loads may return non-dict JSON (e.g., list/string/number). That breaks the downstream contract where tool execution does fn(**function_args), causing a runtime TypeError. Please validate parsed output is a dict before assigning to current_tool_use["input"].
Suggested patch
- if accumulated_tool_input:
- try:
- function_args = json.loads(accumulated_tool_input)
- except json.JSONDecodeError:
- function_args = {}
- logging.warning(
- f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}"
- )
+ if accumulated_tool_input:
+ try:
+ parsed_input = json.loads(accumulated_tool_input)
+ if isinstance(parsed_input, dict):
+ function_args = parsed_input
+ else:
+ function_args = {}
+ logging.warning(
+ "Parsed tool input is not a JSON object; "
+ f"type={type(parsed_input).__name__}. "
+ f"Raw input prefix: {accumulated_tool_input[:200]}"
+ )
+ except json.JSONDecodeError:
+ function_args = {}
+ logging.warning(
+ f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}"
+ )Also applies to: 1645-1657
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/llms/providers/bedrock/completion.py` around lines 1037
- 1049, The json.loads call in the try block parsing accumulated_tool_input may
return non-dict JSON types like lists or strings, which will cause a TypeError
when downstream code attempts to execute tool functions with
fn(**function_args). After the json.loads call where function_args is assigned,
add validation to ensure the parsed result is actually a dictionary, and if it
is not, set function_args to an empty dict and log a warning. Apply this same
validation to both occurrences mentioned in the diff (around line 1039 in the
accumulated_tool_input parsing and at line 1647 in the similar code block).
Problem
Every Bedrock streaming tool call silently receives empty arguments (
{}), causing pydanticField requiredvalidation failures on any tool with required parameters. This makes native tool calling unusable for all Bedrock models when streaming is enabled — the streaming counterpart of #4972 (fixed for non-streaming in #5415).Before (broken):
After (fixed):
Root cause
Both
_handle_streaming_converse(sync) and_ahandle_streaming_converse(async) accumulate tool input deltas intoaccumulated_tool_inputduringcontentBlockDeltaevents — but atcontentBlockStop, they read fromcurrent_tool_use.get("input", {})instead.current_tool_useis set atcontentBlockStart, which only carriesnameandtoolUseId. The Converse streaming API never putsinputon the start block; it delivers arguments as JSON string fragments across delta events.The non-streaming path works because Bedrock's
converse()response includes the completeinputdict on thetoolUseblock.This is an instance of a general pattern in streaming APIs (SSE, WebSocket, Converse): payloads arrive fragmented, so any handler that reads from the initial block instead of the accumulated buffer will silently drop content.
Fix
At
contentBlockStop, parseaccumulated_tool_inputviajson.loads()when it's non-empty. Fall back tocurrent_tool_use.get("input", {})only when no deltas were received (defensive). Write the parsed result back tocurrent_tool_use["input"]so the assistant message appended to conversation history also carries the real arguments.Applied identically to both sync and async streaming handlers. The change is 13 lines per handler (26 total production lines).
Tests
4 new tests in
test_bedrock_streaming_tool_args.py, each constructing a synthetic Converse stream:test_streaming_tool_args_parsed_from_deltas{"city": "Paris"}test_streaming_tool_args_single_chunktest_streaming_tool_args_many_chunkstest_streaming_tool_use_input_set_on_current_tool_usecurrent_tool_use["input"]populated in conversation historyResults: 4/4 new tests pass. 35/36 pre-existing Bedrock tests pass (1 pre-existing failure in
test_bedrock_raises_error_when_model_not_foundis unrelated — fails onmaintoo).Impact
This unblocks all Bedrock streaming tool calls in CrewAI. Any crew using
bedrock/...models withstream=Trueand tools with required parameters was hitting silent argument loss → validation errors → max-iteration exhaustion.Related