From f3b65a28e6083c0359012d22e8a0db89c8d0234a Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Mon, 16 Mar 2026 14:21:22 +0530 Subject: [PATCH 1/6] feat: add circuit breaker observability methods (#1176) New SDK methods: get_circuit_breaker_status, get_circuit_breaker_history, get_circuit_breaker_config, update_circuit_breaker_config. --- axonflow/__init__.py | 11 + axonflow/client.py | 204 +++++++++++++ axonflow/types.py | 96 ++++++ tests/test_circuit_breaker.py | 557 ++++++++++++++++++++++++++++++++++ 4 files changed, 868 insertions(+) create mode 100644 tests/test_circuit_breaker.py diff --git a/axonflow/__init__.py b/axonflow/__init__.py index 6991e48..d4b44e2 100644 --- a/axonflow/__init__.py +++ b/axonflow/__init__.py @@ -152,6 +152,11 @@ BudgetStatus, CacheConfig, CancelPlanResponse, + CircuitBreakerConfig, + CircuitBreakerConfigUpdate, + CircuitBreakerHistoryEntry, + CircuitBreakerHistoryResponse, + CircuitBreakerStatusResponse, ClientRequest, ClientResponse, CodeArtifact, @@ -312,6 +317,12 @@ # Audit Tool Call types (Issue #1260) "AuditToolCallRequest", "AuditToolCallResponse", + # Circuit Breaker Observability types (Issue #1176) + "CircuitBreakerStatusResponse", + "CircuitBreakerHistoryEntry", + "CircuitBreakerHistoryResponse", + "CircuitBreakerConfig", + "CircuitBreakerConfigUpdate", # Execution Replay types "ExecutionSummary", "ExecutionSnapshot", diff --git a/axonflow/client.py b/axonflow/client.py index f58ca4c..e3c3626 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -140,6 +140,11 @@ BudgetStatus, CacheConfig, CancelPlanResponse, + CircuitBreakerConfig, + CircuitBreakerConfigUpdate, + CircuitBreakerHistoryEntry, + CircuitBreakerHistoryResponse, + CircuitBreakerStatusResponse, ClientRequest, ClientResponse, ConnectorHealthStatus, @@ -1850,6 +1855,172 @@ async def audit_tool_call( timestamp=response["timestamp"], ) + # ========================================================================= + # Circuit Breaker Observability Methods + # ========================================================================= + + async def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse: + """Get all active circuit breaker circuits. + + Returns the current state of all circuit breakers, including which + circuits are open (tripped) and whether any emergency stop is active. + + Returns: + CircuitBreakerStatusResponse with active circuits and counts. + + Raises: + AxonFlowError: If the request fails. + + Example: + >>> status = await client.get_circuit_breaker_status() + >>> print(f"{status.count} active circuits") + >>> if status.emergency_stop_active: + ... print("Emergency stop is active!") + """ + if self._config.debug: + self._logger.debug("Getting circuit breaker status") + + response = await self._request("GET", "/api/v1/circuit-breaker/status") + data = response.get("data", response) + + return CircuitBreakerStatusResponse( + active_circuits=data.get("active_circuits", []), + count=data.get("count", 0), + emergency_stop_active=data.get("emergency_stop_active", False), + ) + + async def get_circuit_breaker_history( + self, + limit: int | None = None, + ) -> CircuitBreakerHistoryResponse: + """Get circuit breaker history for audit trail. + + Returns the history of circuit breaker state transitions, including + trips, resets, and auto-recovery events. + + Args: + limit: Maximum number of history entries to return. + + Returns: + CircuitBreakerHistoryResponse with history entries. + + Raises: + AxonFlowError: If the request fails. + + Example: + >>> history = await client.get_circuit_breaker_history(limit=50) + >>> for entry in history.history: + ... print(f"{entry.scope}/{entry.scope_id}: {entry.state}") + """ + if self._config.debug: + self._logger.debug( + "Getting circuit breaker history", + limit=limit, + ) + + path = "/api/v1/circuit-breaker/history" + if limit is not None: + path = f"{path}?limit={limit}" + + response = await self._request("GET", path) + data = response.get("data", response) + + history = [ + CircuitBreakerHistoryEntry(**entry) for entry in data.get("history", []) + ] + + return CircuitBreakerHistoryResponse( + history=history, + count=data.get("count", 0), + ) + + async def get_circuit_breaker_config( + self, + tenant_id: str | None = None, + ) -> CircuitBreakerConfig: + """Get circuit breaker configuration (global or tenant-specific). + + Args: + tenant_id: If provided, returns tenant-specific config with + any overrides applied. Otherwise returns global defaults. + + Returns: + CircuitBreakerConfig with thresholds and recovery settings. + + Raises: + AxonFlowError: If the request fails. + + Example: + >>> config = await client.get_circuit_breaker_config() + >>> print(f"Error threshold: {config.error_threshold}") + >>> tenant_config = await client.get_circuit_breaker_config( + ... tenant_id="tenant-123" + ... ) + """ + if self._config.debug: + self._logger.debug( + "Getting circuit breaker config", + tenant_id=tenant_id, + ) + + path = "/api/v1/circuit-breaker/config" + if tenant_id is not None: + path = f"{path}?tenant_id={tenant_id}" + + response = await self._request("GET", path) + data = response.get("data", response) + + return CircuitBreakerConfig(**data) + + async def update_circuit_breaker_config( + self, + config: CircuitBreakerConfigUpdate, + ) -> dict[str, Any]: + """Update per-tenant circuit breaker configuration. + + Sets tenant-specific overrides for circuit breaker thresholds and + recovery behavior. + + Args: + config: Configuration update with tenant_id and override values. + + Returns: + Server response confirming the update. + + Raises: + ValueError: If tenant_id is empty. + AxonFlowError: If the request fails. + + Example: + >>> from axonflow.types import CircuitBreakerConfigUpdate + >>> result = await client.update_circuit_breaker_config( + ... CircuitBreakerConfigUpdate( + ... tenant_id="tenant-123", + ... error_threshold=10, + ... violation_threshold=5, + ... ) + ... ) + """ + if not config.tenant_id or not config.tenant_id.strip(): + msg = "tenant_id is required and cannot be empty" + raise ValueError(msg) + + if self._config.debug: + self._logger.debug( + "Updating circuit breaker config", + tenant_id=config.tenant_id, + ) + + request_body = config.model_dump(by_alias=True, exclude_none=True) + + response = await self._request( + "PUT", + "/api/v1/circuit-breaker/config", + json_data=request_body, + ) + + return response.get("data", response) + # ========================================================================= # Audit Log Read Methods # ========================================================================= @@ -6271,6 +6442,39 @@ def audit_tool_call( """Record a non-LLM tool call in the audit trail.""" return self._run_sync(self._async_client.audit_tool_call(request)) + # Circuit Breaker Observability sync wrappers + + def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse: + """Get all active circuit breaker circuits.""" + return self._run_sync(self._async_client.get_circuit_breaker_status()) + + def get_circuit_breaker_history( + self, + limit: int | None = None, + ) -> CircuitBreakerHistoryResponse: + """Get circuit breaker history for audit trail.""" + return self._run_sync( + self._async_client.get_circuit_breaker_history(limit=limit) + ) + + def get_circuit_breaker_config( + self, + tenant_id: str | None = None, + ) -> CircuitBreakerConfig: + """Get circuit breaker config (global or tenant-specific).""" + return self._run_sync( + self._async_client.get_circuit_breaker_config(tenant_id=tenant_id) + ) + + def update_circuit_breaker_config( + self, + config: CircuitBreakerConfigUpdate, + ) -> dict[str, Any]: + """Update per-tenant circuit breaker config.""" + return self._run_sync( + self._async_client.update_circuit_breaker_config(config) + ) + # Policy CRUD sync wrappers def list_static_policies( diff --git a/axonflow/types.py b/axonflow/types.py index b7125f7..c641f3b 100644 --- a/axonflow/types.py +++ b/axonflow/types.py @@ -1168,3 +1168,99 @@ class AuditToolCallResponse(BaseModel): audit_id: str = Field(description="Unique ID for the audit entry") status: str = Field(description="Recording status (e.g., recorded)") timestamp: str = Field(description="Timestamp when the audit entry was recorded") + + +# ========================================================================= +# Circuit Breaker Observability Types +# ========================================================================= + + +class CircuitBreakerStatusResponse(BaseModel): + """Response from circuit breaker status endpoint.""" + + model_config = ConfigDict(populate_by_name=True) + + active_circuits: list[dict[str, Any]] = Field( + default_factory=list, description="List of active (open) circuits" + ) + count: int = Field(description="Number of active circuits") + emergency_stop_active: bool = Field(description="Whether any circuit is open") + + +class CircuitBreakerHistoryEntry(BaseModel): + """A single circuit breaker history entry.""" + + model_config = ConfigDict(populate_by_name=True) + + id: str = Field(description="Circuit ID") + org_id: str = Field(description="Organization ID") + scope: str = Field(description="Circuit scope (global, tenant, client, policy)") + scope_id: str = Field(default="", description="Scope identifier") + state: str = Field(description="Circuit state (closed, open, half_open)") + trip_reason: str | None = Field(default=None, description="Why the circuit was tripped") + tripped_by: str | None = Field(default=None, description="Who/what tripped the circuit") + tripped_at: str | None = Field(default=None, description="When the circuit was tripped") + expires_at: str | None = Field(default=None, description="When the circuit will auto-reset") + reset_by: str | None = Field(default=None, description="Who reset the circuit") + reset_at: str | None = Field(default=None, description="When the circuit was reset") + error_count: int = Field(default=0, description="Number of errors in current window") + violation_count: int = Field( + default=0, description="Number of violations in current window" + ) + + +class CircuitBreakerHistoryResponse(BaseModel): + """Response from circuit breaker history endpoint.""" + + model_config = ConfigDict(populate_by_name=True) + + history: list[CircuitBreakerHistoryEntry] = Field( + default_factory=list, description="Circuit history entries" + ) + count: int = Field(description="Number of history entries") + + +class CircuitBreakerConfig(BaseModel): + """Circuit breaker configuration (effective for a tenant or global).""" + + model_config = ConfigDict(populate_by_name=True) + + source: str = Field(description="Config source: 'global' or 'tenant'") + error_threshold: int = Field(description="Error threshold for auto-trip") + violation_threshold: int = Field(description="Policy violation threshold") + window_seconds: int = Field(description="Sliding window duration in seconds") + default_timeout_seconds: int = Field( + description="Default circuit open timeout in seconds" + ) + max_timeout_seconds: int = Field(description="Maximum allowed timeout in seconds") + enable_auto_recovery: bool = Field(description="Whether auto-recovery is enabled") + tenant_id: str | None = Field(default=None, description="Tenant ID if tenant-specific") + overrides: dict[str, Any] | None = Field( + default=None, description="Tenant-specific overrides" + ) + + +class CircuitBreakerConfigUpdate(BaseModel): + """Request to update per-tenant circuit breaker config.""" + + model_config = ConfigDict(populate_by_name=True) + + tenant_id: str = Field(description="Tenant ID to configure") + error_threshold: int | None = Field( + default=None, description="Override error threshold" + ) + violation_threshold: int | None = Field( + default=None, description="Override violation threshold" + ) + window_seconds: int | None = Field( + default=None, description="Override window duration" + ) + default_timeout_seconds: int | None = Field( + default=None, description="Override default timeout" + ) + max_timeout_seconds: int | None = Field( + default=None, description="Override max timeout" + ) + enable_auto_recovery: bool | None = Field( + default=None, description="Override auto-recovery" + ) diff --git a/tests/test_circuit_breaker.py b/tests/test_circuit_breaker.py new file mode 100644 index 0000000..65d0320 --- /dev/null +++ b/tests/test_circuit_breaker.py @@ -0,0 +1,557 @@ +"""Tests for circuit breaker observability methods.""" + +from __future__ import annotations + +import json + +import pytest +from pytest_httpx import HTTPXMock + +from axonflow import AxonFlow +from axonflow.exceptions import AxonFlowError +from axonflow.types import ( + CircuitBreakerConfig, + CircuitBreakerConfigUpdate, + CircuitBreakerHistoryEntry, + CircuitBreakerHistoryResponse, + CircuitBreakerStatusResponse, +) + + +class TestGetCircuitBreakerStatus: + """Tests for get_circuit_breaker_status method.""" + + @pytest.mark.asyncio + async def test_success_with_active_circuits( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test successful status response with active circuits.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "active_circuits": [ + { + "id": "cb_001", + "scope": "tenant", + "scope_id": "tenant-abc", + "state": "open", + "trip_reason": "error_threshold_exceeded", + }, + ], + "count": 1, + "emergency_stop_active": True, + }, + }, + ) + + result = await client.get_circuit_breaker_status() + + assert isinstance(result, CircuitBreakerStatusResponse) + assert result.count == 1 + assert result.emergency_stop_active is True + assert len(result.active_circuits) == 1 + assert result.active_circuits[0]["id"] == "cb_001" + + @pytest.mark.asyncio + async def test_success_no_active_circuits( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test status response when no circuits are active.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "active_circuits": [], + "count": 0, + "emergency_stop_active": False, + }, + }, + ) + + result = await client.get_circuit_breaker_status() + + assert result.count == 0 + assert result.emergency_stop_active is False + assert result.active_circuits == [] + + @pytest.mark.asyncio + async def test_success_without_data_wrapper( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test status response without data wrapper (flat response).""" + httpx_mock.add_response( + status_code=200, + json={ + "active_circuits": [], + "count": 0, + "emergency_stop_active": False, + }, + ) + + result = await client.get_circuit_breaker_status() + + assert result.count == 0 + assert result.emergency_stop_active is False + + @pytest.mark.asyncio + async def test_server_error( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that server errors are raised as AxonFlowError.""" + httpx_mock.add_response( + status_code=500, + json={"error": "internal server error"}, + ) + + with pytest.raises(AxonFlowError): + await client.get_circuit_breaker_status() + + +class TestGetCircuitBreakerHistory: + """Tests for get_circuit_breaker_history method.""" + + @pytest.mark.asyncio + async def test_success_with_entries( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test successful history response with entries.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "history": [ + { + "id": "cb_001", + "org_id": "org-123", + "scope": "tenant", + "scope_id": "tenant-abc", + "state": "open", + "trip_reason": "error_threshold_exceeded", + "tripped_by": "auto", + "tripped_at": "2026-03-16T10:00:00Z", + "expires_at": "2026-03-16T10:05:00Z", + "error_count": 15, + "violation_count": 3, + }, + { + "id": "cb_002", + "org_id": "org-123", + "scope": "global", + "state": "closed", + "reset_by": "admin@example.com", + "reset_at": "2026-03-16T09:30:00Z", + "error_count": 0, + "violation_count": 0, + }, + ], + "count": 2, + }, + }, + ) + + result = await client.get_circuit_breaker_history() + + assert isinstance(result, CircuitBreakerHistoryResponse) + assert result.count == 2 + assert len(result.history) == 2 + + entry = result.history[0] + assert isinstance(entry, CircuitBreakerHistoryEntry) + assert entry.id == "cb_001" + assert entry.scope == "tenant" + assert entry.scope_id == "tenant-abc" + assert entry.state == "open" + assert entry.trip_reason == "error_threshold_exceeded" + assert entry.error_count == 15 + assert entry.violation_count == 3 + + entry2 = result.history[1] + assert entry2.state == "closed" + assert entry2.reset_by == "admin@example.com" + assert entry2.scope_id == "" + + @pytest.mark.asyncio + async def test_with_limit_parameter( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that limit query parameter is sent correctly.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "history": [], + "count": 0, + }, + }, + ) + + await client.get_circuit_breaker_history(limit=10) + + sent_request = httpx_mock.get_request() + assert sent_request is not None + assert "limit=10" in str(sent_request.url) + + @pytest.mark.asyncio + async def test_without_limit_parameter( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that no limit query parameter is sent when not specified.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "history": [], + "count": 0, + }, + }, + ) + + await client.get_circuit_breaker_history() + + sent_request = httpx_mock.get_request() + assert sent_request is not None + assert "limit" not in str(sent_request.url) + + @pytest.mark.asyncio + async def test_server_error( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that server errors are raised as AxonFlowError.""" + httpx_mock.add_response( + status_code=500, + json={"error": "internal server error"}, + ) + + with pytest.raises(AxonFlowError): + await client.get_circuit_breaker_history() + + +class TestGetCircuitBreakerConfig: + """Tests for get_circuit_breaker_config method.""" + + @pytest.mark.asyncio + async def test_global_config( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test getting global circuit breaker config.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "source": "global", + "error_threshold": 10, + "violation_threshold": 5, + "window_seconds": 60, + "default_timeout_seconds": 300, + "max_timeout_seconds": 3600, + "enable_auto_recovery": True, + }, + }, + ) + + result = await client.get_circuit_breaker_config() + + assert isinstance(result, CircuitBreakerConfig) + assert result.source == "global" + assert result.error_threshold == 10 + assert result.violation_threshold == 5 + assert result.window_seconds == 60 + assert result.default_timeout_seconds == 300 + assert result.max_timeout_seconds == 3600 + assert result.enable_auto_recovery is True + assert result.tenant_id is None + assert result.overrides is None + + @pytest.mark.asyncio + async def test_tenant_config( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test getting tenant-specific circuit breaker config.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "source": "tenant", + "error_threshold": 20, + "violation_threshold": 10, + "window_seconds": 120, + "default_timeout_seconds": 600, + "max_timeout_seconds": 7200, + "enable_auto_recovery": False, + "tenant_id": "tenant-123", + "overrides": {"error_threshold": 20, "violation_threshold": 10}, + }, + }, + ) + + result = await client.get_circuit_breaker_config(tenant_id="tenant-123") + + assert result.source == "tenant" + assert result.error_threshold == 20 + assert result.tenant_id == "tenant-123" + assert result.overrides is not None + assert result.overrides["error_threshold"] == 20 + + sent_request = httpx_mock.get_request() + assert sent_request is not None + assert "tenant_id=tenant-123" in str(sent_request.url) + + @pytest.mark.asyncio + async def test_without_tenant_id( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that no tenant_id query parameter is sent when not specified.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "source": "global", + "error_threshold": 10, + "violation_threshold": 5, + "window_seconds": 60, + "default_timeout_seconds": 300, + "max_timeout_seconds": 3600, + "enable_auto_recovery": True, + }, + }, + ) + + await client.get_circuit_breaker_config() + + sent_request = httpx_mock.get_request() + assert sent_request is not None + assert "tenant_id" not in str(sent_request.url) + + @pytest.mark.asyncio + async def test_server_error( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that server errors are raised as AxonFlowError.""" + httpx_mock.add_response( + status_code=500, + json={"error": "internal server error"}, + ) + + with pytest.raises(AxonFlowError): + await client.get_circuit_breaker_config() + + +class TestUpdateCircuitBreakerConfig: + """Tests for update_circuit_breaker_config method.""" + + @pytest.mark.asyncio + async def test_success( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test successful config update.""" + httpx_mock.add_response( + status_code=200, + json={ + "data": { + "status": "updated", + "tenant_id": "tenant-123", + }, + }, + ) + + config = CircuitBreakerConfigUpdate( + tenant_id="tenant-123", + error_threshold=20, + violation_threshold=10, + ) + + result = await client.update_circuit_breaker_config(config) + + assert result["status"] == "updated" + assert result["tenant_id"] == "tenant-123" + + sent_request = httpx_mock.get_request() + assert sent_request is not None + body = json.loads(sent_request.content) + assert body["tenant_id"] == "tenant-123" + assert body["error_threshold"] == 20 + assert body["violation_threshold"] == 10 + # None fields should be excluded + assert "window_seconds" not in body + assert "default_timeout_seconds" not in body + + @pytest.mark.asyncio + async def test_empty_tenant_id_raises( + self, + client: AxonFlow, + ) -> None: + """Test that empty tenant_id raises ValueError.""" + config = CircuitBreakerConfigUpdate(tenant_id="") + + with pytest.raises(ValueError, match="tenant_id is required"): + await client.update_circuit_breaker_config(config) + + @pytest.mark.asyncio + async def test_whitespace_tenant_id_raises( + self, + client: AxonFlow, + ) -> None: + """Test that whitespace-only tenant_id raises ValueError.""" + config = CircuitBreakerConfigUpdate(tenant_id=" ") + + with pytest.raises(ValueError, match="tenant_id is required"): + await client.update_circuit_breaker_config(config) + + @pytest.mark.asyncio + async def test_server_error( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that server errors are raised as AxonFlowError.""" + httpx_mock.add_response( + status_code=500, + json={"error": "internal server error"}, + ) + + config = CircuitBreakerConfigUpdate( + tenant_id="tenant-123", + error_threshold=10, + ) + + with pytest.raises(AxonFlowError): + await client.update_circuit_breaker_config(config) + + @pytest.mark.asyncio + async def test_400_error( + self, + client: AxonFlow, + httpx_mock: HTTPXMock, + ) -> None: + """Test that 400 errors are raised as AxonFlowError.""" + httpx_mock.add_response( + status_code=400, + json={"error": "invalid configuration"}, + ) + + config = CircuitBreakerConfigUpdate( + tenant_id="tenant-123", + error_threshold=-1, + ) + + with pytest.raises(AxonFlowError): + await client.update_circuit_breaker_config(config) + + +class TestCircuitBreakerTypes: + """Tests for circuit breaker Pydantic model validation.""" + + def test_status_response_model_validate(self) -> None: + """Test CircuitBreakerStatusResponse model validation.""" + response = CircuitBreakerStatusResponse.model_validate( + { + "active_circuits": [{"id": "cb_001", "state": "open"}], + "count": 1, + "emergency_stop_active": True, + } + ) + + assert response.count == 1 + assert response.emergency_stop_active is True + assert len(response.active_circuits) == 1 + + def test_status_response_defaults(self) -> None: + """Test CircuitBreakerStatusResponse defaults.""" + response = CircuitBreakerStatusResponse( + count=0, + emergency_stop_active=False, + ) + + assert response.active_circuits == [] + + def test_history_entry_model_validate(self) -> None: + """Test CircuitBreakerHistoryEntry model validation.""" + entry = CircuitBreakerHistoryEntry.model_validate( + { + "id": "cb_001", + "org_id": "org-123", + "scope": "tenant", + "scope_id": "tenant-abc", + "state": "open", + "trip_reason": "error_threshold_exceeded", + "error_count": 15, + "violation_count": 3, + } + ) + + assert entry.id == "cb_001" + assert entry.scope == "tenant" + assert entry.trip_reason == "error_threshold_exceeded" + assert entry.tripped_by is None + assert entry.reset_by is None + + def test_history_entry_defaults(self) -> None: + """Test CircuitBreakerHistoryEntry defaults.""" + entry = CircuitBreakerHistoryEntry( + id="cb_001", + org_id="org-123", + scope="global", + state="closed", + ) + + assert entry.scope_id == "" + assert entry.trip_reason is None + assert entry.error_count == 0 + assert entry.violation_count == 0 + + def test_config_model_validate(self) -> None: + """Test CircuitBreakerConfig model validation.""" + config = CircuitBreakerConfig.model_validate( + { + "source": "global", + "error_threshold": 10, + "violation_threshold": 5, + "window_seconds": 60, + "default_timeout_seconds": 300, + "max_timeout_seconds": 3600, + "enable_auto_recovery": True, + } + ) + + assert config.source == "global" + assert config.error_threshold == 10 + assert config.tenant_id is None + + def test_config_update_serialization_excludes_none(self) -> None: + """Test that model_dump excludes None fields.""" + config = CircuitBreakerConfigUpdate( + tenant_id="tenant-123", + error_threshold=20, + ) + data = config.model_dump(by_alias=True, exclude_none=True) + + assert data == {"tenant_id": "tenant-123", "error_threshold": 20} + assert "violation_threshold" not in data + assert "window_seconds" not in data From 970c1758d41dea3fffc6246f7daf327e2e682280 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Mon, 16 Mar 2026 14:56:13 +0530 Subject: [PATCH 2/6] fix: add X-Org-ID header + handle null list responses Circuit breaker endpoints require X-Org-ID header. API returns null for empty arrays which Pydantic rejects; use `or []` fallback. --- axonflow/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/axonflow/client.py b/axonflow/client.py index e3c3626..d723bf4 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -390,6 +390,7 @@ def __init__( # client_secret is optional for community mode but required for enterprise if client_id: headers["X-Tenant-ID"] = client_id # client_id is used as tenant ID for policy APIs + headers["X-Org-ID"] = client_id # circuit breaker endpoints require org context # OAuth2-style: Authorization: Basic base64(clientId:clientSecret) if client_secret: credentials = f"{client_id}:{client_secret}" @@ -1884,7 +1885,7 @@ async def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse: data = response.get("data", response) return CircuitBreakerStatusResponse( - active_circuits=data.get("active_circuits", []), + active_circuits=data.get("active_circuits") or [], count=data.get("count", 0), emergency_stop_active=data.get("emergency_stop_active", False), ) @@ -1926,7 +1927,7 @@ async def get_circuit_breaker_history( data = response.get("data", response) history = [ - CircuitBreakerHistoryEntry(**entry) for entry in data.get("history", []) + CircuitBreakerHistoryEntry(**entry) for entry in (data.get("history") or []) ] return CircuitBreakerHistoryResponse( From d9156fccbf676fc62b51b0467947816c23774253 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Mon, 16 Mar 2026 14:59:05 +0530 Subject: [PATCH 3/6] revert: remove global X-Org-ID, fix on platform side instead Circuit breaker handler will fall back to X-Tenant-ID when X-Org-ID is missing. Keep null list fix. --- axonflow/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/axonflow/client.py b/axonflow/client.py index d723bf4..433f6d3 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -390,7 +390,6 @@ def __init__( # client_secret is optional for community mode but required for enterprise if client_id: headers["X-Tenant-ID"] = client_id # client_id is used as tenant ID for policy APIs - headers["X-Org-ID"] = client_id # circuit breaker endpoints require org context # OAuth2-style: Authorization: Basic base64(clientId:clientSecret) if client_secret: credentials = f"{client_id}:{client_secret}" From e2f74cb5661e0473bb45e5fde1f5142c0bc4f2e6 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Mon, 16 Mar 2026 16:01:31 +0530 Subject: [PATCH 4/6] chore: add v4.2.0 changelog entry --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ea6b1d..9210f8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to the AxonFlow Python SDK will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [4.2.0] - 2026-03-16 + +### Added + +- `get_circuit_breaker_status()` — query active circuit breaker circuits and emergency stop state +- `get_circuit_breaker_history(limit)` — retrieve circuit breaker trip/reset audit trail +- `get_circuit_breaker_config(tenant_id)` — get effective circuit breaker config (global or tenant-specific) +- `update_circuit_breaker_config(config)` — update per-tenant circuit breaker thresholds + +--- + ## [4.1.0] - 2026-03-14 ### Added From 53e1fa0057f303371bd9d5e39855ebe66a084f7b Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Tue, 17 Mar 2026 00:08:51 +0530 Subject: [PATCH 5/6] style: apply ruff formatting --- axonflow/client.py | 16 ++++------------ axonflow/types.py | 28 +++++++--------------------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/axonflow/client.py b/axonflow/client.py index 433f6d3..86cf267 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -1925,9 +1925,7 @@ async def get_circuit_breaker_history( response = await self._request("GET", path) data = response.get("data", response) - history = [ - CircuitBreakerHistoryEntry(**entry) for entry in (data.get("history") or []) - ] + history = [CircuitBreakerHistoryEntry(**entry) for entry in (data.get("history") or [])] return CircuitBreakerHistoryResponse( history=history, @@ -6453,27 +6451,21 @@ def get_circuit_breaker_history( limit: int | None = None, ) -> CircuitBreakerHistoryResponse: """Get circuit breaker history for audit trail.""" - return self._run_sync( - self._async_client.get_circuit_breaker_history(limit=limit) - ) + return self._run_sync(self._async_client.get_circuit_breaker_history(limit=limit)) def get_circuit_breaker_config( self, tenant_id: str | None = None, ) -> CircuitBreakerConfig: """Get circuit breaker config (global or tenant-specific).""" - return self._run_sync( - self._async_client.get_circuit_breaker_config(tenant_id=tenant_id) - ) + return self._run_sync(self._async_client.get_circuit_breaker_config(tenant_id=tenant_id)) def update_circuit_breaker_config( self, config: CircuitBreakerConfigUpdate, ) -> dict[str, Any]: """Update per-tenant circuit breaker config.""" - return self._run_sync( - self._async_client.update_circuit_breaker_config(config) - ) + return self._run_sync(self._async_client.update_circuit_breaker_config(config)) # Policy CRUD sync wrappers diff --git a/axonflow/types.py b/axonflow/types.py index c641f3b..8ef290a 100644 --- a/axonflow/types.py +++ b/axonflow/types.py @@ -1204,9 +1204,7 @@ class CircuitBreakerHistoryEntry(BaseModel): reset_by: str | None = Field(default=None, description="Who reset the circuit") reset_at: str | None = Field(default=None, description="When the circuit was reset") error_count: int = Field(default=0, description="Number of errors in current window") - violation_count: int = Field( - default=0, description="Number of violations in current window" - ) + violation_count: int = Field(default=0, description="Number of violations in current window") class CircuitBreakerHistoryResponse(BaseModel): @@ -1229,15 +1227,11 @@ class CircuitBreakerConfig(BaseModel): error_threshold: int = Field(description="Error threshold for auto-trip") violation_threshold: int = Field(description="Policy violation threshold") window_seconds: int = Field(description="Sliding window duration in seconds") - default_timeout_seconds: int = Field( - description="Default circuit open timeout in seconds" - ) + default_timeout_seconds: int = Field(description="Default circuit open timeout in seconds") max_timeout_seconds: int = Field(description="Maximum allowed timeout in seconds") enable_auto_recovery: bool = Field(description="Whether auto-recovery is enabled") tenant_id: str | None = Field(default=None, description="Tenant ID if tenant-specific") - overrides: dict[str, Any] | None = Field( - default=None, description="Tenant-specific overrides" - ) + overrides: dict[str, Any] | None = Field(default=None, description="Tenant-specific overrides") class CircuitBreakerConfigUpdate(BaseModel): @@ -1246,21 +1240,13 @@ class CircuitBreakerConfigUpdate(BaseModel): model_config = ConfigDict(populate_by_name=True) tenant_id: str = Field(description="Tenant ID to configure") - error_threshold: int | None = Field( - default=None, description="Override error threshold" - ) + error_threshold: int | None = Field(default=None, description="Override error threshold") violation_threshold: int | None = Field( default=None, description="Override violation threshold" ) - window_seconds: int | None = Field( - default=None, description="Override window duration" - ) + window_seconds: int | None = Field(default=None, description="Override window duration") default_timeout_seconds: int | None = Field( default=None, description="Override default timeout" ) - max_timeout_seconds: int | None = Field( - default=None, description="Override max timeout" - ) - enable_auto_recovery: bool | None = Field( - default=None, description="Override auto-recovery" - ) + max_timeout_seconds: int | None = Field(default=None, description="Override max timeout") + enable_auto_recovery: bool | None = Field(default=None, description="Override auto-recovery") From 68e5d5aa46f46994bccd2aad8f224955147998bc Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Tue, 17 Mar 2026 00:15:20 +0530 Subject: [PATCH 6/6] fix: satisfy mypy no-any-return for update_circuit_breaker_config --- axonflow/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/axonflow/client.py b/axonflow/client.py index 86cf267..2744c4e 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -2017,7 +2017,8 @@ async def update_circuit_breaker_config( json_data=request_body, ) - return response.get("data", response) + result: dict[str, Any] = response.get("data", response) + return result # ========================================================================= # Audit Log Read Methods