Add Redis list-based queue support with async hook#63937
Add Redis list-based queue support with async hook#63937tardunge wants to merge 2 commits intoapache:mainfrom
Conversation
|
@tardunge This PR has a few issues that need to be addressed before it can be reviewed — please see our Pull Request quality criteria. Issues found:
What to do next:
There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
Adds durable list-based messaging to the Redis provider using LPUSH/BRPOP, complementing the existing pub/sub support. List-based queues provide message persistence until consumed and exactly-once delivery semantics via atomic BRPOP, which pub/sub cannot guarantee. New components: - RedisLPushOperator: push messages to Redis lists - AwaitMessageFromListTrigger: async trigger using BRPOP with priority queue support (multiple lists checked in order) - RedisListMessageQueueProvider: common-messaging integration with redis+list:// scheme - RedisHook.get_async_conn(): async context manager using redis.asyncio for truly non-blocking trigger operations
- Provide defaults for host/port/db to satisfy AsyncRedis constructor type requirements (str, int, int instead of Optional types) - Use close() instead of aclose() for redis-py compatibility
e476eb6 to
aeaa9c0
Compare
There was a problem hiding this comment.
Pull request overview
Adds Redis list-backed queue support to the Redis provider (durable consumption via LPUSH + BRPOP) and integrates it with the Common Messaging MessageQueueTrigger via a new redis+list scheme. It also introduces an async Redis hook helper to avoid sync_to_async in triggers.
Changes:
- Added
RedisLPushOperatorfor producing messages to Redis lists. - Added
AwaitMessageFromListTriggerusing asyncBRPOP(supports prioritization via multiple lists). - Added
RedisHook.get_async_conn()async context manager plus provider registration updates and unit tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/redis/src/airflow/providers/redis/hooks/redis.py | Adds get_async_conn() to create/cleanup an async Redis client for triggers. |
| providers/redis/src/airflow/providers/redis/triggers/redis_list_await_message.py | New async trigger that blocks on BRPOP across one or more lists. |
| providers/redis/src/airflow/providers/redis/operators/redis_lpush.py | New operator to LPUSH messages to a Redis list. |
| providers/redis/src/airflow/providers/redis/queues/redis_list.py | Common-messaging provider wiring for the redis+list scheme. |
| providers/redis/src/airflow/providers/redis/get_provider_info.py | Registers the new operator/trigger/queue provider in generated provider info. |
| providers/redis/provider.yaml | Registers the new operator/trigger/queue provider for the Redis provider package. |
| providers/redis/tests/unit/redis/triggers/test_redis_list_await_message.py | Unit tests for trigger serialization and run() behavior (string/bytes, multi-list). |
| providers/redis/tests/unit/redis/operators/test_redis_lpush.py | Unit tests for RedisLPushOperator.execute(). |
| providers/redis/tests/unit/redis/queues/test_redis_list.py | Unit tests for scheme matching and MessageQueueTrigger integration. |
|
|
||
| Unlike pub/sub, list-based messaging provides: | ||
| - **Durability**: messages persist in the list until consumed | ||
| - **Exactly-once delivery**: BRPOP atomically removes and returns the message |
There was a problem hiding this comment.
The docstring claims “Exactly-once delivery” via BRPOP, but Redis lists don’t provide end-to-end exactly-once semantics: BRPOP removes the item before user processing, so a consumer crash after pop can lose the message (at-most-once). Consider rewording to “atomic consumption” / “at-most-once delivery”, or mention a safer pattern (e.g., BRPOPLPUSH/RPOPLPUSH to a processing list) if you want crash-safe processing.
| - **Exactly-once delivery**: BRPOP atomically removes and returns the message | |
| - **At-most-once delivery semantics**: BRPOP atomically removes and returns the | |
| message from the list, so each item is delivered at most once to this consumer |
| Combined with BRPOP on the consumer side, this provides a FIFO queue pattern | ||
| with durable, exactly-once delivery semantics. |
There was a problem hiding this comment.
This operator docstring states “exactly-once delivery semantics”, but LPUSH+BRPOP only guarantees atomic removal from the list; it does not guarantee end-to-end exactly-once processing (messages can be lost if the consumer crashes after BRPOP). Please adjust the wording to avoid overstating delivery guarantees.
| Combined with BRPOP on the consumer side, this provides a FIFO queue pattern | |
| with durable, exactly-once delivery semantics. | |
| Combined with BRPOP on the consumer side, this can be used to implement a simple | |
| FIFO queue pattern backed by Redis; end-to-end delivery guarantees depend on the | |
| consumer and processing logic. |
| * **Durability**: Messages persist in the list until consumed (no message loss). | ||
| * **Exactly-once delivery**: BRPOP atomically removes and returns the message. |
There was a problem hiding this comment.
The provider description claims “Exactly-once delivery” for BRPOP. Redis list pop operations are atomic, but do not provide end-to-end exactly-once processing guarantees. Please reword to “atomic consumption” / “at-most-once delivery” or describe an ack/retry pattern if needed.
| * **Durability**: Messages persist in the list until consumed (no message loss). | |
| * **Exactly-once delivery**: BRPOP atomically removes and returns the message. | |
| * **Durability**: Messages persist in the list until consumed. | |
| * **Atomic consumption / at-most-once delivery**: ``BRPOP`` atomically removes and returns messages. |
| async with hook.get_async_conn() as redis_conn: | ||
| await redis_conn.brpop("my_list", timeout=10) | ||
| """ | ||
| from redis.asyncio import Redis as AsyncRedis |
There was a problem hiding this comment.
There’s a from redis.asyncio import Redis as AsyncRedis import inside this method body. Airflow generally avoids in-function imports unless needed for circular-import avoidance or lazy loading for isolation. Since redis>=4.5.2 is already required, consider moving this import to module scope (or add a short comment explaining why lazy import is required here).
| @asynccontextmanager | ||
| async def get_async_conn(self) -> AsyncIterator[Any]: | ||
| """ | ||
| Return an async Redis connection using ``redis.asyncio``. | ||
|
|
||
| This is an async context manager that creates and cleans up an async Redis connection. | ||
| Useful for triggers and other async components that need non-blocking Redis access. | ||
|
|
||
| Usage:: | ||
|
|
||
| async with hook.get_async_conn() as redis_conn: | ||
| await redis_conn.brpop("my_list", timeout=10) | ||
| """ | ||
| from redis.asyncio import Redis as AsyncRedis | ||
|
|
||
| conn = self.get_connection(self.redis_conn_id) | ||
| host = conn.host | ||
| port = conn.port | ||
| username = conn.login | ||
| password = None if str(conn.password).lower() in ["none", "false", ""] else conn.password | ||
| db = conn.extra_dejson.get("db") | ||
|
|
||
| ssl_arg_names = [ | ||
| "ssl", | ||
| "ssl_cert_reqs", | ||
| "ssl_ca_certs", | ||
| "ssl_keyfile", | ||
| "ssl_certfile", | ||
| "ssl_check_hostname", | ||
| ] | ||
| ssl_args = {name: val for name, val in conn.extra_dejson.items() if name in ssl_arg_names} | ||
|
|
||
| async_redis = AsyncRedis( | ||
| host=host or "localhost", | ||
| port=port or 6379, | ||
| username=username, | ||
| password=password, | ||
| db=db or 0, | ||
| decode_responses=True, | ||
| **ssl_args, | ||
| ) | ||
| try: | ||
| yield async_redis | ||
| finally: | ||
| await async_redis.close() |
There was a problem hiding this comment.
RedisHook.get_async_conn() is a new public method but there are no direct unit tests covering its behavior (e.g., that it builds the async client with expected connection/SSL params and that close() is awaited in the finally). Since get_conn() already has unit coverage, please add analogous tests for this async method.
| def setup_method(self): | ||
| args = {"owner": "airflow", "start_date": DEFAULT_DATE} | ||
| self.dag = DAG("test_dag_id", schedule=None, default_args=args) | ||
| self.mock_context = MagicMock() | ||
|
|
There was a problem hiding this comment.
self.mock_context = MagicMock() is an unspecced mock, which can hide attribute errors and lead to brittle tests. Prefer using a real dict for context (when possible) or MagicMock(spec=...)/create_autospec(...) to constrain allowed attributes.
| mock_redis = AsyncMock() | ||
| mock_redis.brpop.return_value = ("test_list", "test_data") | ||
| mock_redis.aclose = AsyncMock() | ||
|
|
||
| mock_async_conn.return_value.__aenter__ = AsyncMock(return_value=mock_redis) | ||
| mock_async_conn.return_value.__aexit__ = AsyncMock(return_value=False) | ||
|
|
There was a problem hiding this comment.
These tests set mock_redis.aclose = AsyncMock(), but the production code closes the client via await async_redis.close() in RedisHook.get_async_conn(). Since aclose is never used/asserted here, it can be misleading and won’t catch regressions around cleanup. Consider removing it, or asserting that the async context manager’s __aexit__ (and ultimately close()) is invoked.
Adds durable list-based messaging to the Redis provider using LPUSH/BRPOP, complementing the existing pub/sub support.
Motivation: Redis pub/sub is fire-and-forget — messages are lost if no subscriber is listening. List-based queues (LPUSH + BRPOP) provide message persistence until consumed, exactly-once delivery via atomic BRPOP, and priority queue patterns via ordered multi-list BRPOP.
New components:
RedisLPushOperator— push messages to Redis lists (FIFO queue producer)AwaitMessageFromListTrigger— async trigger using BRPOP with priority queue support (multiple lists checked in order)RedisListMessageQueueProvider— common-messaging integration withredis+list://schemeRedisHook.get_async_conn()— async context manager usingredis.asynciofor truly non-blocking trigger operations (the existing hook is sync-only, which requiressync_to_asyncwrapping in triggers)No new dependencies —
redis.asynciois part ofredis>=4.2.0, and the provider already requiresredis>=4.5.2.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.6) following the guidelines