diff --git a/agentex/pyproject.toml b/agentex/pyproject.toml index b1785fa1..eb256c45 100644 --- a/agentex/pyproject.toml +++ b/agentex/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "opentelemetry-api>=1.28.0", "opentelemetry-sdk>=1.28.0", "opentelemetry-exporter-otlp>=1.28.0", + "opentelemetry-instrumentation-fastapi>=0.49b0", "pyyaml>=6.0,<7", ] diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index 76f80015..34469289 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -38,7 +38,7 @@ from src.config.environment_variables import EnvVarKeys from src.domain.exceptions import GenericException from src.utils.logging import make_logger -from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics +from src.utils.otel_metrics import configure_app_metrics, shutdown_otel_metrics logger = make_logger(__name__) @@ -73,9 +73,6 @@ def __init__( @asynccontextmanager async def lifespan(_: FastAPI): - # Initialize OpenTelemetry metrics first (before dependencies register instruments) - init_otel_metrics() - await dependencies.startup_global_dependencies() configure_statsd() @@ -195,6 +192,9 @@ async def handle_unexpected(request, exc): fastapi_app.include_router(checkpoints.router) fastapi_app.include_router(task_retention.router) +# Instrument before the first ASGI message; lifespan startup is too late. +configure_app_metrics(fastapi_app) + # Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses. # This must be the outermost layer to bypass all middleware. # Export as `app` so existing uvicorn entry points (app:app) work without changes. diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index e676fbe9..dd6187cf 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -1,10 +1,15 @@ """ OpenTelemetry metrics configuration for Agentex. -When auto-instrumentation (e.g. OTel Operator) has already installed a global -MeterProvider, custom app metrics attach to it instead of replacing it. -Otherwise this module creates its own provider with OTLP export when an endpoint -is configured. +The Python OTel SDK exposes a single global MeterProvider (set once). This module +uses two deterministic paths: + +1. **Coexistence** — a real SDK MeterProvider is already global (e.g. from OTel + Operator auto-instrumentation). Custom app metrics attach via get_meter(); + set_meter_provider() is never called. +2. **Standalone** — global is still the SDK proxy and an OTLP endpoint is + configured. This module creates the first MeterProvider. Auto-instrumentation + and custom metrics then share that same global slot. Environment Variables: OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to @@ -15,15 +20,26 @@ OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication OTEL_SERVICE_NAME: Service name for metrics (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) + OTEL_RESOURCE_ATTRIBUTES: K8s and service resource attrs (injected by the OTel + Operator on cluster pods; read via OTELResourceDetector). Per-process + service.instance.id is applied via Resource.merge when this module creates + the MeterProvider; env is not modified. + AGENTEX_OTEL_HTTP_METRICS_ENABLED: Opt-in in-process FastAPI HTTP metrics + (default: false). When true, also set + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics on the pod + so the OTel Operator does not double-instrument FastAPI. + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: Pod env; must include ``fastapi`` when + using in-process HTTP metrics (see above). + DD_TRACE_FASTAPI_ENABLED: Set to ``false`` when using ddtrace-run so ddtrace + does not claim FastAPI before OpenTelemetry instrumentation. """ from __future__ import annotations import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from opentelemetry import metrics -from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter as OTLPGrpcMetricExporter, ) @@ -32,7 +48,11 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource +from opentelemetry.sdk.resources import ( + OTELResourceDetector, + Resource, + get_aggregated_resources, +) from src.utils.logging import make_logger @@ -43,7 +63,7 @@ logger = make_logger(__name__) # Global state -_meter_provider: MeterProvider | None = None # Set only when this module creates the provider +_meter_provider: MeterProvider | None = None # Set only when this module creates the provider _initialized: bool = False # Default configuration @@ -51,12 +71,67 @@ DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds +def _per_process_instance_id(resource: Resource) -> str: + """Return a worker-unique service.instance.id from detected resource attrs.""" + pid = os.getpid() + existing = resource.attributes.get("service.instance.id") + if existing: + existing = str(existing) + pid_token = f".{pid}" + if existing.endswith(pid_token) or f"{pid_token}." in existing: + return existing + return f"{existing}.{pid}" + service = ( + resource.attributes.get("service.name") + or os.environ.get("OTEL_SERVICE_NAME") + or "unknown" + ) + pod = resource.attributes.get("k8s.pod.name") or "unknown" + return f"{service}.{pod}.{pid}" + + +def _build_resource() -> Resource: + """Detect operator/k8s attrs from env; set a per-process service.instance.id.""" + resource = get_aggregated_resources([OTELResourceDetector()]) + service_instance_id = _per_process_instance_id(resource) + return resource.merge(Resource.create({"service.instance.id": service_instance_id})) + + def _global_meter_provider() -> MeterProvider | None: """Return the global MeterProvider if installed, else None (proxy is ignored).""" provider = metrics.get_meter_provider() return provider if isinstance(provider, MeterProvider) else None +def _describe_global_provider() -> tuple[str, bool]: + provider = metrics.get_meter_provider() + return type(provider).__name__, isinstance(provider, MeterProvider) + + +def _log_provider_state( + message: str, + *, + app_provider: MeterProvider | None = None, + mode: str | None = None, +) -> None: + """Emit a single structured INFO log for operator/app coexistence debugging.""" + global_type, global_is_sdk = _describe_global_provider() + global_provider = _global_meter_provider() + app_owns_global = ( + app_provider is not None + and global_provider is not None + and global_provider is app_provider + ) + parts = [ + message, + f"mode={mode or 'unknown'}", + f"global_type={global_type}", + f"global_is_sdk_meter_provider={global_is_sdk}", + f"app_owns_global={app_owns_global}", + ] + logger.info(", ".join(parts)) + + def _metrics_endpoint(explicit: str | None = None) -> str | None: if explicit: return explicit @@ -76,9 +151,21 @@ def _metrics_protocol() -> str: ) +def _http_metrics_export_url(endpoint: str) -> str: + """Return an OTLP HTTP metrics URL including the /v1/metrics path. + + OTLPHttpMetricExporter only appends that path when it resolves the endpoint + from environment variables, not when an explicit endpoint argument is passed. + """ + normalized = endpoint.rstrip("/") + if normalized.endswith("/v1/metrics"): + return normalized + return f"{normalized}/v1/metrics" + + def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter: if protocol in {"http/protobuf", "http"}: - return OTLPHttpMetricExporter(endpoint=endpoint) + return OTLPHttpMetricExporter(endpoint=_http_metrics_export_url(endpoint)) if protocol != "grpc": logger.warning("Unknown OTEL metrics protocol %r; using grpc", protocol) @@ -102,8 +189,9 @@ def init_otel_metrics( Call once at application startup. Subsequent calls return the active provider without re-initializing. - If auto-instrumentation already installed a MeterProvider, custom metrics - attach to it. Otherwise, initializes only when an OTLP endpoint is configured. + If a real SDK MeterProvider is already global, custom metrics attach to it + and set_meter_provider() is never called. Otherwise, when an OTLP endpoint + is configured, this module installs the first global provider. Args: service_name: Service name for resource attributes @@ -120,15 +208,23 @@ def init_otel_metrics( if _initialized: return _meter_provider or _global_meter_provider() + _log_provider_state("OpenTelemetry metrics init starting", mode="starting") + if existing := _global_meter_provider(): _initialized = True - logger.info("OpenTelemetry metrics using existing MeterProvider") + _log_provider_state( + "OpenTelemetry metrics using existing MeterProvider", + mode="coexistence", + ) return existing endpoint = _metrics_endpoint(otlp_endpoint) if not endpoint: _initialized = True - logger.info("OpenTelemetry metrics disabled: no OTLP endpoint configured") + _log_provider_state( + "OpenTelemetry metrics disabled: no OTLP endpoint configured", + mode="disabled", + ) return None protocol = _metrics_protocol() @@ -144,15 +240,12 @@ def init_otel_metrics( ) ) ) - resource = Resource.create( - { - SERVICE_NAME: resolved_service_name, - SERVICE_VERSION: service_version - or os.environ.get("SERVICE_VERSION", "0.1.0"), - "deployment.environment": environment - or os.environ.get("ENVIRONMENT", "development"), - } - ) + resource = _build_resource() + if not resource.attributes.get("k8s.pod.name"): + logger.warning( + "k8s.pod.name not set on MeterProvider resource; " + "ensure OTEL_RESOURCE_ATTRIBUTES is injected (OTel Operator)." + ) reader = PeriodicExportingMetricReader( exporter=_create_metric_exporter(endpoint, protocol), export_interval_millis=resolved_export_interval_ms, @@ -164,14 +257,30 @@ def init_otel_metrics( except Exception: provider.shutdown() raise - _meter_provider = provider + + global_provider = _global_meter_provider() + if global_provider is provider: + _meter_provider = provider + _initialized = True + _log_provider_state( + "OpenTelemetry metrics standalone init installed global MeterProvider: " + f"endpoint={endpoint}, protocol={protocol}, service={resolved_service_name}, " + f"interval={resolved_export_interval_ms}ms", + app_provider=provider, + mode="standalone", + ) + return _meter_provider + + # set_meter_provider() was rejected; shut down the orphan to avoid background export noise. + provider.shutdown() _initialized = True - logger.info( - f"OpenTelemetry metrics initialized: endpoint={endpoint}, " - f"protocol={protocol}, service={resolved_service_name}, " - f"interval={resolved_export_interval_ms}ms" + _log_provider_state( + "OpenTelemetry metrics standalone set_meter_provider rejected; " + "using existing global MeterProvider", + app_provider=provider, + mode="standalone_rejected", ) - return _meter_provider + return global_provider def get_meter(name: str, version: str = "0.1.0") -> Meter | None: @@ -209,11 +318,6 @@ def shutdown_otel_metrics() -> None: except Exception: logger.exception("OpenTelemetry metrics shutdown failed") finally: - if _meter_provider is not None: - try: - metrics.set_meter_provider(NoOpMeterProvider()) - except Exception: - logger.exception("Failed to reset global MeterProvider after shutdown") _meter_provider = None _initialized = False @@ -221,3 +325,71 @@ def shutdown_otel_metrics() -> None: def is_otel_configured() -> bool: """Check if metrics export is configured via environment.""" return bool(_metrics_endpoint()) + + +def _http_metrics_enabled() -> bool: + """Return whether in-process FastAPI HTTP metrics should be installed.""" + flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower() + return flag not in {"0", "false", "no", "off"} + + +def instrument_fastapi_http_metrics(app: Any) -> bool: + """ + Install in-process FastAPI HTTP server metrics (http.server.request.duration). + + Prefer :func:`configure_app_metrics`. When called directly, invoke before the + ASGI server handles any messages (lifespan startup is too late). + + Requires ``AGENTEX_OTEL_HTTP_METRICS_ENABLED=true`` and + ``OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics`` on the pod. + + Returns: + True when instrumentation was applied, False when skipped or disabled. + """ + if not _http_metrics_enabled(): + logger.info("FastAPI HTTP metrics disabled via AGENTEX_OTEL_HTTP_METRICS_ENABLED") + return False + + if getattr(app, "_is_instrumented_by_opentelemetry", False): + logger.info( + "FastAPI already instrumented by OpenTelemetry; skipping in-process HTTP metrics" + ) + return False + + if not _initialized: + init_otel_metrics() + + if _global_meter_provider() is None and not is_otel_configured(): + logger.info( + "FastAPI HTTP metrics skipped: no MeterProvider and no OTLP endpoint configured" + ) + return False + + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + meter_provider = _global_meter_provider() + FastAPIInstrumentor.instrument_app(app, meter_provider=meter_provider) + logger.info("FastAPI in-process HTTP metrics instrumentation enabled") + return True + + +def configure_app_metrics(app: Any) -> None: + """ + Initialize OTLP metrics and optional FastAPI HTTP instrumentation. + + Call once at module import after the FastAPI app is fully configured (middleware, + routes, handlers) and before wrapping it or serving any ASGI messages. + Lifespan is too late: Starlette caches ``middleware_stack`` on the first ASGI + message (usually lifespan startup), before the lifespan handler runs. + + HTTP metrics are opt-in via ``AGENTEX_OTEL_HTTP_METRICS_ENABLED`` (default false). + Beyla/eBPF HTTP metrics are independent and use different label sets when present. + """ + init_otel_metrics() + if not _http_metrics_enabled(): + return + if not instrument_fastapi_http_metrics(app): + logger.warning( + "FastAPI HTTP metrics were not applied despite " + "AGENTEX_OTEL_HTTP_METRICS_ENABLED; see prior log lines for the skip reason" + ) diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 22b0c974..6bf90c91 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -4,6 +4,8 @@ from unittest.mock import patch +import os + import pytest from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( @@ -14,9 +16,8 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.resources import Resource -from src.utils import otel_metrics -from src.utils import cache_metrics +from opentelemetry.sdk.resources import OTELResourceDetector, Resource, get_aggregated_resources +from src.utils import cache_metrics, otel_metrics def _set_global_meter_provider(provider: object | None = None) -> None: @@ -26,9 +27,12 @@ def _set_global_meter_provider(provider: object | None = None) -> None: install the no-op proxy (unset state). """ try: + from opentelemetry.util._once import Once + if provider is None: provider = metrics._internal._ProxyMeterProvider() metrics._internal._METER_PROVIDER = provider + metrics._internal._METER_PROVIDER_SET_ONCE = Once() except AttributeError as exc: pytest.skip(f"OpenTelemetry SDK internals changed: {exc}") @@ -52,6 +56,14 @@ def _set_operator_provider() -> MeterProvider: return provider +def _enable_auto_instrumentation_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OTEL_METRICS_EXPORTER", "otlp") + monkeypatch.setenv( + "PYTHONPATH", + "/otel-auto-instrumentation-python/opentelemetry/instrumentation/auto_instrumentation", + ) + + @pytest.mark.unit def test_init_coexists_with_existing_meter_provider(): operator_provider = _set_operator_provider() @@ -98,6 +110,21 @@ def test_init_after_shutdown_in_shared_mode(): assert otel_metrics.init_otel_metrics() is operator_provider +@pytest.mark.unit +def test_init_creates_standalone_when_operator_env_but_proxy_global(monkeypatch): + """Operator injection env must not block first-setter standalone on proxy.""" + _enable_auto_instrumentation_env(monkeypatch) + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") + + result = otel_metrics.init_otel_metrics() + + assert isinstance(result, MeterProvider) + assert otel_metrics._meter_provider is result + assert metrics.get_meter_provider() is result + assert otel_metrics.get_meter("agentex.test") is not None + + @pytest.mark.unit def test_init_creates_meter_provider_when_none_configured(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -110,6 +137,138 @@ def test_init_creates_meter_provider_when_none_configured(monkeypatch): assert otel_metrics.get_meter("agentex.test") is not None +@pytest.mark.unit +def test_per_process_instance_id_extends_operator_value(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=my-pod,service.instance.id=agentex.my-pod.agentex", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._per_process_instance_id(base) == "agentex.my-pod.agentex.42" + + +@pytest.mark.unit +def test_per_process_instance_id_builds_when_missing(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", "k8s.pod.name=my-pod") + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._per_process_instance_id(base) == "agentex.my-pod.42" + + +@pytest.mark.unit +def test_build_resource_does_not_mutate_otel_resource_attributes_env(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + original = "k8s.pod.name=my-pod,service.instance.id=agentex.my-pod.agentex" + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", original) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + otel_metrics._build_resource() + assert os.environ["OTEL_RESOURCE_ATTRIBUTES"] == original + + +@pytest.mark.unit +def test_per_process_instance_id_works_for_other_services(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "payments-api") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=payments-abc,service.instance.id=payments-api.payments-abc.prod", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 7) + base = get_aggregated_resources([OTELResourceDetector()]) + assert ( + otel_metrics._per_process_instance_id(base) + == "payments-api.payments-abc.prod.7" + ) + + +@pytest.mark.unit +def test_build_resource_parses_operator_injected_pod_env(monkeypatch): + """Regression: K8s expands $(OTEL_RESOURCE_ATTRIBUTES_*) before Python starts.""" + pod_name = "agentex-ccc85c45b-s29zm" + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.container.name=agentex," + "k8s.deployment.name=agentex," + "k8s.namespace.name=agentex," + "k8s.node.name=ip-10-0-1-2.us-west-2.compute.internal," + f"k8s.pod.name={pod_name}," + "k8s.replicaset.name=agentex-ccc85c45b," + f"service.instance.id=agentex.{pod_name}.agentex," + "service.namespace=agentex," + "service.version=perf-agentex-drop-redundant-task-grant-b59b92e", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 12345) + + attrs = otel_metrics._build_resource().attributes + assert attrs.get("service.name") == "agentex" + assert attrs.get("k8s.pod.name") == pod_name + assert attrs.get("k8s.namespace.name") == "agentex" + assert attrs.get("k8s.deployment.name") == "agentex" + assert attrs.get("k8s.container.name") == "agentex" + assert attrs.get("service.instance.id") == f"agentex.{pod_name}.agentex.12345" + + +@pytest.mark.unit +def test_build_resource_from_otel_env(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=operator-pod,k8s.namespace.name=agentex," + "k8s.deployment.name=agentex,service.instance.id=agentex.operator-pod.agentex", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 6789) + + resource = otel_metrics._build_resource() + attrs = resource.attributes + assert attrs.get("service.name") == "agentex" + assert attrs.get("k8s.pod.name") == "operator-pod" + assert attrs.get("k8s.namespace.name") == "agentex" + assert attrs.get("k8s.deployment.name") == "agentex" + assert attrs.get("service.instance.id") == "agentex.operator-pod.agentex.6789" + + +@pytest.mark.unit +def test_init_coexists_without_set_meter_provider_when_operator_present( + monkeypatch, +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + operator_provider = _set_operator_provider() + + with patch.object(metrics, "set_meter_provider") as mock_set: + result = otel_metrics.init_otel_metrics() + + mock_set.assert_not_called() + assert result is operator_provider + assert metrics.get_meter_provider() is operator_provider + assert otel_metrics._meter_provider is None + assert otel_metrics.get_meter("agentex.test") is not None + + +@pytest.mark.unit +def test_standalone_shuts_down_orphan_when_set_meter_provider_rejected( + monkeypatch, +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + preexisting = MeterProvider(resource=Resource.create({})) + real_set = metrics.set_meter_provider + + def racing_set(provider: MeterProvider) -> None: + if not isinstance(metrics.get_meter_provider(), MeterProvider): + real_set(preexisting) + real_set(provider) + + with patch.object(metrics, "set_meter_provider", side_effect=racing_set): + result = otel_metrics.init_otel_metrics() + + assert result is preexisting + assert metrics.get_meter_provider() is preexisting + assert otel_metrics._meter_provider is None + assert otel_metrics.get_meter("agentex.test") is not None + + @pytest.mark.unit def test_init_retries_after_provider_creation_failure(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -232,6 +391,31 @@ def test_custom_metrics_preserve_instrument_attributes_in_shared_mode(): ) in points +@pytest.mark.unit +@pytest.mark.parametrize( + ("input_endpoint", "expected_url"), + [ + ("http://collector:4318", "http://collector:4318/v1/metrics"), + ( + "http://collector:4318/v1/metrics", + "http://collector:4318/v1/metrics", + ), + ], +) +def test_http_metrics_export_url(input_endpoint: str, expected_url: str): + assert otel_metrics._http_metrics_export_url(input_endpoint) == expected_url + + +@pytest.mark.unit +def test_create_http_metric_exporter_uses_v1_metrics_path(): + exporter = otel_metrics._create_metric_exporter( + "http://collector:4318", "http/protobuf" + ) + + assert isinstance(exporter, OTLPHttpMetricExporter) + assert exporter._endpoint == "http://collector:4318/v1/metrics" + + @pytest.mark.unit def test_init_after_shutdown_in_standalone_mode(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -242,7 +426,7 @@ def test_init_after_shutdown_in_standalone_mode(monkeypatch): second = otel_metrics.init_otel_metrics() assert second is not None - assert second is not first + assert second is first assert otel_metrics.get_meter("agentex.test") is not None @@ -270,3 +454,167 @@ def test_shutdown_only_own_provider(monkeypatch): assert otel_metrics._initialized is False assert otel_metrics._meter_provider is None + + +@pytest.mark.unit +def test_instrument_fastapi_skips_when_disabled_by_default(monkeypatch): + from fastapi import FastAPI + + monkeypatch.delenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", raising=False) + + app = FastAPI() + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_skips_when_already_instrumented(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + + app = FastAPI() + app._is_instrumented_by_opentelemetry = True # noqa: SLF001 + + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_skips_without_otel_config(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False) + + app = FastAPI() + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_applies_with_existing_provider(monkeypatch): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + otel_metrics.init_otel_metrics() + + app = FastAPI() + + @app.get("/probe") + def probe() -> dict[str, str]: + return {"ok": "true"} + + assert otel_metrics.instrument_fastapi_http_metrics(app) is True + assert getattr(app, "_is_instrumented_by_opentelemetry", False) is True + + with TestClient(app) as client: + response = client.get("/probe") + assert response.status_code == 200 + + data = reader.get_metrics_data() + assert data is not None + metric_names = { + metric.name + for resource_metrics in data.resource_metrics + for scope in resource_metrics.scope_metrics + for metric in scope.metrics + } + assert "http.server.request.duration" in metric_names + + +@pytest.mark.unit +def test_configure_app_metrics_before_first_request(monkeypatch): + """Instrument at import time, before the ASGI server builds middleware_stack.""" + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + + app = FastAPI() + + @app.post("/agents/{agent_id}/rpc") + def rpc(agent_id: str) -> dict[str, str]: + return {"ok": agent_id} + + otel_metrics.configure_app_metrics(app) + + with TestClient(app) as client: + response = client.post("/agents/test/rpc") + assert response.status_code == 200 + + data = reader.get_metrics_data() + assert data is not None + metric_names = { + metric.name + for resource_metrics in data.resource_metrics + for scope in resource_metrics.scope_metrics + for metric in scope.metrics + } + assert "http.server.request.duration" in metric_names + + +@pytest.mark.unit +def test_configure_app_metrics_in_lifespan_does_not_record_metrics(monkeypatch): + """Lifespan runs after Starlette caches middleware_stack on lifespan startup.""" + from contextlib import asynccontextmanager + + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + + app = FastAPI() + + @app.post("/x") + def x() -> dict[str, str]: + return {"ok": "true"} + + @asynccontextmanager + async def lifespan(_: FastAPI): + otel_metrics.configure_app_metrics(app) + yield + + app.router.lifespan_context = lifespan + + with TestClient(app) as client: + client.post("/x") + + assert reader.get_metrics_data() is None + + +@pytest.mark.unit +def test_instrument_fastapi_is_idempotent(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + + _set_operator_provider() + otel_metrics.init_otel_metrics() + app = FastAPI() + + assert otel_metrics.instrument_fastapi_http_metrics(app) is True + assert otel_metrics.instrument_fastapi_http_metrics(app) is False diff --git a/uv.lock b/uv.lock index 41c44065..794a39f8 100644 --- a/uv.lock +++ b/uv.lock @@ -72,6 +72,7 @@ dependencies = [ { name = "litellm", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-exporter-otlp", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation-fastapi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-sdk", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "psycopg2-binary", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pymongo", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -122,6 +123,7 @@ requires-dist = [ { name = "litellm", specifier = ">=1.83.7,<2" }, { name = "opentelemetry-api", specifier = ">=1.28.0" }, { name = "opentelemetry-exporter-otlp", specifier = ">=1.28.0" }, + { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.49b0" }, { name = "opentelemetry-sdk", specifier = ">=1.28.0" }, { name = "psycopg2-binary", specifier = ">=2.9.9,<3" }, { name = "pymongo", specifier = ">=4.13.0,<5" }, @@ -348,6 +350,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/29/5ecc3a15d5a33e31b26c11426c45c501e439cb865d0bff96315d86443b78/appnope-0.1.4-py2.py3-none-any.whl", hash = "sha256:502575ee11cd7a28c0205f379b525beefebab9d161b7c964670864014ed7213c", size = 4321, upload-time = "2024-02-06T09:43:09.663Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "asttokens" version = "3.0.0" @@ -1623,6 +1634,53 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/95/f1/b27d3e2e003cd9a3592c43d099d2ed8d0a947c15281bf8463a256db0b46c/opentelemetry_exporter_otlp_proto_http-1.39.1-py3-none-any.whl", hash = "sha256:d9f5207183dd752a412c4cd564ca8875ececba13be6e9c6c370ffb752fd59985", size = 19641, upload-time = "2025-12-11T13:32:22.248Z" }, ] +[[package]] +name = "opentelemetry-instrumentation" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "packaging", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "wrapt", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/41/0f/7e6b713ac117c1f5e4e3300748af699b9902a2e5e34c9cf443dde25a01fa/opentelemetry_instrumentation-0.60b1.tar.gz", hash = "sha256:57ddc7974c6eb35865af0426d1a17132b88b2ed8586897fee187fd5b8944bd6a", size = 31706, upload-time = "2025-12-11T13:36:42.515Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/77/d2/6788e83c5c86a2690101681aeef27eeb2a6bf22df52d3f263a22cee20915/opentelemetry_instrumentation-0.60b1-py3-none-any.whl", hash = "sha256:04480db952b48fb1ed0073f822f0ee26012b7be7c3eac1a3793122737c78632d", size = 33096, upload-time = "2025-12-11T13:35:33.067Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-util-http", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/77/db/851fa88db7441da82d50bd80f2de5ee55213782e25dc858e04d0c9961d60/opentelemetry_instrumentation_asgi-0.60b1.tar.gz", hash = "sha256:16bfbe595cd24cda309a957456d0fc2523f41bc7b076d1f2d7e98a1ad9876d6f", size = 26107, upload-time = "2025-12-11T13:36:47.015Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/76/76/1fb94367cef64420d2171157a6b9509582873bd09a6afe08a78a8d1f59d9/opentelemetry_instrumentation_asgi-0.60b1-py3-none-any.whl", hash = "sha256:d48def2dbed10294c99cfcf41ebbd0c414d390a11773a41f472d20000fcddc25", size = 16933, upload-time = "2025-12-11T13:35:40.462Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation-asgi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-util-http", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9c/e7/e7e5e50218cf488377209d85666b182fa2d4928bf52389411ceeee1b2b60/opentelemetry_instrumentation_fastapi-0.60b1.tar.gz", hash = "sha256:de608955f7ff8eecf35d056578346a5365015fd7d8623df9b1f08d1c74769c01", size = 24958, upload-time = "2025-12-11T13:36:59.35Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7d/cc/6e808328ba54662e50babdcab21138eae4250bc0fddf67d55526a615a2ca/opentelemetry_instrumentation_fastapi-0.60b1-py3-none-any.whl", hash = "sha256:af94b7a239ad1085fc3a820ecf069f67f579d7faf4c085aaa7bd9b64eafc8eaf", size = 13478, upload-time = "2025-12-11T13:36:00.811Z" }, +] + [[package]] name = "opentelemetry-proto" version = "1.39.1" @@ -1662,6 +1720,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7a/5e/5958555e09635d09b75de3c4f8b9cae7335ca545d77392ffe7331534c402/opentelemetry_semantic_conventions-0.60b1-py3-none-any.whl", hash = "sha256:9fa8c8b0c110da289809292b0591220d3a7b53c1526a23021e977d68597893fb", size = 219982, upload-time = "2025-12-11T13:32:36.955Z" }, ] +[[package]] +name = "opentelemetry-util-http" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/50/fc/c47bb04a1d8a941a4061307e1eddfa331ed4d0ab13d8a9781e6db256940a/opentelemetry_util_http-0.60b1.tar.gz", hash = "sha256:0d97152ca8c8a41ced7172d29d3622a219317f74ae6bb3027cfbdcf22c3cc0d6", size = 11053, upload-time = "2025-12-11T13:37:25.115Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/5c/d3f1733665f7cd582ef0842fb1d2ed0bc1fba10875160593342d22bba375/opentelemetry_util_http-0.60b1-py3-none-any.whl", hash = "sha256:66381ba28550c91bee14dcba8979ace443444af1ed609226634596b4b0faf199", size = 8947, upload-time = "2025-12-11T13:36:37.151Z" }, +] + [[package]] name = "orjson" version = "3.11.7"