From 3b2f41fde706b7054f1cbb5611c8aaabf4ecbd0f Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Tue, 9 Jun 2026 07:18:42 +0200 Subject: [PATCH 1/3] feat(pep): decide -> fulfill -> forward Decision Mode PEP (#2571) Add the SDK analog of platform/shared/pep (ADR-056, epic #2563): a decide client that surfaces engine-fulfillable redact_pii obligations, plus a fulfill helper that discharges them by round-tripping content through the named engine endpoint (check-input) -- never by redacting locally. - decide() / fulfill_request() / decide_and_fulfill() on async + sync clients - DecideRequest/DecideResponse/Obligation/ObligationFulfillment types - redacted/redacted_statement/redaction_evaluated on MCPCheckInputResponse; redaction_evaluated on MCPCheckOutputResponse; content_type on check-input - ObligationNotFulfillableError fail-closed signal; PEP constants + helper - 27 unit tests + runtime-e2e (real enterprise agent: decide->fulfill->masked, demo creds refused); wire-shape baseline annotated (SHA unchanged) Minor bump 8.4.0 -> 8.5.0 (additive, SDK semver decoupled from platform). Signed-off-by: Saurabh Jain --- .lint_baselines/falsey_clobber.json | 36 +- CHANGELOG.md | 49 +++ axonflow/__init__.py | 39 ++ axonflow/_version.py | 2 +- axonflow/client.py | 206 ++++++++++ axonflow/exceptions.py | 16 + axonflow/pep.py | 96 +++++ axonflow/types.py | 123 ++++++ pyproject.toml | 2 +- runtime-e2e/decide_fulfill_obligation/test.py | 139 +++++++ tests/fixtures/wire_shape_baseline.json | 23 ++ tests/test_pep.py | 388 ++++++++++++++++++ 12 files changed, 1099 insertions(+), 20 deletions(-) create mode 100644 axonflow/pep.py create mode 100644 runtime-e2e/decide_fulfill_obligation/test.py create mode 100644 tests/test_pep.py diff --git a/.lint_baselines/falsey_clobber.json b/.lint_baselines/falsey_clobber.json index 9a0c3d4..6e92d93 100644 --- a/.lint_baselines/falsey_clobber.json +++ b/.lint_baselines/falsey_clobber.json @@ -22,24 +22,24 @@ "axonflow/adapters/tool_wrapper.py:190:20", "axonflow/adapters/tool_wrapper.py:208:20", "axonflow/adapters/tool_wrapper.py:220:20", - "axonflow/client.py:1104:16", - "axonflow/client.py:1181:16", - "axonflow/client.py:1653:37", - "axonflow/client.py:1694:18", - "axonflow/client.py:1752:37", - "axonflow/client.py:2270:24", - "axonflow/client.py:2291:33", - "axonflow/client.py:2292:31", - "axonflow/client.py:2304:25", - "axonflow/client.py:2365:28", - "axonflow/client.py:2406:69", - "axonflow/client.py:293:14", - "axonflow/client.py:298:24", - "axonflow/client.py:299:20", - "axonflow/client.py:522:44", - "axonflow/client.py:6284:25", - "axonflow/client.py:838:20", - "axonflow/client.py:924:20", + "axonflow/client.py:1111:16", + "axonflow/client.py:1188:16", + "axonflow/client.py:1669:37", + "axonflow/client.py:1710:18", + "axonflow/client.py:1768:37", + "axonflow/client.py:2286:24", + "axonflow/client.py:2307:33", + "axonflow/client.py:2308:31", + "axonflow/client.py:2320:25", + "axonflow/client.py:2381:28", + "axonflow/client.py:2422:69", + "axonflow/client.py:300:14", + "axonflow/client.py:305:24", + "axonflow/client.py:306:20", + "axonflow/client.py:529:44", + "axonflow/client.py:6456:25", + "axonflow/client.py:845:20", + "axonflow/client.py:931:20", "axonflow/execution.py:205:19", "axonflow/interceptors/anthropic.py:134:43", "axonflow/interceptors/anthropic.py:161:43", diff --git a/CHANGELOG.md b/CHANGELOG.md index 8450f42..60e65e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,55 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 and tag v{X.Y.Z}. The release workflow's preflight checks the section header matches the tag. --> +## [8.5.0] - 2026-06-09 — Decision Mode PEP: decide → fulfill → forward + +Adds the SDK analog of the platform PEP client (`platform/shared/pep`, +ADR-056, epic #2563). A Policy Enforcement Point now follows one path — +**decide → fulfill → forward** — and the SDK makes the engine-fulfillable +obligation contract impossible to misuse: there is **no local redaction +path**, so a `redact_pii` obligation can only be discharged by round-tripping +content through the engine endpoint the obligation names. + +### Added + +- **`AxonFlow.decide(DecideRequest)` / `SyncAxonFlow.decide`** — the PDP step. + `POST /api/v1/decide` returns a `DecideResponse` whose `obligations` is a + list of self-describing `Obligation`s. Decision Mode auth is HTTP Basic + (org:license), which the client already sends; wrong/demo credentials are + refused with `AuthenticationError`. +- **`AxonFlow.fulfill_request(decision, statement)`** — discharges every + request-phase `redact_pii` obligation by POSTing the statement to the + engine's `check-input` endpoint and returning the **engine-redacted** + statement. Fails closed with `ObligationNotFulfillableError` when an + obligation names no request-phase fulfillment, advertises a content-type the + PEP is not holding, names an endpoint the client will not call, the engine + call fails, or the engine reports `redaction_evaluated=false`. Never redacts + locally. +- **`AxonFlow.decide_and_fulfill(DecideRequest)`** — the blessed one-call path + (decide, then fulfill any request-phase obligation); fail-closed by + construction. +- **New types**: `DecideRequest`, `DecideResponse`, `Obligation`, + `ObligationFulfillment`, `DecisionCallerIdentity`, `DecisionTarget`. +- **New exception**: `ObligationNotFulfillableError` (a fail-closed signal). +- **PEP constants** + `has_request_redaction(obligations)` helper + (`OBLIGATION_REDACT_PII`, `PHASE_REQUEST`/`PHASE_RESPONSE`, + `CONTENT_TYPE_TEXT`, `VERDICT_ALLOW`/`VERDICT_DENY`/`VERDICT_NEEDS_APPROVAL`, + endpoint-path constants). +- **`redacted` / `redacted_statement` / `redaction_evaluated` on + `MCPCheckInputResponse`** and **`redaction_evaluated` on + `MCPCheckOutputResponse`** — the request-redaction contract fields the agent + emits (ADR-056). A PEP fulfilling an obligation fails closed when + `redaction_evaluated` is false. +- **`content_type` on `MCPCheckInputRequest` / `mcp_check_input(...)`** — + selects the request-redaction detector (defaults to `text/plain`). + +### Notes + +- SDK semver is decoupled from the platform: this is a **minor** bump from + 8.4.0 (purely additive, optional fields backward-compatible with older + platforms). The wire-shape baseline records the new fields as an + acknowledged SDK superset pending the OpenAPI spec catching up. + ## [8.4.0] - 2026-05-30 — Decision request context + Pasal 56(b) transfer basis Targets AxonFlow platform **v8.5.0**. diff --git a/axonflow/__init__.py b/axonflow/__init__.py index 9fe28ea..2c91d03 100644 --- a/axonflow/__init__.py +++ b/axonflow/__init__.py @@ -60,6 +60,7 @@ ConnectionError, ConnectorError, IdempotencyKeyMismatchError, + ObligationNotFulfillableError, PlanExecutionError, PolicyViolationError, RateLimitError, @@ -104,6 +105,19 @@ RegistrySummary, SystemStatus, ) +from axonflow.pep import ( + CONTENT_TYPE_TEXT, + DECIDE_PATH, + OBLIGATION_REDACT_PII, + PHASE_REQUEST, + PHASE_RESPONSE, + REQUEST_REDACTION_PATH, + RESPONSE_REDACTION_PATH, + VERDICT_ALLOW, + VERDICT_DENY, + VERDICT_NEEDS_APPROVAL, + has_request_redaction, +) from axonflow.policies import ( CreateDynamicPolicyRequest, CreatePolicyOverrideRequest, @@ -171,6 +185,10 @@ ConnectorPolicyInfo, ConnectorResponse, CreateBudgetRequest, + DecideRequest, + DecideResponse, + DecisionCallerIdentity, + DecisionTarget, DynamicPolicyInfo, DynamicPolicyMatch, ExecutionDetail, @@ -204,6 +222,8 @@ MediaGovernanceStatus, Mode, ModelPricing, + Obligation, + ObligationFulfillment, PaginationMeta, PlanExecutionResponse, PlanResponse, @@ -312,6 +332,25 @@ "MCPCheckInputResponse", "MCPCheckOutputRequest", "MCPCheckOutputResponse", + # Decision Mode PEP contract (ADR-056, #2563) + "DecideRequest", + "DecideResponse", + "DecisionCallerIdentity", + "DecisionTarget", + "Obligation", + "ObligationFulfillment", + "ObligationNotFulfillableError", + "OBLIGATION_REDACT_PII", + "PHASE_REQUEST", + "PHASE_RESPONSE", + "CONTENT_TYPE_TEXT", + "VERDICT_ALLOW", + "VERDICT_DENY", + "VERDICT_NEEDS_APPROVAL", + "DECIDE_PATH", + "REQUEST_REDACTION_PATH", + "RESPONSE_REDACTION_PATH", + "has_request_redaction", # Planning types "PlanStep", "PlanResponse", diff --git a/axonflow/_version.py b/axonflow/_version.py index e0dc035..72f0df3 100644 --- a/axonflow/_version.py +++ b/axonflow/_version.py @@ -1,3 +1,3 @@ """Single source of truth for the AxonFlow SDK version.""" -__version__ = "8.4.0" +__version__ = "8.5.0" diff --git a/axonflow/client.py b/axonflow/client.py index 2599fe6..9f3ac36 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -90,6 +90,7 @@ ConnectionError, ConnectorError, IdempotencyKeyMismatchError, + ObligationNotFulfillableError, PlanExecutionError, PolicyViolationError, RateLimitError, @@ -118,6 +119,10 @@ HITLReviewInput, HITLStats, ) +from axonflow.pep import CONTENT_TYPE_TEXT, OBLIGATION_REDACT_PII, PHASE_REQUEST, VERDICT_ALLOW +from axonflow.pep import DECIDE_PATH as PEP_DECIDE_PATH +from axonflow.pep import REQUEST_REDACTION_PATH as PEP_REQUEST_REDACTION_PATH +from axonflow.pep import _endpoint_path_matches as pep_endpoint_path_matches from axonflow.policies import ( CreateDynamicPolicyRequest, CreatePolicyOverrideRequest, @@ -165,6 +170,8 @@ ConnectorPolicyInfo, ConnectorResponse, CreateBudgetRequest, + DecideRequest, + DecideResponse, ExecutionDetail, ExecutionExportOptions, ExecutionMode, @@ -1436,6 +1443,7 @@ async def mcp_check_input( user_id: str | None = None, user_role: str | None = None, user_token: str | None = None, + content_type: str | None = None, ) -> MCPCheckInputResponse: """Validate an MCP request against configured policies without executing it. @@ -1452,9 +1460,15 @@ async def mcp_check_input( user_id: End-user identifier for per-user policies. user_role: User role for role-based policies. user_token: User auth token for downstream propagation. + content_type: Selects the request-redaction detector (ADR-056 / + #2563). ``None`` defaults to "text/plain" server-side. Used by + a PEP fulfilling a ``redact_pii`` obligation. Returns: MCPCheckInputResponse with allowed status, block reason, and policy info. + When the statement carries PII under a redact policy, the engine + returns ``redacted_statement`` so a PEP forwards engine-redacted + content without hand-rolling its own patterns. Raises: ConnectorError: If the request fails (non-403 errors only). @@ -1478,6 +1492,8 @@ async def mcp_check_input( body["user_role"] = user_role if user_token is not None: body["user_token"] = user_token + if content_type is not None: + body["content_type"] = content_type if self._config.debug: self._logger.debug( @@ -2873,6 +2889,162 @@ async def list_decisions( return [] return [DecisionSummary.model_validate(r) for r in rows] + # ------------------------------------------------------------------ # + # Decision Mode PEP: decide -> fulfill -> forward (ADR-056, #2563) # + # ------------------------------------------------------------------ # + + async def decide(self, request: DecideRequest) -> DecideResponse: + """Ask the PDP for a verdict on a request (``POST /api/v1/decide``). + + This is the PDP step of a PEP. ``/decide`` is a pure decision point: it + NEVER mutates content. When an allow verdict carries a ``redact_pii`` + obligation, discharge it with :meth:`fulfill_request` (or use the + one-call :meth:`decide_and_fulfill`) — never by redacting locally. + + Decision Mode auth is HTTP Basic (org:license), which this client + already sends; demo / wrong credentials are refused with 401 → + :class:`~axonflow.exceptions.AuthenticationError`. A deny verdict is + returned in the body with HTTP 200, not as an error. + + Args: + request: The :class:`DecideRequest` (``stage`` ∈ {"llm","tool", + "agent"} and ``query`` are required). + + Returns: + The :class:`DecideResponse` verdict, with ``obligations`` always a + (possibly empty) list. + + Raises: + AuthenticationError: 401 (bad / demo credentials). + AxonFlowError: Other non-200 responses. + """ + self._pre_request_hook() + url = f"{self._config.endpoint}{PEP_DECIDE_PATH}" + body = request.model_dump(exclude_none=True) + try: + response = await self._http_client.post(url, json=body) + except httpx.ConnectError as e: + msg = f"Failed to connect to AxonFlow Agent: {e}" + raise ConnectionError(msg) from e + except httpx.TimeoutException as e: + msg = f"Request timed out: {e}" + raise TimeoutError(msg) from e + + if response.status_code == 401: # noqa: PLR2004 + msg = "Invalid credentials" + raise AuthenticationError(msg) + if response.status_code >= 400: # noqa: PLR2004 + msg = f"HTTP {response.status_code}: {response.text}" + raise AxonFlowError(msg) + + data = response.json() + if not isinstance(data, dict): + data = {} + return DecideResponse.model_validate(data) + + async def fulfill_request( + self, + decision: DecideResponse, + statement: str, + ) -> tuple[str, bool]: + """Discharge every request-phase ``redact_pii`` obligation on ``decision``. + + For each request-phase ``redact_pii`` obligation, POSTs ``statement`` to + the engine endpoint the obligation names (``check-input``) and returns + the engine-redacted statement to forward. + + There is NO code path in which this method redacts locally — fulfillment + is always the engine round-trip (ADR-056 / #2563). + + Returns: + ``(content, did_redact)``. ``content`` is the engine-redacted + statement (or the original when no obligation mutates the request). + ``did_redact`` reflects whether the ENGINE actually changed the + content, not merely that an obligation was present. + + Raises: + ObligationNotFulfillableError: A ``redact_pii`` obligation could not + be discharged through the engine — it named no request-phase + fulfillment, advertised a content-type the PEP is not holding, + named an endpoint this client will not call, the engine call + failed, or the engine reported the redactor did not run + (``redaction_evaluated=false``). The caller MUST fail closed + (block) — never forward the original ``statement``. + """ + redacted = statement + did_redact = False + for ob in decision.obligations: + if ob.type != OBLIGATION_REDACT_PII: + # redact_pii is the only content-mutating obligation today; + # other types are pass-through by contract. + continue + if ob.fulfillment is None or ob.fulfillment.phase != PHASE_REQUEST: + msg = "redact_pii obligation missing request-phase fulfillment" + raise ObligationNotFulfillableError(msg) + content_types = ob.fulfillment.content_types + if content_types and CONTENT_TYPE_TEXT not in content_types: + msg = f"fulfillment endpoint does not advertise a {CONTENT_TYPE_TEXT} detector" + raise ObligationNotFulfillableError(msg) + if not pep_endpoint_path_matches(ob.fulfillment.endpoint, PEP_REQUEST_REDACTION_PATH): + msg = ( + f"fulfillment endpoint {ob.fulfillment.endpoint!r} is not " + "the request-redaction endpoint" + ) + raise ObligationNotFulfillableError(msg) + redacted = await self._fulfill_via_check_input(redacted) + if redacted != statement: + did_redact = True + return redacted, did_redact + + async def _fulfill_via_check_input(self, statement: str) -> str: + """POST ``statement`` to the request-redaction engine endpoint. + + Returns the engine-masked statement. Fails closed (raises + :class:`ObligationNotFulfillableError`) when the engine call errors, the + engine returns non-200, or ``redaction_evaluated`` is false — never + returns unredacted content under an unfulfillable condition. + """ + try: + result = await self.mcp_check_input( + connector_type="gateway", + statement=statement, + operation="execute", + content_type=CONTENT_TYPE_TEXT, + ) + except AxonFlowError as e: + msg = f"request-redaction engine call failed: {e}" + raise ObligationNotFulfillableError(msg) from e + # FAIL CLOSED if the redactor did not actually run (#2563 B1). Without + # this the PEP cannot distinguish "engine looked, found nothing" (safe + # to forward) from "engine wasn't looking" (would leak PII). + if not result.redaction_evaluated: + msg = "engine reported the redactor did not run (redaction disabled)" + raise ObligationNotFulfillableError(msg) + if result.redacted and result.redacted_statement: + return result.redacted_statement + # Redactor ran and found nothing to mask — forward unchanged. + return statement + + async def decide_and_fulfill( + self, + request: DecideRequest, + ) -> tuple[str, str, DecideResponse]: + """One-call PEP path: decide, then fulfill any request-phase obligation. + + Returns ``(verdict, content, decision)``. Branch on ``verdict``: forward + ``content`` on ``"allow"``; block on ``"deny"`` / ``"needs_approval"``. + + On the not-fulfillable path this raises + :class:`ObligationNotFulfillableError` AFTER having computed an empty + ``content`` internally, so a caller that catches the error cannot + accidentally forward the unredacted query — fail-closed by construction. + """ + decision = await self.decide(request) + if decision.verdict != VERDICT_ALLOW: + return decision.verdict, request.query, decision + redacted, _ = await self.fulfill_request(decision, request.query) + return decision.verdict, redacted, decision + async def get_audit_logs_by_tenant( self, tenant_id: str, @@ -7375,6 +7547,7 @@ def mcp_check_input( user_id: str | None = None, user_role: str | None = None, user_token: str | None = None, + content_type: str | None = None, ) -> MCPCheckInputResponse: """Validate an MCP request against configured policies without executing it.""" return self._run_sync( @@ -7388,9 +7561,42 @@ def mcp_check_input( user_id=user_id, user_role=user_role, user_token=user_token, + content_type=content_type, ) ) + def decide(self, request: DecideRequest) -> DecideResponse: + """Ask the PDP for a verdict on a request (``POST /api/v1/decide``). + + Synchronous wrapper for :meth:`AxonFlow.decide`. See that method for the + decide → fulfill → forward PEP contract (ADR-056, #2563). + """ + return self._run_sync(self._async_client.decide(request)) + + def fulfill_request( + self, + decision: DecideResponse, + statement: str, + ) -> tuple[str, bool]: + """Discharge every request-phase ``redact_pii`` obligation via the engine. + + Synchronous wrapper for :meth:`AxonFlow.fulfill_request`. Raises + :class:`~axonflow.exceptions.ObligationNotFulfillableError` (fail-closed) + when an obligation cannot be discharged through the engine — never + redacts locally. + """ + return self._run_sync(self._async_client.fulfill_request(decision, statement)) + + def decide_and_fulfill( + self, + request: DecideRequest, + ) -> tuple[str, str, DecideResponse]: + """One-call PEP path: decide, then fulfill any request-phase obligation. + + Synchronous wrapper for :meth:`AxonFlow.decide_and_fulfill`. + """ + return self._run_sync(self._async_client.decide_and_fulfill(request)) + def mcp_check_output( self, connector_type: str, diff --git a/axonflow/exceptions.py b/axonflow/exceptions.py index 60c70b1..aca4963 100644 --- a/axonflow/exceptions.py +++ b/axonflow/exceptions.py @@ -42,6 +42,22 @@ def __init__( self.block_reason = block_reason +class ObligationNotFulfillableError(AxonFlowError): + """A Decision Mode obligation could not be discharged through the engine. + + Raised by the PEP fulfillment path (``client.fulfill_request`` / + ``client.decide_and_fulfill``) when a ``redact_pii`` obligation named no + request-phase fulfillment endpoint, named an endpoint the client will not + call, advertised a content-type the PEP is not holding, the engine endpoint + failed, or the engine reported the redactor did not run + (``redaction_evaluated=false``). + + This is a FAIL-CLOSED signal (ADR-056 / #2563): the caller MUST block the + request, never forward the unredacted content. The PEP contains no local + redaction path, so it cannot silently substitute its own masking. + """ + + class UpgradeInfo: """Pricing-tier upgrade context emitted in a V1 429 envelope. diff --git a/axonflow/pep.py b/axonflow/pep.py new file mode 100644 index 0000000..ac7d254 --- /dev/null +++ b/axonflow/pep.py @@ -0,0 +1,96 @@ +"""Decision Mode PEP (Policy Enforcement Point) contract constants and helpers. + +A PEP follows one path: **decide → fulfill → forward** (ADR-056, epic #2563). + + - decide: ask the PDP (``POST /api/v1/decide``) for a verdict on a request. + - fulfill: for every obligation the verdict carries, call the ENGINE endpoint + named in the obligation's ``fulfillment`` block to obtain engine-redacted + content. + - forward: forward the (possibly redacted) content, or block, per verdict. + +The structural guarantee #2563 demands: a PEP built on this SDK contains NO +redaction logic of its own. The ONLY way it discharges a ``redact_pii`` +obligation is by POSTing the source content to the engine endpoint the +obligation names (``client.fulfill_request`` / ``client.decide_and_fulfill``) +and forwarding what the engine returns. If an obligation arrives without a +fulfillable engine endpoint — or the engine reports the redactor did not run — +the helper raises :class:`~axonflow.exceptions.ObligationNotFulfillableError` +and the caller MUST fail closed (block), never forward unredacted. + +This mirrors ``platform/shared/pep`` (the Go reference PEP) so the SDK PEP +cannot reimplement redaction the way a hand-rolled regex would. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from axonflow.types import Obligation + +# --- Obligation contract constants (mirror platform/agent/decision_handler.go) --- + +#: The obligation a PEP discharges by replacing request content with +#: engine-redacted content before forwarding. +OBLIGATION_REDACT_PII = "redact_pii" + +#: Fulfillment phases. ``/decide`` runs pre-call so it only emits request-phase +#: obligations; the response-phase value is part of the contract for PEP helpers +#: that fan out to the response-redaction endpoint after the backend call. +PHASE_REQUEST = "request" +PHASE_RESPONSE = "response" + +#: The only redaction content-type wired today. The contract is content-type +#: agnostic — a PEP holding content of a type not advertised by an obligation's +#: ``content_types`` must fail closed rather than forward it unredacted. +CONTENT_TYPE_TEXT = "text/plain" + +# --- Verdict values returned by the PDP --- +VERDICT_ALLOW = "allow" +VERDICT_DENY = "deny" +VERDICT_NEEDS_APPROVAL = "needs_approval" + +# --- Engine endpoints a PEP will POST content to for fulfillment --- +# An obligation whose fulfillment endpoint is not one of these is rejected — a +# PEP must not be steered into calling an arbitrary URL by a malformed verdict. +REQUEST_REDACTION_PATH = "/api/v1/mcp/check-input" +RESPONSE_REDACTION_PATH = "/api/v1/mcp/check-output" + +DECIDE_PATH = "/api/v1/decide" + + +def has_request_redaction(obligations: list[Obligation]) -> bool: + """Report whether any obligation requires request-phase PII redaction. + + Exposed so a PEP can branch ("does this verdict carry work for me?") before + calling ``client.fulfill_request``. + """ + return any( + o.type == OBLIGATION_REDACT_PII + and o.fulfillment is not None + and o.fulfillment.phase == PHASE_REQUEST + for o in obligations + ) + + +def _endpoint_path_matches(endpoint: str, expected: str) -> bool: + """Report whether ``endpoint`` is the expected engine path. + + Tolerates an absolute URL whose path component matches (some PDPs return a + fully-qualified obligation endpoint); a blank endpoint never matches. + """ + e = (endpoint or "").strip() + if e == expected: + return True + marker = "://" + idx = e.find(marker) + if idx >= 0: + rest = e[idx + len(marker) :] + slash = rest.find("/") + if slash >= 0: + path = rest[slash:] + q = path.find("?") + if q >= 0: + path = path[:q] + return path == expected + return False diff --git a/axonflow/types.py b/axonflow/types.py index 8ac7465..8e1d249 100644 --- a/axonflow/types.py +++ b/axonflow/types.py @@ -417,6 +417,12 @@ class MCPCheckInputRequest(BaseModel): user_token: str | None = Field( default=None, description="User token for downstream auth propagation." ) + # ContentType selects the request-redaction detector (ADR-056 / #2563 + # addendum). None defaults to "text/plain" server-side. A content_type with + # no registered detector is rejected (415) so a PEP fulfilling a redact_pii + # obligation fails closed rather than forwarding content the engine cannot + # govern. Source of truth: platform/agent/mcp_handler.go MCPCheckInputRequest. + content_type: str | None = Field(default=None) class MCPCheckInputResponse(BaseModel): @@ -436,6 +442,20 @@ class MCPCheckInputResponse(BaseModel): policy_matches: list[ExplainPolicy] | None = Field(default=None) override_available: bool | None = Field(default=None) override_existing_id: str | None = Field(default=None) + # Request-phase redaction (ADR-056 / #2563). When an allowed statement + # carries PII under a redact (not block) policy, the engine returns the + # masked statement here so a PEP can forward redacted content WITHOUT + # hand-rolling its own patterns — this is what makes a /decide redact_pii + # obligation engine-fulfillable. Source of truth: + # platform/agent/mcp_handler.go MCPCheckInputResponse. + redacted: bool = Field(default=False) + redacted_statement: str | None = Field(default=None) + # redaction_evaluated reports whether the redaction detector actually RAN + # (regardless of whether it masked anything). A PEP fulfilling a redact_pii + # obligation MUST fail closed when this is False — it means the redactor did + # not run (detection disabled), so "redacted:false" would otherwise be + # indistinguishable from "looked, found nothing" (#2563 B1). + redaction_evaluated: bool = Field(default=False) class MCPCheckOutputRequest(BaseModel): @@ -475,6 +495,109 @@ class MCPCheckOutputResponse(BaseModel): # MCPCheckInputResponse fields on the same call site). decision_id: str | None = Field(default=None) policy_matches: list[ExplainPolicy] | None = Field(default=None) + # redaction_evaluated mirrors the check-input field for the response phase + # (ADR-056 / #2563). A PEP fulfilling a response-phase redact_pii obligation + # MUST fail closed when this is False — the redactor did not run, so absence + # of redacted output cannot be trusted as "nothing to mask". The agent does + # not populate this on every path today; default False keeps a PEP fail-closed + # when the platform predates the field. Source of truth: platform/agent/mcp_handler.go. + redaction_evaluated: bool = Field(default=False) + + +class ObligationFulfillment(BaseModel): + """Names the engine call a PEP makes to discharge an obligation. + + Fulfillment is a property of the contract, not of PEP-author discipline: a + conforming PEP POSTs the obligation's source content to ``endpoint`` and + forwards the engine-redacted content the endpoint returns. + + ``content_types`` advertises the mime-types the endpoint's detectors can + handle today. The contract is content-type-agnostic: a PEP holding content + of a type NOT in this list must fail closed rather than forward it + unredacted. Mirrors platform ObligationFulfillment. + """ + + endpoint: str = Field(description='Engine path, e.g. "/api/v1/mcp/check-input".') + method: str = Field(default="POST", description='HTTP method, e.g. "POST".') + phase: str = Field(description='"request" | "response".') + content_types: list[str] | None = Field( + default=None, description="Mime-types the endpoint can redact today." + ) + + +class Obligation(BaseModel): + """A self-describing, engine-fulfillable PEP requirement on an allow verdict. + + Obligations are SELF-DESCRIBING and ENGINE-FULFILLABLE (ADR-056, #2563): + ``/decide`` is a pure PDP and never mutates content, so a ``redact_pii`` + obligation is not "go redact this yourself with your own patterns" — it is + "call the AxonFlow engine endpoint named in ``fulfillment`` to obtain + engine-redacted content." There is no other blessed way to satisfy it; + client-side redaction is forbidden. Mirrors platform DecisionObligation. + """ + + type: str = Field(description='Obligation type, e.g. "redact_pii".') + detail: str | None = Field(default=None, description="Human-readable detail for audit logs.") + fulfillment: ObligationFulfillment | None = Field( + default=None, description="How a PEP discharges this obligation via the engine." + ) + + +class DecisionCallerIdentity(BaseModel): + """Gateway-asserted identity for a /decide request. + + org_id / tenant_id are optional in the body — the auth-derived identity is + authoritative; body-supplied values are accepted only when they match. + Mirrors platform DecisionCallerIdentity. + """ + + gateway_id: str | None = Field(default=None) + org_id: str | None = Field(default=None) + tenant_id: str | None = Field(default=None) + + +class DecisionTarget(BaseModel): + """Describes what the gateway is about to call. Mirrors platform DecisionTarget.""" + + type: str | None = Field(default=None, description='"llm" | "tool" | "agent".') + model: str | None = Field(default=None, description="When type=llm.") + provider: str | None = Field(default=None, description="When type=llm.") + tool: str | None = Field(default=None, description="When type=tool.") + + +class DecideRequest(BaseModel): + """Inbound contract for POST /api/v1/decide. Mirrors platform DecideRequest. + + Required: ``stage`` (one of "llm" | "tool" | "agent") and ``query``. + ``user_token`` is optional — a PEP that supplies one gets the validated-user + record on the audit row; one that doesn't gets a synthesized service user. + """ + + stage: str + query: str + caller_identity: DecisionCallerIdentity = Field(default_factory=DecisionCallerIdentity) + target: DecisionTarget = Field(default_factory=DecisionTarget) + user_token: str | None = Field(default=None) + context: dict[str, Any] | None = Field(default=None) + + +class DecideResponse(BaseModel): + """PDP verdict returned by POST /api/v1/decide. Mirrors platform DecideResponse. + + ``obligations`` is always a list so PEP code can iterate without a None-check. + ``trace_id`` is W3C-format (32 lowercase hex chars). ``error`` is set on the + deny path when the request was malformed. + """ + + verdict: str + decision_id: str | None = Field(default=None) + trace_id: str | None = Field(default=None) + reasons: list[str] | None = Field(default=None) + obligations: list[Obligation] = Field(default_factory=list) + evaluated_policies: list[str] = Field(default_factory=list) + stage: str | None = Field(default=None) + expires_at: datetime | None = Field(default=None) + error: str | None = Field(default=None) class PlanStep(BaseModel): diff --git a/pyproject.toml b/pyproject.toml index 6e7a7ab..5dd3ebc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "axonflow" -version = "8.4.0" +version = "8.5.0" description = "AxonFlow Python SDK - Enterprise AI Governance in 3 Lines of Code" readme = "README.md" license = {text = "MIT"} diff --git a/runtime-e2e/decide_fulfill_obligation/test.py b/runtime-e2e/decide_fulfill_obligation/test.py new file mode 100644 index 0000000..24e9cea --- /dev/null +++ b/runtime-e2e/decide_fulfill_obligation/test.py @@ -0,0 +1,139 @@ +"""Real-stack assertion: Decision Mode PEP decide -> fulfill -> forward (#2571 / #2563). + +Per CLAUDE.md HARD RULE #0 this test MUST hit a real running AxonFlow agent — +no mocks. It proves the engine-fulfillable obligation contract end to end: + + 1. ``client.decide(...)`` on a PII-bearing request returns verdict=allow with a + self-describing ``redact_pii`` obligation whose fulfillment names the + check-input engine endpoint (request phase, text/plain). + 2. ``client.fulfill_request(...)`` discharges it by round-tripping the statement + through that engine endpoint and returns ENGINE-redacted content — the + original PII no longer appears, and the masking is the engine's (the SDK + contains no local redaction path). + 3. ``client.decide_and_fulfill(...)`` does both in one call. + 4. Demo / wrong credentials are refused (401 -> AuthenticationError); the PEP + cannot decide with credentials the enterprise PDP does not accept. + +Enterprise auth is HTTP Basic (org:license) — the SDK builds it from +``client_id`` + ``client_secret``. + +Run locally (after `source /tmp/axonflow-e2e-env.sh` from the enterprise +setup script): + + AXONFLOW_AGENT_URL=http://localhost:8080 \ + AXONFLOW_CLIENT_ID="$AXONFLOW_CLIENT_ID" \ + AXONFLOW_CLIENT_SECRET="$AXONFLOW_CLIENT_SECRET" \ + python3 runtime-e2e/decide_fulfill_obligation/test.py +""" + +from __future__ import annotations + +import asyncio +import os +import sys + +from axonflow import AxonFlow +from axonflow.exceptions import AuthenticationError, ObligationNotFulfillableError +from axonflow.pep import OBLIGATION_REDACT_PII, PHASE_REQUEST, VERDICT_ALLOW +from axonflow.types import DecideRequest + +ENDPOINT = os.environ.get("AXONFLOW_AGENT_URL", "http://localhost:8080") +CLIENT_ID = os.environ.get("AXONFLOW_CLIENT_ID") +CLIENT_SECRET = os.environ.get("AXONFLOW_CLIENT_SECRET") + +# The PII the request carries. The engine's redactor must mask the email + +# credit card; neither raw value may survive into the fulfilled content. +RAW_EMAIL = "john.doe@example.com" +RAW_CARD = "4111111111111111" +QUERY = f"Send the receipt to {RAW_EMAIL} and charge card {RAW_CARD}" + + +def _fail(msg: str) -> None: + sys.stderr.write(f"FAIL: {msg}\n") + sys.exit(1) + + +async def main() -> None: + # Track presence as booleans so the secret value never flows into a log + # expression (keeps CodeQL's clear-text-logging taint analysis happy). + present = { + "AXONFLOW_CLIENT_ID": bool(CLIENT_ID), + "AXONFLOW_CLIENT_SECRET": bool(CLIENT_SECRET), + } + missing = [name for name, ok in present.items() if not ok] + if missing: + sys.stderr.write(f"required env vars not set: {', '.join(missing)}; see module docstring\n") + sys.exit(2) + + async with AxonFlow( + endpoint=ENDPOINT, client_id=CLIENT_ID, client_secret=CLIENT_SECRET + ) as client: + # 1. decide() surfaces the engine-fulfillable redact_pii obligation. + decision = await client.decide( + DecideRequest( + stage="tool", + query=QUERY, + target={"type": "tool", "tool": "send_receipt"}, + caller_identity={"gateway_id": "sdk-runtime-e2e"}, + ) + ) + print(f"decide -> verdict={decision.verdict} obligations={len(decision.obligations)}") + if decision.verdict != VERDICT_ALLOW: + _fail(f"expected allow, got {decision.verdict} ({decision.error})") + if not decision.trace_id: + _fail("decide response did not surface a trace_id") + redact = [o for o in decision.obligations if o.type == OBLIGATION_REDACT_PII] + if not redact: + _fail(f"no redact_pii obligation on a PII request; got {decision.obligations}") + ful = redact[0].fulfillment + if ful is None or ful.phase != PHASE_REQUEST: + _fail(f"obligation not request-phase engine-fulfillable: {ful}") + if "check-input" not in ful.endpoint: + _fail(f"fulfillment endpoint is not the request-redaction endpoint: {ful.endpoint}") + print( + f" obligation fulfillment -> {ful.endpoint} phase={ful.phase} types={ful.content_types}" + ) + + # 2. fulfill_request() returns ENGINE-redacted content; raw PII is gone. + content, did_redact = await client.fulfill_request(decision, QUERY) + print(f"fulfill_request -> did_redact={did_redact} content={content!r}") + if not did_redact: + _fail("engine reported no redaction on a request that carries PII") + if RAW_EMAIL in content: + _fail(f"raw email survived fulfillment — PII leak: {content!r}") + if RAW_CARD in content: + _fail(f"raw card survived fulfillment — PII leak: {content!r}") + if content == QUERY: + _fail("fulfilled content is byte-identical to the unredacted query") + + # 3. decide_and_fulfill() one-call path yields the same masked content. + verdict, one_call, _decision = await client.decide_and_fulfill( + DecideRequest( + stage="tool", query=QUERY, target={"type": "tool", "tool": "send_receipt"} + ) + ) + print(f"decide_and_fulfill -> verdict={verdict} content={one_call!r}") + if verdict != VERDICT_ALLOW: + _fail(f"decide_and_fulfill verdict {verdict}, expected allow") + if RAW_EMAIL in one_call or RAW_CARD in one_call: + _fail(f"decide_and_fulfill leaked PII: {one_call!r}") + + # 4. Demo / wrong credentials are refused by the enterprise PDP. + async with AxonFlow( + endpoint=ENDPOINT, client_id="demo-org", client_secret="demo-license-not-real" + ) as bad_client: + try: + await bad_client.decide(DecideRequest(stage="tool", query="hi")) + except AuthenticationError: + print("demo creds -> AuthenticationError (refused) OK") + else: + _fail("demo credentials were NOT refused by the PDP") + + print("PASS: decide -> fulfill -> forward verified against real agent") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except ObligationNotFulfillableError as e: + _fail(f"obligation unexpectedly not fulfillable against real agent: {e}") diff --git a/tests/fixtures/wire_shape_baseline.json b/tests/fixtures/wire_shape_baseline.json index 04ed62a..7d592a2 100644 --- a/tests/fixtures/wire_shape_baseline.json +++ b/tests/fixtures/wire_shape_baseline.json @@ -296,6 +296,29 @@ ], "spec_only": [] }, + "MCPCheckInputRequest": { + "note": "acknowledged-sdk-superset: tracked in #2563/#2571 \u2014 SDK declares `content_type` (request-redaction detector selector, ADR-056); the agent consumes it on POST /api/v1/mcp/check-input but agent-api.yaml doesn't yet declare it.", + "sdk_only": [ + "content_type" + ], + "spec_only": [] + }, + "MCPCheckInputResponse": { + "note": "acknowledged-sdk-superset: tracked in #2563/#2571 \u2014 SDK declares `redacted`/`redacted_statement`/`redaction_evaluated`; the agent emits them on every evaluated check-input allow path (engine-fulfillable redact_pii obligations, ADR-056). Verified live against an enterprise agent.", + "sdk_only": [ + "redacted", + "redacted_statement", + "redaction_evaluated" + ], + "spec_only": [] + }, + "MCPCheckOutputResponse": { + "note": "acknowledged-sdk-superset: tracked in #2563/#2571 \u2014 SDK declares `redaction_evaluated` for response-phase obligation fail-closed parity with check-input (ADR-056); platform predates the field on check-output, so it defaults False.", + "sdk_only": [ + "redaction_evaluated" + ], + "spec_only": [] + }, "MarkStepCompletedRequest": { "note": "acknowledged-sdk-superset: tracked in #1745. SDK accepts `metadata` on step-completed; platform reads it but spec doesn't yet declare it.", "sdk_only": [ diff --git a/tests/test_pep.py b/tests/test_pep.py new file mode 100644 index 0000000..de32a30 --- /dev/null +++ b/tests/test_pep.py @@ -0,0 +1,388 @@ +"""Unit tests for the Decision Mode PEP contract (ADR-056, epic #2563). + +Covers decide → fulfill → forward: the decide() parsing, the fulfill_request() +fail-closed semantics, decide_and_fulfill(), and the pure helpers. The +load-bearing property under test is that the PEP NEVER redacts locally and +fails CLOSED on every unfulfillable condition — it can only discharge a +redact_pii obligation by round-tripping content through the engine. +""" + +from __future__ import annotations + +import pytest +from pytest_httpx import HTTPXMock + +from axonflow import AxonFlow +from axonflow.exceptions import AuthenticationError, ObligationNotFulfillableError +from axonflow.pep import ( + CONTENT_TYPE_TEXT, + OBLIGATION_REDACT_PII, + PHASE_REQUEST, + PHASE_RESPONSE, + VERDICT_ALLOW, + _endpoint_path_matches, + has_request_redaction, +) +from axonflow.types import ( + DecideRequest, + DecideResponse, + Obligation, + ObligationFulfillment, +) + +CHECK_INPUT_URL = "https://test.axonflow.com/api/v1/mcp/check-input" +DECIDE_URL = "https://test.axonflow.com/api/v1/decide" + +# The exact obligation the real agent emits on /decide for a request carrying +# PII under a redact policy (verified live against an enterprise agent). +REDACT_OBLIGATION = { + "type": "redact_pii", + "fulfillment": { + "endpoint": "/api/v1/mcp/check-input", + "method": "POST", + "phase": "request", + "content_types": ["text/plain"], + }, +} + + +def _decide_allow(obligations: list[dict]) -> dict: + return { + "verdict": "allow", + "decision_id": "dec-1", + "trace_id": "04110a0b50577bbbdda23a00dcbaf6da", + "obligations": obligations, + "evaluated_policies": ["sys_pii_email"], + "stage": "tool", + "expires_at": "2026-06-09T05:05:06.801139966Z", + } + + +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + + +class TestHelpers: + def test_has_request_redaction_true(self) -> None: + obs = [Obligation.model_validate(REDACT_OBLIGATION)] + assert has_request_redaction(obs) is True + + def test_has_request_redaction_false_when_response_phase(self) -> None: + obs = [ + Obligation( + type=OBLIGATION_REDACT_PII, + fulfillment=ObligationFulfillment( + endpoint="/api/v1/mcp/check-output", + phase=PHASE_RESPONSE, + ), + ) + ] + assert has_request_redaction(obs) is False + + def test_has_request_redaction_false_when_empty(self) -> None: + assert has_request_redaction([]) is False + + @pytest.mark.parametrize( + ("endpoint", "expected", "want"), + [ + ("/api/v1/mcp/check-input", "/api/v1/mcp/check-input", True), + ("https://pdp:8443/api/v1/mcp/check-input", "/api/v1/mcp/check-input", True), + ("https://pdp/api/v1/mcp/check-input?x=1", "/api/v1/mcp/check-input", True), + ("", "/api/v1/mcp/check-input", False), + ("/api/v1/other", "/api/v1/mcp/check-input", False), + ("https://evil.example.com/steal", "/api/v1/mcp/check-input", False), + ], + ) + def test_endpoint_path_matches(self, endpoint: str, expected: str, want: bool) -> None: + assert _endpoint_path_matches(endpoint, expected) is want + + +# --------------------------------------------------------------------------- +# decide method +# --------------------------------------------------------------------------- + + +class TestDecide: + @pytest.mark.asyncio + async def test_decide_parses_obligations(self, client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(url=DECIDE_URL, json=_decide_allow([REDACT_OBLIGATION])) + resp = await client.decide( + DecideRequest(stage="tool", query="Email a@b.com", target={"type": "tool"}) + ) + assert isinstance(resp, DecideResponse) + assert resp.verdict == VERDICT_ALLOW + assert resp.trace_id == "04110a0b50577bbbdda23a00dcbaf6da" + assert len(resp.obligations) == 1 + ob = resp.obligations[0] + assert ob.type == OBLIGATION_REDACT_PII + assert ob.fulfillment is not None + assert ob.fulfillment.endpoint == "/api/v1/mcp/check-input" + assert ob.fulfillment.phase == PHASE_REQUEST + assert ob.fulfillment.content_types == [CONTENT_TYPE_TEXT] + + @pytest.mark.asyncio + async def test_decide_empty_obligations_is_list( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + body = _decide_allow([]) + del body["obligations"] # platform always sends [], but be defensive + httpx_mock.add_response(url=DECIDE_URL, json=body) + resp = await client.decide(DecideRequest(stage="tool", query="hi")) + assert resp.obligations == [] + + @pytest.mark.asyncio + async def test_decide_401_raises_auth_error( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + httpx_mock.add_response(url=DECIDE_URL, status_code=401, json={"error": "unauthorized"}) + with pytest.raises(AuthenticationError): + await client.decide(DecideRequest(stage="tool", query="hi")) + + @pytest.mark.asyncio + async def test_decide_omits_none_fields_on_wire( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + httpx_mock.add_response(url=DECIDE_URL, json=_decide_allow([])) + await client.decide(DecideRequest(stage="tool", query="hi")) + sent = httpx_mock.get_requests()[0] + import json as _json + + body = _json.loads(sent.content) + assert body["stage"] == "tool" + assert "user_token" not in body # None excluded + assert "context" not in body + + +# --------------------------------------------------------------------------- +# fulfill_request() — the fail-closed core +# --------------------------------------------------------------------------- + + +class TestFulfillRequest: + @pytest.mark.asyncio + async def test_engine_redacts_and_forwards( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={ + "allowed": True, + "policies_evaluated": 1, + "redacted": True, + "redacted_statement": "Email jo****om", + "redaction_evaluated": True, + }, + ) + content, did_redact = await client.fulfill_request(decision, "Email john@x.com") + assert content == "Email jo****om" + assert did_redact is True + # The PEP submitted the source content to the engine with text/plain. + sent = httpx_mock.get_requests()[0] + import json as _json + + body = _json.loads(sent.content) + assert body["statement"] == "Email john@x.com" + assert body["content_type"] == CONTENT_TYPE_TEXT + + @pytest.mark.asyncio + async def test_no_obligations_passthrough( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate(_decide_allow([])) + content, did_redact = await client.fulfill_request(decision, "nothing to mask") + assert content == "nothing to mask" + assert did_redact is False + # No engine call was made. + assert httpx_mock.get_requests() == [] + + @pytest.mark.asyncio + async def test_engine_found_nothing_forwards_original( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={"allowed": True, "redacted": False, "redaction_evaluated": True}, + ) + content, did_redact = await client.fulfill_request(decision, "clean text") + assert content == "clean text" + assert did_redact is False + + @pytest.mark.asyncio + async def test_redaction_not_evaluated_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + """redaction_evaluated=false ⇒ redactor disabled ⇒ MUST fail closed (#2563 B1).""" + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={"allowed": True, "redacted": False, "redaction_evaluated": False}, + ) + with pytest.raises(ObligationNotFulfillableError, match="redactor did not run"): + await client.fulfill_request(decision, "Email john@x.com") + + @pytest.mark.asyncio + async def test_redaction_evaluated_absent_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + """Absent redaction_evaluated defaults False (older platform) ⇒ fail closed.""" + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={"allowed": True, "redacted": True, "redacted_statement": "x"}, + ) + with pytest.raises(ObligationNotFulfillableError): + await client.fulfill_request(decision, "Email john@x.com") + + @pytest.mark.asyncio + async def test_missing_fulfillment_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate( + _decide_allow([{"type": "redact_pii"}]) # no fulfillment block + ) + with pytest.raises(ObligationNotFulfillableError, match="missing request-phase"): + await client.fulfill_request(decision, "Email john@x.com") + assert httpx_mock.get_requests() == [] + + @pytest.mark.asyncio + async def test_response_phase_fulfillment_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate( + _decide_allow( + [ + { + "type": "redact_pii", + "fulfillment": { + "endpoint": "/api/v1/mcp/check-output", + "phase": "response", + "content_types": ["text/plain"], + }, + } + ] + ) + ) + with pytest.raises(ObligationNotFulfillableError): + await client.fulfill_request(decision, "Email john@x.com") + + @pytest.mark.asyncio + async def test_unadvertised_content_type_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate( + _decide_allow( + [ + { + "type": "redact_pii", + "fulfillment": { + "endpoint": "/api/v1/mcp/check-input", + "phase": "request", + "content_types": ["image/png"], # text/plain not advertised + }, + } + ] + ) + ) + with pytest.raises(ObligationNotFulfillableError, match="text/plain"): + await client.fulfill_request(decision, "Email john@x.com") + assert httpx_mock.get_requests() == [] + + @pytest.mark.asyncio + async def test_foreign_endpoint_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + """A malformed verdict must not steer the PEP into calling an arbitrary URL.""" + decision = DecideResponse.model_validate( + _decide_allow( + [ + { + "type": "redact_pii", + "fulfillment": { + "endpoint": "https://evil.example.com/exfil", + "phase": "request", + "content_types": ["text/plain"], + }, + } + ] + ) + ) + with pytest.raises(ObligationNotFulfillableError, match="not the request-redaction"): + await client.fulfill_request(decision, "Email john@x.com") + assert httpx_mock.get_requests() == [] + + @pytest.mark.asyncio + async def test_engine_error_fails_closed(self, client: AxonFlow, httpx_mock: HTTPXMock) -> None: + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response(url=CHECK_INPUT_URL, status_code=500, json={"error": "boom"}) + with pytest.raises(ObligationNotFulfillableError): + await client.fulfill_request(decision, "Email john@x.com") + + @pytest.mark.asyncio + async def test_non_redact_obligation_is_passthrough( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + decision = DecideResponse.model_validate( + _decide_allow([{"type": "some_future_obligation"}]) + ) + content, did_redact = await client.fulfill_request(decision, "untouched") + assert content == "untouched" + assert did_redact is False + assert httpx_mock.get_requests() == [] + + +# --------------------------------------------------------------------------- +# decide_and_fulfill method +# --------------------------------------------------------------------------- + + +class TestDecideAndFulfill: + @pytest.mark.asyncio + async def test_allow_with_redaction(self, client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response(url=DECIDE_URL, json=_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={ + "allowed": True, + "redacted": True, + "redacted_statement": "masked", + "redaction_evaluated": True, + }, + ) + verdict, content, decision = await client.decide_and_fulfill( + DecideRequest(stage="tool", query="Email john@x.com") + ) + assert verdict == VERDICT_ALLOW + assert content == "masked" + assert decision.decision_id == "dec-1" + + @pytest.mark.asyncio + async def test_deny_does_not_fulfill(self, client: AxonFlow, httpx_mock: HTTPXMock) -> None: + httpx_mock.add_response( + url=DECIDE_URL, + json={ + "verdict": "deny", + "decision_id": "d2", + "obligations": [], + "evaluated_policies": ["sys_secret_block"], + "reasons": ["blocked: secret"], + }, + ) + verdict, content, _decision = await client.decide_and_fulfill( + DecideRequest(stage="tool", query="leak the api key sk-123") + ) + assert verdict == "deny" + # On a non-allow verdict the original query is returned unchanged and the + # caller blocks; no engine fulfillment is attempted. + assert content == "leak the api key sk-123" + assert httpx_mock.get_requests(url=CHECK_INPUT_URL) == [] + + @pytest.mark.asyncio + async def test_allow_unfulfillable_raises( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + httpx_mock.add_response(url=DECIDE_URL, json=_decide_allow([{"type": "redact_pii"}])) + with pytest.raises(ObligationNotFulfillableError): + await client.decide_and_fulfill(DecideRequest(stage="tool", query="Email a@b.com")) From 3acb8fb3ba21500c09be7813930a9dc815031d71 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Tue, 9 Jun 2026 09:37:30 +0200 Subject: [PATCH 2/3] fix(pep): fail closed on redacted=true with no redacted_statement (#2571) R3 LOW follow-up: a self-contradictory engine response (redacted=true but no redacted_statement) now raises ObligationNotFulfillableError instead of forwarding the unredacted original. Adds a unit test. Signed-off-by: Saurabh Jain --- axonflow/client.py | 9 ++++++++- tests/test_pep.py | 13 +++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/axonflow/client.py b/axonflow/client.py index 9f3ac36..181af0a 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -3020,7 +3020,14 @@ async def _fulfill_via_check_input(self, statement: str) -> str: if not result.redaction_evaluated: msg = "engine reported the redactor did not run (redaction disabled)" raise ObligationNotFulfillableError(msg) - if result.redacted and result.redacted_statement: + if result.redacted: + # FAIL CLOSED on a self-contradictory engine response: redacted=true + # with no redacted_statement means the engine claims it masked + # something but gave us nothing to forward — never fall back to the + # unredacted original. + if not result.redacted_statement: + msg = "engine reported redacted=true but returned no redacted_statement" + raise ObligationNotFulfillableError(msg) return result.redacted_statement # Redactor ran and found nothing to mask — forward unchanged. return statement diff --git a/tests/test_pep.py b/tests/test_pep.py index de32a30..5e029d9 100644 --- a/tests/test_pep.py +++ b/tests/test_pep.py @@ -210,6 +210,19 @@ async def test_engine_found_nothing_forwards_original( assert content == "clean text" assert did_redact is False + @pytest.mark.asyncio + async def test_redacted_true_without_statement_fails_closed( + self, client: AxonFlow, httpx_mock: HTTPXMock + ) -> None: + """Self-contradictory engine response (redacted=true, no statement) ⇒ fail closed.""" + decision = DecideResponse.model_validate(_decide_allow([REDACT_OBLIGATION])) + httpx_mock.add_response( + url=CHECK_INPUT_URL, + json={"allowed": True, "redacted": True, "redaction_evaluated": True}, + ) + with pytest.raises(ObligationNotFulfillableError, match="no redacted_statement"): + await client.fulfill_request(decision, "Email john@x.com") + @pytest.mark.asyncio async def test_redaction_not_evaluated_fails_closed( self, client: AxonFlow, httpx_mock: HTTPXMock From 272ad5cec0201b29b705875ec14f01fef10bbe96 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Tue, 9 Jun 2026 09:53:49 +0200 Subject: [PATCH 3/3] chore(pep): refresh falsey-clobber baseline line numbers (#2571) The LOW-1 fix insertion shifted one client.py baselined finding's line number; re-keyed via --write-baseline (line-shift only, no new findings). Signed-off-by: Saurabh Jain --- .lint_baselines/falsey_clobber.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.lint_baselines/falsey_clobber.json b/.lint_baselines/falsey_clobber.json index 6e92d93..45ab20a 100644 --- a/.lint_baselines/falsey_clobber.json +++ b/.lint_baselines/falsey_clobber.json @@ -37,7 +37,7 @@ "axonflow/client.py:305:24", "axonflow/client.py:306:20", "axonflow/client.py:529:44", - "axonflow/client.py:6456:25", + "axonflow/client.py:6463:25", "axonflow/client.py:845:20", "axonflow/client.py:931:20", "axonflow/execution.py:205:19",