Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## v26.06.47 (2026-06-07)

### Added (messaging — @message_listener retry + dead-letter routing)

`@message_listener` gained resilience options (Spring Kafka `@RetryableTopic` /
`DefaultErrorHandler` parity), applied adapter-agnostically (Kafka, RabbitMQ, in-memory):

- **`retries`** — re-invoke the handler N times on failure, with linear `retry_delay`
backoff (attempt N waits `retry_delay * N`).
- **`dead_letter_topic`** — a message still failing after `retries` is re-published there
with `x-original-topic` / `x-exception` headers, instead of propagating.

With no options set, the handler is wired unchanged (zero overhead). The wrapper lives in
`pyfly.messaging.error_handling.wrap_listener` and is applied during listener wiring.

## v26.06.46 (2026-06-07)

### Added (container — SESSION bean scope)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<a href="https://github.com/fireflyframework"><img src="https://img.shields.io/badge/Firefly_Framework-official-ff6600?logo=data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCI+PHBhdGggZmlsbD0id2hpdGUiIGQ9Ik0xMiAyQzYuNDggMiAyIDYuNDggMiAxMnM0LjQ4IDEwIDEwIDEwIDEwLTQuNDggMTAtMTBTMTcuNTIgMiAxMiAyeiIvPjwvc3ZnPg==" alt="Firefly Framework"></a>
<a href="https://www.python.org/"><img src="https://img.shields.io/badge/python-3.12%2B-blue?logo=python&logoColor=white" alt="Python 3.12+"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-green" alt="License: Apache 2.0"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.46-brightgreen" alt="Version: 26.06.46"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.47-brightgreen" alt="Version: 26.06.47"></a>
<a href="#"><img src="https://img.shields.io/badge/type--checked-mypy%20strict-blue?logo=python&logoColor=white" alt="Type Checked: mypy strict"></a>
<a href="#"><img src="https://img.shields.io/badge/code%20style-ruff-purple?logo=ruff&logoColor=white" alt="Code Style: Ruff"></a>
<a href="#"><img src="https://img.shields.io/badge/async-first-brightgreen" alt="Async First"></a>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
version = "26.6.46"
version = "26.6.47"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/pyfly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""

__version__ = "26.06.46"
__version__ = "26.06.47"
12 changes: 11 additions & 1 deletion src/pyfly/context/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,18 @@ def _wire_message_listeners(self) -> None:
return
topic = getattr(method, "__pyfly_listener_topic__", "")
group = getattr(method, "__pyfly_listener_group__", None)
# Apply retry + dead-letter handling (adapter-agnostic) when configured.
from pyfly.messaging.error_handling import wrap_listener

handler = wrap_listener(
method,
broker,
retries=getattr(method, "__pyfly_listener_retries__", 0),
retry_delay=getattr(method, "__pyfly_listener_retry_delay__", 0.0),
dead_letter_topic=getattr(method, "__pyfly_listener_dlq__", None),
)
# MessageBrokerPort.subscribe is async; defer via create_task
task = asyncio.get_running_loop().create_task(broker.subscribe(topic, method, group=group))
task = asyncio.get_running_loop().create_task(broker.subscribe(topic, handler, group=group))
self._background_tasks.append(task)
count += 1
self._wiring_counts["message_listeners"] = count
Expand Down
23 changes: 21 additions & 2 deletions src/pyfly/messaging/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,32 @@
F = TypeVar("F", bound=Callable[..., Any])


def message_listener(topic: str, group: str | None = None) -> Callable[[F], F]:
"""Mark a method as a message listener for the given topic."""
def message_listener(
topic: str,
group: str | None = None,
*,
retries: int = 0,
retry_delay: float = 0.0,
dead_letter_topic: str | None = None,
) -> Callable[[F], F]:
"""Mark a method as a message listener for the given topic.

Args:
topic: Topic to subscribe to.
group: Optional consumer group.
retries: Times to re-invoke the handler on failure (linear ``retry_delay`` backoff).
retry_delay: Base delay (seconds) between retries; attempt N waits ``retry_delay * N``.
dead_letter_topic: When set, a message still failing after *retries* is re-published
here (with ``x-original-topic`` / ``x-exception`` headers) instead of propagating.
"""

def decorator(func: F) -> F:
func.__pyfly_message_listener__ = True # type: ignore[attr-defined]
func.__pyfly_listener_topic__ = topic # type: ignore[attr-defined]
func.__pyfly_listener_group__ = group # type: ignore[attr-defined]
func.__pyfly_listener_retries__ = retries # type: ignore[attr-defined]
func.__pyfly_listener_retry_delay__ = retry_delay # type: ignore[attr-defined]
func.__pyfly_listener_dlq__ = dead_letter_topic # type: ignore[attr-defined]
return func

return decorator
84 changes: 84 additions & 0 deletions src/pyfly/messaging/error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Retry + dead-letter handling for ``@message_listener`` handlers.

Adapter-agnostic: the handler is wrapped once at wiring time, so retry/DLQ behaves
identically across the Kafka, RabbitMQ, and in-memory brokers. Mirrors Spring Kafka's
``@RetryableTopic`` / ``DefaultErrorHandler`` dead-letter routing.
"""

from __future__ import annotations

import asyncio
import functools
import logging

from pyfly.messaging.ports.outbound import MessageBrokerPort, MessageHandler
from pyfly.messaging.types import Message

logger = logging.getLogger(__name__)


def wrap_listener(
handler: MessageHandler,
broker: MessageBrokerPort,
*,
retries: int = 0,
retry_delay: float = 0.0,
dead_letter_topic: str | None = None,
) -> MessageHandler:
"""Wrap *handler* so a failing message is retried up to *retries* times (linear
``retry_delay`` backoff) and, if still failing and *dead_letter_topic* is set,
re-published there with diagnostic headers. With no retries and no DLQ, *handler*
is returned unchanged (zero overhead)."""
if retries <= 0 and dead_letter_topic is None:
return handler

@functools.wraps(handler)
async def wrapped(message: Message) -> None:
attempt = 0
while True:
try:
await handler(message)
return
except Exception as exc: # noqa: BLE001 - the listener contract is to handle/route, not crash the consumer
if attempt < retries:
attempt += 1
if retry_delay > 0:
await asyncio.sleep(retry_delay * attempt)
logger.warning(
"message_listener retry %d/%d for topic %s: %s", attempt, retries, message.topic, exc
)
continue
if dead_letter_topic is not None:
await broker.publish(
dead_letter_topic,
message.value,
key=message.key,
headers={
**message.headers,
"x-original-topic": message.topic,
"x-exception": type(exc).__name__,
},
)
logger.error(
"message_listener exhausted %d retries; routed topic %s -> DLQ %s",
retries,
message.topic,
dead_letter_topic,
)
return
raise

return wrapped
87 changes: 87 additions & 0 deletions tests/messaging/test_listener_retry_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""@message_listener retry + dead-letter routing (v26.06.47)."""

from __future__ import annotations

import pytest

from pyfly.messaging.error_handling import wrap_listener
from pyfly.messaging.types import Message


class _FakeBroker:
def __init__(self) -> None:
self.published: list[tuple[str, bytes, bytes | None, dict[str, str] | None]] = []

async def publish(
self, topic: str, value: bytes, *, key: bytes | None = None, headers: dict[str, str] | None = None
) -> None:
self.published.append((topic, value, key, headers))


def _msg() -> Message:
return Message(topic="orders", value=b"data", key=b"k", headers={"h": "1"})


@pytest.mark.asyncio
async def test_no_config_returns_handler_unchanged() -> None:
async def handler(_m: Message) -> None: ...

assert wrap_listener(handler, _FakeBroker()) is handler # type: ignore[arg-type]


@pytest.mark.asyncio
async def test_retries_then_succeeds() -> None:
calls = {"n": 0}

async def handler(_m: Message) -> None:
calls["n"] += 1
if calls["n"] < 3:
raise ValueError("boom")

broker = _FakeBroker()
wrapped = wrap_listener(handler, broker, retries=3) # type: ignore[arg-type]
await wrapped(_msg())
assert calls["n"] == 3
assert broker.published == [] # succeeded before DLQ


@pytest.mark.asyncio
async def test_exhausted_retries_routes_to_dlq() -> None:
async def handler(_m: Message) -> None:
raise ValueError("boom")

broker = _FakeBroker()
wrapped = wrap_listener(handler, broker, retries=2, dead_letter_topic="orders.DLT") # type: ignore[arg-type]
await wrapped(_msg()) # must not raise

assert len(broker.published) == 1
topic, value, key, headers = broker.published[0]
assert topic == "orders.DLT"
assert value == b"data"
assert key == b"k"
assert headers is not None
assert headers["x-original-topic"] == "orders"
assert headers["x-exception"] == "ValueError"


@pytest.mark.asyncio
async def test_exhausted_retries_without_dlq_reraises() -> None:
async def handler(_m: Message) -> None:
raise ValueError("boom")

wrapped = wrap_listener(handler, _FakeBroker(), retries=1) # type: ignore[arg-type]
with pytest.raises(ValueError, match="boom"):
await wrapped(_msg())
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading