diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index 76f80015..41a9eada 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -1,3 +1,15 @@ +# ruff: noqa: E402 +# E402 suppressed: bootstrap_auto_instrumentation() must run before imports of +# auto-instrumented libraries (FastAPI, httpx, SQLAlchemy, etc.). + +from src.utils.otel_metrics import ( + bootstrap_auto_instrumentation, + init_otel_metrics, + shutdown_otel_metrics, +) + +bootstrap_auto_instrumentation() + import os from contextlib import asynccontextmanager from pathlib import Path @@ -38,7 +50,6 @@ from src.config.environment_variables import EnvVarKeys from src.domain.exceptions import GenericException from src.utils.logging import make_logger -from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics logger = make_logger(__name__) diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index e676fbe9..f286e9f1 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -1,19 +1,45 @@ """ -OpenTelemetry metrics configuration for Agentex. - -When auto-instrumentation (e.g. OTel Operator) has already installed a global -MeterProvider, custom app metrics attach to it instead of replacing it. -Otherwise this module creates its own provider with OTLP export when an endpoint -is configured. - -Environment Variables: - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to +OpenTelemetry bootstrap and custom metrics for Agentex. + +Two responsibilities: + +1. **Auto-instrumentation** — call ``bootstrap_auto_instrumentation()`` from + ``app.py`` before importing FastAPI or other auto-instrumented libraries so + ``initialize()`` runs in each uvicorn spawn worker when contrib packages + are installed. + +2. **Custom app metrics** — ``init_otel_metrics()`` registers Agentex instruments + (``auth_cache_*``, ``db_*``, etc.). Attaches to an existing global + ``MeterProvider`` from bootstrap/operator when present; otherwise creates a + standalone OTLP pipeline when an endpoint is configured. + +**Datadog ``ddtrace-run`` coexistence:** Neither OTel nor ddtrace detects the other's +FastAPI patches. If both run in one process, ddtrace wraps the middleware stack +first; OTel skips ``OpenTelemetryMiddleware`` with "unexpected middleware stack" +and HTTP OTel metrics/traces are not emitted. Helm avoids this by using +``ddtrace-run`` only when ``datadog.env`` is set (OTel-only otherwise). If both +are required, set ``DD_TRACE_FASTAPI_ENABLED=false`` and +``DD_TRACE_STARLETTE_ENABLED=false`` so OTel owns HTTP instrumentation. + +**Per-worker ``service.instance.id``:** Uvicorn spawn workers share pod-level +``OTEL_RESOURCE_ATTRIBUTES``, so auto-instrumentation would otherwise emit all +workers on the same metric timeseries (see `OTel #4390 +`_). +``bootstrap_auto_instrumentation()`` appends ``.`` to ``service.instance.id`` +in ``OTEL_RESOURCE_ATTRIBUTES`` before ``initialize()``; standalone +``init_otel_metrics()`` applies the same via ``Resource.merge``. With +``--workers 1``, operator ``sitecustomize`` may have already called +``initialize()``; bootstrap calls it again (OTel providers and instrumentors +are set-once; duplicate calls only produce log warnings). + +Environment variables (custom metrics / standalone mode): + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics endpoint (falls back to OTEL_EXPORTER_OTLP_ENDPOINT) - OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: Metrics export protocol (falls back to + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: Export protocol (falls back to OTEL_EXPORTER_OTLP_PROTOCOL; default: grpc) OTEL_EXPORTER_OTLP_ENDPOINT: General OTLP endpoint URL - OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication - OTEL_SERVICE_NAME: Service name for metrics (default: agentex) + OTEL_EXPORTER_OTLP_HEADERS: Passed through by OTLP exporters when set + OTEL_SERVICE_NAME: Service name (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) """ @@ -23,7 +49,6 @@ from typing import TYPE_CHECKING from opentelemetry import metrics -from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter as OTLPGrpcMetricExporter, ) @@ -32,7 +57,13 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource +from opentelemetry.sdk.resources import ( + SERVICE_NAME, + SERVICE_VERSION, + OTELResourceDetector, + Resource, + get_aggregated_resources, +) from src.utils.logging import make_logger @@ -42,15 +73,102 @@ logger = make_logger(__name__) -# Global state -_meter_provider: MeterProvider | None = None # Set only when this module creates the provider +# Module state +_auto_instrumentation_bootstrapped = False +_meter_provider: MeterProvider | None = None # Set only when this module creates the provider _initialized: bool = False -# Default configuration DEFAULT_SERVICE_NAME = "agentex" DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds +def _detected_resource() -> Resource: + """Resource attributes from OTEL_* env (operator-injected or local).""" + return get_aggregated_resources([OTELResourceDetector()]) + + +def _unique_instance_id(resource: Resource) -> str: + """Worker-unique service.instance.id (OTel #4390).""" + pid = os.getpid() + existing = resource.attributes.get("service.instance.id") + if existing: + existing = str(existing) + suffix = f".{pid}" + return existing if existing.endswith(suffix) else f"{existing}{suffix}" + service = ( + resource.attributes.get("service.name") + or os.environ.get("OTEL_SERVICE_NAME") + or "unknown" + ) + pod = resource.attributes.get("k8s.pod.name") or "unknown" + return f"{service}.{pod}.{pid}" + + +def _sync_instance_id_to_env(instance_id: str) -> None: + """Write service.instance.id into OTEL_RESOURCE_ATTRIBUTES for auto-instrumentation.""" + key = "service.instance.id" + parts = [ + part.strip() + for part in os.environ.get("OTEL_RESOURCE_ATTRIBUTES", "").split(",") + if part.strip() and not part.strip().startswith(f"{key}=") + ] + parts.append(f"{key}={instance_id}") + os.environ["OTEL_RESOURCE_ATTRIBUTES"] = ",".join(parts) + + +# --- Auto-instrumentation bootstrap --- + + +def bootstrap_auto_instrumentation() -> bool: + """Call ``initialize()`` once per process when auto-instrumentation is available. + + Call from ``app.py`` before any auto-instrumented library (FastAPI, httpx, + SQLAlchemy, etc.) — instrumentors patch at import time. Each uvicorn spawn + worker imports ``app.py`` fresh, so one call per worker is enough. + + Runs when: contrib packages are installed (no ``ImportError``). + Skips when: bootstrap already succeeded in this process. + On ``ImportError`` or ``initialize()`` failure, returns False and leaves + the flag unset so a later call can retry. + + Export config, ``OTEL_SDK_DISABLED``, and disabled instrumentations are + handled inside ``initialize()`` — not gated here. Custom app metrics use + ``init_otel_metrics()`` separately. + + Returns: + True if ``initialize()`` completed; False if skipped or failed. + """ + global _auto_instrumentation_bootstrapped + + if _auto_instrumentation_bootstrapped: + return False + + try: + from opentelemetry.instrumentation.auto_instrumentation import initialize + except ImportError: + return False + + try: + _sync_instance_id_to_env(_unique_instance_id(_detected_resource())) + initialize() + except Exception: + logger.warning( + "OpenTelemetry auto-instrumentation bootstrap failed; continuing without it", + exc_info=True, + ) + return False + + _auto_instrumentation_bootstrapped = True + logger.debug( + "OpenTelemetry auto-instrumentation bootstrapped (pid=%s)", + os.getpid(), + ) + return True + + +# --- Custom application metrics --- + + def _global_meter_provider() -> MeterProvider | None: """Return the global MeterProvider if installed, else None (proxy is ignored).""" provider = metrics.get_meter_provider() @@ -152,6 +270,10 @@ def init_otel_metrics( "deployment.environment": environment or os.environ.get("ENVIRONMENT", "development"), } + ).merge( + Resource.create( + {"service.instance.id": _unique_instance_id(_detected_resource())} + ) ) reader = PeriodicExportingMetricReader( exporter=_create_metric_exporter(endpoint, protocol), @@ -167,9 +289,11 @@ def init_otel_metrics( _meter_provider = provider _initialized = True logger.info( - f"OpenTelemetry metrics initialized: endpoint={endpoint}, " - f"protocol={protocol}, service={resolved_service_name}, " - f"interval={resolved_export_interval_ms}ms" + "OpenTelemetry metrics initialized: endpoint=%s, protocol=%s, service=%s, interval=%sms", + endpoint, + protocol, + resolved_service_name, + resolved_export_interval_ms, ) return _meter_provider @@ -209,11 +333,6 @@ def shutdown_otel_metrics() -> None: except Exception: logger.exception("OpenTelemetry metrics shutdown failed") finally: - if _meter_provider is not None: - try: - metrics.set_meter_provider(NoOpMeterProvider()) - except Exception: - logger.exception("Failed to reset global MeterProvider after shutdown") _meter_provider = None _initialized = False diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 22b0c974..1893b4ef 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -2,7 +2,12 @@ from __future__ import annotations -from unittest.mock import patch +import builtins +import os +from contextlib import AbstractContextManager +from types import ModuleType +from typing import Any +from unittest.mock import MagicMock, patch import pytest from opentelemetry import metrics @@ -14,9 +19,12 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.resources import Resource -from src.utils import otel_metrics -from src.utils import cache_metrics +from opentelemetry.sdk.resources import ( + OTELResourceDetector, + Resource, + get_aggregated_resources, +) +from src.utils import cache_metrics, otel_metrics def _set_global_meter_provider(provider: object | None = None) -> None: @@ -33,19 +41,160 @@ def _set_global_meter_provider(provider: object | None = None) -> None: pytest.skip(f"OpenTelemetry SDK internals changed: {exc}") +def _fake_auto_instrumentation_import( + initialize: MagicMock | None = None, +) -> tuple[MagicMock, AbstractContextManager[Any]]: + """Inject a fake auto_instrumentation module (no contrib deps required).""" + mock_initialize = initialize or MagicMock() + real_import = builtins.__import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "opentelemetry.instrumentation.auto_instrumentation": + mod = ModuleType(name) + mod.initialize = mock_initialize + return mod + return real_import(name, globals, locals, fromlist, level) + + return mock_initialize, patch.object(builtins, "__import__", side_effect=fake_import) + + +def _block_auto_instrumentation_import() -> AbstractContextManager[Any]: + """Simulate missing opentelemetry-instrumentation contrib packages.""" + real_import = builtins.__import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "opentelemetry.instrumentation.auto_instrumentation": + raise ImportError(name) + return real_import(name, globals, locals, fromlist, level) + + return patch.object(builtins, "__import__", side_effect=fake_import) + + @pytest.fixture(autouse=True) def reset_otel_metrics_state(): """Reset module and global OTel state between tests.""" saved_provider = metrics.get_meter_provider() + saved_bootstrap = otel_metrics._auto_instrumentation_bootstrapped otel_metrics.shutdown_otel_metrics() + otel_metrics._auto_instrumentation_bootstrapped = False _set_global_meter_provider() yield otel_metrics.shutdown_otel_metrics() + otel_metrics._auto_instrumentation_bootstrapped = saved_bootstrap _set_global_meter_provider(saved_provider) +@pytest.mark.unit +def test_bootstrap_skips_when_auto_instrumentation_not_installed(monkeypatch): + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + + with _block_auto_instrumentation_import(): + assert otel_metrics.bootstrap_auto_instrumentation() is False + assert otel_metrics._auto_instrumentation_bootstrapped is False + assert otel_metrics.bootstrap_auto_instrumentation() is False + + +@pytest.mark.unit +def test_bootstrap_runs_without_otlp_env(monkeypatch): + for key in list(os.environ): + if key.startswith("OTEL_EXPORTER_OTLP") and key.endswith("_ENDPOINT"): + monkeypatch.delenv(key, raising=False) + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + + mock_initialize, import_patch = _fake_auto_instrumentation_import() + with import_patch: + assert otel_metrics.bootstrap_auto_instrumentation() is True + mock_initialize.assert_called_once() + + +@pytest.mark.unit +def test_bootstrap_calls_initialize_when_packages_available(monkeypatch): + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + + mock_initialize, import_patch = _fake_auto_instrumentation_import() + with ( + import_patch, + patch.object(otel_metrics, "_sync_instance_id_to_env") as sync_env, + ): + assert otel_metrics.bootstrap_auto_instrumentation() is True + sync_env.assert_called_once() + mock_initialize.assert_called_once() + assert otel_metrics.bootstrap_auto_instrumentation() is False + + +@pytest.mark.unit +def test_bootstrap_initialize_failure_returns_false(monkeypatch): + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + + mock_initialize = MagicMock(side_effect=RuntimeError("boom")) + _, import_patch = _fake_auto_instrumentation_import(mock_initialize) + with import_patch: + assert otel_metrics.bootstrap_auto_instrumentation() is False + assert otel_metrics._auto_instrumentation_bootstrapped is False + assert otel_metrics.bootstrap_auto_instrumentation() is False + assert mock_initialize.call_count == 2 + + +@pytest.mark.unit +def test_unique_instance_id_extends_operator_value(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=my-pod,service.instance.id=agentex.my-pod.agentex", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._unique_instance_id(base) == "agentex.my-pod.agentex.42" + + +@pytest.mark.unit +def test_unique_instance_id_builds_when_missing(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", "k8s.pod.name=my-pod") + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._unique_instance_id(base) == "agentex.my-pod.42" + + +@pytest.mark.unit +def test_sync_instance_id_to_env_updates_env(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=operator-pod,service.instance.id=agentex.operator-pod.agentex", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 6789) + + otel_metrics._sync_instance_id_to_env("agentex.operator-pod.agentex.6789") + + env = os.environ["OTEL_RESOURCE_ATTRIBUTES"] + assert "service.instance.id=agentex.operator-pod.agentex.6789" in env + assert "k8s.pod.name=operator-pod" in env + + +@pytest.mark.unit +def test_init_otel_metrics_standalone_resource_has_pid_suffixed_instance_id( + monkeypatch, +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + original = ( + "k8s.pod.name=operator-pod,k8s.namespace.name=agentex," + "k8s.deployment.name=agentex,service.instance.id=agentex.operator-pod.agentex" + ) + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", original) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 6789) + + provider = otel_metrics.init_otel_metrics() + assert provider is not None + attrs = provider._sdk_config.resource.attributes + assert attrs.get("service.name") == "agentex" + assert attrs.get("service.instance.id") == "agentex.operator-pod.agentex.6789" + assert os.environ["OTEL_RESOURCE_ATTRIBUTES"] == original + + def _set_operator_provider() -> MeterProvider: provider = MeterProvider(resource=Resource.create({})) _set_global_meter_provider(provider) @@ -238,14 +387,32 @@ def test_init_after_shutdown_in_standalone_mode(monkeypatch): first = otel_metrics.init_otel_metrics() assert first is not None + assert otel_metrics._meter_provider is first + otel_metrics.shutdown_otel_metrics() + assert otel_metrics._initialized is False + assert otel_metrics._meter_provider is None second = otel_metrics.init_otel_metrics() assert second is not None - assert second is not first + assert otel_metrics._initialized is True assert otel_metrics.get_meter("agentex.test") is not None +@pytest.mark.unit +def test_shutdown_does_not_replace_global_meter_provider(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + provider = otel_metrics.init_otel_metrics() + assert provider is not None + global_before = metrics.get_meter_provider() + + with patch.object(metrics, "set_meter_provider") as set_provider: + otel_metrics.shutdown_otel_metrics() + + set_provider.assert_not_called() + assert metrics.get_meter_provider() is global_before + + @pytest.mark.unit def test_shutdown_resets_state_when_provider_shutdown_raises(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")