Skip to content

Commit b5a5e57

Browse files
Mlaz-codeclaude
andcommitted
fix(stream): honor server retry: hint, fall back to exp backoff
Reconnect previously used pure exponential backoff (2/4/8/16/30s) and ignored the server's `retry: 3000` SSE field. Brief network hiccups got 30s of needless downtime before the 5th reconnect. Hybrid policy: - Honor the server's retry: hint for the first 3 reconnect attempts (default 3 s, updated whenever the server emits a new value mid-stream). - After 3 sustained failures the issue is probably structural (server down, auth changed, etc.), so switch to exponential backoff anchored on the hint, capped at 30 s. Also reset reconnect_attempts to 0 after a successful stream so a graceful server close doesn't carry forward stale failure counts. _parse_sse now surfaces `retry:` lines as a synthetic ("__retry__", int_ms) tuple — _stream_loop intercepts them, updates self._retry_ms, and continues without dispatching to handlers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e6e1000 commit b5a5e57

1 file changed

Lines changed: 41 additions & 4 deletions

File tree

src/sharpapi/streaming.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(
4444
headers: dict[str, str],
4545
timeout: float = 90.0,
4646
max_reconnects: int = 5,
47+
default_retry_ms: int = 3000,
4748
):
4849
self._url = url
4950
self._headers = headers
@@ -52,6 +53,11 @@ def __init__(
5253
self._handlers: dict[str, list[EventHandler]] = {}
5354
self._running = False
5455
self._last_event_id: str | None = None
56+
# SSE protocol: server sends `retry: <ms>` to advise reconnect delay.
57+
# SharpAPI emits `retry: 3000`. We honour it for the first few attempts,
58+
# then switch to exponential backoff capped at 30s if reconnects keep
59+
# failing — see connect().
60+
self._retry_ms = default_retry_ms
5561

5662
def on(self, event_type: str, handler: EventHandler | None = None):
5763
"""Register a handler for an event type. Can be used as a decorator.
@@ -103,26 +109,44 @@ def _emit(self, event_type: str, data: Any) -> None:
103109
def connect(self) -> None:
104110
"""Connect and block, processing events until disconnect() or error.
105111
106-
Automatically reconnects with exponential backoff on connection loss.
112+
Reconnect policy is hybrid: the first ``HONOR_HINT_FOR`` attempts use
113+
the server's ``retry:`` hint (default 3 s, updated whenever the server
114+
sends a new value mid-stream). After that we switch to exponential
115+
backoff capped at 30 s so we don't hammer a persistently broken server.
107116
"""
108117
self._running = True
109118
reconnect_attempts = 0
119+
# Honour the server's retry hint for the first N attempts; after that
120+
# the failure is probably structural (server down, auth changed, etc.)
121+
# and the gentler exponential backoff kicks in.
122+
HONOR_HINT_FOR = 3
110123

111124
while self._running and reconnect_attempts <= self._max_reconnects:
112125
try:
113126
self._stream_loop()
114127
if not self._running:
115128
break
129+
# Clean server close — reset backoff so a graceful reconnect
130+
# doesn't carry forward old failure counts.
131+
reconnect_attempts = 0
116132
except (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError) as e:
117133
reconnect_attempts += 1
118134
if reconnect_attempts > self._max_reconnects:
119135
raise StreamError(
120136
f"Max reconnection attempts ({self._max_reconnects}) reached",
121137
code="max_reconnects",
122138
) from e # noqa: B904
123-
delay = min(2**reconnect_attempts, 30)
139+
140+
hint_seconds = self._retry_ms / 1000.0
141+
if reconnect_attempts <= HONOR_HINT_FOR:
142+
delay = hint_seconds
143+
else:
144+
# Exponential ramp anchored on the server's hint, capped at 30 s.
145+
excess = reconnect_attempts - HONOR_HINT_FOR
146+
delay = min(hint_seconds * (2 ** excess), 30.0)
147+
124148
logger.warning(
125-
"Connection lost, reconnecting in %ds (attempt %d/%d)",
149+
"Connection lost, reconnecting in %.1fs (attempt %d/%d)",
126150
delay,
127151
reconnect_attempts,
128152
self._max_reconnects,
@@ -149,6 +173,10 @@ def _stream_loop(self) -> None:
149173
for event_type, data in _parse_sse(response.iter_lines()):
150174
if not self._running:
151175
break
176+
if event_type == "__retry__":
177+
# Server advised a new reconnect delay — store but don't dispatch.
178+
self._retry_ms = data
179+
continue
152180
self._emit(event_type, data)
153181

154182
def disconnect(self) -> None:
@@ -179,7 +207,11 @@ def iter_events(self) -> Iterator[tuple[str, Any]]:
179207

180208

181209
def _parse_sse(lines: Iterator[str]) -> Iterator[tuple[str, Any]]:
182-
"""Parse SSE text stream into (event_type, parsed_data) tuples."""
210+
"""Parse SSE text stream into (event_type, parsed_data) tuples.
211+
212+
Yields a synthetic ``("__retry__", int_ms)`` tuple when the server emits a
213+
``retry:`` field so the caller can update its reconnect delay.
214+
"""
183215
event_type = "message"
184216
data_lines: list[str] = []
185217

@@ -190,6 +222,11 @@ def _parse_sse(lines: Iterator[str]) -> Iterator[tuple[str, Any]]:
190222
data_lines.append(line[5:].strip())
191223
elif line.startswith("id:"):
192224
pass # Tracked by httpx/EventSource
225+
elif line.startswith("retry:"):
226+
try:
227+
yield "__retry__", int(line[6:].strip())
228+
except ValueError:
229+
pass # Malformed retry: line — ignore.
193230
elif line == "" and data_lines:
194231
# End of event
195232
raw = "\n".join(data_lines)

0 commit comments

Comments
 (0)