Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r
prometheus = { version = "0.13.4", default-features = false, features = ["process"] }
prometheus-parse = "0.2.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "registry"] }
tracing-opentelemetry = "0.32"
tracing-actix-web = "0.7"

# OpenTelemetry tracing
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["rt-tokio"] }
opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["grpc-tonic", "http-proto", "http-json"] }

# Time and Date
chrono = "0.4"
Expand Down Expand Up @@ -201,3 +208,7 @@ kafka = [
inherits = "release"
lto = "fat"
codegen-units = 1

[patch.crates-io]
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
4 changes: 4 additions & 0 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,10 @@ pub async fn mark_querier_available(domain_name: &str) {
}
}

#[tracing::instrument(
name = "send_query_request",
skip(auth_token, query_request, tenant_id)
)]
pub async fn send_query_request(
auth_token: Option<HeaderMap>,
query_request: &Query,
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub trait ParseableServer {
.wrap(prometheus.clone())
.configure(|config| Self::configure_routes(config))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(tracing_actix_web::TracingLogger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};
Expand Down
43 changes: 32 additions & 11 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::task::JoinSet;
use tracing::{error, warn};
use tracing::{Instrument, error, warn};

use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
Expand Down Expand Up @@ -79,6 +79,8 @@ pub struct Query {
/// A function to execute the query and fetch QueryResponse
/// This won't look in the cache
/// TODO: Improve this function and make this a part of the query API
#[tracing::instrument(name = "get_records_and_fields", skip(query_request, creds, tenant_id))]
#[allow(clippy::type_complexity)]
pub async fn get_records_and_fields(
query_request: &Query,
creds: &SessionKey,
Expand Down Expand Up @@ -115,6 +117,7 @@ pub async fn get_records_and_fields(
Ok((Some(records), Some(fields)))
}

#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let mut session_state = QUERY_SESSION.get_ctx().state();
let time_range =
Expand Down Expand Up @@ -179,6 +182,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
///
/// # Returns
/// - `HttpResponse` with the count result as JSON, including fields if requested.
#[tracing::instrument(name = "handle_count_query", skip(query_request, time), fields(table = %table_name))]
async fn handle_count_query(
query_request: &Query,
table_name: &str,
Expand Down Expand Up @@ -230,6 +234,10 @@ async fn handle_count_query(
///
/// # Returns
/// - `HttpResponse` with the full query result as a JSON object.
#[tracing::instrument(
name = "handle_non_streaming_query",
skip(query, query_request, time, table_name, tenant_id)
)]
async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand Down Expand Up @@ -283,6 +291,10 @@ async fn handle_non_streaming_query(
///
/// # Returns
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
#[tracing::instrument(
name = "handle_streaming_query",
skip(query, query_request, time, table_name, tenant_id)
)]
async fn handle_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand Down Expand Up @@ -367,6 +379,7 @@ fn create_batch_processor(
}
}

#[tracing::instrument(name = "get_counts", skip(req, counts_request), fields(otel.kind = "server"))]
pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
Expand Down Expand Up @@ -453,6 +466,7 @@ pub async fn update_schema_when_distributed(
/// Create streams for querier if they do not exist
/// get list of streams from memory and storage
/// create streams for memory from storage if they do not exist
#[tracing::instrument(name = "create_streams_for_distributed", skip_all, fields(stream_count = streams.len()))]
pub async fn create_streams_for_distributed(
streams: Vec<String>,
tenant_id: &Option<String>,
Expand All @@ -461,19 +475,25 @@ pub async fn create_streams_for_distributed(
return Ok(());
}
let mut join_set = JoinSet::new();
let parent_span = tracing::Span::current();
for stream_name in streams {
let id = tenant_id.to_owned();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
let task_span =
tracing::info_span!(parent: &parent_span, "create_stream_task", stream = %stream_name);
join_set.spawn(
async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
}

(stream_name, result)
});
.instrument(task_span),
);
}

while let Some(result) = join_set.join_next().await {
Expand Down Expand Up @@ -516,6 +536,7 @@ impl FromRequest for Query {
}
}

#[tracing::instrument(name = "into_query", skip(query, session_state, time_range))]
pub async fn into_query(
query: &Query,
session_state: &SessionState,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod telemetry;
pub mod tenants;
pub mod users;
pub mod utils;
Expand Down
22 changes: 19 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use std::process::exit;
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
#[cfg(feature = "kafka")]
use parseable::connectors;
use parseable::{
IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode,
parseable::PARSEABLE, rbac, storage,
parseable::PARSEABLE, rbac, storage, telemetry,
};
use tokio::signal::ctrl_c;
use tokio::sync::oneshot;
Expand All @@ -33,7 +35,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt};

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
let tracer_provider = init_logger();
// Install the rustls crypto provider before any TLS operations.
// This is required for rustls 0.23+ which needs an explicit crypto provider.
// If the installation fails, log a warning but continue execution.
Expand Down Expand Up @@ -95,10 +97,14 @@ async fn main() -> anyhow::Result<()> {
parseable_server.await?;
}

if let Some(provider) = tracer_provider {
let _ = provider.shutdown();
}

Ok(())
}

pub fn init_logger() {
pub fn init_logger() -> Option<SdkTracerProvider> {
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let default_level = if cfg!(debug_assertions) {
Level::DEBUG
Expand All @@ -116,10 +122,20 @@ pub fn init_logger() {
.with_target(true)
.compact();

let otel_provider = telemetry::init_otel_tracer();

let otel_layer = otel_provider.as_ref().map(|provider| {
let tracer = provider.tracer("parseable");
tracing_opentelemetry::layer().with_tracer(tracer)
});

Registry::default()
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.init();

otel_provider
}

#[cfg(windows)]
Expand Down
104 changes: 49 additions & 55 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand All @@ -76,6 +78,26 @@ use crate::utils::time::TimeRange;
// pub static QUERY_SESSION: Lazy<SessionContext> =
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

/// Takes care of both streaming and non-streaming query flows
pub type QueryResult = Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<RecordBatch, datafusion::error::DataFusionError>,
> + Send,
>,
>,
>,
>,
>,
>,
>;

pub static QUERY_SESSION_STATE: Lazy<SessionState> =
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));

Expand Down Expand Up @@ -133,40 +155,35 @@ impl InMemorySessionContext {

/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
#[tracing::instrument(name = "query.execute", skip_all, fields(streaming = is_streaming))]
pub async fn execute(
query: Query,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
) -> Result<(QueryResult, Vec<String>), ExecuteError> {
let id = tenant_id.clone();

// W3C TraceContext propagation across QUERY_RUNTIME (separate OS-thread runtime).
// tracing::Span alone does NOT carry OTel context across OS threads.
let mut trace_ctx = std::collections::HashMap::new();
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut trace_ctx);
});

QUERY_RUNTIME
.spawn(async move { query.execute(is_streaming, &id).await })
.spawn(async move {
// Extract the propagated context on the QUERY_RUNTIME thread
let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&trace_ctx)
});
let span = tracing::info_span!("query.runtime_execute", streaming = is_streaming);
let _ = span.set_parent(parent_cx);

async move { query.execute(is_streaming, &id).await }
.instrument(span)
.await
})
.await
.expect("The Join should have been successful")
}
Expand Down Expand Up @@ -272,37 +289,12 @@ impl Query {
/// this function returns the result of the query
/// if streaming is true, it returns a stream
/// if streaming is false, it returns a vector of record batches
#[tracing::instrument(name = "query.datafusion_execute", skip_all, fields(streaming = is_streaming))]
pub async fn execute(
&self,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
) -> Result<(QueryResult, Vec<String>), ExecuteError> {
let df = QUERY_SESSION
.get_ctx()
.execute_logical_plan(self.final_logical_plan(tenant_id))
Expand Down Expand Up @@ -526,6 +518,7 @@ impl CountsRequest {
/// This function is supposed to read maninfest files for the given stream,
/// get the sum of `num_rows` between the `startTime` and `endTime`,
/// divide that by number of bins and return in a manner acceptable for the console
#[tracing::instrument(name = "get_bin_density", skip_all, fields(stream = %self.stream))]
pub async fn get_bin_density(
&self,
tenant_id: &Option<String>,
Expand Down Expand Up @@ -731,6 +724,7 @@ pub fn resolve_stream_names(sql: &str) -> Result<Vec<String>, anyhow::Error> {
Ok(tables)
}

#[tracing::instrument(name = "get_manifest_list", skip(time_range, tenant_id), fields(stream = %stream_name))]
pub async fn get_manifest_list(
stream_name: &str,
time_range: &TimeRange,
Expand Down
1 change: 1 addition & 0 deletions src/storage/field_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ pub struct QueryRow {
/// API handler to get the field stats for a dataset
/// If `fields` is empty, stats for all fields will be returned
/// If `fields` is provided, stats for those fields will be returned
#[tracing::instrument(name = "get_dataset_stats", skip(req, dataset_stats_request), fields(otel.kind = "server"))]
pub async fn get_dataset_stats(
req: HttpRequest,
dataset_stats_request: Json<DataSetStatsRequest>,
Expand Down
Loading
Loading