Skip to content
65 changes: 52 additions & 13 deletions agent_assembly/adapters/crewai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,31 +193,69 @@
return f"[APPROVAL REJECTED] Action was reviewed and denied: {reason_text}"


_UNKNOWN_DECISION_REASON = "Unrecognized governance decision; denied under enforce."


def _interceptor_enforces(callback_handler: Any) -> bool:
"""Return whether the wired interceptor is in fail-closed ``enforce`` posture.

The governance interceptor (``RuntimeQueryInterceptor`` /
``_FailClosedInterceptor``) carries ``_enforce`` set from
``enforcement_mode == "enforce"`` (AAASM-3106). A bare ``GatewayClient`` — used
when no native runtime authority is engaged — has no such attribute and
defaults to fail-open. AAASM-3107 reuses this flag so an unknown / malformed
verdict denies under enforce instead of silently allowing.

Compared strictly against ``True`` so a stub interceptor whose ``__getattr__``
synthesizes truthy values for missing attributes is not mistaken for the
enforce posture; the real flag is always a ``bool``.
"""
target = getattr(callback_handler, "_interceptor", callback_handler)
return getattr(target, "_enforce", False) is True


def _unknown_decision(enforce: bool) -> tuple[Literal["allow", "deny", "pending"], str | None]:
"""Map an unrecognized / malformed verdict, failing closed under ``enforce``.

Under ``enforce`` the SDK is a security control: an unknown, ``None``, or
malformed verdict must not be silently allowed (AAASM-3107), so it denies.
Under ``observe`` / ``disabled`` it proceeds (fail open), preserving the
dry-run / hermetic posture.
"""
if enforce:
return "deny", _UNKNOWN_DECISION_REASON
return "allow", None


def _normalize_decision(

Check failure on line 230 in agent_assembly/adapters/crewai/patch.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=AI-agent-assembly_python-sdk&issues=AZ7T1ar8qF0BTjQTQgiU&open=AZ7T1ar8qF0BTjQTQgiU&pullRequest=139
decision: object,
*,
enforce: bool = False,
) -> tuple[Literal["allow", "deny", "pending"], str | None]:
if isinstance(decision, str):
normalized = decision.strip().lower()
if normalized == "allow":
return "allow", None
if normalized == "deny":
return "deny", None
if normalized == "pending":
return "pending", None
return "allow", None
return _unknown_decision(enforce)

if isinstance(decision, Mapping):
raw_status = str(decision.get("status", "allow")).strip().lower()
if raw_status == "deny":
status: Literal["allow", "deny", "pending"] = "deny"
elif raw_status == "pending":
status = "pending"
else:
status = "allow"

raw_status = str(decision.get("status", "")).strip().lower()
reason_value = decision.get("reason")
reason = str(reason_value) if reason_value is not None else None
return status, reason
if raw_status == "allow":
return "allow", reason
if raw_status == "deny":
return "deny", reason
if raw_status == "pending":
return "pending", reason
unknown_status, unknown_reason = _unknown_decision(enforce)
return unknown_status, reason if reason is not None else unknown_reason

return "allow", None
return _unknown_decision(enforce)


def _invoke_sync_tool_check(
Expand Down Expand Up @@ -306,6 +344,7 @@
return None

original_run = base_tool_cls.run
enforce = _interceptor_enforces(callback_handler)

@wraps(original_run)
def patched_run(self: Any, *args: Any, **kwargs: Any) -> Any:
Expand All @@ -318,7 +357,7 @@
tool_args=tool_args,
agent_id=agent_id,
)
status, reason = _normalize_decision(decision)
status, reason = _normalize_decision(decision, enforce=enforce)
is_pending_flow = False
if status == "pending":
is_pending_flow = True
Expand All @@ -330,7 +369,7 @@
tool_args=tool_args,
agent_id=agent_id,
)
status, reason = _normalize_decision(final_decision)
status, reason = _normalize_decision(final_decision, enforce=enforce)

if status == "deny":
if is_pending_flow:
Expand Down
56 changes: 42 additions & 14 deletions agent_assembly/adapters/langchain/callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,40 @@
class AssemblyCallbackHandler(_CallbackHandlerBase): # type: ignore[valid-type,misc]
"""Callback handler that delegates runtime events to governance interception."""

_UNKNOWN_DECISION_REASON = "Unrecognized governance decision; denied under enforce."

def __init__(self, interceptor: Any) -> None:
self._interceptor = interceptor

@property
def _enforce(self) -> bool:
"""Whether the wired interceptor is in fail-closed ``enforce`` posture.

The governance interceptor (``RuntimeQueryInterceptor`` /
``_FailClosedInterceptor``) carries ``_enforce`` set from
``enforcement_mode == "enforce"`` (AAASM-3106). A bare ``GatewayClient``
— used when no native runtime authority is engaged — lacks it and
defaults to fail-open. AAASM-3107 reuses this flag so an unknown / ``None``
/ malformed verdict denies under enforce instead of silently allowing.

Compared strictly against ``True`` so a stub interceptor whose
``__getattr__`` synthesizes truthy values for missing attributes is not
mistaken for the enforce posture; the real flag is always a ``bool``.
"""
return getattr(self._interceptor, "_enforce", False) is True

def _unknown_decision(self) -> tuple[Literal["allow", "deny", "pending"], str | None]:
"""Map an unrecognized / malformed verdict, failing closed under enforce.

Under ``enforce`` an unknown, ``None``, or malformed verdict must not be
silently allowed (AAASM-3107), so it denies. Under ``observe`` /
``disabled`` it proceeds (fail open).
"""
if self._enforce:
return "deny", self._UNKNOWN_DECISION_REASON
return "allow", None

def _normalize_decision(

Check failure on line 63 in agent_assembly/adapters/langchain/callback_handler.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=AI-agent-assembly_python-sdk&issues=AZ7T1apnqF0BTjQTQgiT&open=AZ7T1apnqF0BTjQTQgiT&pullRequest=139
self,
decision: object,
) -> tuple[Literal["allow", "deny", "pending"], str | None]:
Expand All @@ -42,24 +72,22 @@
return "deny", None
if normalized == "pending":
return "pending", None
return "allow", None
return self._unknown_decision()

if isinstance(decision, Mapping):
raw_status = str(decision.get("status", "allow")).strip().lower()
if raw_status == "allow":
status: Literal["allow", "deny", "pending"] = "allow"
elif raw_status == "deny":
status = "deny"
elif raw_status == "pending":
status = "pending"
else:
status = "allow"

raw_status = str(decision.get("status", "")).strip().lower()
reason_value = decision.get("reason")
reason = str(reason_value) if reason_value is not None else None
return status, reason

return "allow", None
if raw_status == "allow":
return "allow", reason
if raw_status == "deny":
return "deny", reason
if raw_status == "pending":
return "pending", reason
unknown_status, unknown_reason = self._unknown_decision()
return unknown_status, reason if reason is not None else unknown_reason

return self._unknown_decision()

def on_tool_start(
self,
Expand Down
13 changes: 10 additions & 3 deletions agent_assembly/adapters/mcp/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from agent_assembly.adapters.crewai.patch import (
_get_pending_tool_approval_timeout_seconds as _resolve_pending_timeout_seconds,
)
from agent_assembly.adapters.crewai.patch import (
_interceptor_enforces as _resolve_interceptor_enforces,
)
from agent_assembly.adapters.crewai.patch import (
_normalize_decision as _normalize_governance_decision,
)
Expand Down Expand Up @@ -113,8 +116,10 @@ def _extract_tool_call_inputs(

def _normalize_decision(
decision: object,
*,
enforce: bool = False,
) -> tuple[Literal["allow", "deny", "pending"], str | None]:
return _normalize_governance_decision(decision)
return _normalize_governance_decision(decision, enforce=enforce)


def _get_pending_tool_approval_timeout_seconds(callback_handler: Any) -> int:
Expand Down Expand Up @@ -245,6 +250,8 @@ def _apply_client_session_patch(client_session_cls: type[Any], callback_handler:
if not callable(original_call_tool):
return None

enforce = _resolve_interceptor_enforces(callback_handler)

async def patched_call_tool(self: Any, *args: Any, **kwargs: Any) -> Any:
tool_name, tool_args = _extract_tool_call_inputs(args, kwargs)
agent_id = _get_process_agent_id()
Expand All @@ -257,7 +264,7 @@ async def patched_call_tool(self: Any, *args: Any, **kwargs: Any) -> Any:
agent_id=agent_id,
server_identifier=server_identifier,
)
status, reason = _normalize_decision(decision)
status, reason = _normalize_decision(decision, enforce=enforce)
is_pending_flow = False
if status == "pending":
is_pending_flow = True
Expand All @@ -270,7 +277,7 @@ async def patched_call_tool(self: Any, *args: Any, **kwargs: Any) -> Any:
agent_id=agent_id,
server_identifier=server_identifier,
)
status, reason = _normalize_decision(final_decision)
status, reason = _normalize_decision(final_decision, enforce=enforce)

if status == "deny":
raise _build_blocked_error(
Expand Down
47 changes: 47 additions & 0 deletions test/unit/adapters/crewai/test_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,53 @@ def on_task_complete(self, **kwargs: object) -> None:
assert lifecycle_events == ["start", "complete"]


# --- AAASM-3107: unknown/None/malformed verdicts must fail closed under enforce ---


@pytest.mark.parametrize("decision", [None, "maybe", 12345, {"status": "garbage"}, {}])
def test_normalize_decision_denies_unknown_under_enforce(decision: object) -> None:
status, reason = crewai_patch._normalize_decision(decision, enforce=True)
assert status == "deny"
assert reason


@pytest.mark.parametrize("decision", [None, "maybe", 12345, {"status": "garbage"}, {}])
def test_normalize_decision_allows_unknown_when_not_enforcing(decision: object) -> None:
assert crewai_patch._normalize_decision(decision, enforce=False) == ("allow", None)


def test_normalize_decision_known_verdicts_unchanged_under_enforce() -> None:
assert crewai_patch._normalize_decision("allow", enforce=True) == ("allow", None)
assert crewai_patch._normalize_decision("deny", enforce=True) == ("deny", None)
assert crewai_patch._normalize_decision("pending", enforce=True) == ("pending", None)
assert crewai_patch._normalize_decision({"status": "deny", "reason": "x"}, enforce=True) == ("deny", "x")


def test_interceptor_enforces_reads_enforce_flag() -> None:
assert crewai_patch._interceptor_enforces(SimpleNamespace(_enforce=True)) is True
assert crewai_patch._interceptor_enforces(SimpleNamespace()) is False
assert crewai_patch._interceptor_enforces(SimpleNamespace(_interceptor=SimpleNamespace(_enforce=True))) is True


def test_unknown_verdict_blocks_tool_under_enforce(monkeypatch: pytest.MonkeyPatch) -> None:
FakeBaseTool, _ = _install_fake_crewai_modules(monkeypatch)

class EnforcingUnknownInterceptor:
_enforce = True

def check_tool_start(self, **kwargs: object) -> object:
del kwargs
return None

patcher = crewai_patch.CrewAIPatch(EnforcingUnknownInterceptor())
assert patcher.apply() is True

result = FakeBaseTool().run(param="value")

assert isinstance(result, str)
assert "[BLOCKED by governance policy]" in result


def test_blocked_tool_returns_policy_string(monkeypatch: pytest.MonkeyPatch) -> None:
FakeBaseTool, _ = _install_fake_crewai_modules(monkeypatch)

Expand Down
39 changes: 39 additions & 0 deletions test/unit/adapters/langchain/test_callback_handler_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,42 @@ async def test_aon_llm_end_delegates_to_interceptor() -> None:
await handler.aon_llm_end(response={"text": "done"}, run_id=uuid4())

assert interceptor.llm_end_calls == 1


class _EnforcingAsyncInterceptor(AsyncInterceptor):
"""AsyncInterceptor carrying the fail-closed enforce posture (AAASM-3106)."""

_enforce = True


# --- AAASM-3107: unknown/None/malformed verdicts must fail closed under enforce ---


@pytest.mark.asyncio
@pytest.mark.parametrize("decision", [None, "maybe", 12345, {"status": "garbage"}, {}])
async def test_aon_tool_start_denies_unknown_under_enforce(decision: object) -> None:
handler = AssemblyCallbackHandler(_EnforcingAsyncInterceptor())

with pytest.raises(ToolExecutionBlockedError):
await handler.aon_tool_start(
serialized={"name": "web_search"},
input_str="query",
run_id=uuid4(),
decision=decision,
)


@pytest.mark.asyncio
@pytest.mark.parametrize("decision", [None, "maybe", 12345, {"status": "garbage"}, {}])
async def test_aon_tool_start_allows_unknown_when_not_enforcing(decision: object) -> None:
interceptor = AsyncInterceptor()
handler = AssemblyCallbackHandler(interceptor)

await handler.aon_tool_start(
serialized={"name": "web_search"},
input_str="query",
run_id=uuid4(),
decision=decision,
)

assert interceptor.pending_wait_calls == 0
63 changes: 63 additions & 0 deletions test/unit/adapters/langchain/test_callback_handler_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,66 @@ def test_on_llm_end_delegates_to_interceptor() -> None:
handler.on_llm_end(response={"text": "done"}, run_id=uuid4())

assert interceptor.llm_end_calls == 1


class _EnforcingInterceptor(SyncInterceptor):
"""SyncInterceptor carrying the fail-closed enforce posture (AAASM-3106)."""

_enforce = True


# --- AAASM-3107: unknown/None/malformed verdicts must fail closed under enforce ---


@pytest.mark.parametrize(
"decision",
[
None,
"maybe",
12345,
{"status": "garbage"},
{},
],
)
def test_unknown_decision_denies_under_enforce(decision: object) -> None:
handler = AssemblyCallbackHandler(_EnforcingInterceptor())

with pytest.raises(ToolExecutionBlockedError):
handler.on_tool_start(
serialized={"name": "web_search"},
input_str="query",
run_id=uuid4(),
decision=decision,
)


@pytest.mark.parametrize(
"decision",
[
None,
"maybe",
12345,
{"status": "garbage"},
{},
],
)
def test_unknown_decision_allows_when_not_enforcing(decision: object) -> None:
interceptor = SyncInterceptor()
handler = AssemblyCallbackHandler(interceptor)

handler.on_tool_start(
serialized={"name": "web_search"},
input_str="query",
run_id=uuid4(),
decision=decision,
)

assert interceptor.pending_wait_calls == 0


def test_known_verdicts_unchanged_under_enforce() -> None:
handler = AssemblyCallbackHandler(_EnforcingInterceptor())

assert handler._normalize_decision("allow") == ("allow", None)
assert handler._normalize_decision({"status": "deny", "reason": "nope"}) == ("deny", "nope")
assert handler._normalize_decision("pending") == ("pending", None)
Loading