Skip to content

Commit b63de94

Browse files
wiggzzclaude
andcommitted
Fix stateless HTTP task accumulation causing memory leak (#756)
In stateless mode, each request spawned a `run_stateless_server` task into the manager's global `_task_group`. After `handle_request()` completed and `terminate()` was called, the task continued running inside `app.run()`, blocked on `async for message in session.incoming_messages`. These zombie tasks accumulated indefinitely, leaking memory. Replace the global task group spawn with a request-scoped task group so that server tasks are automatically cancelled when their request completes. Add a regression test that simulates the real blocking behavior of `app.run()` using `anyio.sleep_forever()` and verifies no tasks linger in the global task group after requests finish. Closes #756 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0fe16dd commit b63de94

File tree

2 files changed

+86
-9
lines changed

2 files changed

+86
-9
lines changed

src/mcp/server/streamable_http_manager.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
151151
await self._handle_stateful_request(scope, receive, send)
152152

153153
async def _handle_stateless_request(self, scope: Scope, receive: Receive, send: Send) -> None:
154-
"""Process request in stateless mode - creating a new transport for each request."""
154+
"""Process request in stateless mode - creating a new transport for each request.
155+
156+
Uses a request-scoped task group so the server task is automatically
157+
cancelled when the request completes, preventing task accumulation in
158+
the manager's global task group.
159+
"""
155160
logger.debug("Stateless mode: Creating new transport for this request")
156161
# No session ID needed in stateless mode
157162
http_transport = StreamableHTTPServerTransport(
@@ -176,16 +181,23 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
176181
except Exception: # pragma: no cover
177182
logger.exception("Stateless session crashed")
178183

179-
# Assert task group is not None for type checking
180-
assert self._task_group is not None
181-
# Start the server task
182-
await self._task_group.start(run_stateless_server)
184+
# Use a request-scoped task group instead of the global one.
185+
# This ensures the server task is cancelled when the request
186+
# finishes, preventing zombie tasks from accumulating.
187+
# See: https://github.com/modelcontextprotocol/python-sdk/issues/756
188+
async with anyio.create_task_group() as request_tg:
183189

184-
# Handle the HTTP request and return the response
185-
await http_transport.handle_request(scope, receive, send)
190+
async def run_request_handler(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED):
191+
task_status.started()
192+
# Handle the HTTP request and return the response
193+
await http_transport.handle_request(scope, receive, send)
194+
# Terminate the transport after the request is handled
195+
await http_transport.terminate()
196+
# Cancel the request-scoped task group to stop the server task
197+
request_tg.cancel_scope.cancel()
186198

187-
# Terminate the transport after the request is handled
188-
await http_transport.terminate()
199+
await request_tg.start(run_stateless_server)
200+
await request_tg.start(run_request_handler)
189201

190202
async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: Send) -> None:
191203
"""Process request in stateful mode - maintaining session state between requests."""

tests/server/test_streamable_http_manager.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,71 @@ async def mock_receive():
268268
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
269269

270270

271+
@pytest.mark.anyio
272+
async def test_stateless_requests_task_leak():
273+
"""Test that stateless request tasks don't accumulate in the global task group.
274+
275+
Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/756
276+
277+
In the buggy implementation, each stateless request spawns a
278+
``run_stateless_server`` task into the manager's global ``_task_group``.
279+
After ``handle_request()`` completes and ``terminate()`` is called, the
280+
task continues to run inside ``self.app.run()`` (blocked on
281+
``async for message in session.incoming_messages``). These zombie tasks
282+
accumulate forever, leaking memory.
283+
284+
The fix uses a request-scoped task group so that all child tasks are
285+
cancelled when the request finishes.
286+
"""
287+
app = Server("test-stateless-task-leak")
288+
manager = StreamableHTTPSessionManager(app=app, stateless=True)
289+
290+
async with manager.run():
291+
# Replace app.run with a coroutine that blocks until cancelled,
292+
# simulating the real behavior of app.run which blocks on
293+
# ``async for message in session.incoming_messages``.
294+
async def blocking_run(*args: Any, **kwargs: Any) -> None:
295+
try:
296+
await anyio.sleep_forever()
297+
except anyio.get_cancelled_exc_class():
298+
pass
299+
300+
app.run = blocking_run # type: ignore[assignment]
301+
302+
scope = {
303+
"type": "http",
304+
"method": "POST",
305+
"path": "/mcp",
306+
"headers": [
307+
(b"content-type", b"application/json"),
308+
(b"accept", b"application/json, text/event-stream"),
309+
],
310+
}
311+
312+
async def mock_receive() -> Message:
313+
return {"type": "http.request", "body": b"", "more_body": False}
314+
315+
async def mock_send(message: Message) -> None:
316+
pass
317+
318+
assert manager._task_group is not None
319+
320+
num_requests = 5
321+
for _ in range(num_requests):
322+
await manager.handle_request(scope, mock_receive, mock_send)
323+
324+
# Allow a checkpoint so cancelled tasks can be reaped
325+
await anyio.sleep(0.05)
326+
327+
# After all requests complete, there should be no lingering tasks.
328+
# Before the fix, each request left behind a zombie task, so the
329+
# count would equal ``num_requests``.
330+
assert len(manager._task_group._tasks) == 0, (
331+
f"Expected 0 lingering tasks in the global task group but found "
332+
f"{len(manager._task_group._tasks)}. Stateless request tasks are leaking."
333+
)
334+
335+
271336
@pytest.mark.anyio
272337
async def test_unknown_session_id_returns_404():
273338
"""Test that requests with unknown session IDs return HTTP 404 per MCP spec."""

0 commit comments

Comments
 (0)