Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 19 additions & 28 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import sys
import warnings

from abc import ABC, abstractmethod
from types import TracebackType
Expand Down Expand Up @@ -48,38 +49,28 @@ def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:


class EventQueue(ABC):
"""Base class and factory for EventQueueSource.

EventQueue provides an abstraction for a queue of events that can be tapped
by multiple consumers.
EventQueue maintain main queue and source and maintain child queues in sync.
GUARANTEE: All sinks (including the default one) will receive events in the exact same order.

WARNING (Concurrency): All events from all sinks (both the default queue and any
tapped child queues) must be regularly consumed and marked as done. If any single
consumer stops processing and its queue reaches capacity, it can block the event
dispatcher and stall the entire system, causing a widespread deadlock.

WARNING (Memory Leak): Event queues spawn background tasks. To prevent memory
and task leaks, all queue objects (both source and sinks) MUST be explicitly
closed via `await queue.close()` or by using the async context manager (`async with queue:`).
Child queues are automatically closed when parent queue is closed, but you
should still close them explicitly to prevent queues from reaching capacity by
unconsumed events.

Typical usage:
queue = EventQueue()
child_queue1 = await queue.tap()
child_queue2 = await queue.tap()

async for event in child_queue1:
do_some_work(event)
child_queue1.task_done()
"""Producer-side interface passed to `AgentExecutor.execute`/`cancel`.

Exposes only `enqueue_event`. The consumer is framework-managed
and not part of the public surface.

Default request handlers construct the queue and pass it in; executors
should accept it and not construct one. To run an executor outside the
framework, write a custom subclass or use `EventQueueLegacy` (deprecated,
will be removed in a future release).
"""

def __new__(cls, *args: Any, **kwargs: Any) -> Self:
"""Redirects instantiation to EventQueueLegacy for backwards compatibility."""
"""Deprecated: redirects bare `EventQueue(...)` to `EventQueueLegacy(...)`."""
if cls is EventQueue:
warnings.warn(
'EventQueue is an abstract interface; instantiating it '
'directly is deprecated. The redirect to EventQueueLegacy '
'(and EventQueueLegacy itself) will be removed in a future '
'release.',
DeprecationWarning,
stacklevel=2,
)
instance = EventQueueLegacy.__new__(EventQueueLegacy)
EventQueueLegacy.__init__(instance, *args, **kwargs)
return cast('Self', instance)
Comment thread
ishymko marked this conversation as resolved.
Expand Down
Loading