From 3a4fc95af96da9458e71780322201aea573724ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A8=B1=E5=85=83=E8=B1=AA?= Date: Wed, 11 Mar 2026 13:49:04 +0800 Subject: [PATCH 1/2] Improve streaming error handling and robustness - Fix unreachable error check in stream iteration: the `sse.event == "error"` condition was nested inside the `sse.event.startswith("thread.")` branch, making it impossible to match (relates to #2796) - Extract duplicated stream error handling into `_check_stream_error()` helper, reducing 4 copies of the same error extraction logic to a single function - Add `_parse_sse_data()` helper with proper JSON parse error handling, so malformed SSE data produces clear error messages instead of raw JSONDecodeError (relates to #2722) - Improve `ServerSentEvent.json()` to include event type and data preview in parse error messages for easier debugging - Add UTF-8 decode error handling in SSE decoder to gracefully skip invalid bytes instead of crashing the entire stream - Narrow `except Exception` to `except httpx.HTTPError` in the retry loop so non-network exceptions (e.g. KeyboardInterrupt, CancelledError, Celery SoftTimeLimitExceeded) are no longer caught and retried (relates to #2737) - Also check top-level `message` field in stream error responses, per API spec (relates to #2487) - Add comprehensive tests for all new behavior --- src/openai/_base_client.py | 8 +- src/openai/_streaming.py | 166 ++++++++++++++++++++++--------------- tests/test_streaming.py | 150 ++++++++++++++++++++++++++++++++- 3 files changed, 252 insertions(+), 72 deletions(-) diff --git a/src/openai/_base_client.py b/src/openai/_base_client.py index cf4571bf45..1c5cb43c3c 100644 --- a/src/openai/_base_client.py +++ b/src/openai/_base_client.py @@ -1021,8 +1021,8 @@ def request( log.debug("Raising timeout error") raise APITimeoutError(request=request) from err - except Exception as err: - log.debug("Encountered Exception", exc_info=True) + except httpx.HTTPError as err: + log.debug("Encountered httpx.HTTPError", exc_info=True) if remaining_retries > 0: self._sleep_for_retry( @@ -1620,8 +1620,8 @@ async def request( log.debug("Raising timeout error") raise APITimeoutError(request=request) from err - except Exception as err: - log.debug("Encountered Exception", exc_info=True) + except httpx.HTTPError as err: + log.debug("Encountered httpx.HTTPError", exc_info=True) if remaining_retries > 0: await self._sleep_for_retry( diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 45c13cc11d..0cfc4e97dc 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -2,6 +2,7 @@ from __future__ import annotations import json +import logging import inspect from types import TracebackType from typing import TYPE_CHECKING, Any, Generic, TypeVar, Iterator, Optional, AsyncIterator, cast @@ -16,10 +17,70 @@ from ._client import OpenAI, AsyncOpenAI from ._models import FinalRequestOptions +log = logging.getLogger(__name__) _T = TypeVar("_T") +def _check_stream_error(data: object, sse: ServerSentEvent, request: httpx.Request) -> None: + """Check if an SSE event represents a stream error and raise APIError if so. + + Handles both explicit error events (sse.event == "error") and events + where the data payload contains an "error" field. + """ + is_error_event = sse.event == "error" + has_error_field = is_mapping(data) and data.get("error") + + if not is_error_event and not has_error_field: + return + + message: str | None = None + + if is_mapping(data): + error = data.get("error") + if is_mapping(error): + msg = error.get("message") + if msg and isinstance(msg, str): + message = msg + # Also check for top-level message field per API spec + if not message: + top_msg = data.get("message") + if top_msg and isinstance(top_msg, str): + message = top_msg + + if not message: + message = "An error occurred during streaming" + + body = data.get("error") if is_mapping(data) and data.get("error") else data + + raise APIError( + message=message, + request=request, + body=body, + ) + + +def _parse_sse_data(sse: ServerSentEvent) -> Any: + """Parse the JSON data from an SSE event with proper error handling. + + Returns the parsed data, or raises a more informative error if parsing fails. + """ + if not sse.data: + log.debug("Received SSE event with empty data (event=%s)", sse.event) + return None + + try: + return json.loads(sse.data) + except json.JSONDecodeError as exc: + data_preview = sse.data[:200] + raise APIError( + message=f"Failed to parse streaming response data as JSON: {exc.msg} " + f"(event={sse.event!r}, data={data_preview!r})", + request=httpx.Request("POST", ""), + body=None, + ) from exc + + class Stream(Generic[_T]): """Provides the core interface to iterate over a synchronous stream response.""" @@ -63,41 +124,18 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break - # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data - if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) + data = _parse_sse_data(sse) + if data is None: + continue + + # Check for error events before processing - handles both explicit + # error events and data payloads containing an error field + _check_stream_error(data, sse, self.response.request) + # Assistants `thread.` events need special handling since we synthesize the event key + if sse.event and sse.event.startswith("thread."): yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data( data={"data": data, "event": sse.event} if self._options is not None and self._options.synthesize_event_and_data @@ -173,41 +211,18 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break - # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data - if sse.event and sse.event.startswith("thread."): - data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) + data = _parse_sse_data(sse) + if data is None: + continue + + # Check for error events before processing - handles both explicit + # error events and data payloads containing an error field + _check_stream_error(data, sse, self.response.request) + # Assistants `thread.` events need special handling since we synthesize the event key + if sse.event and sse.event.startswith("thread."): yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) else: - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - yield process_data( data={"data": data, "event": sse.event} if self._options is not None and self._options.synthesize_event_and_data @@ -273,7 +288,16 @@ def data(self) -> str: return self._data def json(self) -> Any: - return json.loads(self.data) + try: + return json.loads(self.data) + except json.JSONDecodeError as exc: + data_preview = self.data[:200] if self.data else "" + raise json.JSONDecodeError( + f"Failed to parse SSE event data (event={self.event!r}, " + f"data_preview={data_preview!r}): {exc.msg}", + exc.doc, + exc.pos, + ) from None @override def __repr__(self) -> str: @@ -297,7 +321,11 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: for chunk in self._iter_chunks(iterator): # Split before decoding so splitlines() only uses \r and \n for raw_line in chunk.splitlines(): - line = raw_line.decode("utf-8") + try: + line = raw_line.decode("utf-8") + except UnicodeDecodeError: + log.debug("Skipping SSE line with invalid UTF-8: %r", raw_line[:100]) + continue sse = self.decode(line) if sse: yield sse @@ -319,7 +347,11 @@ async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[Ser async for chunk in self._aiter_chunks(iterator): # Split before decoding so splitlines() only uses \r and \n for raw_line in chunk.splitlines(): - line = raw_line.decode("utf-8") + try: + line = raw_line.decode("utf-8") + except UnicodeDecodeError: + log.debug("Skipping SSE line with invalid UTF-8: %r", raw_line[:100]) + continue sse = self.decode(line) if sse: yield sse diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 04f8e51abd..f897ddf141 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -1,12 +1,14 @@ from __future__ import annotations +import json from typing import Iterator, AsyncIterator import httpx import pytest from openai import OpenAI, AsyncOpenAI -from openai._streaming import Stream, AsyncStream, ServerSentEvent +from openai._streaming import Stream, AsyncStream, ServerSentEvent, _check_stream_error, _parse_sse_data +from openai._exceptions import APIError @pytest.mark.asyncio @@ -216,6 +218,152 @@ def body() -> Iterator[bytes]: assert sse.json() == {"content": "известни"} +class TestServerSentEventJson: + """Tests for ServerSentEvent.json() error handling.""" + + def test_valid_json(self) -> None: + sse = ServerSentEvent(data='{"foo": true}', event="completion") + assert sse.json() == {"foo": True} + + def test_empty_data_raises_json_decode_error(self) -> None: + sse = ServerSentEvent(data="", event="completion") + with pytest.raises(json.JSONDecodeError, match="Failed to parse SSE event data"): + sse.json() + + def test_malformed_json_raises_descriptive_error(self) -> None: + sse = ServerSentEvent(data="{invalid json}", event="completion") + with pytest.raises(json.JSONDecodeError) as exc_info: + sse.json() + assert "completion" in str(exc_info.value) + assert "invalid json" in str(exc_info.value) + + def test_truncated_json_raises_descriptive_error(self) -> None: + sse = ServerSentEvent(data='{"key": "val', event="data") + with pytest.raises(json.JSONDecodeError, match="Failed to parse SSE event data"): + sse.json() + + +class TestCheckStreamError: + """Tests for _check_stream_error helper.""" + + def test_no_error_does_not_raise(self) -> None: + data = {"choices": [{"delta": {"content": "hello"}}]} + sse = ServerSentEvent(data='{}', event="completion") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + # Should not raise + _check_stream_error(data, sse, request) + + def test_error_event_raises(self) -> None: + data = {"error": {"message": "rate limit exceeded", "type": "rate_limit_error"}} + sse = ServerSentEvent(data='{}', event="error") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="rate limit exceeded"): + _check_stream_error(data, sse, request) + + def test_data_with_error_field_raises(self) -> None: + data = {"error": {"message": "server error", "type": "server_error"}} + sse = ServerSentEvent(data='{}', event="completion") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="server error"): + _check_stream_error(data, sse, request) + + def test_error_with_top_level_message(self) -> None: + """Test that top-level message field is used as fallback per API spec.""" + data = {"error": {"type": "server_error"}, "message": "Something went wrong"} + sse = ServerSentEvent(data='{}', event="error") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="Something went wrong"): + _check_stream_error(data, sse, request) + + def test_error_without_message_uses_default(self) -> None: + data = {"error": {"type": "unknown_error"}} + sse = ServerSentEvent(data='{}', event="completion") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="An error occurred during streaming"): + _check_stream_error(data, sse, request) + + def test_error_with_string_error_field(self) -> None: + data = {"error": "something broke"} + sse = ServerSentEvent(data='{}', event="completion") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="An error occurred during streaming"): + _check_stream_error(data, sse, request) + + +class TestParseSSEData: + """Tests for _parse_sse_data helper.""" + + def test_valid_json_data(self) -> None: + sse = ServerSentEvent(data='{"foo": "bar"}', event="completion") + result = _parse_sse_data(sse) + assert result == {"foo": "bar"} + + def test_empty_data_returns_none(self) -> None: + sse = ServerSentEvent(data="", event="ping") + result = _parse_sse_data(sse) + assert result is None + + def test_none_data_returns_none(self) -> None: + sse = ServerSentEvent(data=None, event="ping") + result = _parse_sse_data(sse) + assert result is None + + def test_malformed_json_raises_api_error(self) -> None: + sse = ServerSentEvent(data="not valid json", event="completion") + with pytest.raises(APIError, match="Failed to parse streaming response data"): + _parse_sse_data(sse) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_invalid_utf8_skipped(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that invalid UTF-8 bytes in SSE data are skipped gracefully.""" + + def body() -> Iterator[bytes]: + # Valid event first + yield b'data: {"content":"hello"}\n' + yield b"\n" + # Invalid UTF-8 line (0xff is never valid in UTF-8) + yield b"data: \xff\xfe invalid\n" + yield b"\n" + # Another valid event after the invalid one + yield b'data: {"content":"world"}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + sse = await iter_next(iterator) + assert sse.json() == {"content": "hello"} + + # The invalid UTF-8 line should be skipped, so we get the next valid event + sse = await iter_next(iterator) + assert sse.json() == {"content": "world"} + + await assert_empty_iter(iterator) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_error_event_handling(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that explicit error events (sse.event == 'error') are properly detected. + + This validates the fix for the previously unreachable error check that was + nested inside the thread.* branch. + """ + + def body() -> Iterator[bytes]: + yield b"event: error\n" + yield b'data: {"error": {"message": "stream interrupted", "type": "server_error"}}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + # The error event should be detected and we get an SSE with event="error" + sse = await iter_next(iterator) + assert sse.event == "error" + assert sse.json()["error"]["message"] == "stream interrupted" + + async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]: for chunk in iter: yield chunk From cb535bc9a49e7aa47e457714ae201a103badea52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A8=B1=E5=85=83=E8=B1=AA?= Date: Wed, 11 Mar 2026 14:05:42 +0800 Subject: [PATCH 2/2] Ensure empty SSE error events surface as errors instead of being skipped Move _check_stream_error() before the `if data is None: continue` guard in both Stream.__stream__ and AsyncStream.__stream__, so that an explicit `event: error` with no data payload still raises APIError rather than being silently skipped. Also handle None data in _check_stream_error itself for robustness. Added tests that verify bare error events (no payload) raise at both the SSE decoder level and the full Stream iteration level. --- src/openai/_streaming.py | 25 ++++++++++--- tests/test_streaming.py | 79 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 6 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 0cfc4e97dc..7d2177a6cc 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -34,6 +34,15 @@ def _check_stream_error(data: object, sse: ServerSentEvent, request: httpx.Reque if not is_error_event and not has_error_field: return + # An explicit error event with no payload (or empty payload) should still + # surface as an error rather than being silently skipped. + if data is None and is_error_event: + raise APIError( + message="An error occurred during streaming", + request=request, + body=None, + ) + message: str | None = None if is_mapping(data): @@ -125,13 +134,15 @@ def __stream__(self) -> Iterator[_T]: break data = _parse_sse_data(sse) - if data is None: - continue # Check for error events before processing - handles both explicit - # error events and data payloads containing an error field + # error events (including those with empty payloads) and data + # payloads containing an "error" field _check_stream_error(data, sse, self.response.request) + if data is None: + continue + # Assistants `thread.` events need special handling since we synthesize the event key if sse.event and sse.event.startswith("thread."): yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) @@ -212,13 +223,15 @@ async def __stream__(self) -> AsyncIterator[_T]: break data = _parse_sse_data(sse) - if data is None: - continue # Check for error events before processing - handles both explicit - # error events and data payloads containing an error field + # error events (including those with empty payloads) and data + # payloads containing an "error" field _check_stream_error(data, sse, self.response.request) + if data is None: + continue + # Assistants `thread.` events need special handling since we synthesize the event key if sse.event and sse.event.startswith("thread."): yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index f897ddf141..2ccebc375f 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -289,6 +289,13 @@ def test_error_with_string_error_field(self) -> None: with pytest.raises(APIError, match="An error occurred during streaming"): _check_stream_error(data, sse, request) + def test_error_event_with_none_data_raises(self) -> None: + """An error event with no payload should still raise, not be skipped.""" + sse = ServerSentEvent(data="", event="error") + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + with pytest.raises(APIError, match="An error occurred during streaming"): + _check_stream_error(None, sse, request) + class TestParseSSEData: """Tests for _parse_sse_data helper.""" @@ -364,6 +371,78 @@ def body() -> Iterator[bytes]: assert sse.json()["error"]["message"] == "stream interrupted" +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_error_event_empty_payload_at_sse_level(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that a bare error event with no data is properly yielded by the SSE decoder.""" + + def body() -> Iterator[bytes]: + yield b'data: {"content":"hello"}\n' + yield b"\n" + yield b"event: error\n" + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + sse = await iter_next(iterator) + assert sse.json() == {"content": "hello"} + + # The bare error event (empty data) should still be yielded by the decoder + sse = await iter_next(iterator) + assert sse.event == "error" + assert sse.data == "" + + +def test_stream_error_event_empty_payload_raises(client: OpenAI) -> None: + """Test that Stream.__stream__ raises APIError for error events with no payload. + + This ensures empty error events from backends/proxies are not silently skipped. + """ + + def body() -> Iterator[bytes]: + yield b'data: {"content":"hello"}\n' + yield b"\n" + yield b"event: error\n" + yield b"\n" + + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + response = httpx.Response(200, content=body(), request=request) + + stream: Stream[object] = Stream( + cast_to=object, + client=client, + response=response, + ) + + with pytest.raises(APIError, match="An error occurred during streaming"): + for _item in stream: + pass + + +@pytest.mark.asyncio +async def test_async_stream_error_event_empty_payload_raises(async_client: AsyncOpenAI) -> None: + """Test that AsyncStream.__stream__ raises APIError for error events with no payload.""" + + async def body() -> AsyncIterator[bytes]: + yield b'data: {"content":"hello"}\n' + yield b"\n" + yield b"event: error\n" + yield b"\n" + + request = httpx.Request("POST", "https://api.openai.com/v1/chat/completions") + response = httpx.Response(200, content=body(), request=request) + + stream: AsyncStream[object] = AsyncStream( + cast_to=object, + client=async_client, + response=response, + ) + + with pytest.raises(APIError, match="An error occurred during streaming"): + async for _item in stream: + pass + + async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]: for chunk in iter: yield chunk