From c4b4483d558b47fa8148cc2c7eb5bae9ddada9e4 Mon Sep 17 00:00:00 2001 From: s-zx <2575376715@qq.com> Date: Sun, 8 Mar 2026 23:29:39 +0100 Subject: [PATCH 1/2] fix(streaming): skip empty SSE data and move error check outside thread. block Two related bugs in Stream.__stream__ and AsyncStream.__stream__: 1. JSONDecodeError on meta-only SSE events (fixes #2722) The SSE specification allows events with no data field (e.g. standalone 'retry:' or 'id:' directives). The SDK's SSE parser correctly sets data='' for these but __stream__ called sse.json() unconditionally, raising JSONDecodeError: Expecting value. Fix: skip events whose data is empty or whitespace before any JSON parsing. 2. Unreachable sse.event == 'error' check (fixes #2796) The error-event guard was nested inside the startswith('thread.') branch, making it logically impossible to trigger because 'error' != 'thread.*'. This was a regression from commit abc25966 ('fix(streaming): correct indentation') where the check was accidentally moved inside the block. Fix: move the error-event handling to the else branch (non-thread events), which is the correct location for standalone 'error' SSE events. --- src/openai/_streaming.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 45c13cc11d..33b35ea7a2 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -63,10 +63,20 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip SSE meta-only events that carry no data (e.g. standalone + # `retry:` or `id:` directives). Per the SSE spec these are valid + # but contain an empty data field; calling sse.json() on them + # raises JSONDecodeError. + if not sse.data or not sse.data.strip(): + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() - + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + else: + data = sse.json() + # Handle error events that carry an "error" event type (outside thread.* scope) if sse.event == "error" and is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -81,9 +91,6 @@ def __stream__(self) -> Iterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) - else: - data = sse.json() if is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -173,10 +180,20 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip SSE meta-only events that carry no data (e.g. standalone + # `retry:` or `id:` directives). Per the SSE spec these are valid + # but contain an empty data field; calling sse.json() on them + # raises JSONDecodeError. + if not sse.data or not sse.data.strip(): + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() - + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + else: + data = sse.json() + # Handle error events that carry an "error" event type (outside thread.* scope) if sse.event == "error" and is_mapping(data) and data.get("error"): message = None error = data.get("error") @@ -191,9 +208,6 @@ async def __stream__(self) -> AsyncIterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) - else: - data = sse.json() if is_mapping(data) and data.get("error"): message = None error = data.get("error") From 0b82169fb6b63deea1b41ca0ef47bc6e56700f3f Mon Sep 17 00:00:00 2001 From: s-zx <2575376715@qq.com> Date: Sun, 8 Mar 2026 23:30:34 +0100 Subject: [PATCH 2/2] fix(streaming): add aclose() to AsyncStream for standard async cleanup protocol AsyncStream exposes close() but not aclose(), causing AttributeError when instrumentation libraries (e.g. Langfuse, OpenTelemetry wrappers) call the standard Python async cleanup convention: await stream.aclose() # AttributeError: 'AsyncStream' object has no attribute 'aclose' The error surfaces in production via the call chain: AsyncChatCompletionStreamManager.__aexit__ -> AsyncChatCompletionStream.close() -> self._response.aclose() <- AttributeError when _response is AsyncStream (happens when the raw stream is wrapped by instrumentation) aclose() is the standard async cleanup method used by asyncio.StreamWriter, httpx.AsyncByteStream, and async generators (PEP 525). Add it as a thin alias that delegates to close() so callers can use either name without special-casing AsyncStream. Fixes #2853 --- src/openai/_streaming.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 33b35ea7a2..3c43821066 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -252,6 +252,17 @@ async def close(self) -> None: """ await self.response.aclose() + async def aclose(self) -> None: + """Async-convention alias for :meth:`close`. + + Follows the standard Python async cleanup protocol used by + ``asyncio.StreamWriter``, ``httpx.AsyncByteStream``, and async + generators (PEP 525), allowing callers and instrumentation libraries + to call ``await stream.aclose()`` uniformly without special-casing + this class. + """ + await self.close() + class ServerSentEvent: def __init__(