diff --git a/CHANGELOG.md b/CHANGELOG.md index a4aa2b9..f421803 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.47 (2026-06-07) + +### Added (messaging — @message_listener retry + dead-letter routing) + +`@message_listener` gained resilience options (Spring Kafka `@RetryableTopic` / +`DefaultErrorHandler` parity), applied adapter-agnostically (Kafka, RabbitMQ, in-memory): + +- **`retries`** — re-invoke the handler N times on failure, with linear `retry_delay` + backoff (attempt N waits `retry_delay * N`). +- **`dead_letter_topic`** — a message still failing after `retries` is re-published there + with `x-original-topic` / `x-exception` headers, instead of propagating. + +With no options set, the handler is wired unchanged (zero overhead). The wrapper lives in +`pyfly.messaging.error_handling.wrap_listener` and is applied during listener wiring. + ## v26.06.46 (2026-06-07) ### Added (container — SESSION bean scope) diff --git a/README.md b/README.md index 6077ad0..18e8462 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.46 + Version: 26.06.47 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/pyproject.toml b/pyproject.toml index c801c44..206bb52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.46" +version = "26.6.47" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index 2c93c30..43c8b96 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.46" +__version__ = "26.06.47" diff --git a/src/pyfly/context/application_context.py b/src/pyfly/context/application_context.py index 0a7ff6c..546abc9 100644 --- a/src/pyfly/context/application_context.py +++ b/src/pyfly/context/application_context.py @@ -728,8 +728,18 @@ def _wire_message_listeners(self) -> None: return topic = getattr(method, "__pyfly_listener_topic__", "") group = getattr(method, "__pyfly_listener_group__", None) + # Apply retry + dead-letter handling (adapter-agnostic) when configured. + from pyfly.messaging.error_handling import wrap_listener + + handler = wrap_listener( + method, + broker, + retries=getattr(method, "__pyfly_listener_retries__", 0), + retry_delay=getattr(method, "__pyfly_listener_retry_delay__", 0.0), + dead_letter_topic=getattr(method, "__pyfly_listener_dlq__", None), + ) # MessageBrokerPort.subscribe is async; defer via create_task - task = asyncio.get_running_loop().create_task(broker.subscribe(topic, method, group=group)) + task = asyncio.get_running_loop().create_task(broker.subscribe(topic, handler, group=group)) self._background_tasks.append(task) count += 1 self._wiring_counts["message_listeners"] = count diff --git a/src/pyfly/messaging/decorators.py b/src/pyfly/messaging/decorators.py index 50e3d02..48738ef 100644 --- a/src/pyfly/messaging/decorators.py +++ b/src/pyfly/messaging/decorators.py @@ -21,13 +21,32 @@ F = TypeVar("F", bound=Callable[..., Any]) -def message_listener(topic: str, group: str | None = None) -> Callable[[F], F]: - """Mark a method as a message listener for the given topic.""" +def message_listener( + topic: str, + group: str | None = None, + *, + retries: int = 0, + retry_delay: float = 0.0, + dead_letter_topic: str | None = None, +) -> Callable[[F], F]: + """Mark a method as a message listener for the given topic. + + Args: + topic: Topic to subscribe to. + group: Optional consumer group. + retries: Times to re-invoke the handler on failure (linear ``retry_delay`` backoff). + retry_delay: Base delay (seconds) between retries; attempt N waits ``retry_delay * N``. + dead_letter_topic: When set, a message still failing after *retries* is re-published + here (with ``x-original-topic`` / ``x-exception`` headers) instead of propagating. + """ def decorator(func: F) -> F: func.__pyfly_message_listener__ = True # type: ignore[attr-defined] func.__pyfly_listener_topic__ = topic # type: ignore[attr-defined] func.__pyfly_listener_group__ = group # type: ignore[attr-defined] + func.__pyfly_listener_retries__ = retries # type: ignore[attr-defined] + func.__pyfly_listener_retry_delay__ = retry_delay # type: ignore[attr-defined] + func.__pyfly_listener_dlq__ = dead_letter_topic # type: ignore[attr-defined] return func return decorator diff --git a/src/pyfly/messaging/error_handling.py b/src/pyfly/messaging/error_handling.py new file mode 100644 index 0000000..72e2553 --- /dev/null +++ b/src/pyfly/messaging/error_handling.py @@ -0,0 +1,84 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Retry + dead-letter handling for ``@message_listener`` handlers. + +Adapter-agnostic: the handler is wrapped once at wiring time, so retry/DLQ behaves +identically across the Kafka, RabbitMQ, and in-memory brokers. Mirrors Spring Kafka's +``@RetryableTopic`` / ``DefaultErrorHandler`` dead-letter routing. +""" + +from __future__ import annotations + +import asyncio +import functools +import logging + +from pyfly.messaging.ports.outbound import MessageBrokerPort, MessageHandler +from pyfly.messaging.types import Message + +logger = logging.getLogger(__name__) + + +def wrap_listener( + handler: MessageHandler, + broker: MessageBrokerPort, + *, + retries: int = 0, + retry_delay: float = 0.0, + dead_letter_topic: str | None = None, +) -> MessageHandler: + """Wrap *handler* so a failing message is retried up to *retries* times (linear + ``retry_delay`` backoff) and, if still failing and *dead_letter_topic* is set, + re-published there with diagnostic headers. With no retries and no DLQ, *handler* + is returned unchanged (zero overhead).""" + if retries <= 0 and dead_letter_topic is None: + return handler + + @functools.wraps(handler) + async def wrapped(message: Message) -> None: + attempt = 0 + while True: + try: + await handler(message) + return + except Exception as exc: # noqa: BLE001 - the listener contract is to handle/route, not crash the consumer + if attempt < retries: + attempt += 1 + if retry_delay > 0: + await asyncio.sleep(retry_delay * attempt) + logger.warning( + "message_listener retry %d/%d for topic %s: %s", attempt, retries, message.topic, exc + ) + continue + if dead_letter_topic is not None: + await broker.publish( + dead_letter_topic, + message.value, + key=message.key, + headers={ + **message.headers, + "x-original-topic": message.topic, + "x-exception": type(exc).__name__, + }, + ) + logger.error( + "message_listener exhausted %d retries; routed topic %s -> DLQ %s", + retries, + message.topic, + dead_letter_topic, + ) + return + raise + + return wrapped diff --git a/tests/messaging/test_listener_retry_dlq.py b/tests/messaging/test_listener_retry_dlq.py new file mode 100644 index 0000000..3f2a459 --- /dev/null +++ b/tests/messaging/test_listener_retry_dlq.py @@ -0,0 +1,87 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""@message_listener retry + dead-letter routing (v26.06.47).""" + +from __future__ import annotations + +import pytest + +from pyfly.messaging.error_handling import wrap_listener +from pyfly.messaging.types import Message + + +class _FakeBroker: + def __init__(self) -> None: + self.published: list[tuple[str, bytes, bytes | None, dict[str, str] | None]] = [] + + async def publish( + self, topic: str, value: bytes, *, key: bytes | None = None, headers: dict[str, str] | None = None + ) -> None: + self.published.append((topic, value, key, headers)) + + +def _msg() -> Message: + return Message(topic="orders", value=b"data", key=b"k", headers={"h": "1"}) + + +@pytest.mark.asyncio +async def test_no_config_returns_handler_unchanged() -> None: + async def handler(_m: Message) -> None: ... + + assert wrap_listener(handler, _FakeBroker()) is handler # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_retries_then_succeeds() -> None: + calls = {"n": 0} + + async def handler(_m: Message) -> None: + calls["n"] += 1 + if calls["n"] < 3: + raise ValueError("boom") + + broker = _FakeBroker() + wrapped = wrap_listener(handler, broker, retries=3) # type: ignore[arg-type] + await wrapped(_msg()) + assert calls["n"] == 3 + assert broker.published == [] # succeeded before DLQ + + +@pytest.mark.asyncio +async def test_exhausted_retries_routes_to_dlq() -> None: + async def handler(_m: Message) -> None: + raise ValueError("boom") + + broker = _FakeBroker() + wrapped = wrap_listener(handler, broker, retries=2, dead_letter_topic="orders.DLT") # type: ignore[arg-type] + await wrapped(_msg()) # must not raise + + assert len(broker.published) == 1 + topic, value, key, headers = broker.published[0] + assert topic == "orders.DLT" + assert value == b"data" + assert key == b"k" + assert headers is not None + assert headers["x-original-topic"] == "orders" + assert headers["x-exception"] == "ValueError" + + +@pytest.mark.asyncio +async def test_exhausted_retries_without_dlq_reraises() -> None: + async def handler(_m: Message) -> None: + raise ValueError("boom") + + wrapped = wrap_listener(handler, _FakeBroker(), retries=1) # type: ignore[arg-type] + with pytest.raises(ValueError, match="boom"): + await wrapped(_msg()) diff --git a/uv.lock b/uv.lock index 7992b2d..eefba81 100644 --- a/uv.lock +++ b/uv.lock @@ -1981,7 +1981,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.46" +version = "26.6.47" source = { editable = "." } dependencies = [ { name = "pydantic" },