Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/handlers/http/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use actix_web::{
http::header::ContentType,
web::{self, Json},
};
use tracing::instrument;

use crate::rbac::map::roles;
use crate::rbac::role::model::{Role, RoleType, RoleUI};
Expand Down Expand Up @@ -164,11 +165,13 @@ pub async fn delete(

// Handler for PUT /api/v1/role/default
// Delete existing role
#[instrument(name = "PUT /role/default", skip(req, name), fields(role_name))]
pub async fn put_default(
req: HttpRequest,
name: web::Json<String>,
) -> Result<impl Responder, RoleError> {
let name = name.into_inner();
tracing::Span::current().record("role_name", &name.as_str());
let tenant_id = get_tenant_id_from_request(&req);
let mut metadata = get_metadata(&tenant_id).await?;
metadata.default_role = Some(name.clone());
Expand Down Expand Up @@ -211,6 +214,7 @@ pub async fn get_default(req: HttpRequest) -> Result<impl Responder, RoleError>
Ok(web::Json(res))
}

#[instrument(name = "role::get_metadata", skip_all)]
async fn get_metadata(
tenant_id: &Option<String>,
) -> Result<crate::storage::StorageMetadata, ObjectStorageError> {
Expand All @@ -223,6 +227,7 @@ async fn get_metadata(
Ok(serde_json::from_slice::<StorageMetadata>(&metadata)?)
}

#[instrument(name = "role::put_metadata", skip_all)]
async fn put_metadata(
metadata: &StorageMetadata,
tenant_id: &Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod telemetry;
pub mod tenants;
pub mod users;
pub mod utils;
Expand Down
29 changes: 27 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::process::exit;
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use opentelemetry::trace::TracerProvider as _;
#[cfg(feature = "kafka")]
use parseable::connectors;
use parseable::{
Expand All @@ -27,13 +28,17 @@ use tokio::signal::ctrl_c;
use tokio::sync::oneshot;
use tracing::Level;
use tracing::{info, warn};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Registry, fmt};

/// Env var to read the logging level of OTel traces. Defaults to `info`.
const OTEL_TRACE_LEVEL: &str = "OTEL_TRACE_LEVEL";

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
let otel_provider = init_logger();
// Install the rustls crypto provider before any TLS operations.
// This is required for rustls 0.23+ which needs an explicit crypto provider.
// If the installation fails, log a warning but continue execution.
Expand Down Expand Up @@ -95,10 +100,16 @@ async fn main() -> anyhow::Result<()> {
parseable_server.await?;
}

if let Some(provider) = otel_provider {
if let Err(e) = provider.shutdown() {
warn!("Failed to shutdown OTel tracer provider: {:?}", e);
}
}

Ok(())
}

pub fn init_logger() {
pub fn init_logger() -> Option<opentelemetry_sdk::trace::SdkTracerProvider> {
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let default_level = if cfg!(debug_assertions) {
Level::DEBUG
Expand All @@ -116,10 +127,24 @@ pub fn init_logger() {
.with_target(true)
.compact();

let otel_provider = parseable::telemetry::init_tracing();

let otel_layer = otel_provider.as_ref().map(|provider| {
let otel_filter =
EnvFilter::try_from_env(OTEL_TRACE_LEVEL).unwrap_or_else(|_| EnvFilter::new("info"));
let tracer = provider.tracer("parseable");
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(otel_filter)
});

Registry::default()
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.init();

otel_provider
}

#[cfg(windows)]
Expand Down
3 changes: 3 additions & 0 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
use bytes::Bytes;
use once_cell::sync::OnceCell;
use std::io;
use tracing::instrument;

use crate::{
metastore::metastore_traits::MetastoreObject,
Expand Down Expand Up @@ -319,6 +320,7 @@ pub fn get_staging_metadata(tenant_id: &Option<String>) -> io::Result<Option<Sto
Ok(Some(meta))
}

#[instrument(name = "storage::put_remote_metadata", skip_all)]
pub async fn put_remote_metadata(
metadata: &StorageMetadata,
tenant_id: &Option<String>,
Expand All @@ -330,6 +332,7 @@ pub async fn put_remote_metadata(
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))
}

#[instrument(name = "storage::put_staging_metadata", skip_all)]
pub fn put_staging_metadata(meta: &StorageMetadata, tenant_id: &Option<String>) -> io::Result<()> {
let mut staging_metadata = meta.clone();
staging_metadata.server_mode = PARSEABLE.options.mode;
Expand Down
101 changes: 101 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::{
Resource,
propagation::TraceContextPropagator,
trace::{BatchSpanProcessor, SdkTracerProvider},
};

const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";

/// Initialise an OTLP tracer provider.
///
/// **Required env var:**
/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — collector address.
/// For HTTP exporters the SDK appends the signal path automatically:
/// e.g. `http://localhost:4318` → `http://localhost:4318/v1/traces`.
/// Set a signal-specific var `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to
/// supply a full URL without any path suffix being added.
///
/// **Optional env vars (all read by the SDK automatically):**
/// - `OTEL_EXPORTER_OTLP_PROTOCOL` — transport + serialisation (default: `http/json`):
/// - `grpc` → gRPC / tonic (Jaeger, Tempo, …)
/// - `http/json` → HTTP + JSON (Parseable OSS ingest at `/v1/traces`)
/// - `http/protobuf` → HTTP + protobuf
/// - `OTEL_EXPORTER_OTLP_HEADERS` — comma-separated `key=value` pairs forwarded
/// as gRPC metadata or HTTP headers, e.g.
/// `authorization=Basic <token>,x-p-stream=my-stream,x-p-log-source=otel-traces`
///
/// Returns `None` when `OTEL_EXPORTER_OTLP_ENDPOINT` is not set (OTEL disabled).
/// The caller must call `provider.shutdown()` before process exit.
pub fn init_tracing() -> Option<SdkTracerProvider> {
// Only used to decide whether OTEL is enabled; the SDK reads it again
// from env to build the exporter (which also appends /v1/traces for HTTP).
std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok()?;

let protocol =
std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).unwrap_or_else(|_| "http/json".to_string());

// Build the exporter using the SDK's env-var-aware builders.
// We intentionally do NOT call .with_endpoint() / .with_headers() /
// .with_metadata() here — the SDK reads OTEL_EXPORTER_OTLP_ENDPOINT and
// OTEL_EXPORTER_OTLP_HEADERS from the environment automatically, which
// preserves correct path-appending behaviour for HTTP exporters.
//
// The HTTP builder reads OTEL_EXPORTER_OTLP_PROTOCOL to select between
// http/json (default) and http/protobuf automatically.
let exporter = match protocol.as_str() {
// ── gRPC ─────────────────────────────────────────────────────────────
"grpc" => SpanExporter::builder().with_tonic().build(),
// ── HTTP/JSON (default) or HTTP/Protobuf ─────────────────────────────
// The SDK reads OTEL_EXPORTER_OTLP_PROTOCOL from the environment
// to select between http/json and http/protobuf automatically.
// Default when OTEL_EXPORTER_OTLP_PROTOCOL is unset is http/json,
// which is required for Parseable OSS — it only accepts application/json.
_ => SpanExporter::builder().with_http().build(),
};

let exporter = exporter
.map_err(|e| tracing::warn!("Failed to build OTEL span exporter: {}", e))
.ok()?;

let resource = Resource::builder_empty()
.with_service_name("parseable")
.build();

let processor = BatchSpanProcessor::builder(exporter).build();

let provider = SdkTracerProvider::builder()
.with_span_processor(processor)
.with_resource(resource)
.build();

opentelemetry::global::set_tracer_provider(provider.clone());

// Register the W3C TraceContext propagator globally.
// This is REQUIRED for:
// - Incoming HTTP header extraction (traceparent/tracestate)
// - Cross-thread channel propagation via inject/extract
// Without this, propagator.extract() returns an empty context.
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

Some(provider)
}
Loading