Skip to content

fix(bedrock): parse streaming tool args from contentBlockDelta events#6207

Open
JYeswak wants to merge 1 commit into
crewAIInc:mainfrom
JYeswak:fix/bedrock-streaming-tool-args
Open

fix(bedrock): parse streaming tool args from contentBlockDelta events#6207
JYeswak wants to merge 1 commit into
crewAIInc:mainfrom
JYeswak:fix/bedrock-streaming-tool-args

Conversation

@JYeswak

@JYeswak JYeswak commented Jun 17, 2026

Copy link
Copy Markdown

Problem

Every Bedrock streaming tool call silently receives empty arguments ({}), causing pydantic Field required validation failures on any tool with required parameters. This makes native tool calling unusable for all Bedrock models when streaming is enabled — the streaming counterpart of #4972 (fixed for non-streaming in #5415).

Before (broken):

contentBlockStart → toolUse: {name: "get_weather", toolUseId: "abc"}     # no "input"
contentBlockDelta → toolUse: {input: '{"city":'}                          # fragment 1
contentBlockDelta → toolUse: {input: ' "Paris"}'}                         # fragment 2
contentBlockStop  → function_args = current_tool_use.get("input", {})     # ← always {}
                  → tool receives: {}
                  → Field required [type=missing, input_value={}, input_type=dict]

After (fixed):

contentBlockStop  → function_args = json.loads('{"city": "Paris"}')       # accumulated string
                  → current_tool_use["input"] = {"city": "Paris"}         # backfilled for history
                  → tool receives: {"city": "Paris"}                      # ✓

Root cause

Both _handle_streaming_converse (sync) and _ahandle_streaming_converse (async) accumulate tool input deltas into accumulated_tool_input during contentBlockDelta events — but at contentBlockStop, they read from current_tool_use.get("input", {}) instead. current_tool_use is set at contentBlockStart, which only carries name and toolUseId. The Converse streaming API never puts input on the start block; it delivers arguments as JSON string fragments across delta events.

The non-streaming path works because Bedrock's converse() response includes the complete input dict on the toolUse block.

This is an instance of a general pattern in streaming APIs (SSE, WebSocket, Converse): payloads arrive fragmented, so any handler that reads from the initial block instead of the accumulated buffer will silently drop content.

Fix

At contentBlockStop, parse accumulated_tool_input via json.loads() when it's non-empty. Fall back to current_tool_use.get("input", {}) only when no deltas were received (defensive). Write the parsed result back to current_tool_use["input"] so the assistant message appended to conversation history also carries the real arguments.

Applied identically to both sync and async streaming handlers. The change is 13 lines per handler (26 total production lines).

Tests

4 new tests in test_bedrock_streaming_tool_args.py, each constructing a synthetic Converse stream:

Test What it verifies
test_streaming_tool_args_parsed_from_deltas Args split across 2 chunks → tool receives {"city": "Paris"}
test_streaming_tool_args_single_chunk Full JSON in one delta → multi-key args parsed correctly
test_streaming_tool_args_many_chunks Args split across 10 small fragments → reassembled correctly
test_streaming_tool_use_input_set_on_current_tool_use current_tool_use["input"] populated in conversation history

Results: 4/4 new tests pass. 35/36 pre-existing Bedrock tests pass (1 pre-existing failure in test_bedrock_raises_error_when_model_not_found is unrelated — fails on main too).

Impact

This unblocks all Bedrock streaming tool calls in CrewAI. Any crew using bedrock/... models with stream=True and tools with required parameters was hitting silent argument loss → validation errors → max-iteration exhaustion.

Related

At contentBlockStop, both _handle_streaming_converse and
_ahandle_streaming_converse read current_tool_use.get('input', {})
which is always empty because the Converse streaming API delivers
tool arguments as JSON string fragments via contentBlockDelta events,
not on the initial contentBlockStart block.

Parse accumulated_tool_input (the concatenated JSON string) at
contentBlockStop and write the result back to current_tool_use['input']
so both the tool executor and the assistant message carry the real args.

Fixes crewAIInc#6149

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

In both the synchronous and asynchronous Bedrock streaming handlers, the contentBlockStop branch now JSON-parses accumulated_tool_input into function_args (with a fallback to {} and a warning on decode failure) and writes the result back to current_tool_use["input"]. A new test module adds four regression tests covering multi-chunk, single-chunk, and many-small-delta reassembly, plus follow-up converse payload validation.

Changes

Bedrock Streaming Tool-Arg Parsing

Layer / File(s) Summary
contentBlockStop arg parsing (sync + async)
lib/crewai/src/crewai/llms/providers/bedrock/completion.py
Both _handle_streaming_converse (line 1037–1049) and _ahandle_streaming_converse (line 1645–1657) now JSON-parse accumulated_tool_input into function_args when present, fall back to {} with a logged warning on JSONDecodeError, and fall back to the existing current_tool_use["input"] when no accumulated input exists. The resolved args are written back to current_tool_use["input"] before proceeding to structured-output handling or tool execution.
Regression tests
lib/crewai/tests/llms/bedrock/test_bedrock_streaming_tool_args.py
New test module with an autouse fixture that patches AWS credentials and mocks the Bedrock Session client. A _build_stream_events helper constructs synthetic Converse streaming event sequences. Four tests assert correct behavior: multi-chunk delta reassembly and parse, single-chunk parse, many-small-delta reassembly, and verification that the follow-up non-streaming converse call receives an assistant message with the fully parsed toolUse["input"] dict.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: fixing Bedrock streaming tool argument parsing from contentBlockDelta events.
Linked Issues check ✅ Passed The PR fully addresses issue #6149 by parsing accumulated tool input as JSON at contentBlockStop and falling back defensively, with comprehensive test coverage for sync/async streaming scenarios.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing Bedrock streaming tool argument parsing; no unrelated modifications detected.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/llms/providers/bedrock/completion.py`:
- Around line 1037-1049: The json.loads call in the try block parsing
accumulated_tool_input may return non-dict JSON types like lists or strings,
which will cause a TypeError when downstream code attempts to execute tool
functions with fn(**function_args). After the json.loads call where
function_args is assigned, add validation to ensure the parsed result is
actually a dictionary, and if it is not, set function_args to an empty dict and
log a warning. Apply this same validation to both occurrences mentioned in the
diff (around line 1039 in the accumulated_tool_input parsing and at line 1647 in
the similar code block).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 5af3120b-07c6-4ef9-8f8d-d73e1ab6784b

📥 Commits

Reviewing files that changed from the base of the PR and between 0a577b7 and e76dd52.

📒 Files selected for processing (2)
  • lib/crewai/src/crewai/llms/providers/bedrock/completion.py
  • lib/crewai/tests/llms/bedrock/test_bedrock_streaming_tool_args.py

Comment on lines +1037 to +1049
if accumulated_tool_input:
try:
function_args = json.loads(accumulated_tool_input)
except json.JSONDecodeError:
function_args = {}
logging.warning(
f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}"
)
else:
function_args = cast(
dict[str, Any], current_tool_use.get("input", {})
)
current_tool_use["input"] = function_args

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Enforce object-shaped tool args after JSON parsing.

At Line [1039] and Line [1647], json.loads may return non-dict JSON (e.g., list/string/number). That breaks the downstream contract where tool execution does fn(**function_args), causing a runtime TypeError. Please validate parsed output is a dict before assigning to current_tool_use["input"].

Suggested patch
-                            if accumulated_tool_input:
-                                try:
-                                    function_args = json.loads(accumulated_tool_input)
-                                except json.JSONDecodeError:
-                                    function_args = {}
-                                    logging.warning(
-                                        f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}"
-                                    )
+                            if accumulated_tool_input:
+                                try:
+                                    parsed_input = json.loads(accumulated_tool_input)
+                                    if isinstance(parsed_input, dict):
+                                        function_args = parsed_input
+                                    else:
+                                        function_args = {}
+                                        logging.warning(
+                                            "Parsed tool input is not a JSON object; "
+                                            f"type={type(parsed_input).__name__}. "
+                                            f"Raw input prefix: {accumulated_tool_input[:200]}"
+                                        )
+                                except json.JSONDecodeError:
+                                    function_args = {}
+                                    logging.warning(
+                                        f"Failed to parse accumulated tool input: {accumulated_tool_input[:200]}"
+                                    )

Also applies to: 1645-1657

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/llms/providers/bedrock/completion.py` around lines 1037
- 1049, The json.loads call in the try block parsing accumulated_tool_input may
return non-dict JSON types like lists or strings, which will cause a TypeError
when downstream code attempts to execute tool functions with
fn(**function_args). After the json.loads call where function_args is assigned,
add validation to ensure the parsed result is actually a dictionary, and if it
is not, set function_args to an empty dict and log a warning. Apply this same
validation to both occurrences mentioned in the diff (around line 1039 in the
accumulated_tool_input parsing and at line 1647 in the similar code block).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant