Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def init_assembly(
registered_adapters = _register_adapters(
client=client,
process_agent_id=resolved_agent_id,
enforcement_mode=enforcement_mode,
)
network_mode, network_shutdown = _start_network_layer(client=client, mode=mode)
except Exception as error:
Expand Down Expand Up @@ -263,6 +264,7 @@ def _validate_inputs(
def _register_adapters(
client: GatewayClient,
process_agent_id: str,
enforcement_mode: EnforcementMode | None = None,
) -> list[FrameworkAdapter]:
"""Detect available frameworks via AdapterRegistry and register hooks.

Expand All @@ -272,14 +274,15 @@ def _register_adapters(

When the native runtime is reachable, the bare ``GatewayClient`` is wrapped
in a ``RuntimeQueryInterceptor`` so a runtime ``deny`` blocks the tool via
``check_tool_start``; otherwise the bare client is used unchanged
(fail-open).
``check_tool_start``. ``enforcement_mode`` decides the failure posture: under
``enforce`` an unreachable runtime or a failed query blocks (fail closed,
AAASM-3106); under ``observe`` / ``disabled`` it proceeds (fail open).
"""
registry = AdapterRegistry()
adapters = registry.get_available_adapters_by_priority()

registered: list[FrameworkAdapter] = []
interceptor: Any = build_governance_interceptor(client, process_agent_id)
interceptor: Any = build_governance_interceptor(client, process_agent_id, enforcement_mode)

for adapter in adapters:
adapter.set_process_agent_id(process_agent_id)
Expand Down
112 changes: 88 additions & 24 deletions agent_assembly/core/runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
``query_policy``. The runtime is the authority: it redacts in place, so this
layer only ever *blocks* on an explicit ``deny`` and otherwise proceeds.

Fail-open is preserved at two levels:

* When the native extension is missing or no runtime socket is reachable,
:func:`build_governance_interceptor` returns the bare ``GatewayClient``
unchanged β€” no ``check_tool_start`` is present and the adapters proceed
exactly as before.
* When a runtime *is* connected, the native ``query_policy`` already returns
``decision="allow"`` on QueryFailed / ChannelClosed / Shutdown, so a
transient runtime outage proceeds rather than blocks.
Failure posture is governed by ``enforcement_mode`` (AAASM-3106):

* Under ``enforce``, the SDK is a security control and **fails closed**. When the
native extension is missing the bare client is returned (no native authority
exists to consult β€” see :func:`build_governance_interceptor`), but once the
native extension is present every other failure denies: an unreachable runtime
socket yields a deny-all interceptor, a raising ``query_policy`` maps to
``deny``, and a native ``decision`` that is itself an error sentinel
(``query_failed`` / ``channel_closed``) maps to ``deny`` rather than allow.
* Under ``observe`` / ``disabled`` (or when no mode is supplied), the SDK is a
dry-run / hermetic-test layer and **fails open**: an unreachable runtime
returns the bare client unchanged, a raising or error ``query_policy``
proceeds, exactly as before.

The runtime remains the authority on *redaction* (it redacts in place); this
layer only ever decides allow / deny / pending.
"""

from __future__ import annotations
Expand All @@ -31,6 +38,12 @@

ENV_RUNTIME_SOCKET = "AA_RUNTIME_SOCKET"
ACTION_TYPE_TOOL_CALL = "tool_call"
ENFORCE_MODE = "enforce"

# Native ``query_policy`` decisions that signal the runtime could not produce an
# authoritative verdict (mirrors aa-ffi-python mapping QueryFailed / ChannelClosed
# / Shutdown). Under ``enforce`` these must deny, not allow (AAASM-3106).
_ERROR_DECISIONS = frozenset({"query_failed", "channel_closed", "shutdown", "error"})


def _resolve_runtime_socket_path(agent_id: str) -> str:
Expand Down Expand Up @@ -99,14 +112,20 @@ class RuntimeQueryInterceptor:

Delegates every attribute the adapters look up (event reporting, tool-end
hooks, approval timeout providers, ...) to the wrapped ``GatewayClient`` and
only *adds* ``check_tool_start``. The added check is fail-open: any path that
cannot produce an explicit ``deny`` proceeds.
only *adds* ``check_tool_start``.

The failure posture of the added check is governed by ``enforce``: when
``True`` (``enforcement_mode == "enforce"``) any path that cannot obtain an
authoritative allow β€” a raising ``query_policy`` or an error-sentinel
``decision`` β€” maps to ``deny`` (fail closed). When ``False`` those paths
proceed (fail open), preserving the observe / disabled behavior.
"""

def __init__(self, client: Any, runtime_client: Any, agent_id: str) -> None:
def __init__(self, client: Any, runtime_client: Any, agent_id: str, *, enforce: bool = False) -> None:
self._client = client
self._runtime_client = runtime_client
self._agent_id = agent_id
self._enforce = enforce

def __getattr__(self, name: str) -> Any:
# Delegate anything not defined here (e.g. report_event, on_tool_end,
Expand All @@ -125,12 +144,14 @@ def check_tool_start(

Maps the runtime decision onto the adapter contract:

* ``"deny"`` β†’ ``{"status": "deny", "reason": ...}`` (the only block).
* ``"deny"`` β†’ ``{"status": "deny", "reason": ...}``.
* ``"pending"`` β†’ ``{"status": "pending", "reason": ...}`` so the
adapter's existing approval path runs.
* anything else (``"allow"`` / ``"redact"`` / ``"unspecified"`` / an
unreachable runtime) β†’ ``{"status": "allow"}``. The runtime redacts
authoritatively; this layer never redacts.
* ``"allow"`` / ``"redact"`` / ``"unspecified"`` β†’ ``{"status": "allow"}``.
The runtime redacts authoritatively; this layer never redacts.
* A raising ``query_policy`` or an error-sentinel ``decision``
(``query_failed`` / ``channel_closed`` / ``shutdown``) β†’ ``deny`` under
``enforce`` (fail closed, AAASM-3106), else ``allow`` (fail open).
"""
tool_name = _extract_tool_name(serialized, kwargs)
tool_args_json = _extract_tool_args_json(input_str, kwargs)
Expand All @@ -143,8 +164,10 @@ def check_tool_start(
tool_args_json,
)
except Exception:
# Native query raised unexpectedly β€” fail OPEN, never block.
return {"status": "allow"}
# Native query raised β€” the runtime gave no verdict. Under enforce the
# SDK is a security control and must block (fail closed); otherwise
# proceed (fail open).
return self._on_query_failure("runtime query failed")

decision = str(result.get("decision", "allow")).strip().lower()
reason = str(result.get("reason", "") or "")
Expand All @@ -153,28 +176,69 @@ def check_tool_start(
return {"status": "deny", "reason": reason}
if decision == "pending":
return {"status": "pending", "reason": reason}
if decision in _ERROR_DECISIONS:
# Native reported it could not reach an authoritative verdict.
return self._on_query_failure(reason or f"runtime returned {decision}")
return {"status": "allow"}

def _on_query_failure(self, reason: str) -> dict[str, str]:
"""Map an unauthoritative query to deny (enforce) or allow (observe)."""
if self._enforce:
return {"status": "deny", "reason": reason}
return {"status": "allow"}


def build_governance_interceptor(client: Any, agent_id: str) -> Any:
class _FailClosedInterceptor:
"""Deny-all interceptor used under ``enforce`` when no runtime is reachable.

The native extension is present (so the SDK is configured as a security
control) but the runtime socket could not be connected, meaning no
authoritative verdict can be obtained. Under ``enforce`` this must block
every tool rather than silently allow it (AAASM-3106). Non-check attributes
delegate to the wrapped ``GatewayClient`` so event reporting still works.
"""

def __init__(self, client: Any, reason: str) -> None:
self._client = client
self._reason = reason

def __getattr__(self, name: str) -> Any:
return getattr(self._client, name)

def check_tool_start(self, **_kwargs: Any) -> dict[str, str]:
return {"status": "deny", "reason": self._reason}


def build_governance_interceptor(client: Any, agent_id: str, enforcement_mode: str | None = None) -> Any:
"""Return the interceptor adapters should use for pre-execution checks.

When the native extension is importable and a runtime socket is reachable,
wrap ``client`` in a :class:`RuntimeQueryInterceptor` so a runtime ``deny``
actually blocks the tool. Otherwise return ``client`` unchanged so the
existing fail-open / no-core path is preserved exactly.
actually blocks the tool. The failure posture depends on ``enforcement_mode``
(AAASM-3106):

* The native extension is **missing**: return ``client`` unchanged in every
mode. There is no native authority to consult, so there is nothing to fail
closed *to* β€” the SDK fast path is simply not engaged.
* The native extension is **present** but the runtime socket is unreachable:
under ``enforce`` return a deny-all :class:`_FailClosedInterceptor`;
otherwise return ``client`` unchanged (fail open).
"""
enforce = enforcement_mode == ENFORCE_MODE
try:
from agent_assembly._core import RuntimeClient
except ImportError:
# No native fast path at all β€” the SDK control is not engaged in any mode.
return client

socket_path = _resolve_runtime_socket_path(agent_id)
try:
runtime_client = RuntimeClient.connect(socket_path)
except Exception:
# No reachable runtime / connect failed β€” fail OPEN: bare client has no
# check_tool_start, so adapters proceed exactly as before.
# Native present but runtime unreachable: deny everything under enforce
# (fail closed); proceed under observe / disabled (fail open).
if enforce:
return _FailClosedInterceptor(client, "runtime unreachable")
return client

return RuntimeQueryInterceptor(client, runtime_client, agent_id)
return RuntimeQueryInterceptor(client, runtime_client, agent_id, enforce=enforce)
128 changes: 124 additions & 4 deletions test/unit/core/test_runtime_interceptor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Unit tests for the runtime-backed pre-execution check (AAASM-3049).
"""Unit tests for the runtime-backed pre-execution check (AAASM-3049, AAASM-3106).

Wave 3 of AAASM-3021: a reachable runtime's ``deny`` must block a tool via the
adapter ``check_tool_start`` contract, while every other path (allow / redact /
pending / unreachable runtime / missing native extension) proceeds (fail-open).
Wave 3 of AAASM-3021: a reachable runtime's ``deny`` blocks a tool via the
adapter ``check_tool_start`` contract.

AAASM-3106 adds the failure posture: under ``enforce`` an unreachable runtime, a
raising ``query_policy``, or an error-sentinel ``decision`` must deny (fail
closed); under ``observe`` / ``disabled`` those paths still proceed (fail open).
"""

from __future__ import annotations
Expand Down Expand Up @@ -209,6 +212,123 @@ def connect(_socket_path: str) -> Any:
}


def test_enforce_query_raising_fails_closed() -> None:
"""AAASM-3106: under enforce a raising query_policy denies, not allows."""

class _Raising:
def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]:
raise RuntimeError("native boom")

interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _Raising(), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result["status"] == "deny"


@pytest.mark.parametrize("decision", ["query_failed", "channel_closed", "shutdown"])
def test_enforce_error_decision_fails_closed(decision: str) -> None:
"""AAASM-3106: native error-sentinel decisions deny under enforce."""
interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result["status"] == "deny"


@pytest.mark.parametrize("decision", ["query_failed", "channel_closed"])
def test_observe_error_decision_still_fails_open(decision: str) -> None:
"""Without enforce the error-sentinel decisions keep proceeding."""
interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001")

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result == {"status": "allow"}


def test_enforce_unreachable_runtime_denies_all(monkeypatch: pytest.MonkeyPatch) -> None:
"""AAASM-3106: native present but runtime unreachable yields a deny-all
interceptor under enforce, not the fail-open bare client."""

class _UnreachableRuntimeClient:
@staticmethod
def connect(_socket_path: str) -> Any:
raise OSError("no such socket")

fake_core = types.ModuleType("agent_assembly._core")
fake_core.RuntimeClient = _UnreachableRuntimeClient # type: ignore[attr-defined]
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

client = _FakeGatewayClient()
result = build_governance_interceptor(client, "agent-001", "enforce")

assert result is not client
assert result.check_tool_start(serialized={"name": "t"}, input_str="i")["status"] == "deny"
# Non-check attributes still delegate to the wrapped client.
result.close()
assert client.closed is True


def test_observe_unreachable_runtime_returns_bare_client(monkeypatch: pytest.MonkeyPatch) -> None:
"""Without enforce an unreachable runtime keeps the fail-open bare client."""

class _UnreachableRuntimeClient:
@staticmethod
def connect(_socket_path: str) -> Any:
raise OSError("no such socket")

fake_core = types.ModuleType("agent_assembly._core")
fake_core.RuntimeClient = _UnreachableRuntimeClient # type: ignore[attr-defined]
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

client = _FakeGatewayClient()

assert build_governance_interceptor(client, "agent-001", "observe") is client


def test_enforce_wraps_with_fail_closed_query_path(monkeypatch: pytest.MonkeyPatch) -> None:
"""build_governance_interceptor under enforce wraps a reachable runtime so a
raising query denies (the wrapper carries enforce=True)."""

class _Raising:
def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]:
raise RuntimeError("boom")

class _ConnectingRuntimeClient:
@staticmethod
def connect(_socket_path: str) -> Any:
return _Raising()

fake_core = types.ModuleType("agent_assembly._core")
fake_core.RuntimeClient = _ConnectingRuntimeClient # type: ignore[attr-defined]
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

result = build_governance_interceptor(_FakeGatewayClient(), "agent-001", "enforce")

assert isinstance(result, RuntimeQueryInterceptor)
assert result.check_tool_start(serialized={"name": "t"}, input_str="i")["status"] == "deny"


def test_callback_handler_blocks_on_enforce_fail_closed() -> None:
"""End-to-end: under enforce a failing runtime drives on_tool_start to raise."""

class _Raising:
def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]:
raise RuntimeError("native down")

interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _Raising(), "agent-001", enforce=True)
handler = AssemblyCallbackHandler(interceptor)

with pytest.raises(ToolExecutionBlockedError):
handler.on_tool_start(
serialized={"name": "web_search"},
input_str="query",
run_id=uuid4(),
tool_name="web_search",
args={"q": "x"},
)


def test_resolve_socket_path_prefers_env(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("AA_RUNTIME_SOCKET", "/custom/runtime.sock")
assert runtime_interceptor._resolve_runtime_socket_path("agent-001") == "/custom/runtime.sock"
Expand Down