diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index abe8f3dc9..07f45fa47 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -24,6 +24,7 @@ use actix_web::{ http::header::ContentType, web::{self, Json}, }; +use tracing::instrument; use crate::rbac::map::roles; use crate::rbac::role::model::{Role, RoleType, RoleUI}; @@ -164,11 +165,13 @@ pub async fn delete( // Handler for PUT /api/v1/role/default // Delete existing role +#[instrument(name = "PUT /role/default", skip(req, name), fields(role_name))] pub async fn put_default( req: HttpRequest, name: web::Json, ) -> Result { let name = name.into_inner(); + tracing::Span::current().record("role_name", &name.as_str()); let tenant_id = get_tenant_id_from_request(&req); let mut metadata = get_metadata(&tenant_id).await?; metadata.default_role = Some(name.clone()); @@ -211,6 +214,7 @@ pub async fn get_default(req: HttpRequest) -> Result Ok(web::Json(res)) } +#[instrument(name = "role::get_metadata", skip_all)] async fn get_metadata( tenant_id: &Option, ) -> Result { @@ -223,6 +227,7 @@ async fn get_metadata( Ok(serde_json::from_slice::(&metadata)?) } +#[instrument(name = "role::put_metadata", skip_all)] async fn put_metadata( metadata: &StorageMetadata, tenant_id: &Option, diff --git a/src/lib.rs b/src/lib.rs index 1ec0c4862..2b383cd71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ mod static_schema; mod stats; pub mod storage; pub mod sync; +pub mod telemetry; pub mod tenants; pub mod users; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 42cba34f5..359f0f98e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use std::process::exit; * along with this program. If not, see . * */ +use opentelemetry::trace::TracerProvider as _; #[cfg(feature = "kafka")] use parseable::connectors; use parseable::{ @@ -27,13 +28,17 @@ use tokio::signal::ctrl_c; use tokio::sync::oneshot; use tracing::Level; use tracing::{info, warn}; +use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry, fmt}; +/// Env var to read the logging level of OTel traces. Defaults to `info`. +const OTEL_TRACE_LEVEL: &str = "OTEL_TRACE_LEVEL"; + #[actix_web::main] async fn main() -> anyhow::Result<()> { - init_logger(); + let otel_provider = init_logger(); // Install the rustls crypto provider before any TLS operations. // This is required for rustls 0.23+ which needs an explicit crypto provider. // If the installation fails, log a warning but continue execution. @@ -95,10 +100,16 @@ async fn main() -> anyhow::Result<()> { parseable_server.await?; } + if let Some(provider) = otel_provider { + if let Err(e) = provider.shutdown() { + warn!("Failed to shutdown OTel tracer provider: {:?}", e); + } + } + Ok(()) } -pub fn init_logger() { +pub fn init_logger() -> Option { let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| { let default_level = if cfg!(debug_assertions) { Level::DEBUG @@ -116,10 +127,24 @@ pub fn init_logger() { .with_target(true) .compact(); + let otel_provider = parseable::telemetry::init_tracing(); + + let otel_layer = otel_provider.as_ref().map(|provider| { + let otel_filter = + EnvFilter::try_from_env(OTEL_TRACE_LEVEL).unwrap_or_else(|_| EnvFilter::new("info")); + let tracer = provider.tracer("parseable"); + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(otel_filter) + }); + Registry::default() .with(filter_layer) .with(fmt_layer) + .with(otel_layer) .init(); + + otel_provider } #[cfg(windows)] diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index a04008a61..1f9ee7dd9 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -25,6 +25,7 @@ use std::{ use bytes::Bytes; use once_cell::sync::OnceCell; use std::io; +use tracing::instrument; use crate::{ metastore::metastore_traits::MetastoreObject, @@ -319,6 +320,7 @@ pub fn get_staging_metadata(tenant_id: &Option) -> io::Result, @@ -330,6 +332,7 @@ pub async fn put_remote_metadata( .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail()))) } +#[instrument(name = "storage::put_staging_metadata", skip_all)] pub fn put_staging_metadata(meta: &StorageMetadata, tenant_id: &Option) -> io::Result<()> { let mut staging_metadata = meta.clone(); staging_metadata.server_mode = PARSEABLE.options.mode; diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 000000000..a5f3d7860 --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,101 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_otlp::SpanExporter; +use opentelemetry_sdk::{ + Resource, + propagation::TraceContextPropagator, + trace::{BatchSpanProcessor, SdkTracerProvider}, +}; + +const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; +const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; + +/// Initialise an OTLP tracer provider. +/// +/// **Required env var:** +/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — collector address. +/// For HTTP exporters the SDK appends the signal path automatically: +/// e.g. `http://localhost:4318` → `http://localhost:4318/v1/traces`. +/// Set a signal-specific var `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to +/// supply a full URL without any path suffix being added. +/// +/// **Optional env vars (all read by the SDK automatically):** +/// - `OTEL_EXPORTER_OTLP_PROTOCOL` — transport + serialisation (default: `http/json`): +/// - `grpc` → gRPC / tonic (Jaeger, Tempo, …) +/// - `http/json` → HTTP + JSON (Parseable OSS ingest at `/v1/traces`) +/// - `http/protobuf` → HTTP + protobuf +/// - `OTEL_EXPORTER_OTLP_HEADERS` — comma-separated `key=value` pairs forwarded +/// as gRPC metadata or HTTP headers, e.g. +/// `authorization=Basic ,x-p-stream=my-stream,x-p-log-source=otel-traces` +/// +/// Returns `None` when `OTEL_EXPORTER_OTLP_ENDPOINT` is not set (OTEL disabled). +/// The caller must call `provider.shutdown()` before process exit. +pub fn init_tracing() -> Option { + // Only used to decide whether OTEL is enabled; the SDK reads it again + // from env to build the exporter (which also appends /v1/traces for HTTP). + std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok()?; + + let protocol = + std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).unwrap_or_else(|_| "http/json".to_string()); + + // Build the exporter using the SDK's env-var-aware builders. + // We intentionally do NOT call .with_endpoint() / .with_headers() / + // .with_metadata() here — the SDK reads OTEL_EXPORTER_OTLP_ENDPOINT and + // OTEL_EXPORTER_OTLP_HEADERS from the environment automatically, which + // preserves correct path-appending behaviour for HTTP exporters. + // + // The HTTP builder reads OTEL_EXPORTER_OTLP_PROTOCOL to select between + // http/json (default) and http/protobuf automatically. + let exporter = match protocol.as_str() { + // ── gRPC ───────────────────────────────────────────────────────────── + "grpc" => SpanExporter::builder().with_tonic().build(), + // ── HTTP/JSON (default) or HTTP/Protobuf ───────────────────────────── + // The SDK reads OTEL_EXPORTER_OTLP_PROTOCOL from the environment + // to select between http/json and http/protobuf automatically. + // Default when OTEL_EXPORTER_OTLP_PROTOCOL is unset is http/json, + // which is required for Parseable OSS — it only accepts application/json. + _ => SpanExporter::builder().with_http().build(), + }; + + let exporter = exporter + .map_err(|e| tracing::warn!("Failed to build OTEL span exporter: {}", e)) + .ok()?; + + let resource = Resource::builder_empty() + .with_service_name("parseable") + .build(); + + let processor = BatchSpanProcessor::builder(exporter).build(); + + let provider = SdkTracerProvider::builder() + .with_span_processor(processor) + .with_resource(resource) + .build(); + + opentelemetry::global::set_tracer_provider(provider.clone()); + + // Register the W3C TraceContext propagator globally. + // This is REQUIRED for: + // - Incoming HTTP header extraction (traceparent/tracestate) + // - Cross-thread channel propagation via inject/extract + // Without this, propagator.extract() returns an empty context. + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + Some(provider) +}