-
Notifications
You must be signed in to change notification settings - Fork 719
Description
Checks
- I have updated to the lastest minor and patch version of Strands
- I have checked the documentation and this is not expected behavior
- I have searched ./issues and there are no duplicates of my issue
Strands Version
1.30.0
Python Version
3.14.3
Operating System
Debian (Docker container) / also reproduced on Windows 11 (MINGW64)
Installation Method
pip
Steps to Reproduce
When the Anthropic API stream terminates before sending the message_stop event (e.g. network timeout, connection reset, server error mid-stream), AnthropicModel.stream() crashes with AttributeError.
Minimal reproducible example (no API key needed):
"""
Requirements: pip install "strands-agents[anthropic]"
No API key needed — the Anthropic client is fully mocked.
"""
import asyncio
import traceback
from unittest.mock import AsyncMock, MagicMock, patch
from anthropic.types import (
Message,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawMessageStartEvent,
TextBlock,
TextDelta,
Usage,
)
from anthropic.lib.streaming._types import TextEvent
def make_mock_stream():
"""Create a mock Anthropic stream that terminates before message_stop.
A normal stream ends with: ... -> content_block_stop -> message_delta -> message_stop
This mock stops after content_block_delta + TextEvent, simulating early termination
(network timeout, connection reset, server error mid-stream).
"""
events = [
RawMessageStartEvent(
type="message_start",
message=Message(
id="msg_test",
type="message",
role="assistant",
content=[],
model="claude-sonnet-4-20250514",
stop_reason=None,
stop_sequence=None,
usage=Usage(input_tokens=10, output_tokens=0, cache_creation_input_tokens=0, cache_read_input_tokens=0),
),
),
RawContentBlockStartEvent(
type="content_block_start",
index=0,
content_block=TextBlock(type="text", text=""),
),
RawContentBlockDeltaEvent(
type="content_block_delta",
index=0,
delta=TextDelta(type="text_delta", text="Hello"),
),
# TextEvent is emitted alongside content_block_delta by the parsed stream.
# If the stream terminates here (no message_stop), this becomes the last event.
TextEvent(type="text", text="Hello", snapshot="Hello"),
]
stream = AsyncMock()
stream.__aenter__ = AsyncMock(return_value=stream)
stream.__aexit__ = AsyncMock(return_value=False)
async def aiter_events():
for event in events:
yield event
stream.__aiter__ = lambda self: aiter_events()
return stream
async def reproduce():
from strands.models.anthropic import AnthropicModel
mock_stream = make_mock_stream()
with patch("anthropic.AsyncAnthropic") as MockClient:
client_instance = MockClient.return_value
client_instance.messages.stream.return_value = mock_stream
model = AnthropicModel(
client_args={"api_key": "not-needed"},
model_id="claude-sonnet-4-20250514",
max_tokens=1024,
)
model.client = client_instance
messages = [{"role": "user", "content": [{"text": "Hello"}]}]
async for _event in model.stream(messages, None, None):
pass
if __name__ == "__main__":
import sys
print("Reproducing strands-agents AttributeError on early stream termination...\n", flush=True)
try:
asyncio.run(reproduce())
print("ERROR: Bug was not reproduced (no exception raised)")
sys.exit(1)
except AttributeError as e:
print("Bug reproduced!\n", flush=True)
traceback.print_exc(file=sys.stdout)
print(f"\n--- Summary ---")
print(f"AttributeError: {e}")
print(f"The stream terminated before 'message_stop' was sent.")
print(f"The last event was a TextEvent, which has no .message attribute.")
print(f"Fix: use 'await stream.get_final_message()' instead of 'event.message'")Expected Behavior
When the Anthropic API stream terminates before sending message_stop, the AnthropicModel.stream() method should handle the missing event gracefully — either by using await stream.get_final_message() to retrieve the accumulated message, or by raising a clear error indicating the stream was incomplete.
Actual Behavior
AnthropicModel.stream() crashes with:
AttributeError: 'TextEvent' object has no attribute 'message'
at strands/models/anthropic.py line 412:
usage = event.message.usage # type: ignoreAfter the async for event in stream loop, event holds the last yielded event. Normally this is ParsedMessageStopEvent (which has .message), but on early termination it can be any event type — in this case TextEvent, which only has type, text, and snapshot.
Additional Context
This was encountered in production running strands-agents inside an RQ worker in a Docker container, where network interruptions between the worker and the Anthropic API can cause the stream to terminate before message_stop arrives.
The bug exists on the current main branch (line 411) and all released versions through 1.30.0.
The vulnerable line has never been changed since it was introduced.
Related issues that touched the same code area but did not fix this:
- [BUG] PydanticSerializationUnexpectedValue warnings when Anthropic SDK returns ParsedTextBlock in message content #1746 — PydanticSerializationUnexpectedValue warnings (same code, different symptom)
- fix(anthropic): avoid Pydantic serialization warnings for ParsedTextBlock in message_stop events #1785 — Fixed model_dump() on message_stop events but left line 412 unchanged
Possible Solution
Replace line 412's reliance on the last loop variable with the Anthropic SDK's stream.get_final_message() API, which safely returns the accumulated message snapshot:
async with self.client.messages.stream(**request) as stream:
async for event in stream:
if event.type in AnthropicModel.EVENT_TYPES:
yield self.format_chunk(event.model_dump())
final_message = await stream.get_final_message()
usage = final_message.usage
yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()})