Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## v26.06.43 (2026-06-07)

### Added (resilience — tuning)

Completes the v26.06.36 resilience decorators toward Resilience4j parity:

- **`@retry(jitter=...)`** — a randomization fraction in ``[0, 1]`` applied to each backoff
wait (``±jitter * wait``), to avoid thundering-herd retries.
- **`CircuitBreaker` failure-rate window** — set `failure_rate_threshold` (+ `window_size`)
to open on the failure *rate* over the last N calls (Resilience4j COUNT_BASED) instead of
consecutive failures.
- **`CircuitBreaker(half_open_max_calls=...)`** — admit N trial calls in HALF_OPEN; that
many successes close the circuit, any failure re-opens it.

Existing consecutive-failure behavior is unchanged when these options are not set.

## v26.06.42 (2026-06-07)

### Added (cache — @cacheable condition / unless)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<a href="https://github.com/fireflyframework"><img src="https://img.shields.io/badge/Firefly_Framework-official-ff6600?logo=data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCI+PHBhdGggZmlsbD0id2hpdGUiIGQ9Ik0xMiAyQzYuNDggMiAyIDYuNDggMiAxMnM0LjQ4IDEwIDEwIDEwIDEwLTQuNDggMTAtMTBTMTcuNTIgMiAxMiAyeiIvPjwvc3ZnPg==" alt="Firefly Framework"></a>
<a href="https://www.python.org/"><img src="https://img.shields.io/badge/python-3.12%2B-blue?logo=python&logoColor=white" alt="Python 3.12+"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-green" alt="License: Apache 2.0"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.42-brightgreen" alt="Version: 26.06.42"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.43-brightgreen" alt="Version: 26.06.43"></a>
<a href="#"><img src="https://img.shields.io/badge/type--checked-mypy%20strict-blue?logo=python&logoColor=white" alt="Type Checked: mypy strict"></a>
<a href="#"><img src="https://img.shields.io/badge/code%20style-ruff-purple?logo=ruff&logoColor=white" alt="Code Style: Ruff"></a>
<a href="#"><img src="https://img.shields.io/badge/async-first-brightgreen" alt="Async First"></a>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
version = "26.6.42"
version = "26.6.43"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/pyfly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""

__version__ = "26.06.42"
__version__ = "26.06.43"
55 changes: 48 additions & 7 deletions src/pyfly/resilience/circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import inspect
import threading
import time
from collections import deque
from collections.abc import Callable
from enum import Enum
from typing import Any
Expand All @@ -37,23 +38,35 @@ class CircuitState(Enum):
class CircuitBreaker:
"""A thread-safe circuit breaker.

Trips OPEN after *failure_threshold* consecutive failures; after
*recovery_timeout* seconds it moves to HALF_OPEN and allows a trial call —
success closes the circuit, failure re-opens it.
Opens either after *failure_threshold* consecutive failures (the default) or, when
*failure_rate_threshold* is set, once the failure rate over the last *window_size*
calls reaches that fraction (Resilience4j COUNT_BASED window). After *recovery_timeout*
seconds it moves to HALF_OPEN and admits up to *half_open_max_calls* trial calls —
that many successes close the circuit; any failure re-opens it.
"""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
expected: tuple[type[BaseException], ...] = (Exception,),
*,
failure_rate_threshold: float | None = None,
window_size: int = 10,
half_open_max_calls: int = 1,
) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected = expected
self.failure_rate_threshold = failure_rate_threshold
self.window_size = window_size
self.half_open_max_calls = max(1, half_open_max_calls)
self._failures = 0
self._window: deque[bool] = deque(maxlen=window_size) # True=success, False=failure
self._state = CircuitState.CLOSED
self._opened_at = 0.0
self._half_open_calls = 0
self._half_open_successes = 0
self._lock = threading.Lock()

@property
Expand All @@ -65,26 +78,54 @@ def state(self) -> CircuitState:
def _maybe_half_open(self) -> None:
if self._state is CircuitState.OPEN and (time.monotonic() - self._opened_at) >= self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
self._half_open_successes = 0

def _tripped(self) -> bool:
"""Whether the failure threshold (consecutive count or windowed rate) is reached."""
if self.failure_rate_threshold is not None:
if len(self._window) < self.window_size:
return False # require a full window before judging the rate
return (self._window.count(False) / len(self._window)) >= self.failure_rate_threshold
return self._failures >= self.failure_threshold

def before_call(self) -> None:
"""Raise :class:`CircuitBreakerException` when the circuit is open."""
"""Raise :class:`CircuitBreakerException` when the circuit is open or the half-open
probe budget is exhausted."""
with self._lock:
self._maybe_half_open()
if self._state is CircuitState.OPEN:
raise CircuitBreakerException("Circuit breaker is open")
if self._state is CircuitState.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerException("Circuit breaker is half-open (probe limit reached)")
self._half_open_calls += 1

def on_success(self) -> None:
with self._lock:
self._failures = 0
self._state = CircuitState.CLOSED
self._window.append(True)
if self._state is CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.half_open_max_calls:
self._close()
else:
self._failures = 0

def on_failure(self) -> None:
with self._lock:
self._window.append(False)
self._failures += 1
if self._state is CircuitState.HALF_OPEN or self._failures >= self.failure_threshold:
if self._state is CircuitState.HALF_OPEN or self._tripped():
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()

def _close(self) -> None:
self._state = CircuitState.CLOSED
self._failures = 0
self._half_open_calls = 0
self._half_open_successes = 0
self._window.clear()


def circuit_breaker(breaker: CircuitBreaker) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Guard a callable with *breaker*: rejects calls while OPEN, records
Expand Down
9 changes: 8 additions & 1 deletion src/pyfly/resilience/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncio
import functools
import inspect
import random
import time
from collections.abc import Callable
from typing import Any
Expand All @@ -29,6 +30,7 @@ def retry(
delay: float = 0.0,
backoff: float = 1.0,
max_delay: float | None = None,
jitter: float = 0.0,
exceptions: tuple[type[BaseException], ...] = (Exception,),
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Re-invoke the wrapped callable up to *max_attempts* times while it raises one of
Expand All @@ -41,14 +43,19 @@ def retry(
delay: Base delay (seconds) before the first retry.
backoff: Multiplier applied to the delay each subsequent attempt.
max_delay: Optional cap on the per-attempt delay.
jitter: Randomization fraction in ``[0, 1]`` applied to each wait
(``±jitter * wait``) to avoid thundering-herd retries.
exceptions: Exception types that trigger a retry; others propagate immediately.
"""
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")

def _wait(attempt: int) -> float:
computed = delay * (backoff**attempt)
return min(computed, max_delay) if max_delay is not None else computed
if jitter:
computed += random.uniform(-jitter, jitter) * computed
capped = min(computed, max_delay) if max_delay is not None else computed
return max(0.0, capped)

def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
if inspect.iscoroutinefunction(func):
Expand Down
69 changes: 69 additions & 0 deletions tests/resilience/test_resilience_tuning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Resilience tuning (v26.06.43): @retry jitter, circuit-breaker failure-rate window
and half-open probe budget."""

from __future__ import annotations

import pytest

from pyfly.resilience import CircuitBreaker, CircuitState, retry


@pytest.mark.asyncio
async def test_retry_with_jitter_still_succeeds() -> None:
calls = {"n": 0}

@retry(max_attempts=3, delay=0.001, jitter=0.5)
async def flaky() -> str:
calls["n"] += 1
if calls["n"] < 3:
raise ValueError("transient")
return "ok"

assert await flaky() == "ok"
assert calls["n"] == 3


def test_failure_rate_window_opens_on_rate() -> None:
cb = CircuitBreaker(failure_rate_threshold=0.5, window_size=4)
# Partial window (3 calls) never trips, even with failures.
cb.on_success()
cb.on_failure()
cb.on_success()
assert cb.state is CircuitState.CLOSED
# 4th call completes the window [S, F, S, F] -> 50% failures -> open.
cb.on_failure()
assert cb.state is CircuitState.OPEN


def test_half_open_requires_configured_successes() -> None:
# Private _state is read directly: the `state` property runs _maybe_half_open,
# which would reset the half-open counters mid-sequence.
cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.0, half_open_max_calls=2)
cb.on_failure() # consecutive threshold 1 -> OPEN
cb.before_call() # recovery_timeout=0 -> HALF_OPEN, probe 1
cb.on_success() # 1 success < 2 -> still probing
assert cb._state is CircuitState.HALF_OPEN
cb.before_call() # probe 2
cb.on_success() # 2 successes >= 2 -> CLOSED
assert cb._state is CircuitState.CLOSED


def test_half_open_failure_reopens_immediately() -> None:
cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.0, half_open_max_calls=3)
cb.on_failure() # OPEN
cb.before_call() # HALF_OPEN, probe 1
cb.on_failure() # any half-open failure -> OPEN
assert cb._state is CircuitState.OPEN
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading