Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agentex/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"opentelemetry-api>=1.28.0",
"opentelemetry-sdk>=1.28.0",
"opentelemetry-exporter-otlp>=1.28.0",
"opentelemetry-instrumentation-fastapi>=0.49b0",
"pyyaml>=6.0,<7",
]

Expand Down
8 changes: 4 additions & 4 deletions agentex/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from src.config.environment_variables import EnvVarKeys
from src.domain.exceptions import GenericException
from src.utils.logging import make_logger
from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics
from src.utils.otel_metrics import configure_app_metrics, shutdown_otel_metrics

logger = make_logger(__name__)

Expand Down Expand Up @@ -73,9 +73,6 @@ def __init__(

@asynccontextmanager
async def lifespan(_: FastAPI):
# Initialize OpenTelemetry metrics first (before dependencies register instruments)
init_otel_metrics()

await dependencies.startup_global_dependencies()
configure_statsd()

Expand Down Expand Up @@ -195,6 +192,9 @@ async def handle_unexpected(request, exc):
fastapi_app.include_router(checkpoints.router)
fastapi_app.include_router(task_retention.router)

# Instrument before the first ASGI message; lifespan startup is too late.
configure_app_metrics(fastapi_app)

# Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses.
# This must be the outermost layer to bypass all middleware.
# Export as `app` so existing uvicorn entry points (app:app) work without changes.
Expand Down
238 changes: 205 additions & 33 deletions agentex/src/utils/otel_metrics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""
OpenTelemetry metrics configuration for Agentex.

When auto-instrumentation (e.g. OTel Operator) has already installed a global
MeterProvider, custom app metrics attach to it instead of replacing it.
Otherwise this module creates its own provider with OTLP export when an endpoint
is configured.
The Python OTel SDK exposes a single global MeterProvider (set once). This module
uses two deterministic paths:

1. **Coexistence** — a real SDK MeterProvider is already global (e.g. from OTel
Operator auto-instrumentation). Custom app metrics attach via get_meter();
set_meter_provider() is never called.
2. **Standalone** — global is still the SDK proxy and an OTLP endpoint is
configured. This module creates the first MeterProvider. Auto-instrumentation
and custom metrics then share that same global slot.

Environment Variables:
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to
Expand All @@ -15,15 +20,26 @@
OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication
OTEL_SERVICE_NAME: Service name for metrics (default: agentex)
OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000)
OTEL_RESOURCE_ATTRIBUTES: K8s and service resource attrs (injected by the OTel
Operator on cluster pods; read via OTELResourceDetector). Per-process
service.instance.id is applied via Resource.merge when this module creates
the MeterProvider; env is not modified.
AGENTEX_OTEL_HTTP_METRICS_ENABLED: Opt-in in-process FastAPI HTTP metrics
(default: false). When true, also set
OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics on the pod
so the OTel Operator does not double-instrument FastAPI.
OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: Pod env; must include ``fastapi`` when
using in-process HTTP metrics (see above).
DD_TRACE_FASTAPI_ENABLED: Set to ``false`` when using ddtrace-run so ddtrace
does not claim FastAPI before OpenTelemetry instrumentation.
"""

from __future__ import annotations

import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from opentelemetry import metrics
from opentelemetry.metrics import NoOpMeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter as OTLPGrpcMetricExporter,
)
Expand All @@ -32,7 +48,11 @@
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource
from opentelemetry.sdk.resources import (
OTELResourceDetector,
Resource,
get_aggregated_resources,
)

from src.utils.logging import make_logger

Expand All @@ -43,20 +63,75 @@
logger = make_logger(__name__)

# Global state
_meter_provider: MeterProvider | None = None # Set only when this module creates the provider
_meter_provider: MeterProvider | None = None # Set only when this module creates the provider
_initialized: bool = False

# Default configuration
DEFAULT_SERVICE_NAME = "agentex"
DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds


def _per_process_instance_id(resource: Resource) -> str:
"""Return a worker-unique service.instance.id from detected resource attrs."""
pid = os.getpid()
existing = resource.attributes.get("service.instance.id")
if existing:
existing = str(existing)
pid_token = f".{pid}"
if existing.endswith(pid_token) or f"{pid_token}." in existing:
return existing
return f"{existing}.{pid}"
service = (
resource.attributes.get("service.name")
or os.environ.get("OTEL_SERVICE_NAME")
or "unknown"
)
pod = resource.attributes.get("k8s.pod.name") or "unknown"
return f"{service}.{pod}.{pid}"


def _build_resource() -> Resource:
"""Detect operator/k8s attrs from env; set a per-process service.instance.id."""
resource = get_aggregated_resources([OTELResourceDetector()])
service_instance_id = _per_process_instance_id(resource)
return resource.merge(Resource.create({"service.instance.id": service_instance_id}))


def _global_meter_provider() -> MeterProvider | None:
"""Return the global MeterProvider if installed, else None (proxy is ignored)."""
provider = metrics.get_meter_provider()
return provider if isinstance(provider, MeterProvider) else None


def _describe_global_provider() -> tuple[str, bool]:
provider = metrics.get_meter_provider()
return type(provider).__name__, isinstance(provider, MeterProvider)


def _log_provider_state(
message: str,
*,
app_provider: MeterProvider | None = None,
mode: str | None = None,
) -> None:
"""Emit a single structured INFO log for operator/app coexistence debugging."""
global_type, global_is_sdk = _describe_global_provider()
global_provider = _global_meter_provider()
app_owns_global = (
app_provider is not None
and global_provider is not None
and global_provider is app_provider
)
parts = [
message,
f"mode={mode or 'unknown'}",
f"global_type={global_type}",
f"global_is_sdk_meter_provider={global_is_sdk}",
f"app_owns_global={app_owns_global}",
]
logger.info(", ".join(parts))


def _metrics_endpoint(explicit: str | None = None) -> str | None:
if explicit:
return explicit
Expand All @@ -76,9 +151,21 @@ def _metrics_protocol() -> str:
)


def _http_metrics_export_url(endpoint: str) -> str:
"""Return an OTLP HTTP metrics URL including the /v1/metrics path.

OTLPHttpMetricExporter only appends that path when it resolves the endpoint
from environment variables, not when an explicit endpoint argument is passed.
"""
normalized = endpoint.rstrip("/")
if normalized.endswith("/v1/metrics"):
return normalized
return f"{normalized}/v1/metrics"


def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter:
if protocol in {"http/protobuf", "http"}:
return OTLPHttpMetricExporter(endpoint=endpoint)
return OTLPHttpMetricExporter(endpoint=_http_metrics_export_url(endpoint))

if protocol != "grpc":
logger.warning("Unknown OTEL metrics protocol %r; using grpc", protocol)
Expand All @@ -102,8 +189,9 @@ def init_otel_metrics(
Call once at application startup. Subsequent calls return the active provider
without re-initializing.

If auto-instrumentation already installed a MeterProvider, custom metrics
attach to it. Otherwise, initializes only when an OTLP endpoint is configured.
If a real SDK MeterProvider is already global, custom metrics attach to it
and set_meter_provider() is never called. Otherwise, when an OTLP endpoint
is configured, this module installs the first global provider.

Args:
service_name: Service name for resource attributes
Expand All @@ -120,15 +208,23 @@ def init_otel_metrics(
if _initialized:
return _meter_provider or _global_meter_provider()

_log_provider_state("OpenTelemetry metrics init starting", mode="starting")

if existing := _global_meter_provider():
_initialized = True
logger.info("OpenTelemetry metrics using existing MeterProvider")
_log_provider_state(
"OpenTelemetry metrics using existing MeterProvider",
mode="coexistence",
)
return existing

endpoint = _metrics_endpoint(otlp_endpoint)
if not endpoint:
_initialized = True
logger.info("OpenTelemetry metrics disabled: no OTLP endpoint configured")
_log_provider_state(
"OpenTelemetry metrics disabled: no OTLP endpoint configured",
mode="disabled",
)
return None

protocol = _metrics_protocol()
Expand All @@ -144,15 +240,12 @@ def init_otel_metrics(
)
)
)
resource = Resource.create(
{
SERVICE_NAME: resolved_service_name,
SERVICE_VERSION: service_version
or os.environ.get("SERVICE_VERSION", "0.1.0"),
"deployment.environment": environment
or os.environ.get("ENVIRONMENT", "development"),
}
)
resource = _build_resource()
if not resource.attributes.get("k8s.pod.name"):
logger.warning(
"k8s.pod.name not set on MeterProvider resource; "
"ensure OTEL_RESOURCE_ATTRIBUTES is injected (OTel Operator)."
)
reader = PeriodicExportingMetricReader(
exporter=_create_metric_exporter(endpoint, protocol),
export_interval_millis=resolved_export_interval_ms,
Expand All @@ -164,14 +257,30 @@ def init_otel_metrics(
except Exception:
provider.shutdown()
raise
_meter_provider = provider

global_provider = _global_meter_provider()
if global_provider is provider:
_meter_provider = provider
_initialized = True
_log_provider_state(
"OpenTelemetry metrics standalone init installed global MeterProvider: "
f"endpoint={endpoint}, protocol={protocol}, service={resolved_service_name}, "
f"interval={resolved_export_interval_ms}ms",
app_provider=provider,
mode="standalone",
)
return _meter_provider

# set_meter_provider() was rejected; shut down the orphan to avoid background export noise.
provider.shutdown()
_initialized = True
logger.info(
f"OpenTelemetry metrics initialized: endpoint={endpoint}, "
f"protocol={protocol}, service={resolved_service_name}, "
f"interval={resolved_export_interval_ms}ms"
_log_provider_state(
"OpenTelemetry metrics standalone set_meter_provider rejected; "
"using existing global MeterProvider",
app_provider=provider,
mode="standalone_rejected",
)
return _meter_provider
return global_provider


def get_meter(name: str, version: str = "0.1.0") -> Meter | None:
Expand Down Expand Up @@ -209,15 +318,78 @@ def shutdown_otel_metrics() -> None:
except Exception:
logger.exception("OpenTelemetry metrics shutdown failed")
finally:
if _meter_provider is not None:
try:
metrics.set_meter_provider(NoOpMeterProvider())
except Exception:
logger.exception("Failed to reset global MeterProvider after shutdown")
_meter_provider = None
_initialized = False


def is_otel_configured() -> bool:
"""Check if metrics export is configured via environment."""
return bool(_metrics_endpoint())


def _http_metrics_enabled() -> bool:
"""Return whether in-process FastAPI HTTP metrics should be installed."""
flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower()
return flag not in {"0", "false", "no", "off"}
Comment on lines +330 to +333

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Deny-list silently enables opt-in feature on unexpected values

Any value not in {"0", "false", "no", "off"} — including an empty string — returns True and enables HTTP metrics. In Kubernetes it is common to inherit env vars from a ConfigMap where a key exists but its value is empty (e.g. AGENTEX_OTEL_HTTP_METRICS_ENABLED: ""). That would silently enable in-process HTTP instrumentation and trigger double-instrumentation with the OTel Operator, producing duplicate http_server_* timeseries and potential cardinality issues.

An allow-list approach is safer for an opt-in flag: return flag in {"1", "true", "yes", "on"}.

Suggested change
def _http_metrics_enabled() -> bool:
"""Return whether in-process FastAPI HTTP metrics should be installed."""
flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower()
return flag not in {"0", "false", "no", "off"}
def _http_metrics_enabled() -> bool:
"""Return whether in-process FastAPI HTTP metrics should be installed."""
flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower()
return flag in {"1", "true", "yes", "on"}
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/utils/otel_metrics.py
Line: 345-348

Comment:
**Deny-list silently enables opt-in feature on unexpected values**

Any value not in `{"0", "false", "no", "off"}` — including an empty string — returns `True` and enables HTTP metrics. In Kubernetes it is common to inherit env vars from a `ConfigMap` where a key exists but its value is empty (e.g. `AGENTEX_OTEL_HTTP_METRICS_ENABLED: ""`). That would silently enable in-process HTTP instrumentation and trigger double-instrumentation with the OTel Operator, producing duplicate `http_server_*` timeseries and potential cardinality issues.

An allow-list approach is safer for an opt-in flag: `return flag in {"1", "true", "yes", "on"}`.

```suggestion
def _http_metrics_enabled() -> bool:
    """Return whether in-process FastAPI HTTP metrics should be installed."""
    flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower()
    return flag in {"1", "true", "yes", "on"}
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex



def instrument_fastapi_http_metrics(app: Any) -> bool:
"""
Install in-process FastAPI HTTP server metrics (http.server.request.duration).

Prefer :func:`configure_app_metrics`. When called directly, invoke before the
ASGI server handles any messages (lifespan startup is too late).

Requires ``AGENTEX_OTEL_HTTP_METRICS_ENABLED=true`` and
``OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics`` on the pod.

Returns:
True when instrumentation was applied, False when skipped or disabled.
"""
if not _http_metrics_enabled():
logger.info("FastAPI HTTP metrics disabled via AGENTEX_OTEL_HTTP_METRICS_ENABLED")
return False

if getattr(app, "_is_instrumented_by_opentelemetry", False):
logger.info(
"FastAPI already instrumented by OpenTelemetry; skipping in-process HTTP metrics"
)
return False

if not _initialized:
init_otel_metrics()

if _global_meter_provider() is None and not is_otel_configured():
logger.info(
"FastAPI HTTP metrics skipped: no MeterProvider and no OTLP endpoint configured"
)
return False

from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

meter_provider = _global_meter_provider()
FastAPIInstrumentor.instrument_app(app, meter_provider=meter_provider)
logger.info("FastAPI in-process HTTP metrics instrumentation enabled")
return True


def configure_app_metrics(app: Any) -> None:
"""
Initialize OTLP metrics and optional FastAPI HTTP instrumentation.

Call once at module import after the FastAPI app is fully configured (middleware,
routes, handlers) and before wrapping it or serving any ASGI messages.
Lifespan is too late: Starlette caches ``middleware_stack`` on the first ASGI
message (usually lifespan startup), before the lifespan handler runs.

HTTP metrics are opt-in via ``AGENTEX_OTEL_HTTP_METRICS_ENABLED`` (default false).
Beyla/eBPF HTTP metrics are independent and use different label sets when present.
"""
init_otel_metrics()
if not _http_metrics_enabled():
return
if not instrument_fastapi_http_metrics(app):
logger.warning(
"FastAPI HTTP metrics were not applied despite "
"AGENTEX_OTEL_HTTP_METRICS_ENABLED; see prior log lines for the skip reason"
)
Loading
Loading