From a4299985c5b6092951de99e587aa68497d12b9ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Sun, 7 Jun 2026 13:56:15 +0200 Subject: [PATCH] feat(resilience): @retry jitter + circuit-breaker failure-rate window & half-open tuning + bump v26.06.43 Completes v26.06.36 toward Resilience4j parity: - @retry(jitter): randomization fraction +/- jitter*wait on each backoff (anti thundering-herd). - CircuitBreaker(failure_rate_threshold, window_size): open on failure RATE over the last N calls (COUNT_BASED window) vs consecutive failures. - CircuitBreaker(half_open_max_calls): N trial calls in HALF_OPEN; that many successes close, any failure re-opens. Back-compat: consecutive-failure behavior unchanged when new options unset. Tests: tests/resilience/test_resilience_tuning.py (4). Gates: mypy --strict (619), ruff + format, full suite 3829 passed. --- CHANGELOG.md | 16 +++++ README.md | 2 +- pyproject.toml | 2 +- src/pyfly/__init__.py | 2 +- src/pyfly/resilience/circuit_breaker.py | 55 ++++++++++++++--- src/pyfly/resilience/retry.py | 9 ++- tests/resilience/test_resilience_tuning.py | 69 ++++++++++++++++++++++ uv.lock | 2 +- 8 files changed, 145 insertions(+), 12 deletions(-) create mode 100644 tests/resilience/test_resilience_tuning.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6472e1b..76ae56c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.43 (2026-06-07) + +### Added (resilience — tuning) + +Completes the v26.06.36 resilience decorators toward Resilience4j parity: + +- **`@retry(jitter=...)`** — a randomization fraction in ``[0, 1]`` applied to each backoff + wait (``±jitter * wait``), to avoid thundering-herd retries. +- **`CircuitBreaker` failure-rate window** — set `failure_rate_threshold` (+ `window_size`) + to open on the failure *rate* over the last N calls (Resilience4j COUNT_BASED) instead of + consecutive failures. +- **`CircuitBreaker(half_open_max_calls=...)`** — admit N trial calls in HALF_OPEN; that + many successes close the circuit, any failure re-opens it. + +Existing consecutive-failure behavior is unchanged when these options are not set. + ## v26.06.42 (2026-06-07) ### Added (cache — @cacheable condition / unless) diff --git a/README.md b/README.md index 3f5c22d..570a1c9 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.42 + Version: 26.06.43 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/pyproject.toml b/pyproject.toml index d4bf699..17e1f44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.42" +version = "26.6.43" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index dd7d9fa..3292972 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.42" +__version__ = "26.06.43" diff --git a/src/pyfly/resilience/circuit_breaker.py b/src/pyfly/resilience/circuit_breaker.py index 6af1549..56882ea 100644 --- a/src/pyfly/resilience/circuit_breaker.py +++ b/src/pyfly/resilience/circuit_breaker.py @@ -19,6 +19,7 @@ import inspect import threading import time +from collections import deque from collections.abc import Callable from enum import Enum from typing import Any @@ -37,9 +38,11 @@ class CircuitState(Enum): class CircuitBreaker: """A thread-safe circuit breaker. - Trips OPEN after *failure_threshold* consecutive failures; after - *recovery_timeout* seconds it moves to HALF_OPEN and allows a trial call — - success closes the circuit, failure re-opens it. + Opens either after *failure_threshold* consecutive failures (the default) or, when + *failure_rate_threshold* is set, once the failure rate over the last *window_size* + calls reaches that fraction (Resilience4j COUNT_BASED window). After *recovery_timeout* + seconds it moves to HALF_OPEN and admits up to *half_open_max_calls* trial calls — + that many successes close the circuit; any failure re-opens it. """ def __init__( @@ -47,13 +50,23 @@ def __init__( failure_threshold: int = 5, recovery_timeout: float = 30.0, expected: tuple[type[BaseException], ...] = (Exception,), + *, + failure_rate_threshold: float | None = None, + window_size: int = 10, + half_open_max_calls: int = 1, ) -> None: self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.expected = expected + self.failure_rate_threshold = failure_rate_threshold + self.window_size = window_size + self.half_open_max_calls = max(1, half_open_max_calls) self._failures = 0 + self._window: deque[bool] = deque(maxlen=window_size) # True=success, False=failure self._state = CircuitState.CLOSED self._opened_at = 0.0 + self._half_open_calls = 0 + self._half_open_successes = 0 self._lock = threading.Lock() @property @@ -65,26 +78,54 @@ def state(self) -> CircuitState: def _maybe_half_open(self) -> None: if self._state is CircuitState.OPEN and (time.monotonic() - self._opened_at) >= self.recovery_timeout: self._state = CircuitState.HALF_OPEN + self._half_open_calls = 0 + self._half_open_successes = 0 + + def _tripped(self) -> bool: + """Whether the failure threshold (consecutive count or windowed rate) is reached.""" + if self.failure_rate_threshold is not None: + if len(self._window) < self.window_size: + return False # require a full window before judging the rate + return (self._window.count(False) / len(self._window)) >= self.failure_rate_threshold + return self._failures >= self.failure_threshold def before_call(self) -> None: - """Raise :class:`CircuitBreakerException` when the circuit is open.""" + """Raise :class:`CircuitBreakerException` when the circuit is open or the half-open + probe budget is exhausted.""" with self._lock: self._maybe_half_open() if self._state is CircuitState.OPEN: raise CircuitBreakerException("Circuit breaker is open") + if self._state is CircuitState.HALF_OPEN: + if self._half_open_calls >= self.half_open_max_calls: + raise CircuitBreakerException("Circuit breaker is half-open (probe limit reached)") + self._half_open_calls += 1 def on_success(self) -> None: with self._lock: - self._failures = 0 - self._state = CircuitState.CLOSED + self._window.append(True) + if self._state is CircuitState.HALF_OPEN: + self._half_open_successes += 1 + if self._half_open_successes >= self.half_open_max_calls: + self._close() + else: + self._failures = 0 def on_failure(self) -> None: with self._lock: + self._window.append(False) self._failures += 1 - if self._state is CircuitState.HALF_OPEN or self._failures >= self.failure_threshold: + if self._state is CircuitState.HALF_OPEN or self._tripped(): self._state = CircuitState.OPEN self._opened_at = time.monotonic() + def _close(self) -> None: + self._state = CircuitState.CLOSED + self._failures = 0 + self._half_open_calls = 0 + self._half_open_successes = 0 + self._window.clear() + def circuit_breaker(breaker: CircuitBreaker) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Guard a callable with *breaker*: rejects calls while OPEN, records diff --git a/src/pyfly/resilience/retry.py b/src/pyfly/resilience/retry.py index cf21a37..a4e4f10 100644 --- a/src/pyfly/resilience/retry.py +++ b/src/pyfly/resilience/retry.py @@ -18,6 +18,7 @@ import asyncio import functools import inspect +import random import time from collections.abc import Callable from typing import Any @@ -29,6 +30,7 @@ def retry( delay: float = 0.0, backoff: float = 1.0, max_delay: float | None = None, + jitter: float = 0.0, exceptions: tuple[type[BaseException], ...] = (Exception,), ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Re-invoke the wrapped callable up to *max_attempts* times while it raises one of @@ -41,6 +43,8 @@ def retry( delay: Base delay (seconds) before the first retry. backoff: Multiplier applied to the delay each subsequent attempt. max_delay: Optional cap on the per-attempt delay. + jitter: Randomization fraction in ``[0, 1]`` applied to each wait + (``±jitter * wait``) to avoid thundering-herd retries. exceptions: Exception types that trigger a retry; others propagate immediately. """ if max_attempts < 1: @@ -48,7 +52,10 @@ def retry( def _wait(attempt: int) -> float: computed = delay * (backoff**attempt) - return min(computed, max_delay) if max_delay is not None else computed + if jitter: + computed += random.uniform(-jitter, jitter) * computed + capped = min(computed, max_delay) if max_delay is not None else computed + return max(0.0, capped) def decorator(func: Callable[..., Any]) -> Callable[..., Any]: if inspect.iscoroutinefunction(func): diff --git a/tests/resilience/test_resilience_tuning.py b/tests/resilience/test_resilience_tuning.py new file mode 100644 index 0000000..609a8d8 --- /dev/null +++ b/tests/resilience/test_resilience_tuning.py @@ -0,0 +1,69 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Resilience tuning (v26.06.43): @retry jitter, circuit-breaker failure-rate window +and half-open probe budget.""" + +from __future__ import annotations + +import pytest + +from pyfly.resilience import CircuitBreaker, CircuitState, retry + + +@pytest.mark.asyncio +async def test_retry_with_jitter_still_succeeds() -> None: + calls = {"n": 0} + + @retry(max_attempts=3, delay=0.001, jitter=0.5) + async def flaky() -> str: + calls["n"] += 1 + if calls["n"] < 3: + raise ValueError("transient") + return "ok" + + assert await flaky() == "ok" + assert calls["n"] == 3 + + +def test_failure_rate_window_opens_on_rate() -> None: + cb = CircuitBreaker(failure_rate_threshold=0.5, window_size=4) + # Partial window (3 calls) never trips, even with failures. + cb.on_success() + cb.on_failure() + cb.on_success() + assert cb.state is CircuitState.CLOSED + # 4th call completes the window [S, F, S, F] -> 50% failures -> open. + cb.on_failure() + assert cb.state is CircuitState.OPEN + + +def test_half_open_requires_configured_successes() -> None: + # Private _state is read directly: the `state` property runs _maybe_half_open, + # which would reset the half-open counters mid-sequence. + cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.0, half_open_max_calls=2) + cb.on_failure() # consecutive threshold 1 -> OPEN + cb.before_call() # recovery_timeout=0 -> HALF_OPEN, probe 1 + cb.on_success() # 1 success < 2 -> still probing + assert cb._state is CircuitState.HALF_OPEN + cb.before_call() # probe 2 + cb.on_success() # 2 successes >= 2 -> CLOSED + assert cb._state is CircuitState.CLOSED + + +def test_half_open_failure_reopens_immediately() -> None: + cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.0, half_open_max_calls=3) + cb.on_failure() # OPEN + cb.before_call() # HALF_OPEN, probe 1 + cb.on_failure() # any half-open failure -> OPEN + assert cb._state is CircuitState.OPEN diff --git a/uv.lock b/uv.lock index da64b5a..381f0c7 100644 --- a/uv.lock +++ b/uv.lock @@ -1981,7 +1981,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.42" +version = "26.6.43" source = { editable = "." } dependencies = [ { name = "pydantic" },