Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,19 @@ What this does:
## Public API

- `init_assembly(gateway_url, api_key, agent_id=None, mode="auto", *, control_plane_url=None) -> AssemblyContext`
- `async GatewayClient.register_agent() -> dict`
- `async GatewayClient.check_policy_compliance(action: str) -> dict`
- Exceptions: `AssemblyError`, `AgentError`, `PolicyError`, `GatewayError`, `ConfigurationError`
- Data models: `AgentConfig`, `AgentState`, `PolicyEvaluation`

Agent **registration** and the **pre-execution policy check** are not REST
calls: the SDK goes through the native `aa-sdk-client` shim to the core over
gRPC/UDS (ADR 0004) — it never calls a core HTTP endpoint directly for those.
`init_assembly` registers the agent on startup, and a tool call is checked via
the native `query_policy` so a `deny` blocks it before the tool runs.

### Control-plane routing

By default the SDK issues its HTTP routes (agent registration, policy checks,
topology edges) against `gateway_url` — the single-host OSS dev setup. Pass
By default the SDK issues its remaining HTTP routes (topology edges, secret
dispatch) against `gateway_url` — the single-host OSS dev setup. Pass
`control_plane_url` to route those HTTP calls to a separate control-plane host
while `gateway_url` continues to serve the gRPC data path:

Expand Down
73 changes: 8 additions & 65 deletions agent_assembly/client/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ def __init__(
api_key: Optional API key for authentication
timeout: Request timeout in seconds
control_plane_url: Optional URL of the control-plane HTTP API. When
set, the HTTP routes (``/agents/{id}/register``, ``/policy/check``,
``/topology/edges``) are issued against it; when ``None`` (the
set, the remaining HTTP routes (``/topology/edges``,
``/dispatch_tool``) are issued against it; when ``None`` (the
default) they fall back to ``gateway_url`` — the backwards-compatible
single-host OSS dev setup.
single-host OSS dev setup. Agent registration and policy checks
go through the native gRPC path (ADR 0004), not these HTTP routes.
parent_agent_id: Parent agent ID for topology tracking
team_id: Team ID this agent belongs to
delegation_reason: Human-readable reason for delegation
spawned_by_tool: Name of the tool that spawned this agent
depth: Spawn depth in the agent lineage tree
enforcement_mode: Per-agent governance posture sent to the gateway
at registration. ``None`` (the default) omits the field from
the request body so a legacy gateway sees the same wire shape
as before; the gateway then defaults to live enforcement.
enforcement_mode: Per-agent governance posture for this agent.
``None`` (the default) lets the gateway apply its server-side
default of live enforcement. Stored for reference; registration
itself now goes through the native gRPC path (ADR 0004).
"""
self.gateway_url = gateway_url.rstrip("/")
self.control_plane_url = control_plane_url.rstrip("/") if control_plane_url else None
Expand Down Expand Up @@ -96,64 +97,6 @@ def __exit__(self, *args: object) -> None:
"""Context manager exit."""
self.close()

async def register_agent(self) -> dict[str, Any]:
"""
Register the agent with the governance gateway.

Returns:
Registration response data

Raises:
GatewayError: If registration fails
"""
body: dict[str, str | int] = {}
if self.parent_agent_id is not None:
body["parent_agent_id"] = self.parent_agent_id
if self.team_id is not None:
body["team_id"] = self.team_id
if self.delegation_reason is not None:
body["delegation_reason"] = self.delegation_reason
if self.spawned_by_tool is not None:
body["spawned_by_tool"] = self.spawned_by_tool
if self.depth is not None:
body["depth"] = self.depth
if self.enforcement_mode is not None:
body["enforcement_mode"] = self.enforcement_mode
try:
response = self.client.post(
f"/agents/{self.agent_id}/register",
json=body if body else None,
)
response.raise_for_status()
data: dict[str, Any] = response.json()
return data
except httpx.HTTPError as e:
raise GatewayError(f"Failed to register agent: {e}") from e

async def check_policy_compliance(self, action: str) -> dict[str, Any]:
"""
Check if an action complies with governance policies.

Args:
action: The action to check

Returns:
Policy compliance response

Raises:
GatewayError: If policy check fails
"""
try:
response = self.client.post(
f"/agents/{self.agent_id}/policy/check",
json={"action": action},
)
response.raise_for_status()
data: dict[str, Any] = response.json()
return data
except httpx.HTTPError as e:
raise GatewayError(f"Failed to check policy compliance: {e}") from e

def report_edge(
self,
source_agent_id: str,
Expand Down
82 changes: 70 additions & 12 deletions agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from agent_assembly.adapters.registry import AdapterRegistry
from agent_assembly.client.gateway import GatewayClient
from agent_assembly.core.gateway_resolver import resolve_api_key, resolve_gateway_url
from agent_assembly.core.runtime_interceptor import build_governance_interceptor
from agent_assembly.core.runtime_interceptor import (
_native_core_available,
build_governance_interceptor,
connect_runtime_client,
register_agent,
)
from agent_assembly.core.spawn import _SPAWN_CTX
from agent_assembly.exceptions import AssemblyError, ConfigurationError

Expand Down Expand Up @@ -145,17 +150,24 @@ def init_assembly(
optional auto-start) per Epic 17 S-G — see
``agent_assembly.core.gateway_resolver``.

Agent registration and the pre-execution policy check go through the native
``aa-sdk-client`` shim to the core over gRPC/UDS (ADR 0004): ``init_assembly``
registers the agent on startup (storing the issued credential token on the
native client) and a tool call is checked via the native ``query_policy`` so
a ``deny`` blocks the tool before it runs. The SDK never calls a core HTTP
endpoint directly for registration or policy checks.

:param control_plane_url: Optional URL of the control-plane HTTP API. When
supplied, the SDK issues its HTTP routes (agent registration, policy
checks, topology edges) against it instead of ``gateway_url``. When
omitted it falls back to ``gateway_url`` — the backwards-compatible
single-host OSS dev setup. Resolution order: explicit kwarg >
``AA_CONTROL_PLANE_URL`` env-var > unset (falls back to ``gateway_url``).
:param enforcement_mode: Per-agent governance posture sent to the gateway
at registration (see :data:`EnforcementMode`). Defaults to ``None``,
which omits the field from the registration body — the gateway then
applies its server-side default (live ``enforce``). Pass ``"observe"``
to register the agent in dry-run / sandbox mode: every action
supplied, the SDK issues its remaining HTTP routes (topology edges,
secret dispatch) against it instead of ``gateway_url``. When omitted it
falls back to ``gateway_url`` — the backwards-compatible single-host OSS
dev setup. Resolution order: explicit kwarg > ``AA_CONTROL_PLANE_URL``
env-var > unset (falls back to ``gateway_url``).
:param enforcement_mode: Per-agent governance posture applied to this
agent's actions (see :data:`EnforcementMode`). Defaults to ``None``,
which lets the gateway apply its server-side default (live ``enforce``).
Pass ``"observe"`` to register the agent in dry-run / sandbox mode:
every action
proceeds and the gateway records would-be violations as shadow audit
events.
"""
Expand Down Expand Up @@ -210,10 +222,19 @@ def init_assembly(
network_mode: NetworkMode = "sdk-only"
network_shutdown: Callable[[], None] = _noop_shutdown
try:
native_available = _native_core_available()
runtime_client = connect_runtime_client(resolved_agent_id) if native_available else None
_register_agent_with_gateway(
runtime_client=runtime_client,
agent_id=resolved_agent_id,
enforcement_mode=enforcement_mode,
)
registered_adapters = _register_adapters(
client=client,
process_agent_id=resolved_agent_id,
enforcement_mode=enforcement_mode,
runtime_client=runtime_client,
native_available=native_available,
)
network_mode, network_shutdown = _start_network_layer(client=client, mode=mode)
except Exception as error:
Expand Down Expand Up @@ -261,10 +282,41 @@ def _validate_inputs(
return gateway_url, control_plane_url


def _register_agent_with_gateway(
*,
runtime_client: Any | None,
agent_id: str,
enforcement_mode: EnforcementMode | None,
) -> None:
"""Register the agent with the gateway over the native gRPC ``register``.

Registration goes through the native runtime client (AAASM-3399) so the
issued credential token is stored on the same client the
``RuntimeQueryInterceptor`` later uses for ``query_policy`` — the SDK never
calls a core HTTP endpoint directly (ADR 0004).

No native runtime (extension missing or socket unreachable) means there is
nothing to register against: the call is skipped. Under ``enforce`` a native
registration failure propagates so a misconfigured gateway surfaces at init;
under ``observe`` / ``disabled`` (or no mode) it is swallowed so the dry-run
/ hermetic-test layer never hard-fails on registration.
"""
if runtime_client is None:
return
framework = "python"
try:
register_agent(runtime_client, agent_id, framework)
except Exception:
if enforcement_mode == "enforce":
raise


def _register_adapters(
client: GatewayClient,
process_agent_id: str,
enforcement_mode: EnforcementMode | None = None,
runtime_client: Any | None = None,
native_available: bool = False,
) -> list[FrameworkAdapter]:
"""Detect available frameworks via AdapterRegistry and register hooks.

Expand All @@ -282,7 +334,13 @@ def _register_adapters(
adapters = registry.get_available_adapters_by_priority()

registered: list[FrameworkAdapter] = []
interceptor: Any = build_governance_interceptor(client, process_agent_id, enforcement_mode)
interceptor: Any = build_governance_interceptor(
client,
process_agent_id,
enforcement_mode,
runtime_client=runtime_client,
native_available=native_available,
)

for adapter in adapters:
adapter.set_process_agent_id(process_agent_id)
Expand Down
100 changes: 86 additions & 14 deletions agent_assembly/core/runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,104 @@ def check_tool_start(self, **_kwargs: Any) -> dict[str, str]:
return {"status": "deny", "reason": self._reason}


def build_governance_interceptor(client: Any, agent_id: str, enforcement_mode: str | None = None) -> Any:
def connect_runtime_client(agent_id: str) -> Any | None:
"""Connect a native ``RuntimeClient`` to the runtime UDS, or ``None``.

Returns ``None`` when the native extension is not built or the runtime
socket is unreachable — both cases mean there is no native fast path to
register against or query. The single returned client is reused for both
:func:`register_agent` and the :class:`RuntimeQueryInterceptor`'s
``query_policy`` calls, so the credential token stored by ``register`` is
attached to subsequent checks.
"""
try:
from agent_assembly._core import RuntimeClient
except ImportError:
return None

socket_path = _resolve_runtime_socket_path(agent_id)
try:
return RuntimeClient.connect(socket_path)
except Exception:
return None


def register_agent(
runtime_client: Any,
agent_id: str,
framework: str,
gateway_endpoint: str | None = None,
) -> str | None:
"""Register ``agent_id`` with the gateway over the native ``register`` call.

Delegates to the native ``RuntimeClient.register`` (AAASM-3399), which makes
the SDK's only direct gateway gRPC call and stores the issued credential
token on the shared client so later ``query_policy`` checks authenticate
(ADR 0004 — the SDK never calls core HTTP endpoints directly).

Returns the policy id the gateway assigned, or ``None`` when ``register`` is
not exposed (older native build). Registration is authoritative: a native
failure raises ``RuntimeError`` and is allowed to propagate so init surfaces
a misconfigured gateway rather than silently running unregistered.
"""
register = getattr(runtime_client, "register", None)
if register is None:
return None
return str(register(agent_id, agent_id, framework, gateway_endpoint))


def _native_core_available() -> bool:
"""Whether the native ``_core`` extension can be imported."""
try:
from agent_assembly._core import RuntimeClient # noqa: F401
except ImportError:
return False
return True


def build_governance_interceptor(
client: Any,
agent_id: str,
enforcement_mode: str | None = None,
*,
runtime_client: Any | None = None,
native_available: bool | None = None,
) -> Any:
"""Return the interceptor adapters should use for pre-execution checks.

When the native extension is importable and a runtime socket is reachable,
wrap ``client`` in a :class:`RuntimeQueryInterceptor` so a runtime ``deny``
actually blocks the tool. The failure posture depends on ``enforcement_mode``
(AAASM-3106):
When a native runtime is reachable, wrap ``client`` in a
:class:`RuntimeQueryInterceptor` so a runtime ``deny`` actually blocks the
tool. The failure posture depends on ``enforcement_mode`` (AAASM-3106):

* The native extension is **missing**: return ``client`` unchanged in every
mode. There is no native authority to consult, so there is nothing to fail
closed *to* — the SDK fast path is simply not engaged.
* The native extension is **present** but the runtime socket is unreachable:
under ``enforce`` return a deny-all :class:`_FailClosedInterceptor`;
otherwise return ``client`` unchanged (fail open).

:param runtime_client: A pre-connected native runtime client (e.g. the one
:func:`register_agent` registered the token on). When supplied it is
reused so register and ``query_policy`` share one client; when ``None``
a fresh connection is established here.
:param native_available: Whether the native extension is importable. Pass
this alongside a ``runtime_client`` of ``None`` to distinguish a missing
extension (fail open in every mode) from an unreachable runtime socket
(fail closed under enforce). When ``None`` it is detected here.
"""
enforce = enforcement_mode == ENFORCE_MODE
try:
from agent_assembly._core import RuntimeClient
except ImportError:
# No native fast path at all — the SDK control is not engaged in any mode.
return client

socket_path = _resolve_runtime_socket_path(agent_id)
try:
runtime_client = RuntimeClient.connect(socket_path)
except Exception:
if runtime_client is None and native_available is None:
# No caller-supplied client or hint: detect + connect ourselves.
if not _native_core_available():
return client
runtime_client = connect_runtime_client(agent_id)
native_available = True

if runtime_client is None:
# Native missing → no authority to fail closed to, return bare client.
if not native_available:
return client
# Native present but runtime unreachable: deny everything under enforce
# (fail closed); proceed under observe / disabled (fail open).
if enforce:
Expand Down
Loading