From b344ae1045ddee16991778cad9aa18eb83646aba Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 16:44:43 +0530 Subject: [PATCH 01/18] fix(evaluator-galileo): prefer cluster API URL for internal Luna auth --- .../luna/client.py | 22 ++++++-- .../galileo/tests/test_luna_evaluator.py | 54 +++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 11e4881a..e076f60a 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -167,6 +167,9 @@ class GalileoLunaClient: GALILEO_API_SECRET_KEY or GALILEO_API_SECRET: Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. GALILEO_LUNA_AUTH_MODE: Auth mode, either "public" or "internal". + GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. + GALILEO_API_CLUSTER_URL: Internal Galileo API URL used in internal auth mode. + GALILEO_API_URL: Galileo API URL fallback. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -186,7 +189,8 @@ def __init__( reads from GALILEO_API_SECRET_KEY or GALILEO_API_SECRET. console_url: Galileo Console URL. If not provided, reads from GALILEO_CONSOLE_URL or uses the production console URL. - api_url: Galileo API URL. If not provided, reads from GALILEO_API_URL + api_url: Galileo API URL. If not provided, reads from GALILEO_LUNA_API_URL, + then GALILEO_API_CLUSTER_URL in internal auth mode, then GALILEO_API_URL, before deriving from the console URL. auth_mode: Auth mode to use. If not provided, reads from GALILEO_LUNA_AUTH_MODE, or infers from the single available credential. @@ -211,12 +215,22 @@ def __init__( self.console_url = ( console_url or os.getenv("GALILEO_CONSOLE_URL") or "https://console.galileo.ai" ) - self.api_base = (api_url or os.getenv("GALILEO_API_URL") or "").rstrip( - "/" - ) or self._derive_api_url(self.console_url) + self.api_base = self._resolve_api_base(api_url, resolved_auth_mode) self._client: httpx.AsyncClient | None = None logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) + def _resolve_api_base(self, api_url: str | None, auth_mode: AuthMode) -> str: + """Resolve the scorer invoke API base URL from explicit and environment config.""" + candidates = [api_url, os.getenv("GALILEO_LUNA_API_URL")] + if auth_mode == "internal": + candidates.append(os.getenv("GALILEO_API_CLUSTER_URL")) + candidates.append(os.getenv("GALILEO_API_URL")) + + for candidate in candidates: + if candidate and candidate.strip(): + return candidate.rstrip("/") + return self._derive_api_url(self.console_url) + @staticmethod def _resolve_auth_mode( auth_mode: AuthMode | None, diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index f123e214..0a2bf3cd 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -157,6 +157,60 @@ def test_client_uses_galileo_api_url_when_set(self) -> None: # Then: the explicit API URL wins over console URL derivation assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + def test_client_uses_luna_api_url_when_set(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a Luna-specific API URL and a general API URL are both configured + with patch.dict( + os.environ, + { + "GALILEO_API_KEY": "test-key", + "GALILEO_LUNA_API_URL": "https://luna-api.example.com/", + "GALILEO_API_URL": "https://api.example.com", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: the Luna-specific URL wins without changing the general API URL contract + assert client.api_base == "https://luna-api.example.com" + + def test_client_uses_cluster_url_for_internal_auth(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: internal auth and both cluster and public API URLs are configured + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_API_CLUSTER_URL": "https://api.default.svc.cluster.local:8088", + "GALILEO_API_URL": "https://api-public.example.com", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: internal scorer invocation uses the cluster-local API base + assert client.api_base == "https://api.default.svc.cluster.local:8088" + + def test_client_ignores_cluster_url_for_public_auth(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: public auth with a cluster URL present in the environment + with patch.dict( + os.environ, + { + "GALILEO_API_KEY": "test-key", + "GALILEO_API_CLUSTER_URL": "https://api.default.svc.cluster.local:8088", + "GALILEO_API_URL": "https://api-public.example.com", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: public scorer invocation still uses the public API URL + assert client.api_base == "https://api-public.example.com" + def test_client_derives_api_url_from_console_dash_hostname(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient From 7255e1f33938a2e8087888245b9b32301ecd5895 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 17:31:07 +0530 Subject: [PATCH 02/18] fix(evaluator-galileo): harden Luna client for in-cluster endpoints - Verify TLS against an optional CA bundle (ca_file arg or GALILEO_LUNA_CA_FILE) so internal API endpoints with internally-issued certificates work in internal auth mode. - Bound connection pooling (keepalive expiry 1s, connection limits) so requests do not reuse sockets the server already closed, and retry the idempotent scorer invoke once on connection errors. - Deprecate GALILEO_LUNA_AUTH_MODE; the auth mode is inferred from the configured credential, and setting both credentials remains an explicit error. - Trim whitespace when resolving API base URLs from the environment. - Update the Luna example README and cover URL precedence, TLS, retry, and deprecation with tests. --- .../luna/client.py | 85 ++++++++++-- .../galileo/tests/test_luna_coverage_gaps.py | 4 +- .../galileo/tests/test_luna_evaluator.py | 122 +++++++++++++++++- examples/galileo_luna/README.md | 22 ++-- 4 files changed, 207 insertions(+), 26 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index e076f60a..2a995703 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -4,6 +4,8 @@ import logging import os +import ssl +import warnings from base64 import urlsafe_b64encode from hashlib import sha256 from hmac import new as hmac_new @@ -20,6 +22,11 @@ DEFAULT_TIMEOUT_SECS = 10.0 DEFAULT_INTERNAL_TOKEN_TTL_SECS = 3600 +# Keep pooled-connection reuse shorter than typical server keepalive/worker +# recycle windows so requests do not pick up sockets the server already closed. +DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 +DEFAULT_MAX_CONNECTIONS = 100 +DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] @@ -56,6 +63,13 @@ def _env_auth_mode() -> AuthMode | None: value = os.getenv("GALILEO_LUNA_AUTH_MODE") if value is None or value.strip() == "": return None + warnings.warn( + "GALILEO_LUNA_AUTH_MODE is deprecated. Configure exactly one credential " + "(GALILEO_API_KEY for public auth, GALILEO_API_SECRET_KEY for internal " + "auth) or pass auth_mode to GalileoLunaClient.", + DeprecationWarning, + stacklevel=2, + ) normalized = value.strip().lower() if normalized == "public": return "public" @@ -166,10 +180,13 @@ class GalileoLunaClient: Environment Variables: GALILEO_API_SECRET_KEY or GALILEO_API_SECRET: Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. - GALILEO_LUNA_AUTH_MODE: Auth mode, either "public" or "internal". + GALILEO_LUNA_AUTH_MODE: Deprecated. The auth mode is inferred from which + credential is configured; set exactly one credential instead. GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. GALILEO_API_CLUSTER_URL: Internal Galileo API URL used in internal auth mode. GALILEO_API_URL: Galileo API URL fallback. + GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for + deployments whose API serves an internally-issued TLS certificate. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -180,6 +197,7 @@ def __init__( console_url: str | None = None, api_url: str | None = None, auth_mode: AuthMode | None = None, + ca_file: str | None = None, ) -> None: """Initialize the Galileo Luna client. @@ -192,12 +210,16 @@ def __init__( api_url: Galileo API URL. If not provided, reads from GALILEO_LUNA_API_URL, then GALILEO_API_CLUSTER_URL in internal auth mode, then GALILEO_API_URL, before deriving from the console URL. - auth_mode: Auth mode to use. If not provided, reads from - GALILEO_LUNA_AUTH_MODE, or infers from the single available credential. + auth_mode: Auth mode to use. If not provided, inferred from the single + available credential. (Reading GALILEO_LUNA_AUTH_MODE from the + environment is deprecated.) + ca_file: CA bundle path used to verify the scorer API endpoint. If not + provided, reads from GALILEO_LUNA_CA_FILE. Leave unset for endpoints + with publicly-trusted certificates. Raises: ValueError: If credentials are missing, ambiguous, or incompatible with - the selected auth mode. + the selected auth mode, or if the CA bundle cannot be loaded. """ resolved_api_secret = ( api_secret or os.getenv("GALILEO_API_SECRET_KEY") or os.getenv("GALILEO_API_SECRET") @@ -216,6 +238,8 @@ def __init__( console_url or os.getenv("GALILEO_CONSOLE_URL") or "https://console.galileo.ai" ) self.api_base = self._resolve_api_base(api_url, resolved_auth_mode) + self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None + self._ssl_context = self._load_ssl_context(self.ca_file) self._client: httpx.AsyncClient | None = None logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) @@ -228,9 +252,19 @@ def _resolve_api_base(self, api_url: str | None, auth_mode: AuthMode) -> str: for candidate in candidates: if candidate and candidate.strip(): - return candidate.rstrip("/") + return candidate.strip().rstrip("/") return self._derive_api_url(self.console_url) + @staticmethod + def _load_ssl_context(ca_file: str | None) -> ssl.SSLContext | None: + """Build a TLS verification context from a CA bundle path, if configured.""" + if ca_file is None: + return None + try: + return ssl.create_default_context(cafile=ca_file) + except (OSError, ssl.SSLError) as exc: + raise ValueError(f"Failed to load CA bundle from {ca_file!r}: {exc}") from exc + @staticmethod def _resolve_auth_mode( auth_mode: AuthMode | None, @@ -255,9 +289,9 @@ def _resolve_auth_mode( if api_key and api_secret: raise ValueError( - "Both Galileo API key and API secret are configured. Set " - "GALILEO_LUNA_AUTH_MODE to 'public' or 'internal' to choose the " - "runtime auth mode explicitly." + "Both a Galileo API key and a Galileo API secret are configured. " + "Unset one credential so the auth mode can be inferred, or pass " + "auth_mode='public' or auth_mode='internal' explicitly." ) if api_secret: return "internal" @@ -298,9 +332,18 @@ async def _get_client(self) -> httpx.AsyncClient: headers = {"Content-Type": "application/json"} if self.auth_mode == "public" and self.api_key is not None: headers["Galileo-API-Key"] = self.api_key + verify: ssl.SSLContext | bool = ( + self._ssl_context if self._ssl_context is not None else True + ) self._client = httpx.AsyncClient( headers=headers, timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), + limits=httpx.Limits( + max_connections=DEFAULT_MAX_CONNECTIONS, + max_keepalive_connections=DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + keepalive_expiry=DEFAULT_KEEPALIVE_EXPIRY_SECS, + ), + verify=verify, ) return self._client @@ -371,12 +414,26 @@ async def invoke( try: client = await self._get_client() - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - ) + try: + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + ) + except (httpx.ConnectError, httpx.RemoteProtocolError) as exc: + # These errors occur before a response is received, typically when + # the server closed a pooled connection. The scorer invoke request + # is idempotent, so a single retry on a fresh connection is safe. + logger.warning( + "[GalileoLunaClient] Retrying once after connection error: %s", exc + ) + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + ) response.raise_for_status() response_data = response.json() if not isinstance(response_data, dict): diff --git a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py index 9d5f6766..ab304a74 100644 --- a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py +++ b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py @@ -457,7 +457,9 @@ def test_client_requires_explicit_mode_when_both_credentials_are_present(monkeyp monkeypatch.delenv("GALILEO_LUNA_AUTH_MODE", raising=False) from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="Both Galileo API key and API secret"): + with pytest.raises( + ValueError, match="Both a Galileo API key and a Galileo API secret are configured" + ): GalileoLunaClient() diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 0a2bf3cd..1dc90c8d 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -135,7 +135,7 @@ def test_client_uses_protect_api_url_derivation(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient # Given: the same console URL shape used by Protect - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}): + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): client = GalileoLunaClient(console_url="https://console.demo-v2.galileocloud.io") # Then: the API URL is derived the same way @@ -151,6 +151,7 @@ def test_client_uses_galileo_api_url_when_set(self) -> None: "GALILEO_API_KEY": "test-key", "GALILEO_API_URL": "https://api-test-luna.gcp-dev.galileo.ai/", }, + clear=True, ): client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") @@ -215,12 +216,127 @@ def test_client_derives_api_url_from_console_dash_hostname(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient # Given: a console- devstack hostname - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=False): + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") # Then: the matching api- hostname is used assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + def test_client_strips_whitespace_from_env_url(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a URL override padded with whitespace and a trailing slash + with patch.dict( + os.environ, + { + "GALILEO_API_KEY": "test-key", + "GALILEO_LUNA_API_URL": " https://luna-api.example.com/ ", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + # Then: the resolved base URL is trimmed and slash-free + assert client.api_base == "https://luna-api.example.com" + + def test_client_warns_when_deprecated_auth_mode_env_is_set(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: the deprecated auth-mode environment variable + with patch.dict( + os.environ, + {"GALILEO_API_KEY": "test-key", "GALILEO_LUNA_AUTH_MODE": "public"}, + clear=True, + ): + # When/Then: construction still works but emits a deprecation warning + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient(console_url="https://console.example.com") + + assert client.auth_mode == "public" + + def test_client_rejects_unreadable_ca_bundle(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + # Given: a CA bundle path that does not exist + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CA_FILE": "/nonexistent/ca.pem", + }, + clear=True, + ): + # When/Then: client construction fails fast instead of at first request + with pytest.raises(ValueError, match="Failed to load CA bundle"): + GalileoLunaClient(console_url="https://console.example.com") + + @pytest.mark.asyncio + async def test_client_applies_ca_bundle_and_connection_limits(self) -> None: + import certifi + + from agent_control_evaluator_galileo.luna import GalileoLunaClient + from agent_control_evaluator_galileo.luna.client import DEFAULT_KEEPALIVE_EXPIRY_SECS + + captured: dict[str, object] = {} + real_async_client = httpx.AsyncClient + + def recording_client(**kwargs: object) -> httpx.AsyncClient: + captured.update(kwargs) + return real_async_client(**kwargs) + + # Given: internal auth with a CA bundle configured + with patch.dict(os.environ, {"GALILEO_API_SECRET_KEY": "test-secret"}, clear=True): + client = GalileoLunaClient( + console_url="https://console.example.com", ca_file=certifi.where() + ) + + with patch( + "agent_control_evaluator_galileo.luna.client.httpx.AsyncClient", recording_client + ): + try: + await client._get_client() + finally: + await client.close() + + # Then: TLS verification uses the configured CA bundle and pooled + # connections expire quickly so closed server sockets are not reused + assert captured["verify"] is client._ssl_context + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry == DEFAULT_KEEPALIVE_EXPIRY_SECS + + @pytest.mark.asyncio + async def test_client_retries_once_after_server_disconnect(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + calls = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + calls["count"] += 1 + if calls["count"] == 1: + raise httpx.RemoteProtocolError( + "Server disconnected without sending a response.", request=request + ) + return httpx.Response( + 200, + json={"scorer_label": "toxicity", "score": 0.1, "status": "success"}, + ) + + # Given: a transport whose first attempt fails like a stale keepalive socket + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): + client = GalileoLunaClient(console_url="https://console.example.com") + client._client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + try: + # When: invoking a scorer + response = await client.invoke(scorer_label="toxicity", output="hello") + finally: + await client.close() + + # Then: the request is retried once and succeeds + assert calls["count"] == 2 + assert response.score == 0.1 + @pytest.mark.asyncio async def test_client_posts_to_scorers_invoke_without_protect_fields(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient @@ -242,7 +358,7 @@ def handler(request: httpx.Request) -> httpx.Response: ) # Given: a Luna client with a mock HTTP transport - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}): + with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): client = GalileoLunaClient(console_url="https://console.demo-v2.galileocloud.io") client._client = httpx.AsyncClient( transport=httpx.MockTransport(handler), diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index 5ac97cda..9b078021 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -17,22 +17,28 @@ Start the Agent Control server from the repo root: make server-run ``` -Configure Galileo public API-key auth: +Configure exactly one Galileo credential. With an API key, the evaluator uses +public API-key auth and calls the public scorer API: ```bash -export GALILEO_LUNA_AUTH_MODE="public" export GALILEO_API_KEY="your-api-key" export GALILEO_CONSOLE_URL="https://console.demo-v2.galileocloud.io" ``` -For internal deployments, use internal auth instead: +With the Galileo API internal secret, it uses internal auth and calls the +internal scorer API. In-cluster deployments should point it at the +cluster-local API endpoint and, when that endpoint serves an internally-issued +TLS certificate, provide the CA bundle: ```bash -export GALILEO_LUNA_AUTH_MODE="internal" export GALILEO_API_SECRET_KEY="your-api-secret" -export GALILEO_API_URL="https://api.default.svc.cluster.local:8088" +export GALILEO_API_CLUSTER_URL="https://api.default.svc.cluster.local:8088" +export GALILEO_LUNA_CA_FILE="/etc/ssl/internal/ca.crt" ``` +`GALILEO_LUNA_API_URL` overrides the scorer API URL in either mode, and +`GALILEO_API_URL` remains the public API URL fallback. + Optional scorer settings: ```bash @@ -50,9 +56,9 @@ scalar as the scorer `output` field. If a selector returns structured data with `input` and/or `output` keys, those keys are sent directly and override `GALILEO_LUNA_PAYLOAD_FIELD`. -If both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY`/`GALILEO_API_SECRET` are -set, `GALILEO_LUNA_AUTH_MODE` is required so the client does not silently choose -an auth path. +Setting both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY`/`GALILEO_API_SECRET` +is an error; unset one so the auth mode can be inferred. `GALILEO_LUNA_AUTH_MODE` +is deprecated and only honored as a legacy override. Run: From 9d960852f624ca2853ef78983a6147d32e1e1b31 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 17:46:02 +0530 Subject: [PATCH 03/18] fix(evaluator-galileo): align Luna auth config --- .../luna/client.py | 39 +++++-------------- .../galileo/tests/test_luna_coverage_gaps.py | 16 +++++--- .../galileo/tests/test_luna_evaluator.py | 32 --------------- examples/galileo_luna/README.md | 3 +- examples/galileo_luna/demo_agent.py | 17 ++------ examples/galileo_luna/setup_controls.py | 7 ++-- 6 files changed, 28 insertions(+), 86 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 2a995703..adb408fe 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -180,8 +180,6 @@ class GalileoLunaClient: Environment Variables: GALILEO_API_SECRET_KEY or GALILEO_API_SECRET: Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. - GALILEO_LUNA_AUTH_MODE: Deprecated. The auth mode is inferred from which - credential is configured; set exactly one credential instead. GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. GALILEO_API_CLUSTER_URL: Internal Galileo API URL used in internal auth mode. GALILEO_API_URL: Galileo API URL fallback. @@ -211,8 +209,7 @@ def __init__( then GALILEO_API_CLUSTER_URL in internal auth mode, then GALILEO_API_URL, before deriving from the console URL. auth_mode: Auth mode to use. If not provided, inferred from the single - available credential. (Reading GALILEO_LUNA_AUTH_MODE from the - environment is deprecated.) + available credential. ca_file: CA bundle path used to verify the scorer API endpoint. If not provided, reads from GALILEO_LUNA_CA_FILE. Leave unset for endpoints with publicly-trusted certificates. @@ -274,16 +271,14 @@ def _resolve_auth_mode( ) -> AuthMode: if auth_mode == "public": if not api_key: - raise ValueError( - "GALILEO_API_KEY is required when GALILEO_LUNA_AUTH_MODE=public." - ) + raise ValueError("GALILEO_API_KEY is required for public Luna auth.") return "public" if auth_mode == "internal": if not api_secret: raise ValueError( - "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required when " - "GALILEO_LUNA_AUTH_MODE=internal." + "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required for " + "internal Luna auth." ) return "internal" @@ -414,26 +409,12 @@ async def invoke( try: client = await self._get_client() - try: - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - ) - except (httpx.ConnectError, httpx.RemoteProtocolError) as exc: - # These errors occur before a response is received, typically when - # the server closed a pooled connection. The scorer invoke request - # is idempotent, so a single retry on a fresh connection is safe. - logger.warning( - "[GalileoLunaClient] Retrying once after connection error: %s", exc - ) - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - ) + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + ) response.raise_for_status() response_data = response.json() if not isinstance(response_data, dict): diff --git a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py index ab304a74..e1518eec 100644 --- a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py +++ b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py @@ -470,7 +470,8 @@ def test_client_uses_explicit_public_mode_when_both_credentials_are_present(monk monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "public") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - client = GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient() assert client.auth_mode == "public" endpoint, request_headers = client._endpoint_and_headers(None) @@ -485,7 +486,8 @@ def test_client_uses_explicit_internal_mode_when_both_credentials_are_present(mo monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "internal") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - client = GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + client = GalileoLunaClient() assert client.auth_mode == "internal" endpoint, request_headers = client._endpoint_and_headers(None) @@ -501,8 +503,9 @@ def test_client_rejects_mode_without_matching_credential(monkeypatch): monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "internal") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="GALILEO_API_SECRET_KEY"): - GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + with pytest.raises(ValueError, match="GALILEO_API_SECRET_KEY"): + GalileoLunaClient() def test_client_rejects_invalid_auth_mode(monkeypatch): @@ -511,8 +514,9 @@ def test_client_rejects_invalid_auth_mode(monkeypatch): monkeypatch.setenv("GALILEO_LUNA_AUTH_MODE", "sideways") from agent_control_evaluator_galileo.luna.client import GalileoLunaClient - with pytest.raises(ValueError, match="GALILEO_LUNA_AUTH_MODE"): - GalileoLunaClient() + with pytest.warns(DeprecationWarning, match="GALILEO_LUNA_AUTH_MODE is deprecated"): + with pytest.raises(ValueError, match="GALILEO_LUNA_AUTH_MODE"): + GalileoLunaClient() class TestDeriveApiUrl: diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 1dc90c8d..f61020e1 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -305,38 +305,6 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: assert isinstance(limits, httpx.Limits) assert limits.keepalive_expiry == DEFAULT_KEEPALIVE_EXPIRY_SECS - @pytest.mark.asyncio - async def test_client_retries_once_after_server_disconnect(self) -> None: - from agent_control_evaluator_galileo.luna import GalileoLunaClient - - calls = {"count": 0} - - def handler(request: httpx.Request) -> httpx.Response: - calls["count"] += 1 - if calls["count"] == 1: - raise httpx.RemoteProtocolError( - "Server disconnected without sending a response.", request=request - ) - return httpx.Response( - 200, - json={"scorer_label": "toxicity", "score": 0.1, "status": "success"}, - ) - - # Given: a transport whose first attempt fails like a stale keepalive socket - with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): - client = GalileoLunaClient(console_url="https://console.example.com") - client._client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) - - try: - # When: invoking a scorer - response = await client.invoke(scorer_label="toxicity", output="hello") - finally: - await client.close() - - # Then: the request is retried once and succeeds - assert calls["count"] == 2 - assert response.score == 0.1 - @pytest.mark.asyncio async def test_client_posts_to_scorers_invoke_without_protect_fields(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index 9b078021..8fc1528b 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -57,8 +57,7 @@ scalar as the scorer `output` field. If a selector returns structured data with `GALILEO_LUNA_PAYLOAD_FIELD`. Setting both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY`/`GALILEO_API_SECRET` -is an error; unset one so the auth mode can be inferred. `GALILEO_LUNA_AUTH_MODE` -is deprecated and only honored as a legacy override. +is an error; unset one so the auth mode can be inferred. Run: diff --git a/examples/galileo_luna/demo_agent.py b/examples/galileo_luna/demo_agent.py index 8c7f59b2..1c9b4ab4 100644 --- a/examples/galileo_luna/demo_agent.py +++ b/examples/galileo_luna/demo_agent.py @@ -21,7 +21,6 @@ AGENT_NAME = "galileo-luna-agent" SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") -LUNA_AUTH_MODE = os.getenv("GALILEO_LUNA_AUTH_MODE") logging.basicConfig( level=logging.INFO, @@ -100,28 +99,20 @@ async def run_demo() -> None: "internal mode." ) return - if api_key and api_secret and LUNA_AUTH_MODE not in {"public", "internal"}: + if api_key and api_secret: print( "Both GALILEO_API_KEY and GALILEO_API_SECRET_KEY/GALILEO_API_SECRET are set. " - "Set GALILEO_LUNA_AUTH_MODE to 'public' or 'internal'." - ) - return - if LUNA_AUTH_MODE == "public" and not api_key: - print("GALILEO_API_KEY is required when GALILEO_LUNA_AUTH_MODE=public.") - return - if LUNA_AUTH_MODE == "internal" and not api_secret: - print( - "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required when " - "GALILEO_LUNA_AUTH_MODE=internal." + "Unset one so the auth mode can be inferred." ) return + auth_mode = "public" if api_key else "internal" print("=" * 72) print("Direct Galileo Luna Evaluator Demo") print("=" * 72) print(f"Server: {SERVER_URL}") print(f"Agent: {AGENT_NAME}") - print(f"Auth: GALILEO_LUNA_AUTH_MODE={LUNA_AUTH_MODE or '(auto if one credential)'}") + print(f"Auth: {auth_mode}") print() init_agent() diff --git a/examples/galileo_luna/setup_controls.py b/examples/galileo_luna/setup_controls.py index fb4c6c76..b4534c58 100644 --- a/examples/galileo_luna/setup_controls.py +++ b/examples/galileo_luna/setup_controls.py @@ -4,8 +4,8 @@ Prerequisites: - Agent Control server running at AGENT_CONTROL_URL, default http://localhost:8000 - Galileo credentials set where demo_agent.py will run: - GALILEO_API_KEY with GALILEO_LUNA_AUTH_MODE=public, or - GALILEO_API_SECRET_KEY/GALILEO_API_SECRET with GALILEO_LUNA_AUTH_MODE=internal + GALILEO_API_KEY for public auth, or + GALILEO_API_SECRET_KEY/GALILEO_API_SECRET for internal auth Usage: uv run python setup_controls.py @@ -29,7 +29,6 @@ LUNA_SCORER_VERSION_ID = os.getenv("GALILEO_LUNA_SCORER_VERSION_ID") LUNA_THRESHOLD = float(os.getenv("GALILEO_LUNA_THRESHOLD", "0.5")) LUNA_PAYLOAD_FIELD = os.getenv("GALILEO_LUNA_PAYLOAD_FIELD", "output") -LUNA_AUTH_MODE = os.getenv("GALILEO_LUNA_AUTH_MODE") if LUNA_PAYLOAD_FIELD not in {"input", "output"}: raise ValueError("GALILEO_LUNA_PAYLOAD_FIELD must be either 'input' or 'output'.") @@ -175,7 +174,7 @@ async def setup_demo() -> None: f"threshold={LUNA_THRESHOLD}, " f"payload_field={LUNA_PAYLOAD_FIELD!r}" ) - print(f"Auth: GALILEO_LUNA_AUTH_MODE={LUNA_AUTH_MODE or '(auto if one credential)'}") + print("Auth: inferred from the single configured Galileo credential") async with AgentControlClient(base_url=SERVER_URL, timeout=30.0) as client: await client.health_check() From 454acdf57621767db6d1d69a78fb1e97ffe8fc29 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 18:03:23 +0530 Subject: [PATCH 04/18] fix(evaluators): keep Luna API URL override generic --- .../luna/client.py | 17 ++++------- .../galileo/tests/test_luna_evaluator.py | 28 ++++--------------- examples/galileo_luna/README.md | 8 +++--- examples/galileo_luna/demo_agent.py | 4 +-- examples/galileo_luna/setup_controls.py | 2 +- 5 files changed, 18 insertions(+), 41 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index adb408fe..2c34d8c3 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -178,10 +178,9 @@ class GalileoLunaClient: """Thin HTTP client for Galileo Luna direct scorer invocation. Environment Variables: - GALILEO_API_SECRET_KEY or GALILEO_API_SECRET: Galileo API internal JWT signing secret. + GALILEO_API_SECRET_KEY: Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. - GALILEO_API_CLUSTER_URL: Internal Galileo API URL used in internal auth mode. GALILEO_API_URL: Galileo API URL fallback. GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for deployments whose API serves an internally-issued TLS certificate. @@ -202,12 +201,11 @@ def __init__( Args: api_key: Galileo API key. If not provided, reads from GALILEO_API_KEY. api_secret: Galileo API secret for internal JWT auth. If not provided, - reads from GALILEO_API_SECRET_KEY or GALILEO_API_SECRET. + reads from GALILEO_API_SECRET_KEY. console_url: Galileo Console URL. If not provided, reads from GALILEO_CONSOLE_URL or uses the production console URL. api_url: Galileo API URL. If not provided, reads from GALILEO_LUNA_API_URL, - then GALILEO_API_CLUSTER_URL in internal auth mode, then GALILEO_API_URL, - before deriving from the console URL. + then GALILEO_API_URL, before deriving from the console URL. auth_mode: Auth mode to use. If not provided, inferred from the single available credential. ca_file: CA bundle path used to verify the scorer API endpoint. If not @@ -234,17 +232,15 @@ def __init__( self.console_url = ( console_url or os.getenv("GALILEO_CONSOLE_URL") or "https://console.galileo.ai" ) - self.api_base = self._resolve_api_base(api_url, resolved_auth_mode) + self.api_base = self._resolve_api_base(api_url) self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None self._ssl_context = self._load_ssl_context(self.ca_file) self._client: httpx.AsyncClient | None = None logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) - def _resolve_api_base(self, api_url: str | None, auth_mode: AuthMode) -> str: + def _resolve_api_base(self, api_url: str | None) -> str: """Resolve the scorer invoke API base URL from explicit and environment config.""" candidates = [api_url, os.getenv("GALILEO_LUNA_API_URL")] - if auth_mode == "internal": - candidates.append(os.getenv("GALILEO_API_CLUSTER_URL")) candidates.append(os.getenv("GALILEO_API_URL")) for candidate in candidates: @@ -277,8 +273,7 @@ def _resolve_auth_mode( if auth_mode == "internal": if not api_secret: raise ValueError( - "GALILEO_API_SECRET_KEY or GALILEO_API_SECRET is required for " - "internal Luna auth." + "GALILEO_API_SECRET_KEY is required for internal Luna auth." ) return "internal" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index f61020e1..7aab57d6 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -176,41 +176,23 @@ def test_client_uses_luna_api_url_when_set(self) -> None: # Then: the Luna-specific URL wins without changing the general API URL contract assert client.api_base == "https://luna-api.example.com" - def test_client_uses_cluster_url_for_internal_auth(self) -> None: + def test_client_uses_luna_api_url_for_internal_auth(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient - # Given: internal auth and both cluster and public API URLs are configured + # Given: internal auth and both Luna-specific and general API URLs are configured with patch.dict( os.environ, { "GALILEO_API_SECRET_KEY": "test-secret", - "GALILEO_API_CLUSTER_URL": "https://api.default.svc.cluster.local:8088", + "GALILEO_LUNA_API_URL": "https://internal-api.example.com", "GALILEO_API_URL": "https://api-public.example.com", }, clear=True, ): client = GalileoLunaClient(console_url="https://console.example.com") - # Then: internal scorer invocation uses the cluster-local API base - assert client.api_base == "https://api.default.svc.cluster.local:8088" - - def test_client_ignores_cluster_url_for_public_auth(self) -> None: - from agent_control_evaluator_galileo.luna import GalileoLunaClient - - # Given: public auth with a cluster URL present in the environment - with patch.dict( - os.environ, - { - "GALILEO_API_KEY": "test-key", - "GALILEO_API_CLUSTER_URL": "https://api.default.svc.cluster.local:8088", - "GALILEO_API_URL": "https://api-public.example.com", - }, - clear=True, - ): - client = GalileoLunaClient(console_url="https://console.example.com") - - # Then: public scorer invocation still uses the public API URL - assert client.api_base == "https://api-public.example.com" + # Then: internal scorer invocation uses the Luna-specific API base + assert client.api_base == "https://internal-api.example.com" def test_client_derives_api_url_from_console_dash_hostname(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index 8fc1528b..de68d3f5 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -32,11 +32,11 @@ TLS certificate, provide the CA bundle: ```bash export GALILEO_API_SECRET_KEY="your-api-secret" -export GALILEO_API_CLUSTER_URL="https://api.default.svc.cluster.local:8088" +export GALILEO_LUNA_API_URL="https://api.default.svc.cluster.local:8088" export GALILEO_LUNA_CA_FILE="/etc/ssl/internal/ca.crt" ``` -`GALILEO_LUNA_API_URL` overrides the scorer API URL in either mode, and +`GALILEO_LUNA_API_URL` overrides the scorer API URL in either mode. `GALILEO_API_URL` remains the public API URL fallback. Optional scorer settings: @@ -56,8 +56,8 @@ scalar as the scorer `output` field. If a selector returns structured data with `input` and/or `output` keys, those keys are sent directly and override `GALILEO_LUNA_PAYLOAD_FIELD`. -Setting both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY`/`GALILEO_API_SECRET` -is an error; unset one so the auth mode can be inferred. +Setting both `GALILEO_API_KEY` and `GALILEO_API_SECRET_KEY` is an error; unset +one so the auth mode can be inferred. Run: diff --git a/examples/galileo_luna/demo_agent.py b/examples/galileo_luna/demo_agent.py index 1c9b4ab4..6ae1f5bb 100644 --- a/examples/galileo_luna/demo_agent.py +++ b/examples/galileo_luna/demo_agent.py @@ -91,7 +91,7 @@ def init_agent() -> None: async def run_demo() -> None: """Run scripted scenarios.""" api_key = os.getenv("GALILEO_API_KEY") - api_secret = os.getenv("GALILEO_API_SECRET_KEY") or os.getenv("GALILEO_API_SECRET") + api_secret = os.getenv("GALILEO_API_SECRET_KEY") if not api_key and not api_secret: print( "Galileo credentials are required for the galileo.luna evaluator. " @@ -101,7 +101,7 @@ async def run_demo() -> None: return if api_key and api_secret: print( - "Both GALILEO_API_KEY and GALILEO_API_SECRET_KEY/GALILEO_API_SECRET are set. " + "Both GALILEO_API_KEY and GALILEO_API_SECRET_KEY are set. " "Unset one so the auth mode can be inferred." ) return diff --git a/examples/galileo_luna/setup_controls.py b/examples/galileo_luna/setup_controls.py index b4534c58..97eb4c13 100644 --- a/examples/galileo_luna/setup_controls.py +++ b/examples/galileo_luna/setup_controls.py @@ -5,7 +5,7 @@ - Agent Control server running at AGENT_CONTROL_URL, default http://localhost:8000 - Galileo credentials set where demo_agent.py will run: GALILEO_API_KEY for public auth, or - GALILEO_API_SECRET_KEY/GALILEO_API_SECRET for internal auth + GALILEO_API_SECRET_KEY for internal auth Usage: uv run python setup_controls.py From b16fddf98edcd0a4cf0f7a0d2d0378cca09d2a70 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 18:12:43 +0530 Subject: [PATCH 05/18] docs(examples): clarify Luna evaluator configuration --- examples/galileo_luna/README.md | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index de68d3f5..20df4005 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -17,27 +17,40 @@ Start the Agent Control server from the repo root: make server-run ``` -Configure exactly one Galileo credential. With an API key, the evaluator uses -public API-key auth and calls the public scorer API: +Configure exactly one Galileo credential. + +For most OSS users, only an API key is required. This uses public API-key auth +and calls the public scorer API: ```bash export GALILEO_API_KEY="your-api-key" export GALILEO_CONSOLE_URL="https://console.demo-v2.galileocloud.io" ``` -With the Galileo API internal secret, it uses internal auth and calls the -internal scorer API. In-cluster deployments should point it at the -cluster-local API endpoint and, when that endpoint serves an internally-issued -TLS certificate, provide the CA bundle: +`GALILEO_CONSOLE_URL` is optional when using the production console URL. +`GALILEO_LUNA_API_URL` is not required for this path. The client uses +`GALILEO_API_URL` when set, otherwise it derives the API URL from +`GALILEO_CONSOLE_URL`. + +For deployments that use service-to-service internal auth, provide the API +internal secret instead of an API key: ```bash export GALILEO_API_SECRET_KEY="your-api-secret" +``` + +Deployment tooling may also set a custom scorer API endpoint and CA bundle. Use +these only when the scorer API is not reachable through the default public API +URL derivation, or when the endpoint uses a private CA: + +```bash export GALILEO_LUNA_API_URL="https://api.default.svc.cluster.local:8088" export GALILEO_LUNA_CA_FILE="/etc/ssl/internal/ca.crt" ``` -`GALILEO_LUNA_API_URL` overrides the scorer API URL in either mode. -`GALILEO_API_URL` remains the public API URL fallback. +`GALILEO_LUNA_API_URL` overrides the scorer API URL in either auth mode. +`GALILEO_LUNA_CA_FILE` is only needed for endpoints that are not trusted by the +system CA store. Optional scorer settings: From 6842669b406e71ba747b850094d0141b28d8c498 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Thu, 11 Jun 2026 18:15:11 +0530 Subject: [PATCH 06/18] docs(examples): distinguish deployment-injected Luna secret --- .../agent_control_evaluator_galileo/luna/client.py | 6 +++--- examples/galileo_luna/README.md | 12 +++++++----- examples/galileo_luna/demo_agent.py | 4 ++-- examples/galileo_luna/setup_controls.py | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 2c34d8c3..4c7d4162 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -178,7 +178,7 @@ class GalileoLunaClient: """Thin HTTP client for Galileo Luna direct scorer invocation. Environment Variables: - GALILEO_API_SECRET_KEY: Galileo API internal JWT signing secret. + GALILEO_API_SECRET_KEY: Deployment-provided Galileo API internal JWT signing secret. GALILEO_API_KEY: Galileo API key fallback for public scorer invocation. GALILEO_LUNA_API_URL: Galileo Luna scorer invoke API URL override. GALILEO_API_URL: Galileo API URL fallback. @@ -200,8 +200,8 @@ def __init__( Args: api_key: Galileo API key. If not provided, reads from GALILEO_API_KEY. - api_secret: Galileo API secret for internal JWT auth. If not provided, - reads from GALILEO_API_SECRET_KEY. + api_secret: Deployment-provided Galileo API secret for internal JWT auth. + If not provided, reads from GALILEO_API_SECRET_KEY. console_url: Galileo Console URL. If not provided, reads from GALILEO_CONSOLE_URL or uses the production console URL. api_url: Galileo API URL. If not provided, reads from GALILEO_LUNA_API_URL, diff --git a/examples/galileo_luna/README.md b/examples/galileo_luna/README.md index 20df4005..b81b034f 100644 --- a/examples/galileo_luna/README.md +++ b/examples/galileo_luna/README.md @@ -32,16 +32,18 @@ export GALILEO_CONSOLE_URL="https://console.demo-v2.galileocloud.io" `GALILEO_API_URL` when set, otherwise it derives the API URL from `GALILEO_CONSOLE_URL`. -For deployments that use service-to-service internal auth, provide the API -internal secret instead of an API key: +For deployments that use service-to-service internal auth, the deployment +environment should inject the API internal secret instead of an API key: ```bash +# Set by deployment tooling, not by normal OSS users. export GALILEO_API_SECRET_KEY="your-api-secret" ``` -Deployment tooling may also set a custom scorer API endpoint and CA bundle. Use -these only when the scorer API is not reachable through the default public API -URL derivation, or when the endpoint uses a private CA: +OSS users do not need to set `GALILEO_API_SECRET_KEY` manually for the public +API-key path. Deployment tooling may also set a custom scorer API endpoint and +CA bundle. Use these only when the scorer API is not reachable through the +default public API URL derivation, or when the endpoint uses a private CA: ```bash export GALILEO_LUNA_API_URL="https://api.default.svc.cluster.local:8088" diff --git a/examples/galileo_luna/demo_agent.py b/examples/galileo_luna/demo_agent.py index 6ae1f5bb..0b6a0f8a 100644 --- a/examples/galileo_luna/demo_agent.py +++ b/examples/galileo_luna/demo_agent.py @@ -95,8 +95,8 @@ async def run_demo() -> None: if not api_key and not api_secret: print( "Galileo credentials are required for the galileo.luna evaluator. " - "Set GALILEO_API_KEY for public mode or GALILEO_API_SECRET_KEY for " - "internal mode." + "Set GALILEO_API_KEY for public mode. Deployments using internal " + "mode should inject GALILEO_API_SECRET_KEY." ) return if api_key and api_secret: diff --git a/examples/galileo_luna/setup_controls.py b/examples/galileo_luna/setup_controls.py index 97eb4c13..fe1434c8 100644 --- a/examples/galileo_luna/setup_controls.py +++ b/examples/galileo_luna/setup_controls.py @@ -5,7 +5,7 @@ - Agent Control server running at AGENT_CONTROL_URL, default http://localhost:8000 - Galileo credentials set where demo_agent.py will run: GALILEO_API_KEY for public auth, or - GALILEO_API_SECRET_KEY for internal auth + deployment-injected GALILEO_API_SECRET_KEY for internal auth Usage: uv run python setup_controls.py From 5ee5fc31ff2ebc37127444eb540c029bb42f9075 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Mon, 15 Jun 2026 19:14:22 +0530 Subject: [PATCH 07/18] fix(evaluators): log deprecated Luna auth mode --- .../src/agent_control_evaluator_galileo/luna/client.py | 8 ++++---- evaluators/contrib/galileo/tests/test_luna_evaluator.py | 8 ++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 4c7d4162..252e519b 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -63,13 +63,13 @@ def _env_auth_mode() -> AuthMode | None: value = os.getenv("GALILEO_LUNA_AUTH_MODE") if value is None or value.strip() == "": return None - warnings.warn( + deprecation_message = ( "GALILEO_LUNA_AUTH_MODE is deprecated. Configure exactly one credential " "(GALILEO_API_KEY for public auth, GALILEO_API_SECRET_KEY for internal " - "auth) or pass auth_mode to GalileoLunaClient.", - DeprecationWarning, - stacklevel=2, + "auth) or pass auth_mode to GalileoLunaClient." ) + warnings.warn(deprecation_message, DeprecationWarning, stacklevel=2) + logger.warning(deprecation_message) normalized = value.strip().lower() if normalized == "public": return "public" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 7aab57d6..3280029c 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import logging import os from base64 import urlsafe_b64decode from unittest.mock import AsyncMock, patch @@ -221,10 +222,13 @@ def test_client_strips_whitespace_from_env_url(self) -> None: # Then: the resolved base URL is trimmed and slash-free assert client.api_base == "https://luna-api.example.com" - def test_client_warns_when_deprecated_auth_mode_env_is_set(self) -> None: + def test_client_warns_when_deprecated_auth_mode_env_is_set( + self, caplog: pytest.LogCaptureFixture + ) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient # Given: the deprecated auth-mode environment variable + caplog.set_level(logging.WARNING) with patch.dict( os.environ, {"GALILEO_API_KEY": "test-key", "GALILEO_LUNA_AUTH_MODE": "public"}, @@ -235,6 +239,7 @@ def test_client_warns_when_deprecated_auth_mode_env_is_set(self) -> None: client = GalileoLunaClient(console_url="https://console.example.com") assert client.auth_mode == "public" + assert "GALILEO_LUNA_AUTH_MODE is deprecated" in caplog.text def test_client_rejects_unreadable_ca_bundle(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient @@ -255,7 +260,6 @@ def test_client_rejects_unreadable_ca_bundle(self) -> None: @pytest.mark.asyncio async def test_client_applies_ca_bundle_and_connection_limits(self) -> None: import certifi - from agent_control_evaluator_galileo.luna import GalileoLunaClient from agent_control_evaluator_galileo.luna.client import DEFAULT_KEEPALIVE_EXPIRY_SECS From 739a6070986a66ad31bb22e7be41558fd09d4402 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Mon, 15 Jun 2026 21:01:55 +0530 Subject: [PATCH 08/18] test(evaluators): use neutral Luna URL fixtures --- .../contrib/galileo/tests/test_luna_evaluator.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 3280029c..7332d982 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -145,19 +145,19 @@ def test_client_uses_protect_api_url_derivation(self) -> None: def test_client_uses_galileo_api_url_when_set(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient - # Given: an explicit devstack API URL + # Given: an explicit custom-environment API URL with patch.dict( os.environ, { "GALILEO_API_KEY": "test-key", - "GALILEO_API_URL": "https://api-test-luna.gcp-dev.galileo.ai/", + "GALILEO_API_URL": "https://api-test-luna.example.com/", }, clear=True, ): - client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") + client = GalileoLunaClient(console_url="https://console-test-luna.example.com") # Then: the explicit API URL wins over console URL derivation - assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + assert client.api_base == "https://api-test-luna.example.com" def test_client_uses_luna_api_url_when_set(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient @@ -198,12 +198,12 @@ def test_client_uses_luna_api_url_for_internal_auth(self) -> None: def test_client_derives_api_url_from_console_dash_hostname(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient - # Given: a console- devstack hostname + # Given: a console- hostname with patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}, clear=True): - client = GalileoLunaClient(console_url="https://console-test-luna.gcp-dev.galileo.ai") + client = GalileoLunaClient(console_url="https://console-test-luna.example.com") - # Then: the matching api- hostname is used - assert client.api_base == "https://api-test-luna.gcp-dev.galileo.ai" + # Then: the matching api- hostname is used + assert client.api_base == "https://api-test-luna.example.com" def test_client_strips_whitespace_from_env_url(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient From 44185bb24805f9fa42c9df324dfdc53c049d47c6 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Mon, 15 Jun 2026 21:31:23 +0530 Subject: [PATCH 09/18] test(server): exercise agent route registration --- server/tests/test_init_agent.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/tests/test_init_agent.py b/server/tests/test_init_agent.py index 2dfe9eaa..c5a144ea 100644 --- a/server/tests/test_init_agent.py +++ b/server/tests/test_init_agent.py @@ -4,7 +4,6 @@ from typing import Any import pytest -from fastapi import FastAPI from fastapi.testclient import TestClient from sqlalchemy import create_engine, select, text from sqlalchemy.orm import Session @@ -46,14 +45,17 @@ def make_agent_payload( } -def test_init_agent_route_exists(app: FastAPI) -> None: - # Given: an application router - paths = {getattr(route, "path", None) for route in app.router.routes} - # When: inspecting registered paths - # (computation done above to gather all paths) - # Then: initAgent and agent retrieval endpoints are present - assert "/api/v1/agents/initAgent" in paths - assert "/api/v1/agents/{agent_name}" in paths +def test_agent_routes_are_registered(client: TestClient) -> None: + # Given: malformed requests that should still match registered agent routes + # When: calling initAgent without the required body fields + init_resp = client.post("/api/v1/agents/initAgent", json={}) + + # And: using an unsupported method on the agent resource route + agent_resp = client.post("/api/v1/agents/some-agent") + + # Then: routing reached the expected endpoints instead of falling through to 404 + assert init_resp.status_code == 422 + assert agent_resp.status_code == 405 def test_init_agent_creates_and_gets_agent(client: TestClient) -> None: From 6f781e947a2bd28931abbc01b4f200414cfb4450 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Fri, 12 Jun 2026 18:05:17 +0530 Subject: [PATCH 10/18] feat(evaluator-galileo): add Luna HTTP connection tuning --- .../luna/client.py | 69 ++++++++++++++- .../galileo/tests/test_luna_evaluator.py | 84 ++++++++++++++++++- 2 files changed, 149 insertions(+), 4 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 252e519b..e43cfb1f 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -27,6 +27,9 @@ DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 DEFAULT_MAX_CONNECTIONS = 100 DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" +LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" +LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] @@ -78,6 +81,51 @@ def _env_auth_mode() -> AuthMode | None: raise ValueError("GALILEO_LUNA_AUTH_MODE must be either 'public' or 'internal'.") +def _load_float_env(env_name: str, default: float) -> float: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return float(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not a number.") from exc + + +def _load_int_env(env_name: str, default: int) -> int: + raw = os.getenv(env_name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{env_name}={raw!r} is not an integer.") from exc + + +def _validate_connection_config( + *, + keepalive_expiry_seconds: float, + max_connections: int, + max_keepalive_connections: int, +) -> None: + if keepalive_expiry_seconds < 0: + raise ValueError( + f"{LUNA_KEEPALIVE_EXPIRY_ENV}={keepalive_expiry_seconds} " + "must be greater than or equal to 0." + ) + if max_connections <= 0: + raise ValueError(f"{LUNA_MAX_CONNECTIONS_ENV}={max_connections} must be greater than 0.") + if max_keepalive_connections < 0: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + "must be greater than or equal to 0." + ) + if max_keepalive_connections > max_connections: + raise ValueError( + f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " + f"must be less than or equal to {LUNA_MAX_CONNECTIONS_ENV}={max_connections}." + ) + + def _as_float_or_none(value: JSONValue) -> float | None: if isinstance(value, bool) or value is None: return None @@ -184,6 +232,9 @@ class GalileoLunaClient: GALILEO_API_URL: Galileo API URL fallback. GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for deployments whose API serves an internally-issued TLS certificate. + GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. + GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. + GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -235,6 +286,18 @@ def __init__( self.api_base = self._resolve_api_base(api_url) self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None self._ssl_context = self._load_ssl_context(self.ca_file) + self.keepalive_expiry_seconds = _load_float_env( + LUNA_KEEPALIVE_EXPIRY_ENV, DEFAULT_KEEPALIVE_EXPIRY_SECS + ) + self.max_connections = _load_int_env(LUNA_MAX_CONNECTIONS_ENV, DEFAULT_MAX_CONNECTIONS) + self.max_keepalive_connections = _load_int_env( + LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV, DEFAULT_MAX_KEEPALIVE_CONNECTIONS + ) + _validate_connection_config( + keepalive_expiry_seconds=self.keepalive_expiry_seconds, + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + ) self._client: httpx.AsyncClient | None = None logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) @@ -329,9 +392,9 @@ async def _get_client(self) -> httpx.AsyncClient: headers=headers, timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), limits=httpx.Limits( - max_connections=DEFAULT_MAX_CONNECTIONS, - max_keepalive_connections=DEFAULT_MAX_KEEPALIVE_CONNECTIONS, - keepalive_expiry=DEFAULT_KEEPALIVE_EXPIRY_SECS, + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry_seconds, ), verify=verify, ) diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 7332d982..192ec4d7 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -261,7 +261,11 @@ def test_client_rejects_unreadable_ca_bundle(self) -> None: async def test_client_applies_ca_bundle_and_connection_limits(self) -> None: import certifi from agent_control_evaluator_galileo.luna import GalileoLunaClient - from agent_control_evaluator_galileo.luna.client import DEFAULT_KEEPALIVE_EXPIRY_SECS + from agent_control_evaluator_galileo.luna.client import ( + DEFAULT_KEEPALIVE_EXPIRY_SECS, + DEFAULT_MAX_CONNECTIONS, + DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + ) captured: dict[str, object] = {} real_async_client = httpx.AsyncClient @@ -290,6 +294,84 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: limits = captured["limits"] assert isinstance(limits, httpx.Limits) assert limits.keepalive_expiry == DEFAULT_KEEPALIVE_EXPIRY_SECS + assert limits.max_connections == DEFAULT_MAX_CONNECTIONS + assert limits.max_keepalive_connections == DEFAULT_MAX_KEEPALIVE_CONNECTIONS + + @pytest.mark.asyncio + async def test_client_applies_connection_tuning_env(self) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + captured: dict[str, object] = {} + real_async_client = httpx.AsyncClient + + def recording_client(**kwargs: object) -> httpx.AsyncClient: + captured.update(kwargs) + return real_async_client(**kwargs) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "0.25", + "GALILEO_LUNA_MAX_CONNECTIONS": "17", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "4", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch( + "agent_control_evaluator_galileo.luna.client.httpx.AsyncClient", recording_client + ): + try: + await client._get_client() + finally: + await client.close() + + assert client.keepalive_expiry_seconds == 0.25 + assert client.max_connections == 17 + assert client.max_keepalive_connections == 4 + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.keepalive_expiry == 0.25 + assert limits.max_connections == 17 + assert limits.max_keepalive_connections == 4 + + @pytest.mark.parametrize( + "env_values, expected", + [ + ({"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "soon"}, "not a number"), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "many"}, "not an integer"), + ({"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "some"}, "not an integer"), + ( + {"GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS": "-0.1"}, + "greater than or equal to 0", + ), + ({"GALILEO_LUNA_MAX_CONNECTIONS": "0"}, "greater than 0"), + ( + {"GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "-1"}, + "greater than or equal to 0", + ), + ( + { + "GALILEO_LUNA_MAX_CONNECTIONS": "2", + "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS": "3", + }, + "less than or equal", + ), + ], + ) + def test_client_reports_invalid_connection_tuning_env( + self, env_values: dict[str, str], expected: str + ) -> None: + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + env = {"GALILEO_API_SECRET_KEY": "test-secret"} | env_values + with patch.dict(os.environ, env, clear=True): + with pytest.raises(ValueError) as exc_info: + GalileoLunaClient(console_url="https://console.example.com") + + assert expected in str(exc_info.value) @pytest.mark.asyncio async def test_client_posts_to_scorers_invoke_without_protect_fields(self) -> None: From 387e26c6086609bf2aaa02597eb0b8ef930aa740 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sat, 13 Jun 2026 15:44:03 +0530 Subject: [PATCH 11/18] feat(engine): expose evaluation concurrency setting --- engine/src/agent_control_engine/core.py | 23 ++++++++++- engine/tests/test_core.py | 51 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index e2ae8b6e..9bff6680 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -27,11 +27,32 @@ logger = logging.getLogger(__name__) + +def _env_positive_int(*names: str, default: int) -> int: + """Read a positive integer from the first configured environment variable.""" + for name in names: + value = os.environ.get(name) + if value is None or value.strip() == "": + continue + try: + parsed = int(value) + except ValueError as exc: + raise RuntimeError(f"{name}={value!r} must be an integer.") from exc + if parsed < 1: + raise RuntimeError(f"{name}={value!r} must be greater than or equal to 1.") + return parsed + return default + + # Default timeout for evaluator execution (seconds) DEFAULT_EVALUATOR_TIMEOUT = float(os.environ.get("EVALUATOR_TIMEOUT_SECONDS", "30")) # Max concurrent evaluations (limits task spawning overhead for large policies) -MAX_CONCURRENT_EVALUATIONS = int(os.environ.get("MAX_CONCURRENT_EVALUATIONS", "3")) +MAX_CONCURRENT_EVALUATIONS = _env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, +) SELECTED_DATA_PREVIEW_MAX_CHARS = int( os.environ.get("AGENT_CONTROL_SELECTED_DATA_PREVIEW_MAX_CHARS", "500") diff --git a/engine/tests/test_core.py b/engine/tests/test_core.py index ed4e6e00..baa46bab 100644 --- a/engine/tests/test_core.py +++ b/engine/tests/test_core.py @@ -1280,6 +1280,57 @@ async def test_timeout_does_not_affect_fast_evaluators(self): class TestConcurrencyLimit: """Tests for semaphore-based concurrency limiting.""" + def test_max_concurrency_env_prefers_agent_control_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The canonical Agent Control env var overrides the legacy short name.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "7") + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "2") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 7 + ) + + def test_max_concurrency_env_reads_legacy_name( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The existing env var remains supported for compatibility.""" + import agent_control_engine.core as core_module + + monkeypatch.delenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", raising=False) + monkeypatch.setenv("MAX_CONCURRENT_EVALUATIONS", "5") + + assert ( + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + == 5 + ) + + def test_max_concurrency_env_rejects_non_positive_values( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The concurrency cap must always allow at least one evaluator.""" + import agent_control_engine.core as core_module + + monkeypatch.setenv("AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "0") + + with pytest.raises(RuntimeError, match="greater than or equal to 1"): + core_module._env_positive_int( + "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", + "MAX_CONCURRENT_EVALUATIONS", + default=3, + ) + @pytest.mark.asyncio async def test_concurrency_limited_to_max(self, monkeypatch: pytest.MonkeyPatch): """Test that concurrent evaluations are limited by semaphore. From c8218436d72e459c1e048a8a0aff1be5d1031655 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sun, 14 Jun 2026 07:35:27 +0530 Subject: [PATCH 12/18] feat: add Luna HTTP client pool setting --- .../luna/client.py | 89 +++++++++++++++---- .../galileo/tests/test_luna_evaluator.py | 43 +++++++++ 2 files changed, 113 insertions(+), 19 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index e43cfb1f..cc2cc1cb 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -6,6 +6,7 @@ import os import ssl import warnings +from asyncio import Lock from base64 import urlsafe_b64encode from hashlib import sha256 from hmac import new as hmac_new @@ -27,9 +28,11 @@ DEFAULT_KEEPALIVE_EXPIRY_SECS = 1.0 DEFAULT_MAX_CONNECTIONS = 100 DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +DEFAULT_CLIENT_POOL_SIZE = 1 LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" +LUNA_CLIENT_POOL_SIZE_ENV = "GALILEO_LUNA_CLIENT_POOL_SIZE" PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] @@ -106,6 +109,7 @@ def _validate_connection_config( keepalive_expiry_seconds: float, max_connections: int, max_keepalive_connections: int, + client_pool_size: int, ) -> None: if keepalive_expiry_seconds < 0: raise ValueError( @@ -124,6 +128,8 @@ def _validate_connection_config( f"{LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV}={max_keepalive_connections} " f"must be less than or equal to {LUNA_MAX_CONNECTIONS_ENV}={max_connections}." ) + if client_pool_size <= 0: + raise ValueError(f"{LUNA_CLIENT_POOL_SIZE_ENV}={client_pool_size} must be greater than 0.") def _as_float_or_none(value: JSONValue) -> float | None: @@ -235,6 +241,7 @@ class GalileoLunaClient: GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. + GALILEO_LUNA_CLIENT_POOL_SIZE: Number of outbound HTTP clients to rotate across. GALILEO_CONSOLE_URL: Galileo Console URL (optional, defaults to production). """ @@ -293,12 +300,19 @@ def __init__( self.max_keepalive_connections = _load_int_env( LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV, DEFAULT_MAX_KEEPALIVE_CONNECTIONS ) + self.client_pool_size = _load_int_env( + LUNA_CLIENT_POOL_SIZE_ENV, DEFAULT_CLIENT_POOL_SIZE + ) _validate_connection_config( keepalive_expiry_seconds=self.keepalive_expiry_seconds, max_connections=self.max_connections, max_keepalive_connections=self.max_keepalive_connections, + client_pool_size=self.client_pool_size, ) self._client: httpx.AsyncClient | None = None + self._clients: list[httpx.AsyncClient] = [] + self._next_client_index = 0 + self._client_lock = Lock() logger.info("[GalileoLunaClient] Auth mode selected: %s", self.auth_mode) def _resolve_api_base(self, api_url: str | None) -> str: @@ -379,26 +393,58 @@ def _derive_api_url(self, console_url: str) -> str: parts._replace(netloc=parts.netloc.replace(host, new_host, 1)) ) + def _create_client(self) -> httpx.AsyncClient: + """Create an HTTP client with the configured auth, TLS, and connection limits.""" + headers = {"Content-Type": "application/json"} + if self.auth_mode == "public" and self.api_key is not None: + headers["Galileo-API-Key"] = self.api_key + verify: ssl.SSLContext | bool = ( + self._ssl_context if self._ssl_context is not None else True + ) + return httpx.AsyncClient( + headers=headers, + timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), + limits=httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive_connections, + keepalive_expiry=self.keepalive_expiry_seconds, + ), + verify=verify, + ) + + def _select_pooled_client(self) -> httpx.AsyncClient: + """Select the next pooled client without awaiting on the hot path.""" + client = self._clients[self._next_client_index % len(self._clients)] + self._next_client_index = (self._next_client_index + 1) % len(self._clients) + return client + async def _get_client(self) -> httpx.AsyncClient: - """Get or create the HTTP client.""" - if self._client is None or self._client.is_closed: - headers = {"Content-Type": "application/json"} - if self.auth_mode == "public" and self.api_key is not None: - headers["Galileo-API-Key"] = self.api_key - verify: ssl.SSLContext | bool = ( - self._ssl_context if self._ssl_context is not None else True - ) - self._client = httpx.AsyncClient( - headers=headers, - timeout=httpx.Timeout(DEFAULT_TIMEOUT_SECS), - limits=httpx.Limits( - max_connections=self.max_connections, - max_keepalive_connections=self.max_keepalive_connections, - keepalive_expiry=self.keepalive_expiry_seconds, - ), - verify=verify, - ) - return self._client + """Get or create the next HTTP client.""" + if self._client is not None and not self._client.is_closed: + return self._client + if self._client is not None and self._client.is_closed: + self._client = None + + if ( + self.client_pool_size > 1 + and len(self._clients) == self.client_pool_size + and all(not client.is_closed for client in self._clients) + ): + return self._select_pooled_client() + + async with self._client_lock: + if self._client is not None and not self._client.is_closed: + return self._client + + self._clients = [client for client in self._clients if not client.is_closed] + while len(self._clients) < self.client_pool_size: + self._clients.append(self._create_client()) + + if self.client_pool_size == 1: + self._client = self._clients[0] + return self._client + + return self._select_pooled_client() def _endpoint_and_headers( self, @@ -497,6 +543,11 @@ async def close(self) -> None: if self._client is not None: await self._client.aclose() self._client = None + for client in self._clients: + if not client.is_closed: + await client.aclose() + self._clients = [] + self._next_client_index = 0 async def __aenter__(self) -> GalileoLunaClient: """Async context manager entry.""" diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index 192ec4d7..bbad0f85 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -337,6 +337,47 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: assert limits.max_connections == 17 assert limits.max_keepalive_connections == 4 + @pytest.mark.asyncio + async def test_client_pool_size_rotates_across_http_clients(self) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + class FakeAsyncClient: + def __init__(self, **kwargs: object) -> None: + self.kwargs = kwargs + self.is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + created: list[FakeAsyncClient] = [] + + def recording_client(**kwargs: object) -> FakeAsyncClient: + client = FakeAsyncClient(**kwargs) + created.append(client) + return client + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_CLIENT_POOL_SIZE": "3", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + + with patch.object(luna_client_module.httpx, "AsyncClient", recording_client): + try: + selected = [await client._get_client() for _ in range(5)] + finally: + await client.close() + + assert client.client_pool_size == 3 + assert len(created) == 3 + assert selected == [created[0], created[1], created[2], created[0], created[1]] + assert all(created_client.is_closed for created_client in created) + @pytest.mark.parametrize( "env_values, expected", [ @@ -359,6 +400,8 @@ def recording_client(**kwargs: object) -> httpx.AsyncClient: }, "less than or equal", ), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "many"}, "not an integer"), + ({"GALILEO_LUNA_CLIENT_POOL_SIZE": "0"}, "greater than 0"), ], ) def test_client_reports_invalid_connection_tuning_env( From fd1d77c76a7c375199fe565e1dba7b5a4e3e962d Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sun, 14 Jun 2026 17:24:20 +0530 Subject: [PATCH 13/18] fix: expose luna http error metadata --- .../luna/evaluator.py | 44 ++++++++++++++++--- .../galileo/tests/test_luna_evaluator.py | 38 ++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py index eff92f2a..1221cedb 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py @@ -8,6 +8,7 @@ from importlib.metadata import PackageNotFoundError, version from typing import Any +import httpx from agent_control_evaluators import Evaluator, EvaluatorMetadata, register_evaluator from agent_control_models import EvaluatorResult, JSONValue @@ -27,6 +28,7 @@ def _resolve_package_version() -> str: _PACKAGE_VERSION = _resolve_package_version() LUNA_AVAILABLE = True +_HTTP_ERROR_BODY_LIMIT = 500 def _coerce_payload_text(value: Any) -> str | None: @@ -74,6 +76,32 @@ def _confidence_from_score(score: JSONValue) -> float: return 1.0 +def _truncated_http_response_body(body: str) -> tuple[str, bool]: + if len(body) <= _HTTP_ERROR_BODY_LIMIT: + return body, False + return body[:_HTTP_ERROR_BODY_LIMIT], True + + +def _http_status_error_metadata(error: httpx.HTTPStatusError) -> dict[str, Any]: + metadata: dict[str, Any] = {} + + request = error.request + metadata["http_method"] = request.method + metadata["http_endpoint_path"] = request.url.path + + response = error.response + metadata["http_status_code"] = response.status_code + metadata["http_response_content_type"] = response.headers.get("content-type") + + body = response.text + if body: + metadata["http_response_body"], metadata["http_response_body_truncated"] = ( + _truncated_http_response_body(body) + ) + + return {key: value for key, value in metadata.items() if value is not None} + + @register_evaluator class LunaEvaluator(Evaluator[LunaEvaluatorConfig]): """Galileo Luna evaluator using the direct scorer invocation API.""" @@ -252,16 +280,20 @@ def _handle_error( error: Exception, ) -> EvaluatorResult: error_detail = str(error) + metadata: dict[str, Any] = { + "error_type": type(error).__name__, + "scorer_label": self.config.scorer_label, + "scorer_id": self.config.scorer_id, + "scorer_version_id": self.config.scorer_version_id, + } + if isinstance(error, httpx.HTTPStatusError): + metadata.update(_http_status_error_metadata(error)) + return EvaluatorResult( matched=False, confidence=0.0, message=f"Luna evaluation error: {error_detail}", - metadata={ - "error_type": type(error).__name__, - "scorer_label": self.config.scorer_label, - "scorer_id": self.config.scorer_id, - "scorer_version_id": self.config.scorer_version_id, - }, + metadata=metadata, error=error_detail, ) diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index bbad0f85..fe6c679f 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -729,3 +729,41 @@ async def test_evaluator_fail_open_sets_error(self) -> None: assert "error" not in result.metadata assert result.metadata["error_type"] == "RuntimeError" assert "fallback_action" not in result.metadata + + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) + @pytest.mark.asyncio + async def test_evaluator_error_metadata_includes_http_status_context(self) -> None: + from agent_control_evaluator_galileo.luna import LunaEvaluator + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + evaluator = LunaEvaluator.from_dict({"scorer_label": "toxicity", "threshold": 0.5}) + request = httpx.Request( + "POST", + "https://api.example.test/internal/scorers/invoke?token=secret", + ) + response = httpx.Response( + 503, + headers={"content-type": "application/json"}, + text='{"detail":"busy"}', + request=request, + ) + + with patch.object(GalileoLunaClient, "invoke", new_callable=AsyncMock) as mock_invoke: + mock_invoke.side_effect = httpx.HTTPStatusError( + "service unavailable", + request=request, + response=response, + ) + + result = await evaluator.evaluate("hello") + + assert result.matched is False + assert result.metadata is not None + assert result.metadata["error_type"] == "HTTPStatusError" + assert result.metadata["http_status_code"] == 503 + assert result.metadata["http_method"] == "POST" + assert result.metadata["http_endpoint_path"] == "/internal/scorers/invoke" + assert result.metadata["http_response_content_type"] == "application/json" + assert result.metadata["http_response_body"] == '{"detail":"busy"}' + assert result.metadata["http_response_body_truncated"] is False + assert "token=secret" not in str(result.metadata) From 9b234e468b8cd381d24caba3dffc0fc42e0a554b Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Mon, 15 Jun 2026 20:47:34 +0530 Subject: [PATCH 14/18] docs(engine): clarify concurrency env fallback --- engine/src/agent_control_engine/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index 9bff6680..b2cd81b3 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -47,7 +47,8 @@ def _env_positive_int(*names: str, default: int) -> int: # Default timeout for evaluator execution (seconds) DEFAULT_EVALUATOR_TIMEOUT = float(os.environ.get("EVALUATOR_TIMEOUT_SECONDS", "30")) -# Max concurrent evaluations (limits task spawning overhead for large policies) +# Max concurrent evaluations (limits task spawning overhead for large policies). +# Prefer the namespaced env var; MAX_CONCURRENT_EVALUATIONS is kept for compatibility. MAX_CONCURRENT_EVALUATIONS = _env_positive_int( "AGENT_CONTROL_MAX_CONCURRENT_EVALUATIONS", "MAX_CONCURRENT_EVALUATIONS", From 09fc7b70a3e7f4631ef05ad755788c019f3ff285 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sat, 13 Jun 2026 09:25:00 +0530 Subject: [PATCH 15/18] feat: add evaluation timing metrics --- engine/src/agent_control_engine/core.py | 172 +++++++++++++++--- engine/tests/test_core.py | 109 ++++++++++- .../endpoints/evaluation.py | 96 +++++++--- server/src/agent_control_server/metrics.py | 74 ++++++++ server/tests/test_metrics.py | 40 +++- 5 files changed, 441 insertions(+), 50 deletions(-) create mode 100644 server/src/agent_control_server/metrics.py diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index b2cd81b3..9354001e 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -7,6 +7,7 @@ import functools import logging import os +import time from collections.abc import Sequence from dataclasses import dataclass from typing import Any, Literal, Protocol @@ -193,6 +194,40 @@ def control(self) -> ControlDefinitionLike: """Runtime control payload used during evaluation.""" +EvaluatorObserverOutcome = Literal["success", "timeout", "error", "cancelled"] +ControlObserverOutcome = Literal["matched", "not_matched", "error", "cancelled"] + + +class EvaluationObserver(Protocol): + """Receives timing observations from control evaluation.""" + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record time spent waiting for evaluator concurrency.""" + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent executing an evaluator.""" + + def observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent evaluating a top-level control.""" + + @dataclass class _EvalTask: """Internal container for evaluation task context.""" @@ -228,15 +263,72 @@ def __init__( context: Literal["sdk", "server"] = "server", *, include_raw_selected_data: bool | None = None, + observer: EvaluationObserver | None = None, ): self.controls = controls self.context = context + self.observer = observer self.include_raw_selected_data = ( _env_flag("AGENT_CONTROL_INCLUDE_RAW_SELECTED_DATA") if include_raw_selected_data is None else include_raw_selected_data ) + def _observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record evaluator queue timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_evaluator_queue_duration( + evaluator_name=evaluator_name, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording queue time", exc_info=True) + + def _observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record evaluator execution timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_evaluator_duration( + evaluator_name=evaluator_name, + outcome=outcome, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording evaluator time", exc_info=True) + + def _observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record control evaluation timing without affecting evaluation.""" + if self.observer is None: + return + try: + self.observer.observe_control_duration( + action=action, + outcome=outcome, + duration_seconds=duration_seconds, + ) + except Exception: + logger.debug("Evaluation observer failed while recording control time", exc_info=True) + @staticmethod def _truncated_message(message: str | None) -> str | None: """Truncate long evaluator messages in condition traces.""" @@ -312,8 +404,16 @@ async def _evaluate_leaf( selector_path = selector.path or "*" data = select_data(request.step, selector_path) - try: - async with semaphore: + wait_started_at = time.perf_counter() + async with semaphore: + self._observe_evaluator_queue_duration( + evaluator_name=evaluator_spec.name, + duration_seconds=time.perf_counter() - wait_started_at, + ) + evaluator_started_at = time.perf_counter() + outcome: EvaluatorObserverOutcome = "success" + timeout = DEFAULT_EVALUATOR_TIMEOUT + try: evaluator = get_evaluator_instance(evaluator_spec) timeout = evaluator.get_timeout_seconds() if timeout <= 0: @@ -323,26 +423,37 @@ async def _evaluate_leaf( evaluator.evaluate(data), timeout=timeout, ) - except TimeoutError: - error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" - logger.warning( - "Evaluator timeout for control '%s' (evaluator: %s): %s", - item.name, - evaluator_spec.name, - error_msg, - exc_info=True, - ) - result = self._build_error_result(error_msg) - except Exception as e: - error_msg = self._format_exception(e) - logger.error( - "Evaluator error for control '%s' (evaluator: %s): %s", - item.name, - evaluator_spec.name, - error_msg, - exc_info=True, - ) - result = self._build_error_result(error_msg) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except TimeoutError: + outcome = "timeout" + error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" + logger.warning( + "Evaluator timeout for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + except Exception as e: + outcome = "error" + error_msg = self._format_exception(e) + logger.error( + "Evaluator error for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + finally: + self._observe_evaluator_duration( + evaluator_name=evaluator_spec.name, + outcome=outcome, + duration_seconds=time.perf_counter() - evaluator_started_at, + ) trace = { "type": "leaf", @@ -661,6 +772,9 @@ async def process(self, request: EvaluationRequest) -> EvaluationResponse: async def evaluate_control(eval_task: _EvalTask) -> None: """Evaluate a single control, respecting cancellation and timeout.""" + started_at = time.perf_counter() + action = eval_task.item.control.action.decision + outcome: ControlObserverOutcome = "cancelled" try: evaluation = await self._evaluate_condition( eval_task.item, @@ -669,6 +783,13 @@ async def evaluate_control(eval_task: _EvalTask) -> None: semaphore, ) eval_task.result = evaluation.result + outcome = ( + "error" + if eval_task.result.error + else "matched" + if eval_task.result.matched + else "not_matched" + ) if ( eval_task.result.matched @@ -688,6 +809,13 @@ async def evaluate_control(eval_task: _EvalTask) -> None: error_msg, message_prefix="Condition evaluation failed", ) + outcome = "error" + finally: + self._observe_control_duration( + action=action, + outcome=outcome, + duration_seconds=time.perf_counter() - started_at, + ) # Create and start all tasks for eval_task in eval_tasks: diff --git a/engine/tests/test_core.py b/engine/tests/test_core.py index baa46bab..1cdeb87c 100644 --- a/engine/tests/test_core.py +++ b/engine/tests/test_core.py @@ -7,7 +7,7 @@ """ import asyncio -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any import pytest @@ -172,6 +172,41 @@ class MockControlWithIdentity: control: ControlDefinition +@dataclass +class RecordingObserver: + """Captures engine timing observations for assertions.""" + + evaluator_queue_durations: list[tuple[str, float]] = field(default_factory=list) + evaluator_durations: list[tuple[str, str, float]] = field(default_factory=list) + control_durations: list[tuple[str, str, float]] = field(default_factory=list) + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + self.evaluator_queue_durations.append((evaluator_name, duration_seconds)) + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: str, + duration_seconds: float, + ) -> None: + self.evaluator_durations.append((evaluator_name, outcome, duration_seconds)) + + def observe_control_duration( + self, + *, + action: str, + outcome: str, + duration_seconds: float, + ) -> None: + self.control_durations.append((action, outcome, duration_seconds)) + + @pytest.fixture(autouse=True) def setup_test_evaluators(): """Register test evaluators and reset state before each test.""" @@ -1393,6 +1428,78 @@ async def evaluate(self, data: Any) -> EvaluatorResult: assert _max_concurrent <= 2, f"Expected max 2 concurrent, got {_max_concurrent}" +class TestEvaluationObserver: + """Tests for optional engine timing observations.""" + + @pytest.mark.asyncio + async def test_observer_records_evaluator_and_control_timings(self): + """Test that observer callbacks receive bounded timing labels.""" + controls = [ + make_control(1, "allow", "test-allow", action="observe", config_value="a"), + make_control(2, "deny", "test-deny", action="deny", config_value="d"), + ] + observer = RecordingObserver() + engine = ControlEngine(controls, observer=observer) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + await engine.process(request) + + assert {name for name, _ in observer.evaluator_queue_durations} == { + "test-allow", + "test-deny", + } + assert { + (name, outcome) + for name, outcome, _ in observer.evaluator_durations + } == { + ("test-allow", "success"), + ("test-deny", "success"), + } + assert { + (action, outcome) + for action, outcome, _ in observer.control_durations + } == { + ("observe", "not_matched"), + ("deny", "matched"), + } + assert all( + duration >= 0 + for _, duration in observer.evaluator_queue_durations + ) + assert all(duration >= 0 for _, _, duration in observer.evaluator_durations) + assert all(duration >= 0 for _, _, duration in observer.control_durations) + + @pytest.mark.asyncio + async def test_observer_errors_do_not_fail_evaluation(self): + """Test that observability failures do not affect control decisions.""" + + class RaisingObserver(RecordingObserver): + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: str, + duration_seconds: float, + ) -> None: + raise RuntimeError("metrics backend unavailable") + + controls = [make_control(1, "allow", "test-allow", action="observe")] + engine = ControlEngine(controls, observer=RaisingObserver()) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + result = await engine.process(request) + + assert result.is_safe is True + + # ============================================================================= # Test: Recursive Condition Trees # ============================================================================= diff --git a/server/src/agent_control_server/endpoints/evaluation.py b/server/src/agent_control_server/endpoints/evaluation.py index bc66381f..dcbd4272 100644 --- a/server/src/agent_control_server/endpoints/evaluation.py +++ b/server/src/agent_control_server/endpoints/evaluation.py @@ -2,6 +2,7 @@ import json from dataclasses import dataclass +from time import perf_counter from agent_control_engine.core import ControlEngine from agent_control_models import ( @@ -19,6 +20,7 @@ from ..db import get_async_db from ..errors import APIValidationError, NotFoundError from ..logging_utils import get_logger +from ..metrics import observe_evaluation_stage, prometheus_evaluation_observer from ..models import Agent from ..services.controls import ControlService @@ -164,37 +166,56 @@ async def evaluate( the observability ingestion endpoint. """ namespace_key = principal.namespace_key - - agent_result = await db.execute( - select(Agent).where( - Agent.name == request.agent_name, - Agent.namespace_key == namespace_key, + load_started_at = perf_counter() + try: + agent_result = await db.execute( + select(Agent).where( + Agent.name == request.agent_name, + Agent.namespace_key == namespace_key, + ) ) - ) - agent = agent_result.scalar_one_or_none() - if agent is None: - raise NotFoundError( - error_code=ErrorCode.AGENT_NOT_FOUND, - detail=f"Agent '{request.agent_name}' not found", - resource="Agent", - resource_id=request.agent_name, - hint="Register the agent via initAgent before evaluating.", + agent = agent_result.scalar_one_or_none() + if agent is None: + raise NotFoundError( + error_code=ErrorCode.AGENT_NOT_FOUND, + detail=f"Agent '{request.agent_name}' not found", + resource="Agent", + resource_id=request.agent_name, + hint="Register the agent via initAgent before evaluating.", + ) + + runtime_controls = await ControlService(db).list_runtime_controls_for_agent( + request.agent_name, + namespace_key=namespace_key, + target_type=request.target_type, + target_id=request.target_id, + allow_invalid_step_name_regex=True, ) - runtime_controls = await ControlService(db).list_runtime_controls_for_agent( - request.agent_name, - namespace_key=namespace_key, - target_type=request.target_type, - target_id=request.target_id, - allow_invalid_step_name_regex=True, + engine_controls = [ControlAdapter(c.id, c.name, c.control) for c in runtime_controls] + except Exception: + observe_evaluation_stage( + stage="load_controls", + outcome="error", + duration_seconds=perf_counter() - load_started_at, + ) + raise + observe_evaluation_stage( + stage="load_controls", + outcome="success", + duration_seconds=perf_counter() - load_started_at, ) - engine_controls = [ControlAdapter(c.id, c.name, c.control) for c in runtime_controls] - - engine = ControlEngine(engine_controls) + engine = ControlEngine(engine_controls, observer=prometheus_evaluation_observer) + engine_started_at = perf_counter() try: raw_response = await engine.process(request) except ValueError: + observe_evaluation_stage( + stage="engine", + outcome="validation_error", + duration_seconds=perf_counter() - engine_started_at, + ) _logger.exception("Evaluation failed due to invalid configuration or input") raise APIValidationError( error_code=ErrorCode.EVALUATION_FAILED, @@ -210,5 +231,32 @@ async def evaluate( ) ], ) + except Exception: + observe_evaluation_stage( + stage="engine", + outcome="error", + duration_seconds=perf_counter() - engine_started_at, + ) + raise + observe_evaluation_stage( + stage="engine", + outcome="success", + duration_seconds=perf_counter() - engine_started_at, + ) - return _sanitize_evaluation_response(raw_response) + sanitize_started_at = perf_counter() + try: + response = _sanitize_evaluation_response(raw_response) + except Exception: + observe_evaluation_stage( + stage="sanitize_response", + outcome="error", + duration_seconds=perf_counter() - sanitize_started_at, + ) + raise + observe_evaluation_stage( + stage="sanitize_response", + outcome="success", + duration_seconds=perf_counter() - sanitize_started_at, + ) + return response diff --git a/server/src/agent_control_server/metrics.py b/server/src/agent_control_server/metrics.py new file mode 100644 index 00000000..d0badf37 --- /dev/null +++ b/server/src/agent_control_server/metrics.py @@ -0,0 +1,74 @@ +"""Prometheus metrics owned by the Agent Control server.""" + +from agent_control_engine.core import ControlObserverOutcome, EvaluatorObserverOutcome +from prometheus_client import Histogram + +_EVALUATION_STAGE_DURATION = Histogram( + "agent_control_server_evaluation_stage_duration_seconds", + "Duration of evaluation endpoint stages.", + ("stage", "outcome"), +) +_EVALUATOR_QUEUE_DURATION = Histogram( + "agent_control_server_engine_evaluator_queue_duration_seconds", + "Time evaluator leaves spend waiting for engine concurrency.", + ("evaluator",), +) +_EVALUATOR_DURATION = Histogram( + "agent_control_server_engine_evaluator_duration_seconds", + "Time spent executing evaluator leaves.", + ("evaluator", "outcome"), +) +_CONTROL_DURATION = Histogram( + "agent_control_server_engine_control_duration_seconds", + "Time spent evaluating top-level controls.", + ("action", "outcome"), +) + + +def observe_evaluation_stage( + *, + stage: str, + outcome: str, + duration_seconds: float, +) -> None: + """Record evaluation endpoint stage duration.""" + _EVALUATION_STAGE_DURATION.labels(stage=stage, outcome=outcome).observe(duration_seconds) + + +class PrometheusEvaluationObserver: + """Records engine timing observations as Prometheus histograms.""" + + def observe_evaluator_queue_duration( + self, + *, + evaluator_name: str, + duration_seconds: float, + ) -> None: + """Record time spent waiting for evaluator concurrency.""" + _EVALUATOR_QUEUE_DURATION.labels(evaluator=evaluator_name).observe(duration_seconds) + + def observe_evaluator_duration( + self, + *, + evaluator_name: str, + outcome: EvaluatorObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent executing an evaluator.""" + _EVALUATOR_DURATION.labels( + evaluator=evaluator_name, + outcome=outcome, + ).observe(duration_seconds) + + def observe_control_duration( + self, + *, + action: str, + outcome: ControlObserverOutcome, + duration_seconds: float, + ) -> None: + """Record time spent evaluating a top-level control.""" + _CONTROL_DURATION.labels(action=action, outcome=outcome).observe(duration_seconds) + + +prometheus_evaluation_observer = PrometheusEvaluationObserver() diff --git a/server/tests/test_metrics.py b/server/tests/test_metrics.py index e2d8b896..ee9755e6 100644 --- a/server/tests/test_metrics.py +++ b/server/tests/test_metrics.py @@ -1,8 +1,11 @@ -from fastapi import FastAPI -from fastapi.testclient import TestClient - from agent_control_server.config import settings from agent_control_server.main import METRICS_PATH, add_prometheus_metrics +from agent_control_server.metrics import ( + observe_evaluation_stage, + prometheus_evaluation_observer, +) +from fastapi import FastAPI +from fastapi.testclient import TestClient def test_metrics_endpoint_public(unauthenticated_client: TestClient) -> None: @@ -42,3 +45,34 @@ def ping() -> dict[str, bool]: # Then: metrics include the custom prefix assert response.status_code == 200 assert f"{custom_prefix}_" in response.text + + +def test_metrics_output_contains_evaluation_timing_metrics( + unauthenticated_client: TestClient, +) -> None: + # Given: evaluation timing observations have been recorded + observe_evaluation_stage(stage="engine", outcome="success", duration_seconds=0.001) + prometheus_evaluation_observer.observe_evaluator_queue_duration( + evaluator_name="test-evaluator", + duration_seconds=0.001, + ) + prometheus_evaluation_observer.observe_evaluator_duration( + evaluator_name="test-evaluator", + outcome="success", + duration_seconds=0.002, + ) + prometheus_evaluation_observer.observe_control_duration( + action="observe", + outcome="not_matched", + duration_seconds=0.003, + ) + + # When: requesting metrics output + response = unauthenticated_client.get(METRICS_PATH) + + # Then: the internal evaluation timing series are exported + assert response.status_code == 200 + assert "agent_control_server_evaluation_stage_duration_seconds" in response.text + assert "agent_control_server_engine_evaluator_queue_duration_seconds" in response.text + assert "agent_control_server_engine_evaluator_duration_seconds" in response.text + assert "agent_control_server_engine_control_duration_seconds" in response.text From bf3925ec57423ff3aa2014a51fa524208723e6a6 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sat, 13 Jun 2026 21:28:45 +0530 Subject: [PATCH 16/18] feat: add evaluation tracing spans --- engine/src/agent_control_engine/core.py | 257 ++++++++++++------ engine/tests/test_core.py | 70 +++++ .../luna/client.py | 114 ++++++-- .../luna/evaluator.py | 49 +++- .../luna/tracing.py | 54 ++++ .../galileo/tests/test_luna_evaluator.py | 127 +++++++++ 6 files changed, 563 insertions(+), 108 deletions(-) create mode 100644 evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py diff --git a/engine/src/agent_control_engine/core.py b/engine/src/agent_control_engine/core.py index 9354001e..4d8c78fe 100644 --- a/engine/src/agent_control_engine/core.py +++ b/engine/src/agent_control_engine/core.py @@ -8,7 +8,8 @@ import logging import os import time -from collections.abc import Sequence +from collections.abc import Iterator, Sequence +from contextlib import contextmanager from dataclasses import dataclass from typing import Any, Literal, Protocol @@ -194,6 +195,52 @@ def control(self) -> ControlDefinitionLike: """Runtime control payload used during evaluation.""" +class TraceSpan(Protocol): + """Subset of tracing span behavior used by the engine.""" + + def set_data(self, key: str, value: object) -> None: + """Attach diagnostic data to the active span.""" + + +@contextmanager +def trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, +) -> Iterator[TraceSpan | None]: + """Start an optional tracing span when a tracing SDK is installed.""" + start_span = _load_start_span() + if start_span is None: + yield None + return + + with start_span(op=op, name=name) as span: + for key, value in (data or {}).items(): + span.set_data(key, value) + yield span + + +@functools.lru_cache(maxsize=1) +def _load_start_span() -> Any | None: + """Load the optional tracing span factory once per process.""" + try: + from sentry_sdk import start_span # type: ignore[import-not-found] + except ImportError: + return None + return start_span + + +def set_span_data(span: TraceSpan | None, key: str, value: object) -> None: + """Attach span data without letting tracing failures affect evaluation.""" + if span is None: + return + try: + span.set_data(key, value) + except Exception: + logger.debug("Tracing span failed while setting data", exc_info=True) + + EvaluatorObserverOutcome = Literal["success", "timeout", "error", "cancelled"] ControlObserverOutcome = Literal["matched", "not_matched", "error", "cancelled"] @@ -405,55 +452,95 @@ async def _evaluate_leaf( data = select_data(request.step, selector_path) wait_started_at = time.perf_counter() - async with semaphore: - self._observe_evaluator_queue_duration( - evaluator_name=evaluator_spec.name, - duration_seconds=time.perf_counter() - wait_started_at, - ) - evaluator_started_at = time.perf_counter() - outcome: EvaluatorObserverOutcome = "success" - timeout = DEFAULT_EVALUATOR_TIMEOUT - try: + with trace_span( + op="agent_control.engine.evaluator.queue", + name="wait_for_evaluator_slot", + data={ + "evaluator.name": evaluator_spec.name, + "selector.path": selector_path, + }, + ): + await semaphore.acquire() + + self._observe_evaluator_queue_duration( + evaluator_name=evaluator_spec.name, + duration_seconds=time.perf_counter() - wait_started_at, + ) + evaluator_started_at = time.perf_counter() + outcome: EvaluatorObserverOutcome = "success" + timeout = DEFAULT_EVALUATOR_TIMEOUT + try: + with trace_span( + op="agent_control.engine.evaluator.get_instance", + name="get_evaluator_instance", + data={"evaluator.name": evaluator_spec.name}, + ): evaluator = get_evaluator_instance(evaluator_spec) timeout = evaluator.get_timeout_seconds() if timeout <= 0: timeout = DEFAULT_EVALUATOR_TIMEOUT - result = await asyncio.wait_for( - evaluator.evaluate(data), - timeout=timeout, - ) - except asyncio.CancelledError: - outcome = "cancelled" - raise - except TimeoutError: - outcome = "timeout" - error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" - logger.warning( - "Evaluator timeout for control '%s' (evaluator: %s): %s", - item.name, - evaluator_spec.name, - error_msg, - exc_info=True, - ) - result = self._build_error_result(error_msg) - except Exception as e: - outcome = "error" - error_msg = self._format_exception(e) - logger.error( - "Evaluator error for control '%s' (evaluator: %s): %s", - item.name, - evaluator_spec.name, - error_msg, - exc_info=True, - ) - result = self._build_error_result(error_msg) - finally: - self._observe_evaluator_duration( - evaluator_name=evaluator_spec.name, - outcome=outcome, - duration_seconds=time.perf_counter() - evaluator_started_at, - ) + with trace_span( + op="agent_control.engine.evaluator.evaluate", + name="evaluate_evaluator", + data={ + "evaluator.name": evaluator_spec.name, + "timeout.seconds": timeout, + }, + ) as span: + try: + result = await asyncio.wait_for( + evaluator.evaluate(data), + timeout=timeout, + ) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except TimeoutError: + outcome = "timeout" + error_msg = f"TimeoutError: Evaluator exceeded {timeout}s timeout" + logger.warning( + "Evaluator timeout for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + except Exception as e: + outcome = "error" + error_msg = self._format_exception(e) + logger.error( + "Evaluator error for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + finally: + set_span_data(span, "outcome", outcome) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except Exception as e: + outcome = "error" + error_msg = self._format_exception(e) + logger.error( + "Evaluator error for control '%s' (evaluator: %s): %s", + item.name, + evaluator_spec.name, + error_msg, + exc_info=True, + ) + result = self._build_error_result(error_msg) + finally: + self._observe_evaluator_duration( + evaluator_name=evaluator_spec.name, + outcome=outcome, + duration_seconds=time.perf_counter() - evaluator_started_at, + ) + semaphore.release() trace = { "type": "leaf", @@ -776,40 +863,54 @@ async def evaluate_control(eval_task: _EvalTask) -> None: action = eval_task.item.control.action.decision outcome: ControlObserverOutcome = "cancelled" try: - evaluation = await self._evaluate_condition( - eval_task.item, - eval_task.item.control.condition, - request, - semaphore, - ) - eval_task.result = evaluation.result - outcome = ( - "error" - if eval_task.result.error - else "matched" - if eval_task.result.matched - else "not_matched" - ) + with trace_span( + op="agent_control.engine.control", + name="evaluate_control", + data={ + "control.id": eval_task.item.id, + "control.name": eval_task.item.name, + "control.action": action, + }, + ) as span: + try: + evaluation = await self._evaluate_condition( + eval_task.item, + eval_task.item.control.condition, + request, + semaphore, + ) + except asyncio.CancelledError: + outcome = "cancelled" + raise + except Exception as error: + error_msg = self._format_exception(error) + logger.exception( + "Unexpected condition evaluation error for control '%s': %s", + eval_task.item.name, + error_msg, + ) + eval_task.result = self._build_error_result( + error_msg, + message_prefix="Condition evaluation failed", + ) + outcome = "error" + else: + eval_task.result = evaluation.result + outcome = ( + "error" + if eval_task.result.error + else "matched" + if eval_task.result.matched + else "not_matched" + ) - if ( - eval_task.result.matched - and eval_task.item.control.action.decision == "deny" - ): - deny_found.set() - except asyncio.CancelledError: - raise - except Exception as error: - error_msg = self._format_exception(error) - logger.exception( - "Unexpected condition evaluation error for control '%s': %s", - eval_task.item.name, - error_msg, - ) - eval_task.result = self._build_error_result( - error_msg, - message_prefix="Condition evaluation failed", - ) - outcome = "error" + if ( + eval_task.result.matched + and eval_task.item.control.action.decision == "deny" + ): + deny_found.set() + finally: + set_span_data(span, "outcome", outcome) finally: self._observe_control_duration( action=action, diff --git a/engine/tests/test_core.py b/engine/tests/test_core.py index 1cdeb87c..f98378e5 100644 --- a/engine/tests/test_core.py +++ b/engine/tests/test_core.py @@ -7,6 +7,8 @@ """ import asyncio +from collections.abc import Iterator +from contextlib import contextmanager from dataclasses import dataclass, field from typing import Any @@ -207,6 +209,35 @@ def observe_control_duration( self.control_durations.append((action, outcome, duration_seconds)) +@dataclass +class RecordedSpan: + """Captures optional tracing span data for tests.""" + + op: str + name: str + data: dict[str, object] = field(default_factory=dict) + + def set_data(self, key: str, value: object) -> None: + self.data[key] = value + + +def trace_span_recorder(spans: list[RecordedSpan]): + """Return a trace_span replacement that records spans.""" + + @contextmanager + def _trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, + ) -> Iterator[RecordedSpan]: + span = RecordedSpan(op=op, name=name, data=dict(data or {})) + spans.append(span) + yield span + + return _trace_span + + @pytest.fixture(autouse=True) def setup_test_evaluators(): """Register test evaluators and reset state before each test.""" @@ -1499,6 +1530,45 @@ def observe_evaluator_duration( assert result.is_safe is True + @pytest.mark.asyncio + async def test_engine_emits_fanout_trace_spans(self, monkeypatch: pytest.MonkeyPatch): + """Test that optional tracing spans capture control and evaluator phases.""" + import agent_control_engine.core as core_module + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(core_module, "trace_span", trace_span_recorder(spans)) + + controls = [make_control(1, "allow", "test-allow", action="observe")] + engine = ControlEngine(controls) + + request = EvaluationRequest( + agent_name="00000000-0000-0000-0000-000000000001", + step=Step(type="llm", name="test-step", input="test", output=None), + stage="pre", + ) + result = await engine.process(request) + + assert result.is_safe is True + assert { + span.op + for span in spans + } >= { + "agent_control.engine.control", + "agent_control.engine.evaluator.queue", + "agent_control.engine.evaluator.get_instance", + "agent_control.engine.evaluator.evaluate", + } + control_span = next( + span for span in spans if span.op == "agent_control.engine.control" + ) + evaluator_span = next( + span for span in spans if span.op == "agent_control.engine.evaluator.evaluate" + ) + assert control_span.data["control.action"] == "observe" + assert control_span.data["outcome"] == "not_matched" + assert evaluator_span.data["evaluator.name"] == "test-allow" + assert evaluator_span.data["outcome"] == "success" + # ============================================================================= # Test: Recursive Condition Trees diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index cc2cc1cb..48f88aab 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -19,6 +19,8 @@ from agent_control_models import JSONObject, JSONValue from pydantic import BaseModel, Field, PrivateAttr, model_validator +from .tracing import set_span_data, trace_span + logger = logging.getLogger(__name__) DEFAULT_TIMEOUT_SECS = 10.0 @@ -36,6 +38,7 @@ PUBLIC_SCORER_INVOKE_PATH = "/scorers/invoke" INTERNAL_SCORER_INVOKE_PATH = "/internal/scorers/invoke" AuthMode = Literal["public", "internal"] +ScorerIdentifierKind = Literal["label", "id", "version_id"] def _b64url(data: bytes) -> str: @@ -155,6 +158,23 @@ def _has_value(value: JSONValue) -> bool: return True +def _scorer_identifier_kind( + *, + scorer_label: str | None, + scorer_id: str | None, + scorer_version_id: str | None, +) -> ScorerIdentifierKind: + if scorer_label: + return "label" + if scorer_id: + return "id" + return "version_id" + + +def _endpoint_path(endpoint: str) -> str: + return urlsplit(endpoint).path + + class ScorerInvokeInputs(BaseModel): """Input values sent to Galileo's scorer invoke API.""" @@ -437,8 +457,21 @@ async def _get_client(self) -> httpx.AsyncClient: return self._client self._clients = [client for client in self._clients if not client.is_closed] - while len(self._clients) < self.client_pool_size: - self._clients.append(self._create_client()) + if len(self._clients) < self.client_pool_size: + with trace_span( + op="agent_control.luna.client.create", + name="create_http_client", + data={ + "auth.mode": self.auth_mode, + "limits.max_connections": self.max_connections, + "limits.max_keepalive_connections": self.max_keepalive_connections, + "limits.keepalive_expiry_seconds": self.keepalive_expiry_seconds, + "client.pool_size": self.client_pool_size, + "tls.ca_file_configured": self.ca_file is not None, + }, + ): + while len(self._clients) < self.client_pool_size: + self._clients.append(self._create_client()) if self.client_pool_size == 1: self._client = self._clients[0] @@ -497,34 +530,77 @@ async def invoke( if not (_has_value(input) or _has_value(output)): raise ValueError("At least one of input or output must be provided.") - request_body = ScorerInvokeRequest( + identifier_kind = _scorer_identifier_kind( scorer_label=scorer_label, scorer_id=scorer_id, scorer_version_id=scorer_version_id, - inputs=ScorerInvokeInputs( - query="" if input is None else input, response="" if output is None else output - ), - config=config, - ).to_dict() - endpoint, request_headers = self._endpoint_and_headers(headers) + ) + with trace_span( + op="agent_control.luna.request.build", + name="build_scorer_request", + data={"scorer.identifier_kind": identifier_kind}, + ): + request_body = ScorerInvokeRequest( + scorer_label=scorer_label, + scorer_id=scorer_id, + scorer_version_id=scorer_version_id, + inputs=ScorerInvokeInputs( + query="" if input is None else input, + response="" if output is None else output, + ), + config=config, + ).to_dict() + with trace_span( + op="agent_control.luna.request.endpoint", + name="resolve_scorer_endpoint", + data={"auth.mode": self.auth_mode}, + ) as span: + endpoint, request_headers = self._endpoint_and_headers(headers) + set_span_data(span, "endpoint.path", _endpoint_path(endpoint)) logger.debug("[GalileoLunaClient] POST %s", endpoint) logger.debug("[GalileoLunaClient] Request body: %s", request_body) try: - client = await self._get_client() - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - ) - response.raise_for_status() - response_data = response.json() + with trace_span( + op="agent_control.luna.client.get", + name="get_http_client", + data={"auth.mode": self.auth_mode}, + ): + client = await self._get_client() + with trace_span( + op="agent_control.luna.http.post", + name="post_scorer_invoke", + data={ + "auth.mode": self.auth_mode, + "endpoint.path": _endpoint_path(endpoint), + "scorer.identifier_kind": identifier_kind, + "timeout.seconds": timeout, + }, + ) as span: + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + ) + set_span_data(span, "http.status_code", response.status_code) + response.raise_for_status() + with trace_span( + op="agent_control.luna.response.parse", + name="parse_scorer_response", + data={"http.status_code": response.status_code}, + ): + response_data = response.json() if not isinstance(response_data, dict): raise RuntimeError("Invalid response payload: not a JSON object") - parsed = ScorerInvokeResponse.from_dict(response_data) + with trace_span( + op="agent_control.luna.response.model", + name="model_scorer_response", + data={"http.status_code": response.status_code}, + ): + parsed = ScorerInvokeResponse.from_dict(response_data) logger.debug("[GalileoLunaClient] Response: %s", parsed.raw_response) return parsed except httpx.HTTPStatusError as exc: diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py index 1221cedb..05a65a10 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/evaluator.py @@ -14,6 +14,7 @@ from .client import GalileoLunaClient, ScorerInvokeResponse from .config import LunaEvaluatorConfig, coerce_number +from .tracing import set_span_data, trace_span logger = logging.getLogger(__name__) @@ -202,7 +203,11 @@ async def evaluate(self, data: Any) -> EvaluatorResult: Returns: EvaluatorResult with local threshold decision and scorer metadata. """ - input_text, output_text = self._prepare_payload(data) + with trace_span( + op="agent_control.luna.evaluate.prepare_payload", + name="prepare_luna_payload", + ): + input_text, output_text = self._prepare_payload(data) if not (_has_text(input_text) or _has_text(output_text)): return EvaluatorResult( matched=False, @@ -212,21 +217,43 @@ async def evaluate(self, data: Any) -> EvaluatorResult: ) try: - scorer_kwargs = self._scorer_kwargs() - response = await self._get_client().invoke( - **scorer_kwargs, - input=input_text if _has_text(input_text) else None, - output=output_text if _has_text(output_text) else None, - config=self.config.scorer_config, - timeout=self.get_timeout_seconds(), - ) + with trace_span( + op="agent_control.luna.evaluate.scorer_kwargs", + name="build_scorer_kwargs", + ): + scorer_kwargs = self._scorer_kwargs() + with trace_span( + op="agent_control.luna.evaluate.invoke", + name="invoke_luna_scorer", + data={ + "payload.has_input": _has_text(input_text), + "payload.has_output": _has_text(output_text), + }, + ) as span: + response = await self._get_client().invoke( + **scorer_kwargs, + input=input_text if _has_text(input_text) else None, + output=output_text if _has_text(output_text) else None, + config=self.config.scorer_config, + timeout=self.get_timeout_seconds(), + ) + set_span_data(span, "scorer.status", response.status) if response.status.lower() != "success": message = response.error_message or f"Luna scorer status: {response.status}" raise RuntimeError(message) - matched = self._score_matches(response.score) - metadata = self._metadata(response) + with trace_span( + op="agent_control.luna.evaluate.score_match", + name="match_luna_score", + ) as span: + matched = self._score_matches(response.score) + set_span_data(span, "matched", matched) + with trace_span( + op="agent_control.luna.evaluate.metadata", + name="build_luna_metadata", + ): + metadata = self._metadata(response) operator = self.config.operator threshold = self.config.threshold state = "triggered" if matched else "not triggered" diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py new file mode 100644 index 00000000..3302fede --- /dev/null +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/tracing.py @@ -0,0 +1,54 @@ +"""Optional tracing helpers for Luna scorer evaluation.""" + +from __future__ import annotations + +import functools +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any, Protocol + + +class TraceSpan(Protocol): + """Subset of tracing span behavior used by this package.""" + + def set_data(self, key: str, value: object) -> None: + """Attach diagnostic data to the active span.""" + + +@contextmanager +def trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, +) -> Iterator[TraceSpan | None]: + """Start an optional tracing span when a tracing SDK is installed.""" + start_span = _load_start_span() + if start_span is None: + yield None + return + + with start_span(op=op, name=name) as span: + for key, value in (data or {}).items(): + span.set_data(key, value) + yield span + + +@functools.lru_cache(maxsize=1) +def _load_start_span() -> Any | None: + """Load the optional tracing span factory once per process.""" + try: + from sentry_sdk import start_span # type: ignore[import-not-found] + except ImportError: + return None + return start_span + + +def set_span_data(span: TraceSpan | None, key: str, value: object) -> None: + """Attach span data without letting tracing failures affect evaluation.""" + if span is None: + return + try: + span.set_data(key, value) + except Exception: + pass diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index fe6c679f..f016f796 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -6,6 +6,9 @@ import logging import os from base64 import urlsafe_b64decode +from collections.abc import Iterator +from contextlib import contextmanager +from dataclasses import dataclass, field from unittest.mock import AsyncMock, patch import httpx @@ -14,6 +17,35 @@ from pydantic import ValidationError +@dataclass +class RecordedSpan: + """Captures optional tracing span data for tests.""" + + op: str + name: str + data: dict[str, object] = field(default_factory=dict) + + def set_data(self, key: str, value: object) -> None: + self.data[key] = value + + +def trace_span_recorder(spans: list[RecordedSpan]): + """Return a trace_span replacement that records spans.""" + + @contextmanager + def _trace_span( + *, + op: str, + name: str, + data: dict[str, object] | None = None, + ) -> Iterator[RecordedSpan]: + span = RecordedSpan(op=op, name=name, data=dict(data or {})) + spans.append(span) + yield span + + return _trace_span + + def _decode_jwt_payload(token: str) -> dict[str, object]: payload_segment = token.split(".")[1] padded = payload_segment + ("=" * (-len(payload_segment) % 4)) @@ -472,6 +504,54 @@ def handler(request: httpx.Request) -> httpx.Response: assert isinstance(headers, dict) assert headers["galileo-api-key"] == "test-key" + @pytest.mark.asyncio + async def test_client_emits_scorer_invoke_trace_spans( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(luna_client_module, "trace_span", trace_span_recorder(spans)) + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "scorer_label": "toxicity", + "score": 0.82, + "status": "success", + "execution_time": 0.12, + }, + ) + + with patch.dict(os.environ, {"GALILEO_API_SECRET_KEY": "test-secret"}, clear=True): + client = GalileoLunaClient(api_url="https://api.default.svc.cluster.local:8088") + client._client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + try: + response = await client.invoke(scorer_label="toxicity", output="model answer") + finally: + await client.close() + + assert response.status == "success" + assert { + span.op + for span in spans + } >= { + "agent_control.luna.request.build", + "agent_control.luna.request.endpoint", + "agent_control.luna.client.get", + "agent_control.luna.http.post", + "agent_control.luna.response.parse", + "agent_control.luna.response.model", + } + post_span = next(span for span in spans if span.op == "agent_control.luna.http.post") + assert post_span.data["auth.mode"] == "internal" + assert post_span.data["endpoint.path"] == "/internal/scorers/invoke" + assert post_span.data["http.status_code"] == 200 + @pytest.mark.asyncio async def test_client_uses_internal_jwt_when_api_secret_is_set(self) -> None: from agent_control_evaluator_galileo.luna import GalileoLunaClient @@ -687,6 +767,53 @@ async def test_evaluator_returns_non_match_below_threshold(self) -> None: timeout=10.0, ) + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) + @pytest.mark.asyncio + async def test_evaluator_emits_phase_trace_spans( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.evaluator as luna_evaluator_module + from agent_control_evaluator_galileo.luna import LunaEvaluator, ScorerInvokeResponse + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + spans: list[RecordedSpan] = [] + monkeypatch.setattr(luna_evaluator_module, "trace_span", trace_span_recorder(spans)) + evaluator = LunaEvaluator.from_dict( + {"scorer_label": "toxicity", "threshold": 0.7, "operator": "gte"} + ) + + with patch.object(GalileoLunaClient, "invoke", new_callable=AsyncMock) as mock_invoke: + mock_invoke.return_value = ScorerInvokeResponse( + scorer_label="toxicity", + score=0.82, + status="success", + ) + + result = await evaluator.evaluate({"input": "user prompt", "output": "model answer"}) + + assert result.matched is True + assert { + span.op + for span in spans + } >= { + "agent_control.luna.evaluate.prepare_payload", + "agent_control.luna.evaluate.scorer_kwargs", + "agent_control.luna.evaluate.invoke", + "agent_control.luna.evaluate.score_match", + "agent_control.luna.evaluate.metadata", + } + invoke_span = next( + span for span in spans if span.op == "agent_control.luna.evaluate.invoke" + ) + score_span = next( + span for span in spans if span.op == "agent_control.luna.evaluate.score_match" + ) + assert invoke_span.data["payload.has_input"] is True + assert invoke_span.data["payload.has_output"] is True + assert invoke_span.data["scorer.status"] == "success" + assert score_span.data["matched"] is True + @patch.dict(os.environ, {"GALILEO_API_KEY": "test-key"}) @pytest.mark.asyncio @pytest.mark.parametrize("data", ["", " "]) From 963cba64f83c6e3176ca7cb5800b5d8fd2eaaf12 Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sat, 13 Jun 2026 23:00:56 +0530 Subject: [PATCH 17/18] feat: add Luna HTTP phase tracing --- .../luna/client.py | 79 ++++++++++++++++++- .../galileo/tests/test_luna_evaluator.py | 65 ++++++++++++++- 2 files changed, 140 insertions(+), 4 deletions(-) diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index 48f88aab..ecd12924 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -11,15 +11,15 @@ from hashlib import sha256 from hmac import new as hmac_new from json import dumps -from time import time -from typing import Literal +from time import perf_counter, time +from typing import Any, Literal from urllib.parse import urlsplit, urlunsplit import httpx from agent_control_models import JSONObject, JSONValue from pydantic import BaseModel, Field, PrivateAttr, model_validator -from .tracing import set_span_data, trace_span +from .tracing import TraceSpan, set_span_data, trace_span logger = logging.getLogger(__name__) @@ -31,6 +31,7 @@ DEFAULT_MAX_CONNECTIONS = 100 DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 DEFAULT_CLIENT_POOL_SIZE = 1 +LUNA_HTTP_PHASE_TRACING_ENV = "GALILEO_LUNA_HTTP_PHASE_TRACING" LUNA_KEEPALIVE_EXPIRY_ENV = "GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS" LUNA_MAX_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_CONNECTIONS" LUNA_MAX_KEEPALIVE_CONNECTIONS_ENV = "GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS" @@ -107,6 +108,13 @@ def _load_int_env(env_name: str, default: int) -> int: raise ValueError(f"{env_name}={raw!r} is not an integer.") from exc +def _load_bool_env(env_name: str, default: bool = False) -> bool: + raw = os.getenv(env_name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + def _validate_connection_config( *, keepalive_expiry_seconds: float, @@ -175,6 +183,62 @@ def _endpoint_path(endpoint: str) -> str: return urlsplit(endpoint).path +def _split_httpcore_trace_event(event_name: str) -> tuple[str, str] | None: + if event_name.endswith(".started"): + return event_name.removesuffix(".started"), "started" + if event_name.endswith(".complete"): + return event_name.removesuffix(".complete"), "complete" + if event_name.endswith(".failed"): + return event_name.removesuffix(".failed"), "failed" + return None + + +def _safe_trace_value(value: object) -> object: + if value is None or isinstance(value, bool | int | float | str): + return value + if isinstance(value, bytes): + return value.decode("ascii", errors="ignore") + return type(value).__name__ + + +class _HttpCorePhaseTrace: + """Convert httpcore phase trace events into optional tracing spans.""" + + def __init__(self) -> None: + self._active: dict[str, tuple[Any, TraceSpan | None, float]] = {} + + async def __call__(self, event_name: str, info: dict[str, Any]) -> None: + parsed = _split_httpcore_trace_event(event_name) + if parsed is None: + return + phase, state = parsed + + if state == "started": + manager = trace_span( + op=f"agent_control.luna.httpcore.{phase}", + name=phase, + data={"httpcore.phase": phase}, + ) + span = manager.__enter__() + self._active[phase] = (manager, span, perf_counter()) + return + + manager, span, started_at = self._active.pop(phase, (None, None, perf_counter())) + duration_ms = (perf_counter() - started_at) * 1000 + set_span_data(span, "httpcore.phase", phase) + set_span_data(span, "httpcore.outcome", state) + set_span_data(span, "httpcore.duration_ms", duration_ms) + + if "return_value" in info: + set_span_data(span, "httpcore.return_type", _safe_trace_value(info["return_value"])) + if "exception" in info: + exception = info["exception"] + set_span_data(span, "exception.type", type(exception).__name__) + + if manager is not None: + manager.__exit__(None, None, None) + + class ScorerInvokeInputs(BaseModel): """Input values sent to Galileo's scorer invoke API.""" @@ -258,6 +322,7 @@ class GalileoLunaClient: GALILEO_API_URL: Galileo API URL fallback. GALILEO_LUNA_CA_FILE: CA bundle used to verify the scorer API endpoint, for deployments whose API serves an internally-issued TLS certificate. + GALILEO_LUNA_HTTP_PHASE_TRACING: Enable per-request HTTP transport phase spans. GALILEO_LUNA_KEEPALIVE_EXPIRY_SECONDS: HTTP pooled connection expiry. GALILEO_LUNA_MAX_CONNECTIONS: Maximum outbound HTTP connections. GALILEO_LUNA_MAX_KEEPALIVE_CONNECTIONS: Maximum idle pooled HTTP connections. @@ -313,6 +378,7 @@ def __init__( self.api_base = self._resolve_api_base(api_url) self.ca_file = (ca_file or os.getenv("GALILEO_LUNA_CA_FILE") or "").strip() or None self._ssl_context = self._load_ssl_context(self.ca_file) + self.http_phase_tracing_enabled = _load_bool_env(LUNA_HTTP_PHASE_TRACING_ENV) self.keepalive_expiry_seconds = _load_float_env( LUNA_KEEPALIVE_EXPIRY_ENV, DEFAULT_KEEPALIVE_EXPIRY_SECS ) @@ -575,14 +641,21 @@ async def invoke( "auth.mode": self.auth_mode, "endpoint.path": _endpoint_path(endpoint), "scorer.identifier_kind": identifier_kind, + "http_phase_tracing.enabled": self.http_phase_tracing_enabled, "timeout.seconds": timeout, }, ) as span: + extensions: dict[str, object] | None = ( + {"trace": _HttpCorePhaseTrace()} + if self.http_phase_tracing_enabled + else None + ) response = await client.post( endpoint, json=request_body, headers=request_headers, timeout=timeout, + extensions=extensions, ) set_span_data(span, "http.status_code", response.status_code) response.raise_for_status() diff --git a/evaluators/contrib/galileo/tests/test_luna_evaluator.py b/evaluators/contrib/galileo/tests/test_luna_evaluator.py index f016f796..82f4fa29 100644 --- a/evaluators/contrib/galileo/tests/test_luna_evaluator.py +++ b/evaluators/contrib/galileo/tests/test_luna_evaluator.py @@ -6,9 +6,10 @@ import logging import os from base64 import urlsafe_b64decode -from collections.abc import Iterator +from collections.abc import Awaitable, Callable, Iterator from contextlib import contextmanager from dataclasses import dataclass, field +from typing import cast from unittest.mock import AsyncMock, patch import httpx @@ -410,6 +411,68 @@ def recording_client(**kwargs: object) -> FakeAsyncClient: assert selected == [created[0], created[1], created[2], created[0], created[1]] assert all(created_client.is_closed for created_client in created) + @pytest.mark.asyncio + async def test_client_emits_httpcore_phase_spans_when_enabled( + self, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna import GalileoLunaClient + + spans: list[RecordedSpan] = [] + captured: dict[str, object] = {} + monkeypatch.setattr(luna_client_module, "trace_span", trace_span_recorder(spans)) + + class FakeAsyncClient: + is_closed = False + + async def post(self, url: str, **kwargs: object) -> httpx.Response: + captured.update(kwargs) + extensions = kwargs.get("extensions") + assert isinstance(extensions, dict) + trace = cast( + Callable[[str, dict[str, object]], Awaitable[None]], + extensions["trace"], + ) + await trace("connection.connect_tcp.started", {}) + await trace("connection.connect_tcp.complete", {"return_value": object()}) + await trace("http11.receive_response_headers.started", {}) + await trace("http11.receive_response_headers.complete", {}) + return httpx.Response( + 200, + json={"scorer_label": "toxicity", "score": 0.2, "status": "success"}, + request=httpx.Request("POST", url), + ) + + with patch.dict( + os.environ, + { + "GALILEO_API_SECRET_KEY": "test-secret", + "GALILEO_LUNA_HTTP_PHASE_TRACING": "true", + }, + clear=True, + ): + client = GalileoLunaClient(console_url="https://console.example.com") + client._client = FakeAsyncClient() # type: ignore[assignment] + + response = await client.invoke(scorer_label="toxicity", output="hello") + + assert response.score == 0.2 + assert captured["extensions"] is not None + phase_spans = { + span.op: span.data + for span in spans + if span.op.startswith("agent_control.luna.httpcore.") + } + assert "agent_control.luna.httpcore.connection.connect_tcp" in phase_spans + assert "agent_control.luna.httpcore.http11.receive_response_headers" in phase_spans + assert phase_spans["agent_control.luna.httpcore.connection.connect_tcp"][ + "httpcore.outcome" + ] == "complete" + assert "httpcore.duration_ms" in phase_spans[ + "agent_control.luna.httpcore.connection.connect_tcp" + ] + @pytest.mark.parametrize( "env_values, expected", [ From 5a546cf5616ea2ea192172916a609d46b7995f4b Mon Sep 17 00:00:00 2001 From: abhinav-galileo Date: Sun, 14 Jun 2026 02:50:27 +0530 Subject: [PATCH 18/18] feat: add Luna client timing metrics --- .../luna/client.py | 97 ++++++++++++++----- .../luna/metrics.py | 92 ++++++++++++++++++ .../galileo/tests/test_luna_coverage_gaps.py | 82 +++++++++++++++- 3 files changed, 242 insertions(+), 29 deletions(-) create mode 100644 evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py index ecd12924..a61a8e07 100644 --- a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/client.py @@ -19,6 +19,7 @@ from agent_control_models import JSONObject, JSONValue from pydantic import BaseModel, Field, PrivateAttr, model_validator +from .metrics import observe_luna_client_stage, observe_luna_httpcore_phase from .tracing import TraceSpan, set_span_data, trace_span logger = logging.getLogger(__name__) @@ -224,10 +225,16 @@ async def __call__(self, event_name: str, info: dict[str, Any]) -> None: return manager, span, started_at = self._active.pop(phase, (None, None, perf_counter())) - duration_ms = (perf_counter() - started_at) * 1000 + duration_seconds = perf_counter() - started_at + duration_ms = duration_seconds * 1000 set_span_data(span, "httpcore.phase", phase) set_span_data(span, "httpcore.outcome", state) set_span_data(span, "httpcore.duration_ms", duration_ms) + observe_luna_httpcore_phase( + phase=phase, + outcome=state, + duration_seconds=duration_seconds, + ) if "return_value" in info: set_span_data(span, "httpcore.return_type", _safe_trace_value(info["return_value"])) @@ -601,28 +608,42 @@ async def invoke( scorer_id=scorer_id, scorer_version_id=scorer_version_id, ) + endpoint_path = "unknown" with trace_span( op="agent_control.luna.request.build", name="build_scorer_request", data={"scorer.identifier_kind": identifier_kind}, ): - request_body = ScorerInvokeRequest( - scorer_label=scorer_label, - scorer_id=scorer_id, - scorer_version_id=scorer_version_id, - inputs=ScorerInvokeInputs( - query="" if input is None else input, - response="" if output is None else output, - ), - config=config, - ).to_dict() + with observe_luna_client_stage( + stage="build_request", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + request_body = ScorerInvokeRequest( + scorer_label=scorer_label, + scorer_id=scorer_id, + scorer_version_id=scorer_version_id, + inputs=ScorerInvokeInputs( + query="" if input is None else input, + response="" if output is None else output, + ), + config=config, + ).to_dict() with trace_span( op="agent_control.luna.request.endpoint", name="resolve_scorer_endpoint", data={"auth.mode": self.auth_mode}, ) as span: - endpoint, request_headers = self._endpoint_and_headers(headers) - set_span_data(span, "endpoint.path", _endpoint_path(endpoint)) + with observe_luna_client_stage( + stage="resolve_endpoint", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + endpoint, request_headers = self._endpoint_and_headers(headers) + endpoint_path = _endpoint_path(endpoint) + set_span_data(span, "endpoint.path", endpoint_path) logger.debug("[GalileoLunaClient] POST %s", endpoint) logger.debug("[GalileoLunaClient] Request body: %s", request_body) @@ -633,13 +654,19 @@ async def invoke( name="get_http_client", data={"auth.mode": self.auth_mode}, ): - client = await self._get_client() + with observe_luna_client_stage( + stage="get_http_client", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + client = await self._get_client() with trace_span( op="agent_control.luna.http.post", name="post_scorer_invoke", data={ "auth.mode": self.auth_mode, - "endpoint.path": _endpoint_path(endpoint), + "endpoint.path": endpoint_path, "scorer.identifier_kind": identifier_kind, "http_phase_tracing.enabled": self.http_phase_tracing_enabled, "timeout.seconds": timeout, @@ -650,21 +677,33 @@ async def invoke( if self.http_phase_tracing_enabled else None ) - response = await client.post( - endpoint, - json=request_body, - headers=request_headers, - timeout=timeout, - extensions=extensions, - ) - set_span_data(span, "http.status_code", response.status_code) - response.raise_for_status() + with observe_luna_client_stage( + stage="http_post", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + response = await client.post( + endpoint, + json=request_body, + headers=request_headers, + timeout=timeout, + extensions=extensions, + ) + set_span_data(span, "http.status_code", response.status_code) + response.raise_for_status() with trace_span( op="agent_control.luna.response.parse", name="parse_scorer_response", data={"http.status_code": response.status_code}, ): - response_data = response.json() + with observe_luna_client_stage( + stage="parse_json", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + response_data = response.json() if not isinstance(response_data, dict): raise RuntimeError("Invalid response payload: not a JSON object") @@ -673,7 +712,13 @@ async def invoke( name="model_scorer_response", data={"http.status_code": response.status_code}, ): - parsed = ScorerInvokeResponse.from_dict(response_data) + with observe_luna_client_stage( + stage="model_response", + auth_mode=self.auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=identifier_kind, + ): + parsed = ScorerInvokeResponse.from_dict(response_data) logger.debug("[GalileoLunaClient] Response: %s", parsed.raw_response) return parsed except httpx.HTTPStatusError as exc: diff --git a/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py new file mode 100644 index 00000000..5af6c255 --- /dev/null +++ b/evaluators/contrib/galileo/src/agent_control_evaluator_galileo/luna/metrics.py @@ -0,0 +1,92 @@ +"""Optional Prometheus metrics for Luna scorer client timing.""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from time import perf_counter +from typing import Protocol, cast + +try: + from prometheus_client import Histogram # type: ignore[import-not-found] +except ImportError: # pragma: no cover - exercised when embedded without Prometheus. + Histogram = None # type: ignore[assignment] + + +class _HistogramChild(Protocol): + def observe(self, amount: float) -> None: ... + + +class _Histogram(Protocol): + def labels(self, **labels: str) -> _HistogramChild: ... + + +if Histogram is None: + _CLIENT_STAGE_DURATION: _Histogram | None = None + _HTTPCORE_PHASE_DURATION: _Histogram | None = None +else: + _CLIENT_STAGE_DURATION = cast( + _Histogram, + Histogram( + "agent_control_luna_client_stage_duration_seconds", + "Duration of Luna scorer client stages.", + ("stage", "outcome", "auth_mode", "endpoint_path", "scorer_identifier_kind"), + ), + ) + _HTTPCORE_PHASE_DURATION = cast( + _Histogram, + Histogram( + "agent_control_luna_httpcore_phase_duration_seconds", + "Duration of Luna scorer HTTP transport phases.", + ("phase", "outcome"), + ), + ) + + +def _stage_outcome(error: BaseException | None) -> str: + if error is None: + return "success" + if error.__class__.__name__ == "HTTPStatusError": + return "http_status_error" + if error.__class__.__name__.endswith("RequestError"): + return "request_error" + return "error" + + +def observe_luna_httpcore_phase( + *, + phase: str, + outcome: str, + duration_seconds: float, +) -> None: + """Record an HTTP transport phase duration when Prometheus is available.""" + if _HTTPCORE_PHASE_DURATION is None: + return + _HTTPCORE_PHASE_DURATION.labels(phase=phase, outcome=outcome).observe(duration_seconds) + + +@contextmanager +def observe_luna_client_stage( + *, + stage: str, + auth_mode: str, + endpoint_path: str, + scorer_identifier_kind: str, +) -> Iterator[None]: + """Record a Luna client stage duration when Prometheus is available.""" + started_at = perf_counter() + error: BaseException | None = None + try: + yield + except BaseException as exc: + error = exc + raise + finally: + if _CLIENT_STAGE_DURATION is not None: + _CLIENT_STAGE_DURATION.labels( + stage=stage, + outcome=_stage_outcome(error), + auth_mode=auth_mode, + endpoint_path=endpoint_path, + scorer_identifier_kind=scorer_identifier_kind, + ).observe(perf_counter() - started_at) diff --git a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py index e1518eec..4238bac7 100644 --- a/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py +++ b/evaluators/contrib/galileo/tests/test_luna_coverage_gaps.py @@ -7,13 +7,13 @@ from __future__ import annotations import json -import os +from collections.abc import Iterator +from contextlib import contextmanager from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest - # ============================================================================= # luna/evaluator.py: utility helpers # ============================================================================= @@ -425,11 +425,12 @@ class TestScorerInvokeRequestValidation: """``ScorerInvokeRequest`` rejects malformed input combos.""" def test_missing_all_identifiers_raises(self): + from pydantic import ValidationError + from agent_control_evaluator_galileo.luna.client import ( ScorerInvokeInputs, ScorerInvokeRequest, ) - from pydantic import ValidationError with pytest.raises(ValidationError, match="One of scorer_label"): ScorerInvokeRequest(inputs=ScorerInvokeInputs(query="hello")) @@ -678,6 +679,81 @@ async def test_invoke_propagates_request_error(monkeypatch): await client.close() +@pytest.mark.asyncio +async def test_invoke_records_luna_client_stage_metrics(monkeypatch): + """Successful scorer invocation records stage metrics without changing behavior.""" + monkeypatch.setenv("GALILEO_API_KEY", "test-key") + import agent_control_evaluator_galileo.luna.client as luna_client_module + from agent_control_evaluator_galileo.luna.client import GalileoLunaClient + + observed: list[dict[str, str]] = [] + + @contextmanager + def record_stage(**labels: str) -> Iterator[None]: + observed.append(labels) + yield + + monkeypatch.setattr(luna_client_module, "observe_luna_client_stage", record_stage) + + fake_response = MagicMock() + fake_response.status_code = 200 + fake_response.raise_for_status = MagicMock() + fake_response.json = MagicMock( + return_value={"scorer_label": "toxicity", "score": 0.1, "status": "success"} + ) + + fake_http = AsyncMock() + fake_http.post = AsyncMock(return_value=fake_response) + fake_http.is_closed = False + + client = GalileoLunaClient(api_url="https://api.example.test") + client._client = fake_http + + try: + response = await client.invoke(scorer_label="toxicity", input="hello") + finally: + await client.close() + + assert response.score == 0.1 + assert [item["stage"] for item in observed] == [ + "build_request", + "resolve_endpoint", + "get_http_client", + "http_post", + "parse_json", + "model_response", + ] + assert all(item["auth_mode"] == "public" for item in observed) + assert all(item["scorer_identifier_kind"] == "label" for item in observed) + assert next(item for item in observed if item["stage"] == "http_post")[ + "endpoint_path" + ] == "/scorers/invoke" + + +@pytest.mark.asyncio +async def test_httpcore_phase_trace_records_metrics(monkeypatch): + """HTTP transport phase tracing records matching Prometheus metrics.""" + import agent_control_evaluator_galileo.luna.client as luna_client_module + + observed: list[dict[str, object]] = [] + monkeypatch.setattr( + luna_client_module, + "observe_luna_httpcore_phase", + lambda **labels: observed.append(labels), + ) + + trace = luna_client_module._HttpCorePhaseTrace() + + await trace("connection.connect_tcp.started", {}) + await trace("connection.connect_tcp.complete", {"return_value": "ok"}) + + assert len(observed) == 1 + assert observed[0]["phase"] == "connection.connect_tcp" + assert observed[0]["outcome"] == "complete" + assert isinstance(observed[0]["duration_seconds"], float) + assert observed[0]["duration_seconds"] >= 0 + + @pytest.mark.asyncio async def test_client_async_context_manager_closes_on_exit(monkeypatch): """Entering/exiting the async context manager must close the client."""