Improve streaming error handling and robustness#2960
Improve streaming error handling and robustness#2960edenfunf wants to merge 2 commits intoopenai:mainfrom
Conversation
- Fix unreachable error check in stream iteration: the `sse.event == "error"`
condition was nested inside the `sse.event.startswith("thread.")` branch,
making it impossible to match (relates to openai#2796)
- Extract duplicated stream error handling into `_check_stream_error()` helper,
reducing 4 copies of the same error extraction logic to a single function
- Add `_parse_sse_data()` helper with proper JSON parse error handling, so
malformed SSE data produces clear error messages instead of raw JSONDecodeError
(relates to openai#2722)
- Improve `ServerSentEvent.json()` to include event type and data preview in
parse error messages for easier debugging
- Add UTF-8 decode error handling in SSE decoder to gracefully skip invalid
bytes instead of crashing the entire stream
- Narrow `except Exception` to `except httpx.HTTPError` in the retry loop
so non-network exceptions (e.g. KeyboardInterrupt, CancelledError,
Celery SoftTimeLimitExceeded) are no longer caught and retried (relates to openai#2737)
- Also check top-level `message` field in stream error responses, per API spec
(relates to openai#2487)
- Add comprehensive tests for all new behavior
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a4fc95af9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
src/openai/_streaming.py
Outdated
| if data is None: | ||
| continue |
There was a problem hiding this comment.
Surface empty error events instead of skipping them
In Stream.__stream__, events with empty data are unconditionally skipped before _check_stream_error runs, so an SSE frame like event: error with no payload gets silently ignored rather than terminating the stream with an error. This means backend/proxy failures that emit empty error events can be treated as successful stream completion, which is a regression from the previous fail-fast behavior; the same control flow exists in AsyncStream.__stream__ as well.
Useful? React with 👍 / 👎.
Move _check_stream_error() before the `if data is None: continue` guard in both Stream.__stream__ and AsyncStream.__stream__, so that an explicit `event: error` with no data payload still raises APIError rather than being silently skipped. Also handle None data in _check_stream_error itself for robustness. Added tests that verify bare error events (no payload) raise at both the SSE decoder level and the full Stream iteration level.
Summary
This PR improves error handling in the streaming implementation, fixing a few bugs and improving robustness:
Fix unreachable error check (Bug: Dead code -
sse.event == "error"check is unreachable in _streaming.py #2796): Thesse.event == "error"check was nested insideif sse.event.startswith("thread."), so it could never match. Restructured the logic so error events are caught regardless of prefix.Fix empty/malformed SSE data crashes (Streaming: JSONDecodeError when SSE events contain only meta-fields (retry, id, event) #2722): SSE events with empty or non-JSON data would crash with an unhelpful
JSONDecodeError. Added_parse_sse_data()that returnsNonefor empty data and wraps parse failures with useful context (event type, data preview).Narrow retry exception scope (OpenAI client intercepts Celery SoftTimeLimitExceeded exception if it happens during API call #2737): Changed
except Exceptiontoexcept httpx.HTTPErrorin both sync/async retry loops, so non-network exceptions likeKeyboardInterrupt,asyncio.CancelledError, or Celery'sSoftTimeLimitExceededare no longer silently caught and retried.Deduplicate stream error handling: Extracted the error extraction logic (previously copy-pasted 4 times across sync/async
__stream__methods) into a shared_check_stream_error()helper.Handle invalid UTF-8 in SSE: Added
try/except UnicodeDecodeErroraround.decode("utf-8")calls in the SSE decoder so that invalid bytes are skipped with a debug log instead of crashing the stream.Better error messages:
ServerSentEvent.json()now includes event type and data preview in the error message. Also checks top-levelmessagefield in error responses per API spec (Responses API error handling reads error.message, but spec says message is top-level #2487).Test plan
TestServerSentEventJson- tests for.json()error handling (valid JSON, empty data, malformed JSON, truncated JSON)TestCheckStreamError- tests for the error detection helper (no error, error event, error in data field, top-level message fallback, string error field, missing message)TestParseSSEData- tests for the JSON parsing helper (valid data, empty data, None data, malformed data)test_invalid_utf8_skipped- verifies invalid UTF-8 bytes are skipped and subsequent events still parsetest_error_event_handling- verifies explicit error events are detected (previously unreachable)