From 9bac6558e479dcb782765f33efd8f2af8aae33b2 Mon Sep 17 00:00:00 2001 From: nachatz Date: Sat, 24 Jan 2026 19:45:08 -0800 Subject: [PATCH] feat: add async redeliver supprot --- pulsar/asyncio.py | 11 +++++++++++ tests/asyncio_test.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 064e353..7c3f14d 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -399,6 +399,17 @@ async def close(self) -> None: self._consumer.close_async(functools.partial(_set_future, future, value=None)) await future + def redeliver_unacknowledged_messages(self): + """ + Redelivers all the unacknowledged messages. In failover mode, the + request is ignored if the consumer is not active for the given topic. In + shared mode, the consumer's messages to be redelivered are distributed + across all the connected consumers. This is a non-blocking call and + doesn't throw an exception. In case the connection breaks, the messages + are redelivered after reconnect. + """ + self._consumer.redeliver_unacknowledged_messages() + def topic(self) -> str: """ Return the topic this consumer is subscribed to. diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 66ff0fd..4fadde2 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -267,6 +267,44 @@ async def verify_receive(consumer: Consumer): await verify_receive(consumer) await consumer.close() + async def test_async_dead_letter_policy(self): + topic = f'asyncio-test-dlq-{time.time()}' + dlq_topic = 'dlq-' + topic + max_redeliver_count = 5 + + dlq_consumer = await self._client.subscribe(dlq_topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared) + consumer = await self._client.subscribe(topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared, + dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub')) + producer = await self._client.create_producer(topic) + + # Sen num msgs. + num = 10 + for i in range(num): + await producer.send(b"hello-%d" % i) + await producer.flush() + + # Redelivery all messages maxRedeliverCountNum time. + for i in range(1, num * max_redeliver_count + num + 1): + msg = await consumer.receive() + if i % num == 0: + consumer.redeliver_unacknowledged_messages() + print(f"Start redeliver msgs '{i}'") + + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(consumer.receive(), 0.1) + + for i in range(num): + msg = await dlq_consumer.receive() + self.assertTrue(msg) + self.assertEqual(msg.data(), b"hello-%d" % i) + dlq_consumer.acknowledge(msg) + + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(dlq_consumer.receive(), 0.1) + + await consumer.close() + await dlq_consumer.close() + async def test_unsubscribe(self): topic = f'asyncio-test-unsubscribe-{time.time()}' sub = 'sub'