From c4b4483d558b47fa8148cc2c7eb5bae9ddada9e4 Mon Sep 17 00:00:00 2001 From: s-zx <2575376715@qq.com> Date: Sun, 8 Mar 2026 23:29:39 +0100 Subject: [PATCH] fix(streaming): skip empty SSE data and move error check outside thread. block Two related bugs in Stream.__stream__ and AsyncStream.__stream__: 1. JSONDecodeError on meta-only SSE events (fixes #2722) The SSE specification allows events with no data field (e.g. standalone 'retry:' or 'id:' directives). The SDK's SSE parser correctly sets data='' for these but __stream__ called sse.json() unconditionally, raising JSONDecodeError: Expecting value. Fix: skip events whose data is empty or whitespace before any JSON parsing. 2. Unreachable sse.event == 'error' check (fixes #2796) The error-event guard was nested inside the startswith('thread.') branch, making it logically impossible to trigger because 'error' != 'thread.*'. This was a regression from commit abc25966 ('fix(streaming): correct indentation') where the check was accidentally moved inside the block. Fix: move the error-event handling to the else branch (non-thread events), which is the correct location for standalone 'error' SSE events. --- src/openai/_streaming.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 45c13cc11d..33b35ea7a2 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -63,10 +63,20 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip SSE meta-only events that carry no data (e.g. standalone + # `retry:` or `id:` directives). Per the SSE spec these are valid + # but contain an empty data field; calling sse.json() on them + # raises JSONDecodeError. + if not sse.data or not sse.data.strip(): + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() - + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + else: + data = sse.json() + # Handle error events that carry an "error" event type (outside thread.* scope) if sse.event == "error" and is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -81,9 +91,6 @@ def __stream__(self) -> Iterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) - else: - data = sse.json() if is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -173,10 +180,20 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip SSE meta-only events that carry no data (e.g. standalone + # `retry:` or `id:` directives). Per the SSE spec these are valid + # but contain an empty data field; calling sse.json() on them + # raises JSONDecodeError. + if not sse.data or not sse.data.strip(): + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() - + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + else: + data = sse.json() + # Handle error events that carry an "error" event type (outside thread.* scope) if sse.event == "error" and is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -191,9 +208,6 @@ async def __stream__(self) -> AsyncIterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) - else: - data = sse.json() if is_mapping(data) and data.get("error"): message = None error = data.get("error")