Skip to content
13 changes: 12 additions & 1 deletion agentex/src/api/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# ruff: noqa: E402
# E402 suppressed: bootstrap_auto_instrumentation() must run before imports of
# auto-instrumented libraries (FastAPI, httpx, SQLAlchemy, etc.).

from src.utils.otel_metrics import (
bootstrap_auto_instrumentation,
init_otel_metrics,
shutdown_otel_metrics,
)

bootstrap_auto_instrumentation()

import os
from contextlib import asynccontextmanager
from pathlib import Path
Expand Down Expand Up @@ -38,7 +50,6 @@
from src.config.environment_variables import EnvVarKeys
from src.domain.exceptions import GenericException
from src.utils.logging import make_logger
from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics

logger = make_logger(__name__)

Expand Down
169 changes: 144 additions & 25 deletions agentex/src/utils/otel_metrics.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
"""
OpenTelemetry metrics configuration for Agentex.

When auto-instrumentation (e.g. OTel Operator) has already installed a global
MeterProvider, custom app metrics attach to it instead of replacing it.
Otherwise this module creates its own provider with OTLP export when an endpoint
is configured.

Environment Variables:
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to
OpenTelemetry bootstrap and custom metrics for Agentex.

Two responsibilities:

1. **Auto-instrumentation** — call ``bootstrap_auto_instrumentation()`` from
``app.py`` before importing FastAPI or other auto-instrumented libraries so
``initialize()`` runs in each uvicorn spawn worker when contrib packages
are installed.

2. **Custom app metrics** — ``init_otel_metrics()`` registers Agentex instruments
(``auth_cache_*``, ``db_*``, etc.). Attaches to an existing global
``MeterProvider`` from bootstrap/operator when present; otherwise creates a
standalone OTLP pipeline when an endpoint is configured.

**Datadog ``ddtrace-run`` coexistence:** Neither OTel nor ddtrace detects the other's
FastAPI patches. If both run in one process, ddtrace wraps the middleware stack
first; OTel skips ``OpenTelemetryMiddleware`` with "unexpected middleware stack"
and HTTP OTel metrics/traces are not emitted. Helm avoids this by using
``ddtrace-run`` only when ``datadog.env`` is set (OTel-only otherwise). If both
are required, set ``DD_TRACE_FASTAPI_ENABLED=false`` and
``DD_TRACE_STARLETTE_ENABLED=false`` so OTel owns HTTP instrumentation.

**Per-worker ``service.instance.id``:** Uvicorn spawn workers share pod-level
``OTEL_RESOURCE_ATTRIBUTES``, so auto-instrumentation would otherwise emit all
workers on the same metric timeseries (see `OTel #4390
<https://github.com/open-telemetry/opentelemetry-python/issues/4390>`_).
``bootstrap_auto_instrumentation()`` appends ``.<pid>`` to ``service.instance.id``
in ``OTEL_RESOURCE_ATTRIBUTES`` before ``initialize()``; standalone
``init_otel_metrics()`` applies the same via ``Resource.merge``. With
``--workers 1``, operator ``sitecustomize`` may have already called
``initialize()``; bootstrap calls it again (OTel providers and instrumentors
are set-once; duplicate calls only produce log warnings).

Environment variables (custom metrics / standalone mode):
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics endpoint (falls back to
OTEL_EXPORTER_OTLP_ENDPOINT)
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: Metrics export protocol (falls back to
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: Export protocol (falls back to
OTEL_EXPORTER_OTLP_PROTOCOL; default: grpc)
OTEL_EXPORTER_OTLP_ENDPOINT: General OTLP endpoint URL
OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication
OTEL_SERVICE_NAME: Service name for metrics (default: agentex)
OTEL_EXPORTER_OTLP_HEADERS: Passed through by OTLP exporters when set
OTEL_SERVICE_NAME: Service name (default: agentex)
OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000)
"""

Expand All @@ -23,7 +49,6 @@
from typing import TYPE_CHECKING

from opentelemetry import metrics
from opentelemetry.metrics import NoOpMeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter as OTLPGrpcMetricExporter,
)
Expand All @@ -32,7 +57,13 @@
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.resources import (
SERVICE_NAME,
SERVICE_VERSION,
OTELResourceDetector,
Resource,
get_aggregated_resources,
)

from src.utils.logging import make_logger

Expand All @@ -42,15 +73,102 @@

logger = make_logger(__name__)

# Global state
_meter_provider: MeterProvider | None = None # Set only when this module creates the provider
# Module state
_auto_instrumentation_bootstrapped = False
_meter_provider: MeterProvider | None = None # Set only when this module creates the provider
_initialized: bool = False

# Default configuration
DEFAULT_SERVICE_NAME = "agentex"
DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds


def _detected_resource() -> Resource:
"""Resource attributes from OTEL_* env (operator-injected or local)."""
return get_aggregated_resources([OTELResourceDetector()])


def _unique_instance_id(resource: Resource) -> str:
"""Worker-unique service.instance.id (OTel #4390)."""
pid = os.getpid()
existing = resource.attributes.get("service.instance.id")
if existing:
existing = str(existing)
suffix = f".{pid}"
return existing if existing.endswith(suffix) else f"{existing}{suffix}"
service = (
resource.attributes.get("service.name")
or os.environ.get("OTEL_SERVICE_NAME")
or "unknown"
)
pod = resource.attributes.get("k8s.pod.name") or "unknown"
return f"{service}.{pod}.{pid}"


def _sync_instance_id_to_env(instance_id: str) -> None:
"""Write service.instance.id into OTEL_RESOURCE_ATTRIBUTES for auto-instrumentation."""
key = "service.instance.id"
parts = [
part.strip()
for part in os.environ.get("OTEL_RESOURCE_ATTRIBUTES", "").split(",")
if part.strip() and not part.strip().startswith(f"{key}=")
]
parts.append(f"{key}={instance_id}")
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = ",".join(parts)


# --- Auto-instrumentation bootstrap ---


def bootstrap_auto_instrumentation() -> bool:
"""Call ``initialize()`` once per process when auto-instrumentation is available.

Call from ``app.py`` before any auto-instrumented library (FastAPI, httpx,
SQLAlchemy, etc.) — instrumentors patch at import time. Each uvicorn spawn
worker imports ``app.py`` fresh, so one call per worker is enough.

Runs when: contrib packages are installed (no ``ImportError``).
Skips when: bootstrap already succeeded in this process.
On ``ImportError`` or ``initialize()`` failure, returns False and leaves
the flag unset so a later call can retry.

Export config, ``OTEL_SDK_DISABLED``, and disabled instrumentations are
handled inside ``initialize()`` — not gated here. Custom app metrics use
``init_otel_metrics()`` separately.

Returns:
True if ``initialize()`` completed; False if skipped or failed.
"""
global _auto_instrumentation_bootstrapped

if _auto_instrumentation_bootstrapped:
return False

try:
from opentelemetry.instrumentation.auto_instrumentation import initialize
except ImportError:
return False
Comment thread
greptile-apps[bot] marked this conversation as resolved.

try:
_sync_instance_id_to_env(_unique_instance_id(_detected_resource()))
initialize()
except Exception:
logger.warning(
"OpenTelemetry auto-instrumentation bootstrap failed; continuing without it",
exc_info=True,
)
return False

_auto_instrumentation_bootstrapped = True
logger.debug(
"OpenTelemetry auto-instrumentation bootstrapped (pid=%s)",
os.getpid(),
)
return True


# --- Custom application metrics ---


def _global_meter_provider() -> MeterProvider | None:
"""Return the global MeterProvider if installed, else None (proxy is ignored)."""
provider = metrics.get_meter_provider()
Expand Down Expand Up @@ -152,6 +270,10 @@ def init_otel_metrics(
"deployment.environment": environment
or os.environ.get("ENVIRONMENT", "development"),
}
).merge(
Resource.create(
{"service.instance.id": _unique_instance_id(_detected_resource())}
)
)
reader = PeriodicExportingMetricReader(
exporter=_create_metric_exporter(endpoint, protocol),
Expand All @@ -167,9 +289,11 @@ def init_otel_metrics(
_meter_provider = provider
_initialized = True
logger.info(
f"OpenTelemetry metrics initialized: endpoint={endpoint}, "
f"protocol={protocol}, service={resolved_service_name}, "
f"interval={resolved_export_interval_ms}ms"
"OpenTelemetry metrics initialized: endpoint=%s, protocol=%s, service=%s, interval=%sms",
endpoint,
protocol,
resolved_service_name,
resolved_export_interval_ms,
)
return _meter_provider

Expand Down Expand Up @@ -209,11 +333,6 @@ def shutdown_otel_metrics() -> None:
except Exception:
logger.exception("OpenTelemetry metrics shutdown failed")
finally:
if _meter_provider is not None:
try:
metrics.set_meter_provider(NoOpMeterProvider())
except Exception:
logger.exception("Failed to reset global MeterProvider after shutdown")
_meter_provider = None
_initialized = False

Expand Down
Loading
Loading