From 81ad2e70d407f32edc7a257604d44818bd8ac54a Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 17 Jun 2026 10:26:51 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=94=92=20(runtime):=20Fail=20closed?= =?UTF-8?q?=20in=20interceptor=20when=20query=20is=20unauthoritative?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under enforce, a raising query_policy, an error-sentinel decision, or an unreachable runtime (with the native extension present) now denies the tool instead of allowing it. observe / disabled keep failing open. Refs AAASM-3106 --- agent_assembly/core/runtime_interceptor.py | 112 ++++++++++++++++----- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/agent_assembly/core/runtime_interceptor.py b/agent_assembly/core/runtime_interceptor.py index 1bf323b3..19a98344 100644 --- a/agent_assembly/core/runtime_interceptor.py +++ b/agent_assembly/core/runtime_interceptor.py @@ -12,15 +12,22 @@ ``query_policy``. The runtime is the authority: it redacts in place, so this layer only ever *blocks* on an explicit ``deny`` and otherwise proceeds. -Fail-open is preserved at two levels: - -* When the native extension is missing or no runtime socket is reachable, - :func:`build_governance_interceptor` returns the bare ``GatewayClient`` - unchanged — no ``check_tool_start`` is present and the adapters proceed - exactly as before. -* When a runtime *is* connected, the native ``query_policy`` already returns - ``decision="allow"`` on QueryFailed / ChannelClosed / Shutdown, so a - transient runtime outage proceeds rather than blocks. +Failure posture is governed by ``enforcement_mode`` (AAASM-3106): + +* Under ``enforce``, the SDK is a security control and **fails closed**. When the + native extension is missing the bare client is returned (no native authority + exists to consult — see :func:`build_governance_interceptor`), but once the + native extension is present every other failure denies: an unreachable runtime + socket yields a deny-all interceptor, a raising ``query_policy`` maps to + ``deny``, and a native ``decision`` that is itself an error sentinel + (``query_failed`` / ``channel_closed``) maps to ``deny`` rather than allow. +* Under ``observe`` / ``disabled`` (or when no mode is supplied), the SDK is a + dry-run / hermetic-test layer and **fails open**: an unreachable runtime + returns the bare client unchanged, a raising or error ``query_policy`` + proceeds, exactly as before. + +The runtime remains the authority on *redaction* (it redacts in place); this +layer only ever decides allow / deny / pending. """ from __future__ import annotations @@ -31,6 +38,12 @@ ENV_RUNTIME_SOCKET = "AA_RUNTIME_SOCKET" ACTION_TYPE_TOOL_CALL = "tool_call" +ENFORCE_MODE = "enforce" + +# Native ``query_policy`` decisions that signal the runtime could not produce an +# authoritative verdict (mirrors aa-ffi-python mapping QueryFailed / ChannelClosed +# / Shutdown). Under ``enforce`` these must deny, not allow (AAASM-3106). +_ERROR_DECISIONS = frozenset({"query_failed", "channel_closed", "shutdown", "error"}) def _resolve_runtime_socket_path(agent_id: str) -> str: @@ -99,14 +112,20 @@ class RuntimeQueryInterceptor: Delegates every attribute the adapters look up (event reporting, tool-end hooks, approval timeout providers, ...) to the wrapped ``GatewayClient`` and - only *adds* ``check_tool_start``. The added check is fail-open: any path that - cannot produce an explicit ``deny`` proceeds. + only *adds* ``check_tool_start``. + + The failure posture of the added check is governed by ``enforce``: when + ``True`` (``enforcement_mode == "enforce"``) any path that cannot obtain an + authoritative allow — a raising ``query_policy`` or an error-sentinel + ``decision`` — maps to ``deny`` (fail closed). When ``False`` those paths + proceed (fail open), preserving the observe / disabled behavior. """ - def __init__(self, client: Any, runtime_client: Any, agent_id: str) -> None: + def __init__(self, client: Any, runtime_client: Any, agent_id: str, *, enforce: bool = False) -> None: self._client = client self._runtime_client = runtime_client self._agent_id = agent_id + self._enforce = enforce def __getattr__(self, name: str) -> Any: # Delegate anything not defined here (e.g. report_event, on_tool_end, @@ -125,12 +144,14 @@ def check_tool_start( Maps the runtime decision onto the adapter contract: - * ``"deny"`` → ``{"status": "deny", "reason": ...}`` (the only block). + * ``"deny"`` → ``{"status": "deny", "reason": ...}``. * ``"pending"`` → ``{"status": "pending", "reason": ...}`` so the adapter's existing approval path runs. - * anything else (``"allow"`` / ``"redact"`` / ``"unspecified"`` / an - unreachable runtime) → ``{"status": "allow"}``. The runtime redacts - authoritatively; this layer never redacts. + * ``"allow"`` / ``"redact"`` / ``"unspecified"`` → ``{"status": "allow"}``. + The runtime redacts authoritatively; this layer never redacts. + * A raising ``query_policy`` or an error-sentinel ``decision`` + (``query_failed`` / ``channel_closed`` / ``shutdown``) → ``deny`` under + ``enforce`` (fail closed, AAASM-3106), else ``allow`` (fail open). """ tool_name = _extract_tool_name(serialized, kwargs) tool_args_json = _extract_tool_args_json(input_str, kwargs) @@ -143,8 +164,10 @@ def check_tool_start( tool_args_json, ) except Exception: - # Native query raised unexpectedly — fail OPEN, never block. - return {"status": "allow"} + # Native query raised — the runtime gave no verdict. Under enforce the + # SDK is a security control and must block (fail closed); otherwise + # proceed (fail open). + return self._on_query_failure("runtime query failed") decision = str(result.get("decision", "allow")).strip().lower() reason = str(result.get("reason", "") or "") @@ -153,28 +176,69 @@ def check_tool_start( return {"status": "deny", "reason": reason} if decision == "pending": return {"status": "pending", "reason": reason} + if decision in _ERROR_DECISIONS: + # Native reported it could not reach an authoritative verdict. + return self._on_query_failure(reason or f"runtime returned {decision}") + return {"status": "allow"} + + def _on_query_failure(self, reason: str) -> dict[str, str]: + """Map an unauthoritative query to deny (enforce) or allow (observe).""" + if self._enforce: + return {"status": "deny", "reason": reason} return {"status": "allow"} -def build_governance_interceptor(client: Any, agent_id: str) -> Any: +class _FailClosedInterceptor: + """Deny-all interceptor used under ``enforce`` when no runtime is reachable. + + The native extension is present (so the SDK is configured as a security + control) but the runtime socket could not be connected, meaning no + authoritative verdict can be obtained. Under ``enforce`` this must block + every tool rather than silently allow it (AAASM-3106). Non-check attributes + delegate to the wrapped ``GatewayClient`` so event reporting still works. + """ + + def __init__(self, client: Any, reason: str) -> None: + self._client = client + self._reason = reason + + def __getattr__(self, name: str) -> Any: + return getattr(self._client, name) + + def check_tool_start(self, **_kwargs: Any) -> dict[str, str]: + return {"status": "deny", "reason": self._reason} + + +def build_governance_interceptor(client: Any, agent_id: str, enforcement_mode: str | None = None) -> Any: """Return the interceptor adapters should use for pre-execution checks. When the native extension is importable and a runtime socket is reachable, wrap ``client`` in a :class:`RuntimeQueryInterceptor` so a runtime ``deny`` - actually blocks the tool. Otherwise return ``client`` unchanged so the - existing fail-open / no-core path is preserved exactly. + actually blocks the tool. The failure posture depends on ``enforcement_mode`` + (AAASM-3106): + + * The native extension is **missing**: return ``client`` unchanged in every + mode. There is no native authority to consult, so there is nothing to fail + closed *to* — the SDK fast path is simply not engaged. + * The native extension is **present** but the runtime socket is unreachable: + under ``enforce`` return a deny-all :class:`_FailClosedInterceptor`; + otherwise return ``client`` unchanged (fail open). """ + enforce = enforcement_mode == ENFORCE_MODE try: from agent_assembly._core import RuntimeClient except ImportError: + # No native fast path at all — the SDK control is not engaged in any mode. return client socket_path = _resolve_runtime_socket_path(agent_id) try: runtime_client = RuntimeClient.connect(socket_path) except Exception: - # No reachable runtime / connect failed — fail OPEN: bare client has no - # check_tool_start, so adapters proceed exactly as before. + # Native present but runtime unreachable: deny everything under enforce + # (fail closed); proceed under observe / disabled (fail open). + if enforce: + return _FailClosedInterceptor(client, "runtime unreachable") return client - return RuntimeQueryInterceptor(client, runtime_client, agent_id) + return RuntimeQueryInterceptor(client, runtime_client, agent_id, enforce=enforce) From 034a9b6a195f77eb8dfa15fdc3968d4388616570 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 17 Jun 2026 10:26:58 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=94=92=20(assembly):=20Thread=20enfor?= =?UTF-8?q?cement=5Fmode=20into=20governance=20interceptor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit So build_governance_interceptor can pick the fail-closed vs fail-open posture per the agent's enforcement_mode. Refs AAASM-3106 --- agent_assembly/core/assembly.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/agent_assembly/core/assembly.py b/agent_assembly/core/assembly.py index 7eb96dc4..5f802254 100644 --- a/agent_assembly/core/assembly.py +++ b/agent_assembly/core/assembly.py @@ -213,6 +213,7 @@ def init_assembly( registered_adapters = _register_adapters( client=client, process_agent_id=resolved_agent_id, + enforcement_mode=enforcement_mode, ) network_mode, network_shutdown = _start_network_layer(client=client, mode=mode) except Exception as error: @@ -263,6 +264,7 @@ def _validate_inputs( def _register_adapters( client: GatewayClient, process_agent_id: str, + enforcement_mode: EnforcementMode | None = None, ) -> list[FrameworkAdapter]: """Detect available frameworks via AdapterRegistry and register hooks. @@ -272,14 +274,15 @@ def _register_adapters( When the native runtime is reachable, the bare ``GatewayClient`` is wrapped in a ``RuntimeQueryInterceptor`` so a runtime ``deny`` blocks the tool via - ``check_tool_start``; otherwise the bare client is used unchanged - (fail-open). + ``check_tool_start``. ``enforcement_mode`` decides the failure posture: under + ``enforce`` an unreachable runtime or a failed query blocks (fail closed, + AAASM-3106); under ``observe`` / ``disabled`` it proceeds (fail open). """ registry = AdapterRegistry() adapters = registry.get_available_adapters_by_priority() registered: list[FrameworkAdapter] = [] - interceptor: Any = build_governance_interceptor(client, process_agent_id) + interceptor: Any = build_governance_interceptor(client, process_agent_id, enforcement_mode) for adapter in adapters: adapter.set_process_agent_id(process_agent_id) From 3bbb2594069a7c4a573dbb3673546aa2dac1b525 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 17 Jun 2026 10:27:50 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E2=9C=85=20(runtime):=20Add=20fail-closed?= =?UTF-8?q?=20regression=20tests=20for=20enforce=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover enforce + raising query, enforce + error-sentinel decision, enforce + unreachable runtime (deny-all wrapper), and the observe fail-open counterparts. Refs AAASM-3106 --- test/unit/core/test_runtime_interceptor.py | 128 ++++++++++++++++++++- 1 file changed, 124 insertions(+), 4 deletions(-) diff --git a/test/unit/core/test_runtime_interceptor.py b/test/unit/core/test_runtime_interceptor.py index 7aad9560..2df11510 100644 --- a/test/unit/core/test_runtime_interceptor.py +++ b/test/unit/core/test_runtime_interceptor.py @@ -1,8 +1,11 @@ -"""Unit tests for the runtime-backed pre-execution check (AAASM-3049). +"""Unit tests for the runtime-backed pre-execution check (AAASM-3049, AAASM-3106). -Wave 3 of AAASM-3021: a reachable runtime's ``deny`` must block a tool via the -adapter ``check_tool_start`` contract, while every other path (allow / redact / -pending / unreachable runtime / missing native extension) proceeds (fail-open). +Wave 3 of AAASM-3021: a reachable runtime's ``deny`` blocks a tool via the +adapter ``check_tool_start`` contract. + +AAASM-3106 adds the failure posture: under ``enforce`` an unreachable runtime, a +raising ``query_policy``, or an error-sentinel ``decision`` must deny (fail +closed); under ``observe`` / ``disabled`` those paths still proceed (fail open). """ from __future__ import annotations @@ -209,6 +212,123 @@ def connect(_socket_path: str) -> Any: } +def test_enforce_query_raising_fails_closed() -> None: + """AAASM-3106: under enforce a raising query_policy denies, not allows.""" + + class _Raising: + def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]: + raise RuntimeError("native boom") + + interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _Raising(), "agent-001", enforce=True) + + result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i") + + assert result["status"] == "deny" + + +@pytest.mark.parametrize("decision", ["query_failed", "channel_closed", "shutdown"]) +def test_enforce_error_decision_fails_closed(decision: str) -> None: + """AAASM-3106: native error-sentinel decisions deny under enforce.""" + interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001", enforce=True) + + result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i") + + assert result["status"] == "deny" + + +@pytest.mark.parametrize("decision", ["query_failed", "channel_closed"]) +def test_observe_error_decision_still_fails_open(decision: str) -> None: + """Without enforce the error-sentinel decisions keep proceeding.""" + interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001") + + result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i") + + assert result == {"status": "allow"} + + +def test_enforce_unreachable_runtime_denies_all(monkeypatch: pytest.MonkeyPatch) -> None: + """AAASM-3106: native present but runtime unreachable yields a deny-all + interceptor under enforce, not the fail-open bare client.""" + + class _UnreachableRuntimeClient: + @staticmethod + def connect(_socket_path: str) -> Any: + raise OSError("no such socket") + + fake_core = types.ModuleType("agent_assembly._core") + fake_core.RuntimeClient = _UnreachableRuntimeClient # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core) + + client = _FakeGatewayClient() + result = build_governance_interceptor(client, "agent-001", "enforce") + + assert result is not client + assert result.check_tool_start(serialized={"name": "t"}, input_str="i")["status"] == "deny" + # Non-check attributes still delegate to the wrapped client. + result.close() + assert client.closed is True + + +def test_observe_unreachable_runtime_returns_bare_client(monkeypatch: pytest.MonkeyPatch) -> None: + """Without enforce an unreachable runtime keeps the fail-open bare client.""" + + class _UnreachableRuntimeClient: + @staticmethod + def connect(_socket_path: str) -> Any: + raise OSError("no such socket") + + fake_core = types.ModuleType("agent_assembly._core") + fake_core.RuntimeClient = _UnreachableRuntimeClient # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core) + + client = _FakeGatewayClient() + + assert build_governance_interceptor(client, "agent-001", "observe") is client + + +def test_enforce_wraps_with_fail_closed_query_path(monkeypatch: pytest.MonkeyPatch) -> None: + """build_governance_interceptor under enforce wraps a reachable runtime so a + raising query denies (the wrapper carries enforce=True).""" + + class _Raising: + def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]: + raise RuntimeError("boom") + + class _ConnectingRuntimeClient: + @staticmethod + def connect(_socket_path: str) -> Any: + return _Raising() + + fake_core = types.ModuleType("agent_assembly._core") + fake_core.RuntimeClient = _ConnectingRuntimeClient # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core) + + result = build_governance_interceptor(_FakeGatewayClient(), "agent-001", "enforce") + + assert isinstance(result, RuntimeQueryInterceptor) + assert result.check_tool_start(serialized={"name": "t"}, input_str="i")["status"] == "deny" + + +def test_callback_handler_blocks_on_enforce_fail_closed() -> None: + """End-to-end: under enforce a failing runtime drives on_tool_start to raise.""" + + class _Raising: + def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]: + raise RuntimeError("native down") + + interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _Raising(), "agent-001", enforce=True) + handler = AssemblyCallbackHandler(interceptor) + + with pytest.raises(ToolExecutionBlockedError): + handler.on_tool_start( + serialized={"name": "web_search"}, + input_str="query", + run_id=uuid4(), + tool_name="web_search", + args={"q": "x"}, + ) + + def test_resolve_socket_path_prefers_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("AA_RUNTIME_SOCKET", "/custom/runtime.sock") assert runtime_interceptor._resolve_runtime_socket_path("agent-001") == "/custom/runtime.sock"