From 2ecad0fcf37d16d378cd3dfa9be9fd64c1e36d94 Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Sat, 28 Feb 2026 05:13:14 +0100 Subject: [PATCH 1/2] perf: use deque for InMemoryTaskMessageQueue FIFO operations Replace list with collections.deque in InMemoryTaskMessageQueue so that dequeue (popleft) runs in O(1) amortised time instead of the O(n) incurred by list.pop(0), which must shift every remaining element on each call. --- src/mcp/shared/experimental/tasks/message_queue.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/mcp/shared/experimental/tasks/message_queue.py b/src/mcp/shared/experimental/tasks/message_queue.py index 018c2b7b2..e17c4a865 100644 --- a/src/mcp/shared/experimental/tasks/message_queue.py +++ b/src/mcp/shared/experimental/tasks/message_queue.py @@ -12,6 +12,7 @@ """ from abc import ABC, abstractmethod +from collections import deque from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Literal @@ -151,13 +152,13 @@ class InMemoryTaskMessageQueue(TaskMessageQueue): """ def __init__(self) -> None: - self._queues: dict[str, list[QueuedMessage]] = {} + self._queues: dict[str, deque[QueuedMessage]] = {} self._events: dict[str, anyio.Event] = {} - def _get_queue(self, task_id: str) -> list[QueuedMessage]: + def _get_queue(self, task_id: str) -> deque[QueuedMessage]: """Get or create the queue for a task.""" if task_id not in self._queues: - self._queues[task_id] = [] + self._queues[task_id] = deque() return self._queues[task_id] async def enqueue(self, task_id: str, message: QueuedMessage) -> None: @@ -172,7 +173,7 @@ async def dequeue(self, task_id: str) -> QueuedMessage | None: queue = self._get_queue(task_id) if not queue: return None - return queue.pop(0) + return queue.popleft() async def peek(self, task_id: str) -> QueuedMessage | None: """Return the next message without removing it.""" From bd11f1b2b7a0391914c2606e748ab823f640fce2 Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Sat, 28 Feb 2026 16:25:39 +0100 Subject: [PATCH 2/2] fix: use deque in test to match internal queue type --- tests/experimental/tasks/test_message_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/experimental/tasks/test_message_queue.py b/tests/experimental/tasks/test_message_queue.py index a8517e535..eca113d5b 100644 --- a/tests/experimental/tasks/test_message_queue.py +++ b/tests/experimental/tasks/test_message_queue.py @@ -1,5 +1,6 @@ """Tests for TaskMessageQueue and InMemoryTaskMessageQueue.""" +from collections import deque from datetime import datetime, timezone import anyio @@ -270,7 +271,7 @@ async def is_empty_with_injection(tid: str) -> bool: if call_count == 2 and tid == task_id: # Before second check, inject a message - this simulates a message # arriving between event creation and the double-check - queue._queues[task_id] = [QueuedMessage(type="request", message=make_request())] + queue._queues[task_id] = deque([QueuedMessage(type="request", message=make_request())]) return await original_is_empty(tid) queue.is_empty = is_empty_with_injection # type: ignore[method-assign]