Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 70 additions & 58 deletions src/openai/lib/streaming/responses/_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,60 +250,69 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven
events: List[ResponseStreamEvent[TextFormatT]] = []

if event.type == "response.output_text.delta":
output = snapshot.output[event.output_index]
assert output.type == "message"
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
assert output.type == "message"

content = output.content[event.content_index]
assert content.type == "output_text"
content = output.content[event.content_index]
assert content.type == "output_text"

events.append(
build(
ResponseTextDeltaEvent,
content_index=event.content_index,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
logprobs=event.logprobs,
type="response.output_text.delta",
snapshot=content.text,
events.append(
build(
ResponseTextDeltaEvent,
content_index=event.content_index,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
logprobs=event.logprobs,
type="response.output_text.delta",
snapshot=content.text,
)
)
)
else:
events.append(event)
elif event.type == "response.output_text.done":
output = snapshot.output[event.output_index]
assert output.type == "message"
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
assert output.type == "message"

content = output.content[event.content_index]
assert content.type == "output_text"
content = output.content[event.content_index]
assert content.type == "output_text"

events.append(
build(
ResponseTextDoneEvent[TextFormatT],
content_index=event.content_index,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
logprobs=event.logprobs,
type="response.output_text.done",
text=event.text,
parsed=parse_text(event.text, text_format=self._text_format),
events.append(
build(
ResponseTextDoneEvent[TextFormatT],
content_index=event.content_index,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
logprobs=event.logprobs,
type="response.output_text.done",
text=event.text,
parsed=parse_text(event.text, text_format=self._text_format),
)
)
)
else:
events.append(event)
Comment on lines +296 to +297

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve parsed done events when output index is missing

When response.output_text.done arrives with an out-of-bounds output_index, this branch now emits the raw event object, which does not have the parsed field that responses.stream() consumers expect for response.output_text.done events. In the exact tool/resume scenarios this patch targets, user code that accesses event.parsed after checking event.type == "response.output_text.done" will still fail (now with AttributeError instead of IndexError), even though parsing can still be performed from event.text without needing snapshot state.

Useful? React with 👍 / 👎.

elif event.type == "response.function_call_arguments.delta":
output = snapshot.output[event.output_index]
assert output.type == "function_call"

events.append(
build(
ResponseFunctionCallArgumentsDeltaEvent,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
type="response.function_call_arguments.delta",
snapshot=output.arguments,
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
assert output.type == "function_call"

events.append(
build(
ResponseFunctionCallArgumentsDeltaEvent,
delta=event.delta,
item_id=event.item_id,
output_index=event.output_index,
sequence_number=event.sequence_number,
type="response.function_call_arguments.delta",
snapshot=output.arguments,
)
)
)
else:
events.append(event)

elif event.type == "response.completed":
response = self._completed_response
Expand Down Expand Up @@ -341,21 +350,24 @@ def accumulate_event(self, event: RawResponseStreamEvent) -> ParsedResponseSnaps
else:
snapshot.output.append(event.item)
elif event.type == "response.content_part.added":
output = snapshot.output[event.output_index]
if output.type == "message":
output.content.append(
construct_type_unchecked(type_=cast(Any, ParsedContent), value=event.part.to_dict())
)
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
if output.type == "message":
output.content.append(
construct_type_unchecked(type_=cast(Any, ParsedContent), value=event.part.to_dict())
)
elif event.type == "response.output_text.delta":
output = snapshot.output[event.output_index]
if output.type == "message":
content = output.content[event.content_index]
assert content.type == "output_text"
content.text += event.delta
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
if output.type == "message":
content = output.content[event.content_index]
assert content.type == "output_text"
content.text += event.delta
elif event.type == "response.function_call_arguments.delta":
output = snapshot.output[event.output_index]
if output.type == "function_call":
output.arguments += event.delta
if event.output_index < len(snapshot.output):
output = snapshot.output[event.output_index]
if output.type == "function_call":
output.arguments += event.delta
elif event.type == "response.completed":
self._completed_response = parse_response(
text_format=self._text_format,
Expand Down