diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index e2ae8b6e..4d8c78fe 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -7,7 +7,9 @@ import functools import logging import os -from collections.abc import Sequence +import time +from collections.abc import Iterator, Sequence +from contextlib import contextmanager from dataclasses import dataclass from typing import Any, Literal, Protocol @@ -27,11 +29,33 @@ logger = logging.getLogger(__name__) + +def _env_positive_int(*names: str, default: int) -> int: + """Read a positive integer from the first configured environment variable.""" + for name in names: + value = os.environ.get(name) + if value is None or value.strip() == "": + continue + try: + parsed = int(value) + except ValueError as exc: + raise RuntimeError(f"{name}={value!r} must be an integer.") from exc + if parsed < 1: + raise RuntimeError(f"{name}={value!r} must be greater than or equal to 1.") + return parsed + return default + + # Default timeout for evaluator execution (seconds) DEFAULT_EVALUATOR_TIMEOUT = float(os.environ.get("EVALUATOR_TIMEOUT_SECONDS", "30")) -# Max concurrent evaluations (limits task spawning overhead for large policies) -MAX_CONCURRENT_EVALUATIONS = int(os.environ.get("MAX_CONCURRENT_EVALUATIONS", "3")) +# Max concurrent evaluations (limits task spawning overhead for large policies). +# Prefer the namespaced env var; MAX_CONCURRENT_EVALUATIONS is kept for compatibility. +MAX_CONCURRENT_EVALUATIONS = _env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, +) SELECTED_DATA_PREVIEW_MAX_CHARS = int( os.environ.get("AGENT_CONTROL_SELECTED_DATA_PREVIEW_MAX_CHARS", "500") @@ -171,6 +195,86 @@ def control(self) -> ControlDefinitionLike: """Runtime control payload used during evaluation.""" +class TraceSpan(Protocol): + """Subset of tracing span behavior used by the engine.""" + + def set_data(self, key: str, value: object) -> None: + """Attach diagnostic data to the active span.""" + + +@contextmanager +def trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, +) -> Iterator[TraceSpan | None]: + """Start an optional tracing span when a tracing SDK is installed.""" + start_span = _load_start_span() + if start_span is None: + yield None + return + + with start_span(op=op, name=name) as span: + for key, value in (data or {}).items(): + span.set_data(key, value) + yield span + + +@functools.lru_cache(maxsize=1) +def _load_start_span() -> Any | None: + """Load the optional tracing span factory once per process.""" + try: + from sentry_sdk import start_span # type: ignore[import-not-found] + except ImportError: + return None + return start_span + + +def set_span_data(span: TraceSpan | None, key: str, value: object) -> None: + """Attach span data without letting tracing failures affect evaluation.""" + if span is None: + return + try: + span.set_data(key, value) + except Exception: + logger.debug("Tracing span failed while setting data", exc_info=True) + + +EvaluatorObserverOutcome = Literal["success", "timeout", "error", "cancelled"] +ControlObserverOutcome = Literal["matched", "not_matched", "error", "cancelled"] + + +class EvaluationObserver(Protocol): + """Receives timing observations from control evaluation.""" + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record time spent waiting for evaluator concurrency.""" + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent executing an evaluator.""" + + def observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent evaluating a top-level control.""" + + @dataclass class _EvalTask: """Internal container for evaluation task context.""" @@ -206,15 +310,72 @@ def __init__( context: Literal["sdk", "server"] = "server", *, include_raw_selected_data: bool | None = None, + observer: EvaluationObserver | None = None, ): self.controls = controls self.context = context + self.observer = observer self.include_raw_selected_data = ( _env_flag("AGENT_CONTROL_INCLUDE_RAW_SELECTED_DATA") if include_raw_selected_data is None else include_raw_selected_data ) + def _observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record evaluator queue timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_evaluator_queue_duration( + evaluator_name=evaluator_name, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording queue time", exc_info=True) + + def _observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record evaluator execution timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_evaluator_duration( + evaluator_name=evaluator_name, + outcome=outcome, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording evaluator time", exc_info=True) + + def _observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record control evaluation timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_control_duration( + action=action, + outcome=outcome, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording control time", exc_info=True) + @staticmethod def _truncated_message(message: str | None) -> str | None: """Truncate long evaluator messages in condition traces.""" @@ -290,28 +451,80 @@ async def _evaluate_leaf( selector_path = selector.path or "*" data = select_data(request.step, selector_path) + wait_started_at = time.perf_counter() + with trace_span( + op="agent_control.engine.evaluator.queue", + name="wait_for_evaluator_slot", + data={ + "evaluator.name": evaluator_spec.name, + "selector.path": selector_path, + }, + ): + await semaphore.acquire() + + self._observe_evaluator_queue_duration( + evaluator_name=evaluator_spec.name, + duration_seconds=time.perf_counter() - wait_started_at, + ) + evaluator_started_at = time.perf_counter() + outcome: EvaluatorObserverOutcome = "success" + timeout = DEFAULT_EVALUATOR_TIMEOUT try: - async with semaphore: + with trace_span( + op="agent_control.engine.evaluator.get_instance", + name="get_evaluator_instance", + data={"evaluator.name": evaluator_spec.name}, + ): evaluator = get_evaluator_instance(evaluator_spec) timeout = evaluator.get_timeout_seconds() if timeout <= 0: timeout = DEFAULT_EVALUATOR_TIMEOUT - result = await asyncio.wait_for( - evaluator.evaluate(data), - timeout=timeout, - ) - except TimeoutError: - error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" - logger.warning( - "Evaluator timeout for control '%s' (evaluator: %s): %s", - item.name, - evaluator_spec.name, - error_msg, - exc_info=True, - ) - result = self._build_error_result(error_msg) + with trace_span( + op="agent_control.engine.evaluator.evaluate", + name="evaluate_evaluator", + data={ + "evaluator.name": evaluator_spec.name, + "timeout.seconds": timeout, + }, + ) as span: + try: + result = await asyncio.wait_for( + evaluator.evaluate(data), + timeout=timeout, + ) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except TimeoutError: + outcome = "timeout" + error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" + logger.warning( + "Evaluator timeout for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + except Exception as e: + outcome = "error" + error_msg = self._format_exception(e) + logger.error( + "Evaluator error for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + finally: + set_span_data(span, "outcome", outcome) + except asyncio.CancelledError: + outcome = "cancelled" + raise except Exception as e: + outcome = "error" error_msg = self._format_exception(e) logger.error( "Evaluator error for control '%s' (evaluator: %s): %s", @@ -321,6 +534,13 @@ async def _evaluate_leaf( exc_info=True, ) result = self._build_error_result(error_msg) + finally: + self._observe_evaluator_duration( + evaluator_name=evaluator_spec.name, + outcome=outcome, + duration_seconds=time.perf_counter() - evaluator_started_at, + ) + semaphore.release() trace = { "type": "leaf", @@ -639,32 +859,63 @@ async def process(self, request: EvaluationRequest) -> EvaluationResponse: async def evaluate_control(eval_task: _EvalTask) -> None: """Evaluate a single control, respecting cancellation and timeout.""" + started_at = time.perf_counter() + action = eval_task.item.control.action.decision + outcome: ControlObserverOutcome = "cancelled" try: - evaluation = await self._evaluate_condition( - eval_task.item, - eval_task.item.control.condition, - request, - semaphore, - ) - eval_task.result = evaluation.result + with trace_span( + op="agent_control.engine.control", + name="evaluate_control", + data={ + "control.id": eval_task.item.id, + "control.name": eval_task.item.name, + "control.action": action, + }, + ) as span: + try: + evaluation = await self._evaluate_condition( + eval_task.item, + eval_task.item.control.condition, + request, + semaphore, + ) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except Exception as error: + error_msg = self._format_exception(error) + logger.exception( + "Unexpected condition evaluation error for control '%s': %s", + eval_task.item.name, + error_msg, + ) + eval_task.result = self._build_error_result( + error_msg, + message_prefix="Condition evaluation failed", + ) + outcome = "error" + else: + eval_task.result = evaluation.result + outcome = ( + "error" + if eval_task.result.error + else "matched" + if eval_task.result.matched + else "not_matched" + ) - if ( - eval_task.result.matched - and eval_task.item.control.action.decision == "deny" - ): - deny_found.set() - except asyncio.CancelledError: - raise - except Exception as error: - error_msg = self._format_exception(error) - logger.exception( - "Unexpected condition evaluation error for control '%s': %s", - eval_task.item.name, - error_msg, - ) - eval_task.result = self._build_error_result( - error_msg, - message_prefix="Condition evaluation failed", + if ( + eval_task.result.matched + and eval_task.item.control.action.decision == "deny" + ): + deny_found.set() + finally: + set_span_data(span, "outcome", outcome) + finally: + self._observe_control_duration( + action=action, + outcome=outcome, + duration_seconds=time.perf_counter() - started_at, ) # Create and start all tasks diff --git a/engine/tests/test_core.py b/engine/tests/test_core.py index ed4e6e00..f98378e5 100644 --- a/engine/tests/test_core.py +++ b/engine/tests/test_core.py @@ -7,7 +7,9 @@ """ import asyncio -from dataclasses import dataclass +from collections.abc import Iterator +from contextlib import contextmanager +from dataclasses import dataclass, field from typing import Any import pytest @@ -172,6 +174,70 @@ class MockControlWithIdentity: control: ControlDefinition +@dataclass +class RecordingObserver: + """Captures engine timing observations for assertions.""" + + evaluator_queue_durations: list[tuple[str, float]] = field(default_factory=list) + evaluator_durations: list[tuple[str, str, float]] = field(default_factory=list) + control_durations: list[tuple[str, str, float]] = field(default_factory=list) + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + self.evaluator_queue_durations.append((evaluator_name, duration_seconds)) + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: str, + duration_seconds: float, + ) -> None: + self.evaluator_durations.append((evaluator_name, outcome, duration_seconds)) + + def observe_control_duration( + self, + *, + action: str, + outcome: str, + duration_seconds: float, + ) -> None: + self.control_durations.append((action, outcome, duration_seconds)) + + +@dataclass +class RecordedSpan: + """Captures optional tracing span data for tests.""" + + op: str + name: str + data: dict[str, object] = field(default_factory=dict) + + def set_data(self, key: str, value: object) -> None: + self.data[key] = value + + +def trace_span_recorder(spans: list[RecordedSpan]): + """Return a trace_span replacement that records spans.""" + + @contextmanager + def _trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, + ) -> Iterator[RecordedSpan]: + span = RecordedSpan(op=op, name=name, data=dict(data or {})) + spans.append(span) + yield span + + return _trace_span + + @pytest.fixture(autouse=True) def setup_test_evaluators(): """Register test evaluators and reset state before each test.""" @@ -1280,6 +1346,57 @@ async def test_timeout_does_not_affect_fast_evaluators(self): class TestConcurrencyLimit: """Tests for semaphore-based concurrency limiting.""" + def test_max_concurrency_env_prefers_agent_control_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The canonical Agent Control env var overrides the legacy short name.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "7") + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "2") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 7 + ) + + def test_max_concurrency_env_reads_legacy_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The existing env var remains supported for compatibility.""" + import agent_control_engine.core as core_module + + monkeypatch.delenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", raising=False) + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "5") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 5 + ) + + def test_max_concurrency_env_rejects_non_positive_values( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The concurrency cap must always allow at least one evaluator.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "0") + + with pytest.raises(RuntimeError, match="greater than or equal to 1"): + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + @pytest.mark.asyncio async def test_concurrency_limited_to_max(self, monkeypatch: pytest.MonkeyPatch): """Test that concurrent evaluations are limited by semaphore. @@ -1342,6 +1459,117 @@ async def evaluate(self, data: Any) -> EvaluatorResult: assert _max_concurrent <= 2, f"Expected max 2 concurrent, got {_max_concurrent}" +class TestEvaluationObserver: + """Tests for optional engine timing observations.""" + + @pytest.mark.asyncio + async def test_observer_records_evaluator_and_control_timings(self): + """Test that observer callbacks receive bounded timing labels.""" + controls = [ + make_control(1, "allow", "test-allow", action="observe", config_value="a"), + make_control(2, "deny", "test-deny", action="deny", config_value="d"), + ] + observer = RecordingObserver() + engine = ControlEngine(controls, observer=observer) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + await engine.process(request) + + assert {name for name, _ in observer.evaluator_queue_durations} == { + "test-allow", + "test-deny", + } + assert { + (name, outcome) + for name, outcome, _ in observer.evaluator_durations + } == { + ("test-allow", "success"), + ("test-deny", "success"), + } + assert { + (action, outcome) + for action, outcome, _ in observer.control_durations + } == { + ("observe", "not_matched"), + ("deny", "matched"), + } + assert all( + duration >= 0 + for _, duration in observer.evaluator_queue_durations + ) + assert all(duration >= 0 for _, _, duration in observer.evaluator_durations) + assert all(duration >= 0 for _, _, duration in observer.control_durations) + + @pytest.mark.asyncio + async def test_observer_errors_do_not_fail_evaluation(self): + """Test that observability failures do not affect control decisions.""" + + class RaisingObserver(RecordingObserver): + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: str, + duration_seconds: float, + ) -> None: + raise RuntimeError("metrics backend unavailable") + + controls = [make_control(1, "allow", "test-allow", action="observe")] + engine = ControlEngine(controls, observer=RaisingObserver()) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + result = await engine.process(request) + + assert result.is_safe is True + + @pytest.mark.asyncio + async def test_engine_emits_fanout_trace_spans(self, monkeypatch: pytest.MonkeyPatch): + """Test that optional tracing spans capture control and evaluator phases.""" + import agent_control_engine.core as core_module + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(core_module, "trace_span", trace_span_recorder(spans)) + + controls = [make_control(1, "allow", "test-allow", action="observe")] + engine = ControlEngine(controls) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + result = await engine.process(request) + + assert result.is_safe is True + assert { + span.op + for span in spans + } >= { + "agent_control.engine.control", + "agent_control.engine.evaluator.queue", + "agent_control.engine.evaluator.get_instance", + "agent_control.engine.evaluator.evaluate", + } + control_span = next( + span for span in spans if span.op == "agent_control.engine.control" + ) + evaluator_span = next( + span for span in spans if span.op == "agent_control.engine.evaluator.evaluate" + ) + assert control_span.data["control.action"] == "observe" + assert control_span.data["outcome"] == "not_matched" + assert evaluator_span.data["evaluator.name"] == "test-allow" + assert evaluator_span.data["outcome"] == "success" + + # ============================================================================= # Test: Recursive Condition Trees # ============================================================================= diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 11e4881a..a61a8e07 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -4,25 +4,43 @@ import logging import os +import ssl +import warnings +from asyncio import Lock from base64 import urlsafe_b64encode from hashlib import sha256 from hmac import new as hmac_new from json import dumps -from time import time -from typing import Literal +from time import perf_counter, time +from typing import Any, Literal from urllib.parse import urlsplit, urlunsplit import httpx from agent_control_models import JSONObject, JSONValue from pydantic import BaseModel, Field, PrivateAttr, model_validator +from .metrics import observe_luna_client_stage, observe_luna_httpcore_phase +from .tracing import TraceSpan, set_span_data, trace_span + logger = logging.getLogger(__name__) DEFAULT_TIMEOUT_SECS = 10.0 DEFAULT_INTERNAL_TOKEN_TTL_SECS = 3600 +# Keep pooled-connection reuse shorter than typical server keepalive/worker +# recycle windows so requests do not pick up sockets the server already closed. +DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 +DEFAULT_MAX_CONNECTIONS = 100 +DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +DEFAULT_CLIENT_POOL_SIZE = 1 +LUNA_HTTP_PHASE_TRACING_ENV = "GALILEO_LUNA_HTTP_PHASE_TRACING" +LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" +LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" +LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" +LUNA_CLIENT_POOL_SIZE_ENV = "GALILEO_LUNA_CLIENT_POOL_SIZE" PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] +ScorerIdentifierKind = Literal["label", "id", "version_id"] def _b64url(data: bytes) -> str: @@ -56,6 +74,13 @@ def _env_auth_mode() -> AuthMode | None: value = os.getenv("GALILEO_LUNA_AUTH_MODE") if value is None or value.strip() == "": return None + deprecation_message = ( + "GALILEO_LUNA_AUTH_MODE is deprecated. Configure exactly one credential " + "(GALILEO_API_KEY for public auth, GALILEO_API_SECRET_KEY for internal " + "auth) or pass auth_mode to GalileoLunaClient." + ) + warnings.warn(deprecation_message, DeprecationWarning, stacklevel=2) + logger.warning(deprecation_message) normalized = value.strip().lower() if normalized == "public": return "public" @@ -64,6 +89,61 @@ def _env_auth_mode() -> AuthMode | None: raise ValueError("GALILEO_LUNA_AUTH_MODE must be either 'public' or 'internal'.") +def _load_float_env(env_name: str, default: float) -> float: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return float(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not a number.") from exc + + +def _load_int_env(env_name: str, default: int) -> int: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not an integer.") from exc + + +def _load_bool_env(env_name: str, default: bool = False) -> bool: + raw = os.getenv(env_name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _validate_connection_config( + *, + keepalive_expiry_seconds: float, + max_connections: int, + max_keepalive_connections: int, + client_pool_size: int, +) -> None: + if keepalive_expiry_seconds < 0: + raise ValueError( + f"{LUNA_KEEPALIVE_EXPIRY_ENV}={keepalive_expiry_seconds} " + "must be greater than or equal to 0." + ) + if max_connections <= 0: + raise ValueError(f"{LUNA_MAX_CONNECTIONS_ENV}={max_connections} must be greater than 0.") + if max_keepalive_connections < 0: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + "must be greater than or equal to 0." + ) + if max_keepalive_connections > max_connections: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + f"must be less than or equal to {LUNA_MAX_CONNECTIONS_ENV}={max_connections}." + ) + if client_pool_size <= 0: + raise ValueError(f"{LUNA_CLIENT_POOL_SIZE_ENV}={client_pool_size} must be greater than 0.") + + def _as_float_or_none(value: JSONValue) -> float | None: if isinstance(value, bool) or value is None: return None @@ -87,6 +167,85 @@ def _has_value(value: JSONValue) -> bool: return True +def _scorer_identifier_kind( + *, + scorer_label: str | None, + scorer_id: str | None, + scorer_version_id: str | None, +) -> ScorerIdentifierKind: + if scorer_label: + return "label" + if scorer_id: + return "id" + return "version_id" + + +def _endpoint_path(endpoint: str) -> str: + return urlsplit(endpoint).path + + +def _split_httpcore_trace_event(event_name: str) -> tuple[str, str] | None: + if event_name.endswith(".started"): + return event_name.removesuffix(".started"), "started" + if event_name.endswith(".complete"): + return event_name.removesuffix(".complete"), "complete" + if event_name.endswith(".failed"): + return event_name.removesuffix(".failed"), "failed" + return None + + +def _safe_trace_value(value: object) -> object: + if value is None or isinstance(value, bool | int | float | str): + return value + if isinstance(value, bytes): + return value.decode("ascii", errors="ignore") + return type(value).__name__ + + +class _HttpCorePhaseTrace: + """Convert httpcore phase trace events into optional tracing spans.""" + + def __init__(self) -> None: + self._active: dict[str, tuple[Any, TraceSpan | None, float]] = {} + + async def __call__(self, event_name: str, info: dict[str, Any]) -> None: + parsed = _split_httpcore_trace_event(event_name) + if parsed is None: + return + phase, state = parsed + + if state == "started": + manager = trace_span( + op=f"agent_control.luna.httpcore.{phase}", + name=phase, + data={"httpcore.phase": phase}, + ) + span = manager.__enter__() + self._active[phase] = (manager, span, perf_counter()) + return + + manager, span, started_at = self._active.pop(phase, (None, None, perf_counter())) + duration_seconds = perf_counter() - started_at + duration_ms = duration_seconds * 1000 + set_span_data(span, "httpcore.phase", phase) + set_span_data(span, "httpcore.outcome", state) + set_span_data(span, "httpcore.duration_ms", duration_ms) + observe_luna_httpcore_phase( + phase=phase, + outcome=state, + duration_seconds=duration_seconds, + ) + + if "return_value" in info: + set_span_data(span, "httpcore.return_type", _safe_trace_value(info["return_value"])) + if "exception" in info: + exception = info["exception"] + set_span_data(span, "exception.type", type(exception).__name__) + + if manager is not None: + manager.__exit__(None, None, None) + + class ScorerInvokeInputs(BaseModel): """Input values sent to Galileo's scorer invoke API.""" @@ -164,9 +323,17 @@ class GalileoLunaClient: """Thin HTTP client for Galileo Luna direct scorer invocation. Environment Variables: - GALILEO_API_SECRET_KEY or GALILEO_API_SECRET: Galileo API internal JWT signing secret. + GALILEO_API_SECRET_KEY: Deployment-provided Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. - GALILEO_LUNA_AUTH_MODE: Auth mode, either "public" or "internal". + GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. + GALILEO_API_URL: Galileo API URL fallback. + GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for + deployments whose API serves an internally-issued TLS certificate. + GALILEO_LUNA_HTTP_PHASE_TRACING: Enable per-request HTTP transport phase spans. + GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. + GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. + GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. + GALILEO_LUNA_CLIENT_POOL_SIZE: Number of outbound HTTP clients to rotate across. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -177,23 +344,27 @@ def __init__( console_url: str | None = None, api_url: str | None = None, auth_mode: AuthMode | None = None, + ca_file: str | None = None, ) -> None: """Initialize the Galileo Luna client. Args: api_key: Galileo API key. If not provided, reads from GALILEO_API_KEY. - api_secret: Galileo API secret for internal JWT auth. If not provided, - reads from GALILEO_API_SECRET_KEY or GALILEO_API_SECRET. + api_secret: Deployment-provided Galileo API secret for internal JWT auth. + If not provided, reads from GALILEO_API_SECRET_KEY. console_url: Galileo Console URL. If not provided, reads from GALILEO_CONSOLE_URL or uses the production console URL. - api_url: Galileo API URL. If not provided, reads from GALILEO_API_URL - before deriving from the console URL. - auth_mode: Auth mode to use. If not provided, reads from - GALILEO_LUNA_AUTH_MODE, or infers from the single available credential. + api_url: Galileo API URL. If not provided, reads from GALILEO_LUNA_API_URL, + then GALILEO_API_URL, before deriving from the console URL. + auth_mode: Auth mode to use. If not provided, inferred from the single + available credential. + ca_file: CA bundle path used to verify the scorer API endpoint. If not + provided, reads from GALILEO_LUNA_CA_FILE. Leave unset for endpoints + with publicly-trusted certificates. Raises: ValueError: If credentials are missing, ambiguous, or incompatible with - the selected auth mode. + the selected auth mode, or if the CA bundle cannot be loaded. """ resolved_api_secret = ( api_secret or os.getenv("GALILEO_API_SECRET_KEY") or os.getenv("GALILEO_API_SECRET") @@ -211,12 +382,52 @@ def __init__( self.console_url = ( console_url or os.getenv("GALILEO_CONSOLE_URL") or "https://console.galileo.ai" ) - self.api_base = (api_url or os.getenv("GALILEO_API_URL") or "").rstrip( - "/" - ) or self._derive_api_url(self.console_url) + self.api_base = self._resolve_api_base(api_url) + self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None + self._ssl_context = self._load_ssl_context(self.ca_file) + self.http_phase_tracing_enabled = _load_bool_env(LUNA_HTTP_PHASE_TRACING_ENV) + self.keepalive_expiry_seconds = _load_float_env( + LUNA_KEEPALIVE_EXPIRY_ENV, DEFAULT_KEEPALIVE_EXPIRY_SECS + ) + self.max_connections = _load_int_env(LUNA_MAX_CONNECTIONS_ENV, DEFAULT_MAX_CONNECTIONS) + self.max_keepalive_connections = _load_int_env( + LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV, DEFAULT_MAX_KEEPALIVE_CONNECTIONS + ) + self.client_pool_size = _load_int_env( + LUNA_CLIENT_POOL_SIZE_ENV, DEFAULT_CLIENT_POOL_SIZE + ) + _validate_connection_config( + keepalive_expiry_seconds=self.keepalive_expiry_seconds, + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + client_pool_size=self.client_pool_size, + ) self._client: httpx.AsyncClient | None = None + self._clients: list[httpx.AsyncClient] = [] + self._next_client_index = 0 + self._client_lock = Lock() logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) + def _resolve_api_base(self, api_url: str | None) -> str: + """Resolve the scorer invoke API base URL from explicit and environment config.""" + candidates = [api_url, os.getenv("GALILEO_LUNA_API_URL")] + candidates.append(os.getenv("GALILEO_API_URL")) + + for candidate in candidates: + if candidate and candidate.strip(): + return candidate.strip().rstrip("/") + return self._derive_api_url(self.console_url) + + @staticmethod + def _load_ssl_context(ca_file: str | None) -> ssl.SSLContext | None: + """Build a TLS verification context from a CA bundle path, if configured.""" + if ca_file is None: + return None + try: + return ssl.create_default_context(cafile=ca_file) + except (OSError, ssl.SSLError) as exc: + raise ValueError(f"Failed to load CA bundle from {ca_file!r}: {exc}") from exc + @staticmethod def _resolve_auth_mode( auth_mode: AuthMode | None, @@ -226,24 +437,21 @@ def _resolve_auth_mode( ) -> AuthMode: if auth_mode == "public": if not api_key: - raise ValueError( - "GALILEO_API_KEY is required when GALILEO_LUNA_AUTH_MODE=public." - ) + raise ValueError("GALILEO_API_KEY is required for public Luna auth.") return "public" if auth_mode == "internal": if not api_secret: raise ValueError( - "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required when " - "GALILEO_LUNA_AUTH_MODE=internal." + "GALILEO_API_SECRET_KEY is required for internal Luna auth." ) return "internal" if api_key and api_secret: raise ValueError( - "Both Galileo API key and API secret are configured. Set " - "GALILEO_LUNA_AUTH_MODE to 'public' or 'internal' to choose the " - "runtime auth mode explicitly." + "Both a Galileo API key and a Galileo API secret are configured. " + "Unset one credential so the auth mode can be inferred, or pass " + "auth_mode='public' or auth_mode='internal' explicitly." ) if api_secret: return "internal" @@ -278,17 +486,71 @@ def _derive_api_url(self, console_url: str) -> str: parts._replace(netloc=parts.netloc.replace(host, new_host, 1)) ) + def _create_client(self) -> httpx.AsyncClient: + """Create an HTTP client with the configured auth, TLS, and connection limits.""" + headers = {"Content-Type": "application/json"} + if self.auth_mode == "public" and self.api_key is not None: + headers["Galileo-API-Key"] = self.api_key + verify: ssl.SSLContext | bool = ( + self._ssl_context if self._ssl_context is not None else True + ) + return httpx.AsyncClient( + headers=headers, + timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), + limits=httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry_seconds, + ), + verify=verify, + ) + + def _select_pooled_client(self) -> httpx.AsyncClient: + """Select the next pooled client without awaiting on the hot path.""" + client = self._clients[self._next_client_index % len(self._clients)] + self._next_client_index = (self._next_client_index + 1) % len(self._clients) + return client + async def _get_client(self) -> httpx.AsyncClient: - """Get or create the HTTP client.""" - if self._client is None or self._client.is_closed: - headers = {"Content-Type": "application/json"} - if self.auth_mode == "public" and self.api_key is not None: - headers["Galileo-API-Key"] = self.api_key - self._client = httpx.AsyncClient( - headers=headers, - timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), - ) - return self._client + """Get or create the next HTTP client.""" + if self._client is not None and not self._client.is_closed: + return self._client + if self._client is not None and self._client.is_closed: + self._client = None + + if ( + self.client_pool_size > 1 + and len(self._clients) == self.client_pool_size + and all(not client.is_closed for client in self._clients) + ): + return self._select_pooled_client() + + async with self._client_lock: + if self._client is not None and not self._client.is_closed: + return self._client + + self._clients = [client for client in self._clients if not client.is_closed] + if len(self._clients) < self.client_pool_size: + with trace_span( + op="agent_control.luna.client.create", + name="create_http_client", + data={ + "auth.mode": self.auth_mode, + "limits.max_connections": self.max_connections, + "limits.max_keepalive_connections": self.max_keepalive_connections, + "limits.keepalive_expiry_seconds": self.keepalive_expiry_seconds, + "client.pool_size": self.client_pool_size, + "tls.ca_file_configured": self.ca_file is not None, + }, + ): + while len(self._clients) < self.client_pool_size: + self._clients.append(self._create_client()) + + if self.client_pool_size == 1: + self._client = self._clients[0] + return self._client + + return self._select_pooled_client() def _endpoint_and_headers( self, @@ -341,34 +603,122 @@ async def invoke( if not (_has_value(input) or _has_value(output)): raise ValueError("At least one of input or output must be provided.") - request_body = ScorerInvokeRequest( + identifier_kind = _scorer_identifier_kind( scorer_label=scorer_label, scorer_id=scorer_id, scorer_version_id=scorer_version_id, - inputs=ScorerInvokeInputs( - query="" if input is None else input, response="" if output is None else output - ), - config=config, - ).to_dict() - endpoint, request_headers = self._endpoint_and_headers(headers) + ) + endpoint_path = "unknown" + with trace_span( + op="agent_control.luna.request.build", + name="build_scorer_request", + data={"scorer.identifier_kind": identifier_kind}, + ): + with observe_luna_client_stage( + stage="build_request", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + request_body = ScorerInvokeRequest( + scorer_label=scorer_label, + scorer_id=scorer_id, + scorer_version_id=scorer_version_id, + inputs=ScorerInvokeInputs( + query="" if input is None else input, + response="" if output is None else output, + ), + config=config, + ).to_dict() + with trace_span( + op="agent_control.luna.request.endpoint", + name="resolve_scorer_endpoint", + data={"auth.mode": self.auth_mode}, + ) as span: + with observe_luna_client_stage( + stage="resolve_endpoint", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + endpoint, request_headers = self._endpoint_and_headers(headers) + endpoint_path = _endpoint_path(endpoint) + set_span_data(span, "endpoint.path", endpoint_path) logger.debug("[GalileoLunaClient] POST %s", endpoint) logger.debug("[GalileoLunaClient] Request body: %s", request_body) try: - client = await self._get_client() - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - ) - response.raise_for_status() - response_data = response.json() + with trace_span( + op="agent_control.luna.client.get", + name="get_http_client", + data={"auth.mode": self.auth_mode}, + ): + with observe_luna_client_stage( + stage="get_http_client", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + client = await self._get_client() + with trace_span( + op="agent_control.luna.http.post", + name="post_scorer_invoke", + data={ + "auth.mode": self.auth_mode, + "endpoint.path": endpoint_path, + "scorer.identifier_kind": identifier_kind, + "http_phase_tracing.enabled": self.http_phase_tracing_enabled, + "timeout.seconds": timeout, + }, + ) as span: + extensions: dict[str, object] | None = ( + {"trace": _HttpCorePhaseTrace()} + if self.http_phase_tracing_enabled + else None + ) + with observe_luna_client_stage( + stage="http_post", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + extensions=extensions, + ) + set_span_data(span, "http.status_code", response.status_code) + response.raise_for_status() + with trace_span( + op="agent_control.luna.response.parse", + name="parse_scorer_response", + data={"http.status_code": response.status_code}, + ): + with observe_luna_client_stage( + stage="parse_json", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + response_data = response.json() if not isinstance(response_data, dict): raise RuntimeError("Invalid response payload: not a JSON object") - parsed = ScorerInvokeResponse.from_dict(response_data) + with trace_span( + op="agent_control.luna.response.model", + name="model_scorer_response", + data={"http.status_code": response.status_code}, + ): + with observe_luna_client_stage( + stage="model_response", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + parsed = ScorerInvokeResponse.from_dict(response_data) logger.debug("[GalileoLunaClient] Response: %s", parsed.raw_response) return parsed except httpx.HTTPStatusError as exc: @@ -387,6 +737,11 @@ async def close(self) -> None: if self._client is not None: await self._client.aclose() self._client = None + for client in self._clients: + if not client.is_closed: + await client.aclose() + self._clients = [] + self._next_client_index = 0 async def __aenter__(self) -> GalileoLunaClient: """Async context manager entry.""" diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py index eff92f2a..05a65a10 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py @@ -8,11 +8,13 @@ from importlib.metadata import PackageNotFoundError, version from typing import Any +import httpx from agent_control_evaluators import Evaluator, EvaluatorMetadata, register_evaluator from agent_control_models import EvaluatorResult, JSONValue from .client import GalileoLunaClient, ScorerInvokeResponse from .config import LunaEvaluatorConfig, coerce_number +from .tracing import set_span_data, trace_span logger = logging.getLogger(__name__) @@ -27,6 +29,7 @@ def _resolve_package_version() -> str: _PACKAGE_VERSION = _resolve_package_version() LUNA_AVAILABLE = True +_HTTP_ERROR_BODY_LIMIT = 500 def _coerce_payload_text(value: Any) -> str | None: @@ -74,6 +77,32 @@ def _confidence_from_score(score: JSONValue) -> float: return 1.0 +def _truncated_http_response_body(body: str) -> tuple[str, bool]: + if len(body) <= _HTTP_ERROR_BODY_LIMIT: + return body, False + return body[:_HTTP_ERROR_BODY_LIMIT], True + + +def _http_status_error_metadata(error: httpx.HTTPStatusError) -> dict[str, Any]: + metadata: dict[str, Any] = {} + + request = error.request + metadata["http_method"] = request.method + metadata["http_endpoint_path"] = request.url.path + + response = error.response + metadata["http_status_code"] = response.status_code + metadata["http_response_content_type"] = response.headers.get("content-type") + + body = response.text + if body: + metadata["http_response_body"], metadata["http_response_body_truncated"] = ( + _truncated_http_response_body(body) + ) + + return {key: value for key, value in metadata.items() if value is not None} + + @register_evaluator class LunaEvaluator(Evaluator[LunaEvaluatorConfig]): """Galileo Luna evaluator using the direct scorer invocation API.""" @@ -174,7 +203,11 @@ async def evaluate(self, data: Any) -> EvaluatorResult: Returns: EvaluatorResult with local threshold decision and scorer metadata. """ - input_text, output_text = self._prepare_payload(data) + with trace_span( + op="agent_control.luna.evaluate.prepare_payload", + name="prepare_luna_payload", + ): + input_text, output_text = self._prepare_payload(data) if not (_has_text(input_text) or _has_text(output_text)): return EvaluatorResult( matched=False, @@ -184,21 +217,43 @@ async def evaluate(self, data: Any) -> EvaluatorResult: ) try: - scorer_kwargs = self._scorer_kwargs() - response = await self._get_client().invoke( - **scorer_kwargs, - input=input_text if _has_text(input_text) else None, - output=output_text if _has_text(output_text) else None, - config=self.config.scorer_config, - timeout=self.get_timeout_seconds(), - ) + with trace_span( + op="agent_control.luna.evaluate.scorer_kwargs", + name="build_scorer_kwargs", + ): + scorer_kwargs = self._scorer_kwargs() + with trace_span( + op="agent_control.luna.evaluate.invoke", + name="invoke_luna_scorer", + data={ + "payload.has_input": _has_text(input_text), + "payload.has_output": _has_text(output_text), + }, + ) as span: + response = await self._get_client().invoke( + **scorer_kwargs, + input=input_text if _has_text(input_text) else None, + output=output_text if _has_text(output_text) else None, + config=self.config.scorer_config, + timeout=self.get_timeout_seconds(), + ) + set_span_data(span, "scorer.status", response.status) if response.status.lower() != "success": message = response.error_message or f"Luna scorer status: {response.status}" raise RuntimeError(message) - matched = self._score_matches(response.score) - metadata = self._metadata(response) + with trace_span( + op="agent_control.luna.evaluate.score_match", + name="match_luna_score", + ) as span: + matched = self._score_matches(response.score) + set_span_data(span, "matched", matched) + with trace_span( + op="agent_control.luna.evaluate.metadata", + name="build_luna_metadata", + ): + metadata = self._metadata(response) operator = self.config.operator threshold = self.config.threshold state = "triggered" if matched else "not triggered" @@ -252,16 +307,20 @@ def _handle_error( error: Exception, ) -> EvaluatorResult: error_detail = str(error) + metadata: dict[str, Any] = { + "error_type": type(error).__name__, + "scorer_label": self.config.scorer_label, + "scorer_id": self.config.scorer_id, + "scorer_version_id": self.config.scorer_version_id, + } + if isinstance(error, httpx.HTTPStatusError): + metadata.update(_http_status_error_metadata(error)) + return EvaluatorResult( matched=False, confidence=0.0, message=f"Luna evaluation error: {error_detail}", - metadata={ - "error_type": type(error).__name__, - "scorer_label": self.config.scorer_label, - "scorer_id": self.config.scorer_id, - "scorer_version_id": self.config.scorer_version_id, - }, + metadata=metadata, error=error_detail, ) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py new file mode 100644 index 00000000..5af6c255 --- /dev/null +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py @@ -0,0 +1,92 @@ +"""Optional Prometheus metrics for Luna scorer client timing.""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from time import perf_counter +from typing import Protocol, cast + +try: + from prometheus_client import Histogram # type: ignore[import-not-found] +except ImportError: # pragma: no cover - exercised when embedded without Prometheus. + Histogram = None # type: ignore[assignment] + + +class _HistogramChild(Protocol): + def observe(self, amount: float) -> None: ... + + +class _Histogram(Protocol): + def labels(self, **labels: str) -> _HistogramChild: ... + + +if Histogram is None: + _CLIENT_STAGE_DURATION: _Histogram | None = None + _HTTPCORE_PHASE_DURATION: _Histogram | None = None +else: + _CLIENT_STAGE_DURATION = cast( + _Histogram, + Histogram( + "agent_control_luna_client_stage_duration_seconds", + "Duration of Luna scorer client stages.", + ("stage", "outcome", "auth_mode", "endpoint_path", "scorer_identifier_kind"), + ), + ) + _HTTPCORE_PHASE_DURATION = cast( + _Histogram, + Histogram( + "agent_control_luna_httpcore_phase_duration_seconds", + "Duration of Luna scorer HTTP transport phases.", + ("phase", "outcome"), + ), + ) + + +def _stage_outcome(error: BaseException | None) -> str: + if error is None: + return "success" + if error.__class__.__name__ == "HTTPStatusError": + return "http_status_error" + if error.__class__.__name__.endswith("RequestError"): + return "request_error" + return "error" + + +def observe_luna_httpcore_phase( + *, + phase: str, + outcome: str, + duration_seconds: float, +) -> None: + """Record an HTTP transport phase duration when Prometheus is available.""" + if _HTTPCORE_PHASE_DURATION is None: + return + _HTTPCORE_PHASE_DURATION.labels(phase=phase, outcome=outcome).observe(duration_seconds) + + +@contextmanager +def observe_luna_client_stage( + *, + stage: str, + auth_mode: str, + endpoint_path: str, + scorer_identifier_kind: str, +) -> Iterator[None]: + """Record a Luna client stage duration when Prometheus is available.""" + started_at = perf_counter() + error: BaseException | None = None + try: + yield + except BaseException as exc: + error = exc + raise + finally: + if _CLIENT_STAGE_DURATION is not None: + _CLIENT_STAGE_DURATION.labels( + stage=stage, + outcome=_stage_outcome(error), + auth_mode=auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=scorer_identifier_kind, + ).observe(perf_counter() - started_at) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py new file mode 100644 index 00000000..3302fede --- /dev/null +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py @@ -0,0 +1,54 @@ +"""Optional tracing helpers for Luna scorer evaluation.""" + +from __future__ import annotations + +import functools +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any, Protocol + + +class TraceSpan(Protocol): + """Subset of tracing span behavior used by this package.""" + + def set_data(self, key: str, value: object) -> None: + """Attach diagnostic data to the active span.""" + + +@contextmanager +def trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, +) -> Iterator[TraceSpan | None]: + """Start an optional tracing span when a tracing SDK is installed.""" + start_span = _load_start_span() + if start_span is None: + yield None + return + + with start_span(op=op, name=name) as span: + for key, value in (data or {}).items(): + span.set_data(key, value) + yield span + + +@functools.lru_cache(maxsize=1) +def _load_start_span() -> Any | None: + """Load the optional tracing span factory once per process.""" + try: + from sentry_sdk import start_span # type: ignore[import-not-found] + except ImportError: + return None + return start_span + + +def set_span_data(span: TraceSpan | None, key: str, value: object) -> None: + """Attach span data without letting tracing failures affect evaluation.""" + if span is None: + return + try: + span.set_data(key, value) + except Exception: + pass diff --git a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py index 9d5f6766..4238bac7 100644 --- a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py +++ b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py @@ -7,13 +7,13 @@ from __future__ import annotations import json -import os +from collections.abc import Iterator +from contextlib import contextmanager from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest - # ============================================================================= # luna/evaluator.py: utility helpers # ============================================================================= @@ -425,11 +425,12 @@ class TestScorerInvokeRequestValidation: """``ScorerInvokeRequest`` rejects malformed input combos.""" def test_missing_all_identifiers_raises(self): + from pydantic import ValidationError + from agent_control_evaluator_galileo.luna.client import ( ScorerInvokeInputs, ScorerInvokeRequest, ) - from pydantic import ValidationError with pytest.raises(ValidationError, match="One of scorer_label"): ScorerInvokeRequest(inputs=ScorerInvokeInputs(query="hello")) @@ -457,7 +458,9 @@ def test_client_requires_explicit_mode_when_both_credentials_are_present(monkeyp monkeypatch.delenv("GALILEO_LUNA_AUTH_MODE", raising=False) from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="Both Galileo API key and API secret"): + with pytest.raises( + ValueError, match="Both a Galileo API key and a Galileo API secret are configured" + ): GalileoLunaClient() @@ -468,7 +471,8 @@ def test_client_uses_explicit_public_mode_when_both_credentials_are_present(monk monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "public") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - client = GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient() assert client.auth_mode == "public" endpoint, request_headers = client._endpoint_and_headers(None) @@ -483,7 +487,8 @@ def test_client_uses_explicit_internal_mode_when_both_credentials_are_present(mo monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "internal") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - client = GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient() assert client.auth_mode == "internal" endpoint, request_headers = client._endpoint_and_headers(None) @@ -499,8 +504,9 @@ def test_client_rejects_mode_without_matching_credential(monkeypatch): monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "internal") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="GALILEO_API_SECRET_KEY"): - GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + with pytest.raises(ValueError, match="GALILEO_API_SECRET_KEY"): + GalileoLunaClient() def test_client_rejects_invalid_auth_mode(monkeypatch): @@ -509,8 +515,9 @@ def test_client_rejects_invalid_auth_mode(monkeypatch): monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "sideways") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="GALILEO_LUNA_AUTH_MODE"): - GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + with pytest.raises(ValueError, match="GALILEO_LUNA_AUTH_MODE"): + GalileoLunaClient() class TestDeriveApiUrl: @@ -672,6 +679,81 @@ async def test_invoke_propagates_request_error(monkeypatch): await client.close() +@pytest.mark.asyncio +async def test_invoke_records_luna_client_stage_metrics(monkeypatch): + """Successful scorer invocation records stage metrics without changing behavior.""" + monkeypatch.setenv("GALILEO_API_KEY", "test-key") + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + observed: list[dict[str, str]] = [] + + @contextmanager + def record_stage(**labels: str) -> Iterator[None]: + observed.append(labels) + yield + + monkeypatch.setattr(luna_client_module, "observe_luna_client_stage", record_stage) + + fake_response = MagicMock() + fake_response.status_code = 200 + fake_response.raise_for_status = MagicMock() + fake_response.json = MagicMock( + return_value={"scorer_label": "toxicity", "score": 0.1, "status": "success"} + ) + + fake_http = AsyncMock() + fake_http.post = AsyncMock(return_value=fake_response) + fake_http.is_closed = False + + client = GalileoLunaClient(api_url="https://api.example.test") + client._client = fake_http + + try: + response = await client.invoke(scorer_label="toxicity", input="hello") + finally: + await client.close() + + assert response.score == 0.1 + assert [item["stage"] for item in observed] == [ + "build_request", + "resolve_endpoint", + "get_http_client", + "http_post", + "parse_json", + "model_response", + ] + assert all(item["auth_mode"] == "public" for item in observed) + assert all(item["scorer_identifier_kind"] == "label" for item in observed) + assert next(item for item in observed if item["stage"] == "http_post")[ + "endpoint_path" + ] == "/scorers/invoke" + + +@pytest.mark.asyncio +async def test_httpcore_phase_trace_records_metrics(monkeypatch): + """HTTP transport phase tracing records matching Prometheus metrics.""" + import agent_control_evaluator_galileo.luna.client as luna_client_module + + observed: list[dict[str, object]] = [] + monkeypatch.setattr( + luna_client_module, + "observe_luna_httpcore_phase", + lambda **labels: observed.append(labels), + ) + + trace = luna_client_module._HttpCorePhaseTrace() + + await trace("connection.connect_tcp.started", {}) + await trace("connection.connect_tcp.complete", {"return_value": "ok"}) + + assert len(observed) == 1 + assert observed[0]["phase"] == "connection.connect_tcp" + assert observed[0]["outcome"] == "complete" + assert isinstance(observed[0]["duration_seconds"], float) + assert observed[0]["duration_seconds"] >= 0 + + @pytest.mark.asyncio async def test_client_async_context_manager_closes_on_exit(monkeypatch): """Entering/exiting the async context manager must close the client.""" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index f123e214..82f4fa29 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -3,8 +3,13 @@ from __future__ import annotations import json +import logging import os from base64 import urlsafe_b64decode +from collections.abc import Awaitable, Callable, Iterator +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import cast from unittest.mock import AsyncMock, patch import httpx @@ -13,6 +18,35 @@ from pydantic import ValidationError +@dataclass +class RecordedSpan: + """Captures optional tracing span data for tests.""" + + op: str + name: str + data: dict[str, object] = field(default_factory=dict) + + def set_data(self, key: str, value: object) -> None: + self.data[key] = value + + +def trace_span_recorder(spans: list[RecordedSpan]): + """Return a trace_span replacement that records spans.""" + + @contextmanager + def _trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, + ) -> Iterator[RecordedSpan]: + span = RecordedSpan(op=op, name=name, data=dict(data or {})) + spans.append(span) + yield span + + return _trace_span + + def _decode_jwt_payload(token: str) -> dict[str, object]: payload_segment = token.split(".")[1] padded = payload_segment + ("=" * (-len(payload_segment) % 4)) @@ -135,7 +169,7 @@ def test_client_uses_protect_api_url_derivation(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient # Given: the same console URL shape used by Protect - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}): + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): client = GalileoLunaClient(console_url="https://console.demo-v2.galileocloud.io") # Then: the API URL is derived the same way @@ -144,28 +178,338 @@ def test_client_uses_protect_api_url_derivation(self) -> None: def test_client_uses_galileo_api_url_when_set(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient - # Given: an explicit devstack API URL + # Given: an explicit custom-environment API URL with patch.dict( os.environ, { "GALILEO_API_KEY": "test-key", - "GALILEO_API_URL": "https://api-test-luna.gcp-dev.galileo.ai/", + "GALILEO_API_URL": "https://api-test-luna.example.com/", }, + clear=True, ): - client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") + client = GalileoLunaClient(console_url="https://console-test-luna.example.com") # Then: the explicit API URL wins over console URL derivation - assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + assert client.api_base == "https://api-test-luna.example.com" + + def test_client_uses_luna_api_url_when_set(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a Luna-specific API URL and a general API URL are both configured + with patch.dict( + os.environ, + { + "GALILEO_API_KEY": "test-key", + "GALILEO_LUNA_API_URL": "https://luna-api.example.com/", + "GALILEO_API_URL": "https://api.example.com", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: the Luna-specific URL wins without changing the general API URL contract + assert client.api_base == "https://luna-api.example.com" + + def test_client_uses_luna_api_url_for_internal_auth(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: internal auth and both Luna-specific and general API URLs are configured + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_API_URL": "https://internal-api.example.com", + "GALILEO_API_URL": "https://api-public.example.com", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: internal scorer invocation uses the Luna-specific API base + assert client.api_base == "https://internal-api.example.com" def test_client_derives_api_url_from_console_dash_hostname(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient - # Given: a console- devstack hostname - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=False): - client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") + # Given: a console- hostname + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): + client = GalileoLunaClient(console_url="https://console-test-luna.example.com") + + # Then: the matching api- hostname is used + assert client.api_base == "https://api-test-luna.example.com" + + def test_client_strips_whitespace_from_env_url(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a URL override padded with whitespace and a trailing slash + with patch.dict( + os.environ, + { + "GALILEO_API_KEY": "test-key", + "GALILEO_LUNA_API_URL": " https://luna-api.example.com/ ", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: the resolved base URL is trimmed and slash-free + assert client.api_base == "https://luna-api.example.com" + + def test_client_warns_when_deprecated_auth_mode_env_is_set( + self, caplog: pytest.LogCaptureFixture + ) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: the deprecated auth-mode environment variable + caplog.set_level(logging.WARNING) + with patch.dict( + os.environ, + {"GALILEO_API_KEY": "test-key", "GALILEO_LUNA_AUTH_MODE": "public"}, + clear=True, + ): + # When/Then: construction still works but emits a deprecation warning + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient(console_url="https://console.example.com") - # Then: the matching api- hostname is used - assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + assert client.auth_mode == "public" + assert "GALILEO_LUNA_AUTH_MODE is deprecated" in caplog.text + + def test_client_rejects_unreadable_ca_bundle(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a CA bundle path that does not exist + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CA_FILE": "/nonexistent/ca.pem", + }, + clear=True, + ): + # When/Then: client construction fails fast instead of at first request + with pytest.raises(ValueError, match="Failed to load CA bundle"): + GalileoLunaClient(console_url="https://console.example.com") + + @pytest.mark.asyncio + async def test_client_applies_ca_bundle_and_connection_limits(self) -> None: + import certifi + from agent_control_evaluator_galileo.luna import GalileoLunaClient + from agent_control_evaluator_galileo.luna.client import ( + DEFAULT_KEEPALIVE_EXPIRY_SECS, + DEFAULT_MAX_CONNECTIONS, + DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + ) + + captured: dict[str, object] = {} + real_async_client = httpx.AsyncClient + + def recording_client(**kwargs: object) -> httpx.AsyncClient: + captured.update(kwargs) + return real_async_client(**kwargs) + + # Given: internal auth with a CA bundle configured + with patch.dict(os.environ, {"GALILEO_API_SECRET_KEY": "test-secret"}, clear=True): + client = GalileoLunaClient( + console_url="https://console.example.com", ca_file=certifi.where() + ) + + with patch( + "agent_control_evaluator_galileo.luna.client.httpx.AsyncClient", recording_client + ): + try: + await client._get_client() + finally: + await client.close() + + # Then: TLS verification uses the configured CA bundle and pooled + # connections expire quickly so closed server sockets are not reused + assert captured["verify"] is client._ssl_context + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry == DEFAULT_KEEPALIVE_EXPIRY_SECS + assert limits.max_connections == DEFAULT_MAX_CONNECTIONS + assert limits.max_keepalive_connections == DEFAULT_MAX_KEEPALIVE_CONNECTIONS + + @pytest.mark.asyncio + async def test_client_applies_connection_tuning_env(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + captured: dict[str, object] = {} + real_async_client = httpx.AsyncClient + + def recording_client(**kwargs: object) -> httpx.AsyncClient: + captured.update(kwargs) + return real_async_client(**kwargs) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "0.25", + "GALILEO_LUNA_MAX_CONNECTIONS": "17", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "4", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch( + "agent_control_evaluator_galileo.luna.client.httpx.AsyncClient", recording_client + ): + try: + await client._get_client() + finally: + await client.close() + + assert client.keepalive_expiry_seconds == 0.25 + assert client.max_connections == 17 + assert client.max_keepalive_connections == 4 + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry == 0.25 + assert limits.max_connections == 17 + assert limits.max_keepalive_connections == 4 + + @pytest.mark.asyncio + async def test_client_pool_size_rotates_across_http_clients(self) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + class FakeAsyncClient: + def __init__(self, **kwargs: object) -> None: + self.kwargs = kwargs + self.is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + created: list[FakeAsyncClient] = [] + + def recording_client(**kwargs: object) -> FakeAsyncClient: + client = FakeAsyncClient(**kwargs) + created.append(client) + return client + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CLIENT_POOL_SIZE": "3", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch.object(luna_client_module.httpx, "AsyncClient", recording_client): + try: + selected = [await client._get_client() for _ in range(5)] + finally: + await client.close() + + assert client.client_pool_size == 3 + assert len(created) == 3 + assert selected == [created[0], created[1], created[2], created[0], created[1]] + assert all(created_client.is_closed for created_client in created) + + @pytest.mark.asyncio + async def test_client_emits_httpcore_phase_spans_when_enabled( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + spans: list[RecordedSpan] = [] + captured: dict[str, object] = {} + monkeypatch.setattr(luna_client_module, "trace_span", trace_span_recorder(spans)) + + class FakeAsyncClient: + is_closed = False + + async def post(self, url: str, **kwargs: object) -> httpx.Response: + captured.update(kwargs) + extensions = kwargs.get("extensions") + assert isinstance(extensions, dict) + trace = cast( + Callable[[str, dict[str, object]], Awaitable[None]], + extensions["trace"], + ) + await trace("connection.connect_tcp.started", {}) + await trace("connection.connect_tcp.complete", {"return_value": object()}) + await trace("http11.receive_response_headers.started", {}) + await trace("http11.receive_response_headers.complete", {}) + return httpx.Response( + 200, + json={"scorer_label": "toxicity", "score": 0.2, "status": "success"}, + request=httpx.Request("POST", url), + ) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_HTTP_PHASE_TRACING": "true", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + client._client = FakeAsyncClient() # type: ignore[assignment] + + response = await client.invoke(scorer_label="toxicity", output="hello") + + assert response.score == 0.2 + assert captured["extensions"] is not None + phase_spans = { + span.op: span.data + for span in spans + if span.op.startswith("agent_control.luna.httpcore.") + } + assert "agent_control.luna.httpcore.connection.connect_tcp" in phase_spans + assert "agent_control.luna.httpcore.http11.receive_response_headers" in phase_spans + assert phase_spans["agent_control.luna.httpcore.connection.connect_tcp"][ + "httpcore.outcome" + ] == "complete" + assert "httpcore.duration_ms" in phase_spans[ + "agent_control.luna.httpcore.connection.connect_tcp" + ] + + @pytest.mark.parametrize( + "env_values, expected", + [ + ({"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "soon"}, "not a number"), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "many"}, "not an integer"), + ({"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "some"}, "not an integer"), + ( + {"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "-0.1"}, + "greater than or equal to 0", + ), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "0"}, "greater than 0"), + ( + {"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "-1"}, + "greater than or equal to 0", + ), + ( + { + "GALILEO_LUNA_MAX_CONNECTIONS": "2", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "3", + }, + "less than or equal", + ), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "many"}, "not an integer"), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "0"}, "greater than 0"), + ], + ) + def test_client_reports_invalid_connection_tuning_env( + self, env_values: dict[str, str], expected: str + ) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + env = {"GALILEO_API_SECRET_KEY": "test-secret"} | env_values + with patch.dict(os.environ, env, clear=True): + with pytest.raises(ValueError) as exc_info: + GalileoLunaClient(console_url="https://console.example.com") + + assert expected in str(exc_info.value) @pytest.mark.asyncio async def test_client_posts_to_scorers_invoke_without_protect_fields(self) -> None: @@ -188,7 +532,7 @@ def handler(request: httpx.Request) -> httpx.Response: ) # Given: a Luna client with a mock HTTP transport - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}): + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): client = GalileoLunaClient(console_url="https://console.demo-v2.galileocloud.io") client._client = httpx.AsyncClient( transport=httpx.MockTransport(handler), @@ -223,6 +567,54 @@ def handler(request: httpx.Request) -> httpx.Response: assert isinstance(headers, dict) assert headers["galileo-api-key"] == "test-key" + @pytest.mark.asyncio + async def test_client_emits_scorer_invoke_trace_spans( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(luna_client_module, "trace_span", trace_span_recorder(spans)) + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "scorer_label": "toxicity", + "score": 0.82, + "status": "success", + "execution_time": 0.12, + }, + ) + + with patch.dict(os.environ, {"GALILEO_API_SECRET_KEY": "test-secret"}, clear=True): + client = GalileoLunaClient(api_url="https://api.default.svc.cluster.local:8088") + client._client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + try: + response = await client.invoke(scorer_label="toxicity", output="model answer") + finally: + await client.close() + + assert response.status == "success" + assert { + span.op + for span in spans + } >= { + "agent_control.luna.request.build", + "agent_control.luna.request.endpoint", + "agent_control.luna.client.get", + "agent_control.luna.http.post", + "agent_control.luna.response.parse", + "agent_control.luna.response.model", + } + post_span = next(span for span in spans if span.op == "agent_control.luna.http.post") + assert post_span.data["auth.mode"] == "internal" + assert post_span.data["endpoint.path"] == "/internal/scorers/invoke" + assert post_span.data["http.status_code"] == 200 + @pytest.mark.asyncio async def test_client_uses_internal_jwt_when_api_secret_is_set(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient @@ -438,6 +830,53 @@ async def test_evaluator_returns_non_match_below_threshold(self) -> None: timeout=10.0, ) + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) + @pytest.mark.asyncio + async def test_evaluator_emits_phase_trace_spans( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.evaluator as luna_evaluator_module + from agent_control_evaluator_galileo.luna import LunaEvaluator, ScorerInvokeResponse + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(luna_evaluator_module, "trace_span", trace_span_recorder(spans)) + evaluator = LunaEvaluator.from_dict( + {"scorer_label": "toxicity", "threshold": 0.7, "operator": "gte"} + ) + + with patch.object(GalileoLunaClient, "invoke", new_callable=AsyncMock) as mock_invoke: + mock_invoke.return_value = ScorerInvokeResponse( + scorer_label="toxicity", + score=0.82, + status="success", + ) + + result = await evaluator.evaluate({"input": "user prompt", "output": "model answer"}) + + assert result.matched is True + assert { + span.op + for span in spans + } >= { + "agent_control.luna.evaluate.prepare_payload", + "agent_control.luna.evaluate.scorer_kwargs", + "agent_control.luna.evaluate.invoke", + "agent_control.luna.evaluate.score_match", + "agent_control.luna.evaluate.metadata", + } + invoke_span = next( + span for span in spans if span.op == "agent_control.luna.evaluate.invoke" + ) + score_span = next( + span for span in spans if span.op == "agent_control.luna.evaluate.score_match" + ) + assert invoke_span.data["payload.has_input"] is True + assert invoke_span.data["payload.has_output"] is True + assert invoke_span.data["scorer.status"] == "success" + assert score_span.data["matched"] is True + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) @pytest.mark.asyncio @pytest.mark.parametrize("data", ["", " "]) @@ -480,3 +919,41 @@ async def test_evaluator_fail_open_sets_error(self) -> None: assert "error" not in result.metadata assert result.metadata["error_type"] == "RuntimeError" assert "fallback_action" not in result.metadata + + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) + @pytest.mark.asyncio + async def test_evaluator_error_metadata_includes_http_status_context(self) -> None: + from agent_control_evaluator_galileo.luna import LunaEvaluator + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + evaluator = LunaEvaluator.from_dict({"scorer_label": "toxicity", "threshold": 0.5}) + request = httpx.Request( + "POST", + "https://api.example.test/internal/scorers/invoke?token=secret", + ) + response = httpx.Response( + 503, + headers={"content-type": "application/json"}, + text='{"detail":"busy"}', + request=request, + ) + + with patch.object(GalileoLunaClient, "invoke", new_callable=AsyncMock) as mock_invoke: + mock_invoke.side_effect = httpx.HTTPStatusError( + "service unavailable", + request=request, + response=response, + ) + + result = await evaluator.evaluate("hello") + + assert result.matched is False + assert result.metadata is not None + assert result.metadata["error_type"] == "HTTPStatusError" + assert result.metadata["http_status_code"] == 503 + assert result.metadata["http_method"] == "POST" + assert result.metadata["http_endpoint_path"] == "/internal/scorers/invoke" + assert result.metadata["http_response_content_type"] == "application/json" + assert result.metadata["http_response_body"] == '{"detail":"busy"}' + assert result.metadata["http_response_body_truncated"] is False + assert "token=secret" not in str(result.metadata) diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index 5ac97cda..b81b034f 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -17,22 +17,43 @@ Start the Agent Control server from the repo root: make server-run ``` -Configure Galileo public API-key auth: +Configure exactly one Galileo credential. + +For most OSS users, only an API key is required. This uses public API-key auth +and calls the public scorer API: ```bash -export GALILEO_LUNA_AUTH_MODE="public" export GALILEO_API_KEY="your-api-key" export GALILEO_CONSOLE_URL="https://console.demo-v2.galileocloud.io" ``` -For internal deployments, use internal auth instead: +`GALILEO_CONSOLE_URL` is optional when using the production console URL. +`GALILEO_LUNA_API_URL` is not required for this path. The client uses +`GALILEO_API_URL` when set, otherwise it derives the API URL from +`GALILEO_CONSOLE_URL`. + +For deployments that use service-to-service internal auth, the deployment +environment should inject the API internal secret instead of an API key: ```bash -export GALILEO_LUNA_AUTH_MODE="internal" +# Set by deployment tooling, not by normal OSS users. export GALILEO_API_SECRET_KEY="your-api-secret" -export GALILEO_API_URL="https://api.default.svc.cluster.local:8088" ``` +OSS users do not need to set `GALILEO_API_SECRET_KEY` manually for the public +API-key path. Deployment tooling may also set a custom scorer API endpoint and +CA bundle. Use these only when the scorer API is not reachable through the +default public API URL derivation, or when the endpoint uses a private CA: + +```bash +export GALILEO_LUNA_API_URL="https://api.default.svc.cluster.local:8088" +export GALILEO_LUNA_CA_FILE="/etc/ssl/internal/ca.crt" +``` + +`GALILEO_LUNA_API_URL` overrides the scorer API URL in either auth mode. +`GALILEO_LUNA_CA_FILE` is only needed for endpoints that are not trusted by the +system CA store. + Optional scorer settings: ```bash @@ -50,9 +71,8 @@ scalar as the scorer `output` field. If a selector returns structured data with `input` and/or `output` keys, those keys are sent directly and override `GALILEO_LUNA_PAYLOAD_FIELD`. -If both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY`/`GALILEO_API_SECRET` are -set, `GALILEO_LUNA_AUTH_MODE` is required so the client does not silently choose -an auth path. +Setting both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY` is an error; unset +one so the auth mode can be inferred. Run: diff --git a/examples/galileo_luna/demo_agent.py b/examples/galileo_luna/demo_agent.py index 8c7f59b2..0b6a0f8a 100644 --- a/examples/galileo_luna/demo_agent.py +++ b/examples/galileo_luna/demo_agent.py @@ -21,7 +21,6 @@ AGENT_NAME = "galileo-luna-agent" SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") -LUNA_AUTH_MODE = os.getenv("GALILEO_LUNA_AUTH_MODE") logging.basicConfig( level=logging.INFO, @@ -92,36 +91,28 @@ def init_agent() -> None: async def run_demo() -> None: """Run scripted scenarios.""" api_key = os.getenv("GALILEO_API_KEY") - api_secret = os.getenv("GALILEO_API_SECRET_KEY") or os.getenv("GALILEO_API_SECRET") + api_secret = os.getenv("GALILEO_API_SECRET_KEY") if not api_key and not api_secret: print( "Galileo credentials are required for the galileo.luna evaluator. " - "Set GALILEO_API_KEY for public mode or GALILEO_API_SECRET_KEY for " - "internal mode." + "Set GALILEO_API_KEY for public mode. Deployments using internal " + "mode should inject GALILEO_API_SECRET_KEY." ) return - if api_key and api_secret and LUNA_AUTH_MODE not in {"public", "internal"}: + if api_key and api_secret: print( - "Both GALILEO_API_KEY and GALILEO_API_SECRET_KEY/GALILEO_API_SECRET are set. " - "Set GALILEO_LUNA_AUTH_MODE to 'public' or 'internal'." - ) - return - if LUNA_AUTH_MODE == "public" and not api_key: - print("GALILEO_API_KEY is required when GALILEO_LUNA_AUTH_MODE=public.") - return - if LUNA_AUTH_MODE == "internal" and not api_secret: - print( - "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required when " - "GALILEO_LUNA_AUTH_MODE=internal." + "Both GALILEO_API_KEY and GALILEO_API_SECRET_KEY are set. " + "Unset one so the auth mode can be inferred." ) return + auth_mode = "public" if api_key else "internal" print("=" * 72) print("Direct Galileo Luna Evaluator Demo") print("=" * 72) print(f"Server: {SERVER_URL}") print(f"Agent: {AGENT_NAME}") - print(f"Auth: GALILEO_LUNA_AUTH_MODE={LUNA_AUTH_MODE or '(auto if one credential)'}") + print(f"Auth: {auth_mode}") print() init_agent() diff --git a/examples/galileo_luna/setup_controls.py b/examples/galileo_luna/setup_controls.py index fb4c6c76..fe1434c8 100644 --- a/examples/galileo_luna/setup_controls.py +++ b/examples/galileo_luna/setup_controls.py @@ -4,8 +4,8 @@ Prerequisites: - Agent Control server running at AGENT_CONTROL_URL, default http://localhost:8000 - Galileo credentials set where demo_agent.py will run: - GALILEO_API_KEY with GALILEO_LUNA_AUTH_MODE=public, or - GALILEO_API_SECRET_KEY/GALILEO_API_SECRET with GALILEO_LUNA_AUTH_MODE=internal + GALILEO_API_KEY for public auth, or + deployment-injected GALILEO_API_SECRET_KEY for internal auth Usage: uv run python setup_controls.py @@ -29,7 +29,6 @@ LUNA_SCORER_VERSION_ID = os.getenv("GALILEO_LUNA_SCORER_VERSION_ID") LUNA_THRESHOLD = float(os.getenv("GALILEO_LUNA_THRESHOLD", "0.5")) LUNA_PAYLOAD_FIELD = os.getenv("GALILEO_LUNA_PAYLOAD_FIELD", "output") -LUNA_AUTH_MODE = os.getenv("GALILEO_LUNA_AUTH_MODE") if LUNA_PAYLOAD_FIELD not in {"input", "output"}: raise ValueError("GALILEO_LUNA_PAYLOAD_FIELD must be either 'input' or 'output'.") @@ -175,7 +174,7 @@ async def setup_demo() -> None: f"threshold={LUNA_THRESHOLD}, " f"payload_field={LUNA_PAYLOAD_FIELD!r}" ) - print(f"Auth: GALILEO_LUNA_AUTH_MODE={LUNA_AUTH_MODE or '(auto if one credential)'}") + print("Auth: inferred from the single configured Galileo credential") async with AgentControlClient(base_url=SERVER_URL, timeout=30.0) as client: await client.health_check() diff --git a/server/src/agent_control_server/endpoints/evaluation.py b/server/src/agent_control_server/endpoints/evaluation.py index bc66381f..dcbd4272 100644 --- a/server/src/agent_control_server/endpoints/evaluation.py +++ b/server/src/agent_control_server/endpoints/evaluation.py @@ -2,6 +2,7 @@ import json from dataclasses import dataclass +from time import perf_counter from agent_control_engine.core import ControlEngine from agent_control_models import ( @@ -19,6 +20,7 @@ from ..db import get_async_db from ..errors import APIValidationError, NotFoundError from ..logging_utils import get_logger +from ..metrics import observe_evaluation_stage, prometheus_evaluation_observer from ..models import Agent from ..services.controls import ControlService @@ -164,37 +166,56 @@ async def evaluate( the observability ingestion endpoint. """ namespace_key = principal.namespace_key - - agent_result = await db.execute( - select(Agent).where( - Agent.name == request.agent_name, - Agent.namespace_key == namespace_key, + load_started_at = perf_counter() + try: + agent_result = await db.execute( + select(Agent).where( + Agent.name == request.agent_name, + Agent.namespace_key == namespace_key, + ) ) - ) - agent = agent_result.scalar_one_or_none() - if agent is None: - raise NotFoundError( - error_code=ErrorCode.AGENT_NOT_FOUND, - detail=f"Agent '{request.agent_name}' not found", - resource="Agent", - resource_id=request.agent_name, - hint="Register the agent via initAgent before evaluating.", + agent = agent_result.scalar_one_or_none() + if agent is None: + raise NotFoundError( + error_code=ErrorCode.AGENT_NOT_FOUND, + detail=f"Agent '{request.agent_name}' not found", + resource="Agent", + resource_id=request.agent_name, + hint="Register the agent via initAgent before evaluating.", + ) + + runtime_controls = await ControlService(db).list_runtime_controls_for_agent( + request.agent_name, + namespace_key=namespace_key, + target_type=request.target_type, + target_id=request.target_id, + allow_invalid_step_name_regex=True, ) - runtime_controls = await ControlService(db).list_runtime_controls_for_agent( - request.agent_name, - namespace_key=namespace_key, - target_type=request.target_type, - target_id=request.target_id, - allow_invalid_step_name_regex=True, + engine_controls = [ControlAdapter(c.id, c.name, c.control) for c in runtime_controls] + except Exception: + observe_evaluation_stage( + stage="load_controls", + outcome="error", + duration_seconds=perf_counter() - load_started_at, + ) + raise + observe_evaluation_stage( + stage="load_controls", + outcome="success", + duration_seconds=perf_counter() - load_started_at, ) - engine_controls = [ControlAdapter(c.id, c.name, c.control) for c in runtime_controls] - - engine = ControlEngine(engine_controls) + engine = ControlEngine(engine_controls, observer=prometheus_evaluation_observer) + engine_started_at = perf_counter() try: raw_response = await engine.process(request) except ValueError: + observe_evaluation_stage( + stage="engine", + outcome="validation_error", + duration_seconds=perf_counter() - engine_started_at, + ) _logger.exception("Evaluation failed due to invalid configuration or input") raise APIValidationError( error_code=ErrorCode.EVALUATION_FAILED, @@ -210,5 +231,32 @@ async def evaluate( ) ], ) + except Exception: + observe_evaluation_stage( + stage="engine", + outcome="error", + duration_seconds=perf_counter() - engine_started_at, + ) + raise + observe_evaluation_stage( + stage="engine", + outcome="success", + duration_seconds=perf_counter() - engine_started_at, + ) - return _sanitize_evaluation_response(raw_response) + sanitize_started_at = perf_counter() + try: + response = _sanitize_evaluation_response(raw_response) + except Exception: + observe_evaluation_stage( + stage="sanitize_response", + outcome="error", + duration_seconds=perf_counter() - sanitize_started_at, + ) + raise + observe_evaluation_stage( + stage="sanitize_response", + outcome="success", + duration_seconds=perf_counter() - sanitize_started_at, + ) + return response diff --git a/server/src/agent_control_server/metrics.py b/server/src/agent_control_server/metrics.py new file mode 100644 index 00000000..d0badf37 --- /dev/null +++ b/server/src/agent_control_server/metrics.py @@ -0,0 +1,74 @@ +"""Prometheus metrics owned by the Agent Control server.""" + +from agent_control_engine.core import ControlObserverOutcome, EvaluatorObserverOutcome +from prometheus_client import Histogram + +_EVALUATION_STAGE_DURATION = Histogram( + "agent_control_server_evaluation_stage_duration_seconds", + "Duration of evaluation endpoint stages.", + ("stage", "outcome"), +) +_EVALUATOR_QUEUE_DURATION = Histogram( + "agent_control_server_engine_evaluator_queue_duration_seconds", + "Time evaluator leaves spend waiting for engine concurrency.", + ("evaluator",), +) +_EVALUATOR_DURATION = Histogram( + "agent_control_server_engine_evaluator_duration_seconds", + "Time spent executing evaluator leaves.", + ("evaluator", "outcome"), +) +_CONTROL_DURATION = Histogram( + "agent_control_server_engine_control_duration_seconds", + "Time spent evaluating top-level controls.", + ("action", "outcome"), +) + + +def observe_evaluation_stage( + *, + stage: str, + outcome: str, + duration_seconds: float, +) -> None: + """Record evaluation endpoint stage duration.""" + _EVALUATION_STAGE_DURATION.labels(stage=stage, outcome=outcome).observe(duration_seconds) + + +class PrometheusEvaluationObserver: + """Records engine timing observations as Prometheus histograms.""" + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record time spent waiting for evaluator concurrency.""" + _EVALUATOR_QUEUE_DURATION.labels(evaluator=evaluator_name).observe(duration_seconds) + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent executing an evaluator.""" + _EVALUATOR_DURATION.labels( + evaluator=evaluator_name, + outcome=outcome, + ).observe(duration_seconds) + + def observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent evaluating a top-level control.""" + _CONTROL_DURATION.labels(action=action, outcome=outcome).observe(duration_seconds) + + +prometheus_evaluation_observer = PrometheusEvaluationObserver() diff --git a/server/tests/test_init_agent.py b/server/tests/test_init_agent.py index 2dfe9eaa..c5a144ea 100644 --- a/server/tests/test_init_agent.py +++ b/server/tests/test_init_agent.py @@ -4,7 +4,6 @@ from typing import Any import pytest -from fastapi import FastAPI from fastapi.testclient import TestClient from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session @@ -46,14 +45,17 @@ def make_agent_payload( } -def test_init_agent_route_exists(app: FastAPI) -> None: - # Given: an application router - paths = {getattr(route, "path", None) for route in app.router.routes} - # When: inspecting registered paths - # (computation done above to gather all paths) - # Then: initAgent and agent retrieval endpoints are present - assert "/api/v1/agents/initAgent" in paths - assert "/api/v1/agents/{agent_name}" in paths +def test_agent_routes_are_registered(client: TestClient) -> None: + # Given: malformed requests that should still match registered agent routes + # When: calling initAgent without the required body fields + init_resp = client.post("/api/v1/agents/initAgent", json={}) + + # And: using an unsupported method on the agent resource route + agent_resp = client.post("/api/v1/agents/some-agent") + + # Then: routing reached the expected endpoints instead of falling through to 404 + assert init_resp.status_code == 422 + assert agent_resp.status_code == 405 def test_init_agent_creates_and_gets_agent(client: TestClient) -> None: diff --git a/server/tests/test_metrics.py b/server/tests/test_metrics.py index e2d8b896..ee9755e6 100644 --- a/server/tests/test_metrics.py +++ b/server/tests/test_metrics.py @@ -1,8 +1,11 @@ -from fastapi import FastAPI -from fastapi.testclient import TestClient - from agent_control_server.config import settings from agent_control_server.main import METRICS_PATH, add_prometheus_metrics +from agent_control_server.metrics import ( + observe_evaluation_stage, + prometheus_evaluation_observer, +) +from fastapi import FastAPI +from fastapi.testclient import TestClient def test_metrics_endpoint_public(unauthenticated_client: TestClient) -> None: @@ -42,3 +45,34 @@ def ping() -> dict[str, bool]: # Then: metrics include the custom prefix assert response.status_code == 200 assert f"{custom_prefix}_" in response.text + + +def test_metrics_output_contains_evaluation_timing_metrics( + unauthenticated_client: TestClient, +) -> None: + # Given: evaluation timing observations have been recorded + observe_evaluation_stage(stage="engine", outcome="success", duration_seconds=0.001) + prometheus_evaluation_observer.observe_evaluator_queue_duration( + evaluator_name="test-evaluator", + duration_seconds=0.001, + ) + prometheus_evaluation_observer.observe_evaluator_duration( + evaluator_name="test-evaluator", + outcome="success", + duration_seconds=0.002, + ) + prometheus_evaluation_observer.observe_control_duration( + action="observe", + outcome="not_matched", + duration_seconds=0.003, + ) + + # When: requesting metrics output + response = unauthenticated_client.get(METRICS_PATH) + + # Then: the internal evaluation timing series are exported + assert response.status_code == 200 + assert "agent_control_server_evaluation_stage_duration_seconds" in response.text + assert "agent_control_server_engine_evaluator_queue_duration_seconds" in response.text + assert "agent_control_server_engine_evaluator_duration_seconds" in response.text + assert "agent_control_server_engine_control_duration_seconds" in response.text