Skip to content

Add Redis list-based queue support with async hook#63937

Open
tardunge wants to merge 2 commits intoapache:mainfrom
tardunge:feature/redis-list-queue-support
Open

Add Redis list-based queue support with async hook#63937
tardunge wants to merge 2 commits intoapache:mainfrom
tardunge:feature/redis-list-queue-support

Conversation

@tardunge
Copy link
Copy Markdown

@tardunge tardunge commented Mar 19, 2026

Adds durable list-based messaging to the Redis provider using LPUSH/BRPOP, complementing the existing pub/sub support.

Motivation: Redis pub/sub is fire-and-forget — messages are lost if no subscriber is listening. List-based queues (LPUSH + BRPOP) provide message persistence until consumed, exactly-once delivery via atomic BRPOP, and priority queue patterns via ordered multi-list BRPOP.

New components:

  • RedisLPushOperator — push messages to Redis lists (FIFO queue producer)
  • AwaitMessageFromListTrigger — async trigger using BRPOP with priority queue support (multiple lists checked in order)
  • RedisListMessageQueueProvider — common-messaging integration with redis+list:// scheme
  • RedisHook.get_async_conn() — async context manager using redis.asyncio for truly non-blocking trigger operations (the existing hook is sync-only, which requires sync_to_async wrapping in triggers)

No new dependenciesredis.asyncio is part of redis>=4.2.0, and the provider already requires redis>=4.5.2.


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.6)

Generated-by: Claude Code (Opus 4.6) following the guidelines

@potiuk
Copy link
Copy Markdown
Member

potiuk commented Mar 20, 2026

@tardunge This PR has a few issues that need to be addressed before it can be reviewed — please see our Pull Request quality criteria.

Issues found:

  • mypy (type checking): Failing: CI image checks / MyPy checks (mypy-providers). Run prek --stage manual mypy-providers --all-files locally to reproduce. You need breeze ci-image build --python 3.10 for Docker-based mypy. See mypy (type checking) docs.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates.
  • Maintainers will then proceed with a normal review.

There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack.

TARDUNGE and others added 2 commits March 21, 2026 03:07
Adds durable list-based messaging to the Redis provider using LPUSH/BRPOP,
complementing the existing pub/sub support. List-based queues provide
message persistence until consumed and exactly-once delivery semantics
via atomic BRPOP, which pub/sub cannot guarantee.

New components:
- RedisLPushOperator: push messages to Redis lists
- AwaitMessageFromListTrigger: async trigger using BRPOP with
  priority queue support (multiple lists checked in order)
- RedisListMessageQueueProvider: common-messaging integration
  with redis+list:// scheme
- RedisHook.get_async_conn(): async context manager using
  redis.asyncio for truly non-blocking trigger operations
- Provide defaults for host/port/db to satisfy AsyncRedis constructor
  type requirements (str, int, int instead of Optional types)
- Use close() instead of aclose() for redis-py compatibility
@tardunge tardunge force-pushed the feature/redis-list-queue-support branch from e476eb6 to aeaa9c0 Compare March 20, 2026 19:07
@kaxil kaxil requested a review from Copilot April 2, 2026 00:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Redis list-backed queue support to the Redis provider (durable consumption via LPUSH + BRPOP) and integrates it with the Common Messaging MessageQueueTrigger via a new redis+list scheme. It also introduces an async Redis hook helper to avoid sync_to_async in triggers.

Changes:

  • Added RedisLPushOperator for producing messages to Redis lists.
  • Added AwaitMessageFromListTrigger using async BRPOP (supports prioritization via multiple lists).
  • Added RedisHook.get_async_conn() async context manager plus provider registration updates and unit tests.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
providers/redis/src/airflow/providers/redis/hooks/redis.py Adds get_async_conn() to create/cleanup an async Redis client for triggers.
providers/redis/src/airflow/providers/redis/triggers/redis_list_await_message.py New async trigger that blocks on BRPOP across one or more lists.
providers/redis/src/airflow/providers/redis/operators/redis_lpush.py New operator to LPUSH messages to a Redis list.
providers/redis/src/airflow/providers/redis/queues/redis_list.py Common-messaging provider wiring for the redis+list scheme.
providers/redis/src/airflow/providers/redis/get_provider_info.py Registers the new operator/trigger/queue provider in generated provider info.
providers/redis/provider.yaml Registers the new operator/trigger/queue provider for the Redis provider package.
providers/redis/tests/unit/redis/triggers/test_redis_list_await_message.py Unit tests for trigger serialization and run() behavior (string/bytes, multi-list).
providers/redis/tests/unit/redis/operators/test_redis_lpush.py Unit tests for RedisLPushOperator.execute().
providers/redis/tests/unit/redis/queues/test_redis_list.py Unit tests for scheme matching and MessageQueueTrigger integration.


Unlike pub/sub, list-based messaging provides:
- **Durability**: messages persist in the list until consumed
- **Exactly-once delivery**: BRPOP atomically removes and returns the message
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring claims “Exactly-once delivery” via BRPOP, but Redis lists don’t provide end-to-end exactly-once semantics: BRPOP removes the item before user processing, so a consumer crash after pop can lose the message (at-most-once). Consider rewording to “atomic consumption” / “at-most-once delivery”, or mention a safer pattern (e.g., BRPOPLPUSH/RPOPLPUSH to a processing list) if you want crash-safe processing.

Suggested change
- **Exactly-once delivery**: BRPOP atomically removes and returns the message
- **At-most-once delivery semantics**: BRPOP atomically removes and returns the
message from the list, so each item is delivered at most once to this consumer

Copilot uses AI. Check for mistakes.
Comment on lines +35 to +36
Combined with BRPOP on the consumer side, this provides a FIFO queue pattern
with durable, exactly-once delivery semantics.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator docstring states “exactly-once delivery semantics”, but LPUSH+BRPOP only guarantees atomic removal from the list; it does not guarantee end-to-end exactly-once processing (messages can be lost if the consumer crashes after BRPOP). Please adjust the wording to avoid overstating delivery guarantees.

Suggested change
Combined with BRPOP on the consumer side, this provides a FIFO queue pattern
with durable, exactly-once delivery semantics.
Combined with BRPOP on the consumer side, this can be used to implement a simple
FIFO queue pattern backed by Redis; end-to-end delivery guarantees depend on the
consumer and processing logic.

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +40
* **Durability**: Messages persist in the list until consumed (no message loss).
* **Exactly-once delivery**: BRPOP atomically removes and returns the message.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provider description claims “Exactly-once delivery” for BRPOP. Redis list pop operations are atomic, but do not provide end-to-end exactly-once processing guarantees. Please reword to “atomic consumption” / “at-most-once delivery” or describe an ack/retry pattern if needed.

Suggested change
* **Durability**: Messages persist in the list until consumed (no message loss).
* **Exactly-once delivery**: BRPOP atomically removes and returns the message.
* **Durability**: Messages persist in the list until consumed.
* **Atomic consumption / at-most-once delivery**: ``BRPOP`` atomically removes and returns messages.

Copilot uses AI. Check for mistakes.
async with hook.get_async_conn() as redis_conn:
await redis_conn.brpop("my_list", timeout=10)
"""
from redis.asyncio import Redis as AsyncRedis
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s a from redis.asyncio import Redis as AsyncRedis import inside this method body. Airflow generally avoids in-function imports unless needed for circular-import avoidance or lazy loading for isolation. Since redis>=4.5.2 is already required, consider moving this import to module scope (or add a short comment explaining why lazy import is required here).

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +147
@asynccontextmanager
async def get_async_conn(self) -> AsyncIterator[Any]:
"""
Return an async Redis connection using ``redis.asyncio``.

This is an async context manager that creates and cleans up an async Redis connection.
Useful for triggers and other async components that need non-blocking Redis access.

Usage::

async with hook.get_async_conn() as redis_conn:
await redis_conn.brpop("my_list", timeout=10)
"""
from redis.asyncio import Redis as AsyncRedis

conn = self.get_connection(self.redis_conn_id)
host = conn.host
port = conn.port
username = conn.login
password = None if str(conn.password).lower() in ["none", "false", ""] else conn.password
db = conn.extra_dejson.get("db")

ssl_arg_names = [
"ssl",
"ssl_cert_reqs",
"ssl_ca_certs",
"ssl_keyfile",
"ssl_certfile",
"ssl_check_hostname",
]
ssl_args = {name: val for name, val in conn.extra_dejson.items() if name in ssl_arg_names}

async_redis = AsyncRedis(
host=host or "localhost",
port=port or 6379,
username=username,
password=password,
db=db or 0,
decode_responses=True,
**ssl_args,
)
try:
yield async_redis
finally:
await async_redis.close()
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisHook.get_async_conn() is a new public method but there are no direct unit tests covering its behavior (e.g., that it builds the async client with expected connection/SSL params and that close() is awaited in the finally). Since get_conn() already has unit coverage, please add analogous tests for this async method.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +30 to +34
def setup_method(self):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG("test_dag_id", schedule=None, default_args=args)
self.mock_context = MagicMock()

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.mock_context = MagicMock() is an unspecced mock, which can hide attribute errors and lead to brittle tests. Prefer using a real dict for context (when possible) or MagicMock(spec=...)/create_autospec(...) to constrain allowed attributes.

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +65
mock_redis = AsyncMock()
mock_redis.brpop.return_value = ("test_list", "test_data")
mock_redis.aclose = AsyncMock()

mock_async_conn.return_value.__aenter__ = AsyncMock(return_value=mock_redis)
mock_async_conn.return_value.__aexit__ = AsyncMock(return_value=False)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests set mock_redis.aclose = AsyncMock(), but the production code closes the client via await async_redis.close() in RedisHook.get_async_conn(). Since aclose is never used/asserted here, it can be misleading and won’t catch regressions around cleanup. Consider removing it, or asserting that the async context manager’s __aexit__ (and ultimately close()) is invoked.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants