Skip to content

Commit 19fece9

Browse files
committed
fix: CPU busy-loop on streamable HTTP / SSE client shutdown
Exiting the streamable_http_client or sse_client context with pending requests causes AnyIO to spin in _deliver_cancellation because transport tasks are stuck on zero-buffer stream send() calls that can't be cancelled cooperatively. Close streams before cancelling the task group so blocked operations raise ClosedResourceError and exit on their own. Guard all read_stream_writer.send() calls against ClosedResourceError and BrokenResourceError during teardown. Bump stream buffer from 0 to 1 to reduce blocking probability. Same fix applied to BaseSession.__aexit__ for consistency. Github-Issue: #1805
1 parent 0fe16dd commit 19fece9

File tree

5 files changed

+156
-18
lines changed

5 files changed

+156
-18
lines changed

src/mcp/client/sse.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ async def sse_client(
5757
write_stream: MemoryObjectSendStream[SessionMessage]
5858
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
5959

60-
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
61-
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
60+
read_stream_writer, read_stream = anyio.create_memory_object_stream(1)
61+
write_stream, write_stream_reader = anyio.create_memory_object_stream(1)
6262

6363
async with anyio.create_task_group() as tg:
6464
try:
@@ -113,19 +113,34 @@ async def sse_reader(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED):
113113
logger.debug(f"Received server message: {message}")
114114
except Exception as exc: # pragma: no cover
115115
logger.exception("Error parsing server message") # pragma: no cover
116-
await read_stream_writer.send(exc) # pragma: no cover
116+
try: # pragma: no cover
117+
await read_stream_writer.send(exc) # pragma: no cover
118+
except ( # pragma: no cover
119+
anyio.ClosedResourceError,
120+
anyio.BrokenResourceError,
121+
):
122+
return # pragma: no cover
117123
continue # pragma: no cover
118124

119125
session_message = SessionMessage(message)
120-
await read_stream_writer.send(session_message)
126+
try:
127+
await read_stream_writer.send(session_message)
128+
except (
129+
anyio.ClosedResourceError,
130+
anyio.BrokenResourceError,
131+
): # pragma: no cover
132+
return # pragma: no cover
121133
case _: # pragma: no cover
122134
logger.warning(f"Unknown SSE event: {sse.event}") # pragma: no cover
123135
except SSEError as sse_exc: # pragma: lax no cover
124136
logger.exception("Encountered SSE exception")
125137
raise sse_exc
126138
except Exception as exc: # pragma: lax no cover
127139
logger.exception("Error in sse_reader")
128-
await read_stream_writer.send(exc)
140+
try:
141+
await read_stream_writer.send(exc)
142+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
143+
pass
129144
finally:
130145
await read_stream_writer.aclose()
131146

@@ -156,6 +171,8 @@ async def post_writer(endpoint_url: str):
156171
try:
157172
yield read_stream, write_stream
158173
finally:
174+
await read_stream_writer.aclose()
175+
await write_stream.aclose()
159176
tg.cancel_scope.cancel()
160177
finally:
161178
await read_stream_writer.aclose()

src/mcp/client/streamable_http.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,11 @@ async def _handle_sse_event(
155155
message.id = original_request_id
156156

157157
session_message = SessionMessage(message)
158-
await read_stream_writer.send(session_message)
158+
try:
159+
await read_stream_writer.send(session_message)
160+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
161+
logger.debug("Read stream closed, stopping SSE event handling")
162+
return True
159163

160164
# Call resumption token callback if we have an ID
161165
if sse.id and resumption_callback:
@@ -170,9 +174,15 @@ async def _handle_sse_event(
170174
if original_request_id is not None:
171175
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}")
172176
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
173-
await read_stream_writer.send(error_msg)
177+
try:
178+
await read_stream_writer.send(error_msg)
179+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
180+
pass
174181
return True
175-
await read_stream_writer.send(exc)
182+
try:
183+
await read_stream_writer.send(exc)
184+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
185+
pass
176186
return False
177187
else: # pragma: no cover
178188
logger.warning(f"Unknown SSE event: {sse.event}")
@@ -271,14 +281,20 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
271281
if isinstance(message, JSONRPCRequest): # pragma: no branch
272282
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
273283
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
274-
await ctx.read_stream_writer.send(session_message)
284+
try:
285+
await ctx.read_stream_writer.send(session_message)
286+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
287+
pass
275288
return
276289

277290
if response.status_code >= 400:
278291
if isinstance(message, JSONRPCRequest):
279292
error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response")
280293
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
281-
await ctx.read_stream_writer.send(session_message)
294+
try:
295+
await ctx.read_stream_writer.send(session_message)
296+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
297+
pass
282298
return
283299

284300
if is_initialization:
@@ -298,7 +314,10 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
298314
logger.error(f"Unexpected content type: {content_type}")
299315
error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}")
300316
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
301-
await ctx.read_stream_writer.send(error_msg)
317+
try:
318+
await ctx.read_stream_writer.send(error_msg)
319+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
320+
pass
302321

303322
async def _handle_json_response(
304323
self,
@@ -318,12 +337,18 @@ async def _handle_json_response(
318337
self._maybe_extract_protocol_version_from_message(message)
319338

320339
session_message = SessionMessage(message)
321-
await read_stream_writer.send(session_message)
340+
try:
341+
await read_stream_writer.send(session_message)
342+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
343+
return
322344
except (httpx.StreamError, ValidationError) as exc:
323345
logger.exception("Error parsing JSON response")
324346
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse JSON response: {exc}")
325347
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data))
326-
await read_stream_writer.send(error_msg)
348+
try:
349+
await read_stream_writer.send(error_msg)
350+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
351+
return
327352

328353
async def _handle_sse_response(
329354
self,
@@ -533,8 +558,8 @@ async def streamable_http_client(
533558
Example:
534559
See examples/snippets/clients/ for usage patterns.
535560
"""
536-
read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0)
537-
write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0)
561+
read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](1)
562+
write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](1)
538563

539564
# Determine if we need to create and manage the client
540565
client_provided = http_client is not None
@@ -573,6 +598,10 @@ def start_get_stream() -> None:
573598
finally:
574599
if transport.session_id and terminate_on_close:
575600
await transport.terminate_session(client)
601+
# Close streams before cancelling to unblock tasks
602+
# waiting on stream send/receive during shutdown.
603+
await read_stream_writer.aclose()
604+
await write_stream.aclose()
576605
tg.cancel_scope.cancel()
577606
finally:
578607
await read_stream_writer.aclose()

src/mcp/shared/session.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,10 @@ async def __aexit__(
224224
exc_tb: TracebackType | None,
225225
) -> bool | None:
226226
await self._exit_stack.aclose()
227-
# Using BaseSession as a context manager should not block on exit (this
228-
# would be very surprising behavior), so make sure to cancel the tasks
229-
# in the task group.
227+
# Close streams first so _receive_loop exits cooperatively,
228+
# then cancel the task group as a fallback.
229+
await self._read_stream.aclose()
230+
await self._write_stream.aclose()
230231
self._task_group.cancel_scope.cancel()
231232
return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
232233

tests/shared/test_session.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,3 +416,45 @@ async def make_request(client_session: ClientSession):
416416
# Pending request completed successfully
417417
assert len(result_holder) == 1
418418
assert isinstance(result_holder[0], EmptyResult)
419+
420+
421+
@pytest.mark.anyio
422+
async def test_session_exit_closes_streams_before_cancel():
423+
"""Verify BaseSession.__aexit__ closes streams before cancelling task group.
424+
425+
The receive loop should exit via ClosedResourceError on the read stream,
426+
not via forced task group cancellation. This prevents AnyIO cancellation
427+
busy-loops when tasks are blocked on stream operations.
428+
"""
429+
async with create_client_server_memory_streams() as (client_streams, server_streams):
430+
client_read, client_write = client_streams
431+
server_read, _server_write = server_streams
432+
433+
async def slow_server():
434+
"""Read a request but never respond, keeping the session busy."""
435+
try:
436+
await server_read.receive()
437+
# Hold the connection open
438+
await anyio.sleep(60)
439+
except (anyio.ClosedResourceError, anyio.get_cancelled_exc_class()):
440+
pass
441+
442+
async with anyio.create_task_group() as outer_tg:
443+
outer_tg.start_soon(slow_server)
444+
445+
with anyio.fail_after(5): # pragma: no branch
446+
async with ClientSession(read_stream=client_read, write_stream=client_write) as client_session:
447+
# Fire a request in a background task (will never get a response)
448+
async with anyio.create_task_group() as inner_tg: # pragma: no branch
449+
450+
async def send_and_ignore():
451+
try:
452+
await client_session.send_ping()
453+
except (MCPError, anyio.get_cancelled_exc_class()):
454+
pass
455+
456+
inner_tg.start_soon(send_and_ignore)
457+
await anyio.sleep(0.1)
458+
inner_tg.cancel_scope.cancel()
459+
460+
outer_tg.cancel_scope.cancel()

tests/shared/test_streamable_http.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2247,3 +2247,52 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(
22472247

22482248
assert "content-type" in headers_data
22492249
assert headers_data["content-type"] == "application/json"
2250+
2251+
2252+
@pytest.mark.anyio
2253+
async def test_streamable_http_client_exit_with_pending_requests(basic_server: None, basic_server_url: str):
2254+
"""Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/1805.
2255+
2256+
Sends tool calls to a server-side handler that blocks indefinitely (lock
2257+
never released), then exits the client context while responses are still
2258+
pending. Verifies that shutdown completes within the timeout and does not
2259+
hang or busy-loop in AnyIO cancellation delivery.
2260+
"""
2261+
with anyio.fail_after(10): # pragma: no branch
2262+
async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream):
2263+
async with ClientSession(read_stream, write_stream) as session:
2264+
await session.initialize()
2265+
2266+
async with anyio.create_task_group() as tg: # pragma: no branch
2267+
2268+
async def call_blocked_tool():
2269+
try:
2270+
await session.call_tool("wait_for_lock_with_notification", {})
2271+
except (MCPError, anyio.get_cancelled_exc_class()):
2272+
pass
2273+
2274+
# Fire off multiple requests that will block server-side
2275+
for _ in range(3):
2276+
tg.start_soon(call_blocked_tool)
2277+
2278+
# Give the server a moment to receive them, then bail out
2279+
await anyio.sleep(0.2)
2280+
tg.cancel_scope.cancel()
2281+
2282+
# If we reach here, shutdown completed without hanging.
2283+
await anyio.sleep(0.1)
2284+
2285+
2286+
@pytest.mark.anyio
2287+
async def test_streamable_http_client_rapid_connect_disconnect(basic_server: None, basic_server_url: str):
2288+
"""Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/1805.
2289+
2290+
Rapidly connects, initializes, and disconnects multiple times. Verifies no
2291+
resource leak or cancellation busy-loop across iterations.
2292+
"""
2293+
for _ in range(5):
2294+
with anyio.fail_after(10): # pragma: no branch
2295+
async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream):
2296+
async with ClientSession(read_stream, write_stream) as session:
2297+
await session.initialize()
2298+
await anyio.sleep(0.1)

0 commit comments

Comments
 (0)