Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ed28d63
fix(server): bound database connection pool
abhinav-galileo Jun 8, 2026
029eaf1
fix(server): release evaluation db session before processing
abhinav-galileo Jun 8, 2026
3f231c9
test(server): sort evaluation error imports
abhinav-galileo Jun 11, 2026
6d33097
feat(server): add db connect/statement timeouts and burst overflow
abhinav-galileo Jun 11, 2026
889f52e
fix(evaluator-galileo): prefer cluster API URL for internal Luna auth
abhinav-galileo Jun 11, 2026
dab9590
fix(evaluator-galileo): harden Luna client for in-cluster endpoints
abhinav-galileo Jun 11, 2026
d379148
fix(evaluator-galileo): align Luna auth config
abhinav-galileo Jun 11, 2026
4c92849
fix(evaluators): keep Luna API URL override generic
abhinav-galileo Jun 11, 2026
14787d8
docs(examples): clarify Luna evaluator configuration
abhinav-galileo Jun 11, 2026
001fdf9
docs(examples): distinguish deployment-injected Luna secret
abhinav-galileo Jun 11, 2026
7951332
feat(evaluator-galileo): add Luna HTTP connection tuning
abhinav-galileo Jun 12, 2026
d7deed8
feat: add evaluation timing metrics
abhinav-galileo Jun 13, 2026
258d101
feat(engine): expose evaluation concurrency setting
abhinav-galileo Jun 13, 2026
a1241b9
feat: add evaluation tracing spans
abhinav-galileo Jun 13, 2026
4068484
feat: add Luna HTTP phase tracing
abhinav-galileo Jun 13, 2026
8e2b031
feat: add Luna client timing metrics
abhinav-galileo Jun 13, 2026
42ed48f
feat: add Luna HTTP client pool setting
abhinav-galileo Jun 14, 2026
2ad873c
fix: expose luna http error metadata
abhinav-galileo Jun 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 291 additions & 41 deletions engine/src/agent_control_engine/core.py

Large diffs are not rendered by default.

230 changes: 229 additions & 1 deletion engine/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"""

import asyncio
from dataclasses import dataclass
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any

import pytest
Expand Down Expand Up @@ -172,6 +174,70 @@ class MockControlWithIdentity:
control: ControlDefinition


@dataclass
class RecordingObserver:
"""Captures engine timing observations for assertions."""

evaluator_queue_durations: list[tuple[str, float]] = field(default_factory=list)
evaluator_durations: list[tuple[str, str, float]] = field(default_factory=list)
control_durations: list[tuple[str, str, float]] = field(default_factory=list)

def observe_evaluator_queue_duration(
self,
*,
evaluator_name: str,
duration_seconds: float,
) -> None:
self.evaluator_queue_durations.append((evaluator_name, duration_seconds))

def observe_evaluator_duration(
self,
*,
evaluator_name: str,
outcome: str,
duration_seconds: float,
) -> None:
self.evaluator_durations.append((evaluator_name, outcome, duration_seconds))

def observe_control_duration(
self,
*,
action: str,
outcome: str,
duration_seconds: float,
) -> None:
self.control_durations.append((action, outcome, duration_seconds))


@dataclass
class RecordedSpan:
"""Captures optional tracing span data for tests."""

op: str
name: str
data: dict[str, object] = field(default_factory=dict)

def set_data(self, key: str, value: object) -> None:
self.data[key] = value


def trace_span_recorder(spans: list[RecordedSpan]):
"""Return a trace_span replacement that records spans."""

@contextmanager
def _trace_span(
*,
op: str,
name: str,
data: dict[str, object] | None = None,
) -> Iterator[RecordedSpan]:
span = RecordedSpan(op=op, name=name, data=dict(data or {}))
spans.append(span)
yield span

return _trace_span


@pytest.fixture(autouse=True)
def setup_test_evaluators():
"""Register test evaluators and reset state before each test."""
Expand Down Expand Up @@ -1280,6 +1346,57 @@ async def test_timeout_does_not_affect_fast_evaluators(self):
class TestConcurrencyLimit:
"""Tests for semaphore-based concurrency limiting."""

def test_max_concurrency_env_prefers_agent_control_name(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""The canonical Agent Control env var overrides the legacy short name."""
import agent_control_engine.core as core_module

monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "7")
monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "2")

assert (
core_module._env_positive_int(
"AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS",
"MAX_CONCURRENT_EVALUATIONS",
default=3,
)
== 7
)

def test_max_concurrency_env_reads_legacy_name(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""The existing env var remains supported for compatibility."""
import agent_control_engine.core as core_module

monkeypatch.delenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", raising=False)
monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "5")

assert (
core_module._env_positive_int(
"AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS",
"MAX_CONCURRENT_EVALUATIONS",
default=3,
)
== 5
)

def test_max_concurrency_env_rejects_non_positive_values(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""The concurrency cap must always allow at least one evaluator."""
import agent_control_engine.core as core_module

monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "0")

with pytest.raises(RuntimeError, match="greater than or equal to 1"):
core_module._env_positive_int(
"AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS",
"MAX_CONCURRENT_EVALUATIONS",
default=3,
)

@pytest.mark.asyncio
async def test_concurrency_limited_to_max(self, monkeypatch: pytest.MonkeyPatch):
"""Test that concurrent evaluations are limited by semaphore.
Expand Down Expand Up @@ -1342,6 +1459,117 @@ async def evaluate(self, data: Any) -> EvaluatorResult:
assert _max_concurrent <= 2, f"Expected max 2 concurrent, got {_max_concurrent}"


class TestEvaluationObserver:
"""Tests for optional engine timing observations."""

@pytest.mark.asyncio
async def test_observer_records_evaluator_and_control_timings(self):
"""Test that observer callbacks receive bounded timing labels."""
controls = [
make_control(1, "allow", "test-allow", action="observe", config_value="a"),
make_control(2, "deny", "test-deny", action="deny", config_value="d"),
]
observer = RecordingObserver()
engine = ControlEngine(controls, observer=observer)

request = EvaluationRequest(
agent_name="00000000-0000-0000-0000-000000000001",
step=Step(type="llm", name="test-step", input="test", output=None),
stage="pre",
)
await engine.process(request)

assert {name for name, _ in observer.evaluator_queue_durations} == {
"test-allow",
"test-deny",
}
assert {
(name, outcome)
for name, outcome, _ in observer.evaluator_durations
} == {
("test-allow", "success"),
("test-deny", "success"),
}
assert {
(action, outcome)
for action, outcome, _ in observer.control_durations
} == {
("observe", "not_matched"),
("deny", "matched"),
}
assert all(
duration >= 0
for _, duration in observer.evaluator_queue_durations
)
assert all(duration >= 0 for _, _, duration in observer.evaluator_durations)
assert all(duration >= 0 for _, _, duration in observer.control_durations)

@pytest.mark.asyncio
async def test_observer_errors_do_not_fail_evaluation(self):
"""Test that observability failures do not affect control decisions."""

class RaisingObserver(RecordingObserver):
def observe_evaluator_duration(
self,
*,
evaluator_name: str,
outcome: str,
duration_seconds: float,
) -> None:
raise RuntimeError("metrics backend unavailable")

controls = [make_control(1, "allow", "test-allow", action="observe")]
engine = ControlEngine(controls, observer=RaisingObserver())

request = EvaluationRequest(
agent_name="00000000-0000-0000-0000-000000000001",
step=Step(type="llm", name="test-step", input="test", output=None),
stage="pre",
)
result = await engine.process(request)

assert result.is_safe is True

@pytest.mark.asyncio
async def test_engine_emits_fanout_trace_spans(self, monkeypatch: pytest.MonkeyPatch):
"""Test that optional tracing spans capture control and evaluator phases."""
import agent_control_engine.core as core_module

spans: list[RecordedSpan] = []
monkeypatch.setattr(core_module, "trace_span", trace_span_recorder(spans))

controls = [make_control(1, "allow", "test-allow", action="observe")]
engine = ControlEngine(controls)

request = EvaluationRequest(
agent_name="00000000-0000-0000-0000-000000000001",
step=Step(type="llm", name="test-step", input="test", output=None),
stage="pre",
)
result = await engine.process(request)

assert result.is_safe is True
assert {
span.op
for span in spans
} >= {
"agent_control.engine.control",
"agent_control.engine.evaluator.queue",
"agent_control.engine.evaluator.get_instance",
"agent_control.engine.evaluator.evaluate",
}
control_span = next(
span for span in spans if span.op == "agent_control.engine.control"
)
evaluator_span = next(
span for span in spans if span.op == "agent_control.engine.evaluator.evaluate"
)
assert control_span.data["control.action"] == "observe"
assert control_span.data["outcome"] == "not_matched"
assert evaluator_span.data["evaluator.name"] == "test-allow"
assert evaluator_span.data["outcome"] == "success"


# =============================================================================
# Test: Recursive Condition Trees
# =============================================================================
Expand Down
Loading
Loading