diff --git a/CHANGELOG.md b/CHANGELOG.md
index a4aa2b9..f421803 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
---
+## v26.06.47 (2026-06-07)
+
+### Added (messaging — @message_listener retry + dead-letter routing)
+
+`@message_listener` gained resilience options (Spring Kafka `@RetryableTopic` /
+`DefaultErrorHandler` parity), applied adapter-agnostically (Kafka, RabbitMQ, in-memory):
+
+- **`retries`** — re-invoke the handler N times on failure, with linear `retry_delay`
+ backoff (attempt N waits `retry_delay * N`).
+- **`dead_letter_topic`** — a message still failing after `retries` is re-published there
+ with `x-original-topic` / `x-exception` headers, instead of propagating.
+
+With no options set, the handler is wired unchanged (zero overhead). The wrapper lives in
+`pyfly.messaging.error_handling.wrap_listener` and is applied during listener wiring.
+
## v26.06.46 (2026-06-07)
### Added (container — SESSION bean scope)
diff --git a/README.md b/README.md
index 6077ad0..18e8462 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
-
+
diff --git a/pyproject.toml b/pyproject.toml
index c801c44..206bb52 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
-version = "26.6.46"
+version = "26.6.47"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py
index 2c93c30..43c8b96 100644
--- a/src/pyfly/__init__.py
+++ b/src/pyfly/__init__.py
@@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""
-__version__ = "26.06.46"
+__version__ = "26.06.47"
diff --git a/src/pyfly/context/application_context.py b/src/pyfly/context/application_context.py
index 0a7ff6c..546abc9 100644
--- a/src/pyfly/context/application_context.py
+++ b/src/pyfly/context/application_context.py
@@ -728,8 +728,18 @@ def _wire_message_listeners(self) -> None:
return
topic = getattr(method, "__pyfly_listener_topic__", "")
group = getattr(method, "__pyfly_listener_group__", None)
+ # Apply retry + dead-letter handling (adapter-agnostic) when configured.
+ from pyfly.messaging.error_handling import wrap_listener
+
+ handler = wrap_listener(
+ method,
+ broker,
+ retries=getattr(method, "__pyfly_listener_retries__", 0),
+ retry_delay=getattr(method, "__pyfly_listener_retry_delay__", 0.0),
+ dead_letter_topic=getattr(method, "__pyfly_listener_dlq__", None),
+ )
# MessageBrokerPort.subscribe is async; defer via create_task
- task = asyncio.get_running_loop().create_task(broker.subscribe(topic, method, group=group))
+ task = asyncio.get_running_loop().create_task(broker.subscribe(topic, handler, group=group))
self._background_tasks.append(task)
count += 1
self._wiring_counts["message_listeners"] = count
diff --git a/src/pyfly/messaging/decorators.py b/src/pyfly/messaging/decorators.py
index 50e3d02..48738ef 100644
--- a/src/pyfly/messaging/decorators.py
+++ b/src/pyfly/messaging/decorators.py
@@ -21,13 +21,32 @@
F = TypeVar("F", bound=Callable[..., Any])
-def message_listener(topic: str, group: str | None = None) -> Callable[[F], F]:
- """Mark a method as a message listener for the given topic."""
+def message_listener(
+ topic: str,
+ group: str | None = None,
+ *,
+ retries: int = 0,
+ retry_delay: float = 0.0,
+ dead_letter_topic: str | None = None,
+) -> Callable[[F], F]:
+ """Mark a method as a message listener for the given topic.
+
+ Args:
+ topic: Topic to subscribe to.
+ group: Optional consumer group.
+ retries: Times to re-invoke the handler on failure (linear ``retry_delay`` backoff).
+ retry_delay: Base delay (seconds) between retries; attempt N waits ``retry_delay * N``.
+ dead_letter_topic: When set, a message still failing after *retries* is re-published
+ here (with ``x-original-topic`` / ``x-exception`` headers) instead of propagating.
+ """
def decorator(func: F) -> F:
func.__pyfly_message_listener__ = True # type: ignore[attr-defined]
func.__pyfly_listener_topic__ = topic # type: ignore[attr-defined]
func.__pyfly_listener_group__ = group # type: ignore[attr-defined]
+ func.__pyfly_listener_retries__ = retries # type: ignore[attr-defined]
+ func.__pyfly_listener_retry_delay__ = retry_delay # type: ignore[attr-defined]
+ func.__pyfly_listener_dlq__ = dead_letter_topic # type: ignore[attr-defined]
return func
return decorator
diff --git a/src/pyfly/messaging/error_handling.py b/src/pyfly/messaging/error_handling.py
new file mode 100644
index 0000000..72e2553
--- /dev/null
+++ b/src/pyfly/messaging/error_handling.py
@@ -0,0 +1,84 @@
+# Copyright 2026 Firefly Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Retry + dead-letter handling for ``@message_listener`` handlers.
+
+Adapter-agnostic: the handler is wrapped once at wiring time, so retry/DLQ behaves
+identically across the Kafka, RabbitMQ, and in-memory brokers. Mirrors Spring Kafka's
+``@RetryableTopic`` / ``DefaultErrorHandler`` dead-letter routing.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import functools
+import logging
+
+from pyfly.messaging.ports.outbound import MessageBrokerPort, MessageHandler
+from pyfly.messaging.types import Message
+
+logger = logging.getLogger(__name__)
+
+
+def wrap_listener(
+ handler: MessageHandler,
+ broker: MessageBrokerPort,
+ *,
+ retries: int = 0,
+ retry_delay: float = 0.0,
+ dead_letter_topic: str | None = None,
+) -> MessageHandler:
+ """Wrap *handler* so a failing message is retried up to *retries* times (linear
+ ``retry_delay`` backoff) and, if still failing and *dead_letter_topic* is set,
+ re-published there with diagnostic headers. With no retries and no DLQ, *handler*
+ is returned unchanged (zero overhead)."""
+ if retries <= 0 and dead_letter_topic is None:
+ return handler
+
+ @functools.wraps(handler)
+ async def wrapped(message: Message) -> None:
+ attempt = 0
+ while True:
+ try:
+ await handler(message)
+ return
+ except Exception as exc: # noqa: BLE001 - the listener contract is to handle/route, not crash the consumer
+ if attempt < retries:
+ attempt += 1
+ if retry_delay > 0:
+ await asyncio.sleep(retry_delay * attempt)
+ logger.warning(
+ "message_listener retry %d/%d for topic %s: %s", attempt, retries, message.topic, exc
+ )
+ continue
+ if dead_letter_topic is not None:
+ await broker.publish(
+ dead_letter_topic,
+ message.value,
+ key=message.key,
+ headers={
+ **message.headers,
+ "x-original-topic": message.topic,
+ "x-exception": type(exc).__name__,
+ },
+ )
+ logger.error(
+ "message_listener exhausted %d retries; routed topic %s -> DLQ %s",
+ retries,
+ message.topic,
+ dead_letter_topic,
+ )
+ return
+ raise
+
+ return wrapped
diff --git a/tests/messaging/test_listener_retry_dlq.py b/tests/messaging/test_listener_retry_dlq.py
new file mode 100644
index 0000000..3f2a459
--- /dev/null
+++ b/tests/messaging/test_listener_retry_dlq.py
@@ -0,0 +1,87 @@
+# Copyright 2026 Firefly Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""@message_listener retry + dead-letter routing (v26.06.47)."""
+
+from __future__ import annotations
+
+import pytest
+
+from pyfly.messaging.error_handling import wrap_listener
+from pyfly.messaging.types import Message
+
+
+class _FakeBroker:
+ def __init__(self) -> None:
+ self.published: list[tuple[str, bytes, bytes | None, dict[str, str] | None]] = []
+
+ async def publish(
+ self, topic: str, value: bytes, *, key: bytes | None = None, headers: dict[str, str] | None = None
+ ) -> None:
+ self.published.append((topic, value, key, headers))
+
+
+def _msg() -> Message:
+ return Message(topic="orders", value=b"data", key=b"k", headers={"h": "1"})
+
+
+@pytest.mark.asyncio
+async def test_no_config_returns_handler_unchanged() -> None:
+ async def handler(_m: Message) -> None: ...
+
+ assert wrap_listener(handler, _FakeBroker()) is handler # type: ignore[arg-type]
+
+
+@pytest.mark.asyncio
+async def test_retries_then_succeeds() -> None:
+ calls = {"n": 0}
+
+ async def handler(_m: Message) -> None:
+ calls["n"] += 1
+ if calls["n"] < 3:
+ raise ValueError("boom")
+
+ broker = _FakeBroker()
+ wrapped = wrap_listener(handler, broker, retries=3) # type: ignore[arg-type]
+ await wrapped(_msg())
+ assert calls["n"] == 3
+ assert broker.published == [] # succeeded before DLQ
+
+
+@pytest.mark.asyncio
+async def test_exhausted_retries_routes_to_dlq() -> None:
+ async def handler(_m: Message) -> None:
+ raise ValueError("boom")
+
+ broker = _FakeBroker()
+ wrapped = wrap_listener(handler, broker, retries=2, dead_letter_topic="orders.DLT") # type: ignore[arg-type]
+ await wrapped(_msg()) # must not raise
+
+ assert len(broker.published) == 1
+ topic, value, key, headers = broker.published[0]
+ assert topic == "orders.DLT"
+ assert value == b"data"
+ assert key == b"k"
+ assert headers is not None
+ assert headers["x-original-topic"] == "orders"
+ assert headers["x-exception"] == "ValueError"
+
+
+@pytest.mark.asyncio
+async def test_exhausted_retries_without_dlq_reraises() -> None:
+ async def handler(_m: Message) -> None:
+ raise ValueError("boom")
+
+ wrapped = wrap_listener(handler, _FakeBroker(), retries=1) # type: ignore[arg-type]
+ with pytest.raises(ValueError, match="boom"):
+ await wrapped(_msg())
diff --git a/uv.lock b/uv.lock
index 7992b2d..eefba81 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1981,7 +1981,7 @@ wheels = [
[[package]]
name = "pyfly"
-version = "26.6.46"
+version = "26.6.47"
source = { editable = "." }
dependencies = [
{ name = "pydantic" },