Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ tonic = { version = "0.14.1", features = [
] }
tonic-prost = "0.14.1"
tonic-web = "0.14.1"
tower-http = { version = "0.6.1", features = ["cors"] }
tower-http = { version = "0.6.1", features = ["cors", "trace"] }
url = "2.4.0"

# Connectors dependencies
Expand Down Expand Up @@ -108,6 +108,11 @@ prometheus = { version = "0.13.4", default-features = false, features = ["proces
prometheus-parse = "0.2.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
opentelemetry = { git = "https://github.com/parmesant/opentelemetry-rust/", rev = "45fb828769e6ade96d56ca1f5fa14cf0986a5341", features = ["trace"] }
opentelemetry_sdk = { git = "https://github.com/parmesant/opentelemetry-rust/", rev = "45fb828769e6ade96d56ca1f5fa14cf0986a5341", features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { git = "https://github.com/parmesant/opentelemetry-rust/", rev = "45fb828769e6ade96d56ca1f5fa14cf0986a5341", features = ["trace", "grpc-tonic", "http-json", "http-proto"] }
tracing-opentelemetry = "0.32"
tracing-actix-web = "0.7"

# Time and Date
chrono = "0.4"
Expand Down Expand Up @@ -197,6 +202,10 @@ kafka = [
"sasl2-sys/vendored",
]

[patch.crates-io]
opentelemetry = { git = "https://github.com/parmesant/opentelemetry-rust/", rev = "45fb828769e6ade96d56ca1f5fa14cf0986a5341" }
opentelemetry_sdk = { git = "https://github.com/parmesant/opentelemetry-rust/", rev = "45fb828769e6ade96d56ca1f5fa14cf0986a5341" }

[profile.release-lto]
inherits = "release"
lto = "fat"
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub async fn put_stream(
Ok(("Log stream created", StatusCode::OK))
}

#[tracing::instrument(name = "http.get_retention", skip(req), fields(stream_name = %stream_name), err)]
pub async fn get_retention(
req: HttpRequest,
stream_name: Path<String>,
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use serde_json::{Map, Value};
use ssl_acceptor::get_ssl_acceptor;
use tokio::sync::{RwLock, oneshot};
use tracing::{error, info, warn};
use tracing_actix_web::TracingLogger;

use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
Expand Down Expand Up @@ -111,6 +112,7 @@ pub trait ParseableServer {
// fn that creates the app
let create_app_fn = move || {
App::new()
.wrap(TracingLogger::default())
.wrap(prometheus.clone())
.configure(|config| Self::configure_routes(config))
.wrap(from_fn(health_check::check_shutdown_middleware))
Expand Down
39 changes: 39 additions & 0 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,27 @@ pub async fn get_records_and_fields(
Ok((Some(records), Some(fields)))
}

#[tracing::instrument(
name = "query.handle",
skip(req, query_request),
fields(
streaming = %query_request.streaming,
sql = %query_request.query,
tables = tracing::field::Empty,
tenant = tracing::field::Empty,
)
)]
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let mut session_state = QUERY_SESSION.get_ctx().state();
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
let tables = resolve_stream_names(&query_request.query)?;
tracing::Span::current().record("tables", tracing::field::display(tables.join(",")));
// check or load streams in memory
create_streams_for_distributed(tables.clone(), &get_tenant_id_from_request(&req)).await?;

let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current().record("tenant", tracing::field::debug(&tenant_id));
session_state
.config_mut()
.options_mut()
Expand Down Expand Up @@ -179,6 +191,11 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
///
/// # Returns
/// - `HttpResponse` with the count result as JSON, including fields if requested.
#[tracing::instrument(
name = "query.count",
skip(query_request, time, tenant_id),
fields(table_name, column_name)
)]
async fn handle_count_query(
query_request: &Query,
table_name: &str,
Expand Down Expand Up @@ -230,6 +247,11 @@ async fn handle_count_query(
///
/// # Returns
/// - `HttpResponse` with the full query result as a JSON object.
#[tracing::instrument(
name = "query.batch",
skip(query, query_request, tenant_id),
fields(table_name = tracing::field::Empty)
)]
async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand All @@ -238,6 +260,7 @@ async fn handle_non_streaming_query(
tenant_id: &Option<String>,
) -> Result<HttpResponse, QueryError> {
let first_table_name = table_name[0].clone();
tracing::Span::current().record("table_name", first_table_name.as_str());
let (records, fields) = execute(query, query_request.streaming, tenant_id).await?;
let records = match records {
Either::Left(rbs) => rbs,
Expand Down Expand Up @@ -283,6 +306,11 @@ async fn handle_non_streaming_query(
///
/// # Returns
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
#[tracing::instrument(
name = "query.stream",
skip(query, query_request, tenant_id),
fields(table_name = tracing::field::Empty)
)]
async fn handle_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand All @@ -291,6 +319,7 @@ async fn handle_streaming_query(
tenant_id: &Option<String>,
) -> Result<HttpResponse, QueryError> {
let first_table_name = table_name[0].clone();
tracing::Span::current().record("table_name", first_table_name.as_str());
let (records_stream, fields) = execute(query, query_request.streaming, tenant_id).await?;
let records_stream = match records_stream {
Either::Left(_) => {
Expand Down Expand Up @@ -453,6 +482,11 @@ pub async fn update_schema_when_distributed(
/// Create streams for querier if they do not exist
/// get list of streams from memory and storage
/// create streams for memory from storage if they do not exist
#[tracing::instrument(
name = "distributed.create_streams",
skip(streams, tenant_id),
fields(stream_count = streams.len())
)]
pub async fn create_streams_for_distributed(
streams: Vec<String>,
tenant_id: &Option<String>,
Expand Down Expand Up @@ -516,6 +550,11 @@ impl FromRequest for Query {
}
}

#[tracing::instrument(
name = "query.parse_logical_plan",
skip(query, session_state, time_range),
fields(sql = %query.query)
)]
pub async fn into_query(
query: &Query,
session_state: &SessionState,
Expand Down
10 changes: 10 additions & 0 deletions src/handlers/http/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use itertools::Itertools;
use serde::Serialize;
use serde_json::json;
use tokio::sync::Mutex;
use tracing::instrument;

use super::modal::utils::rbac_utils::{get_metadata, put_metadata};

Expand Down Expand Up @@ -69,8 +70,10 @@ impl From<&user::User> for User {

// Handler for GET /api/v1/user
// returns list of all registered users
#[tracing::instrument(name = "list_users", skip(req), fields(tenant_id = tracing::field::Empty))]
pub async fn list_users(req: HttpRequest) -> impl Responder {
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current().record("tenant_id", tenant_id.as_deref().unwrap_or("default"));
web::Json(Users.collect_user::<User>(&tenant_id))
}

Expand Down Expand Up @@ -217,12 +220,19 @@ pub async fn post_gen_password(

// Handler for GET /api/v1/user/{userid}/role
// returns role for a user if that user exists
#[instrument(
name = "get_role",
skip(req, userid),
fields(user_id = tracing::field::Empty, tenant_id = tracing::field::Empty)
)]
pub async fn get_role(
req: HttpRequest,
userid: web::Path<String>,
) -> Result<impl Responder, RBACError> {
let userid = userid.into_inner();
tracing::Span::current().record("user_id", &userid);
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current().record("tenant_id", tenant_id.as_deref().unwrap_or("default"));
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
if !Users.contains(&userid, &tenant_id) {
return Err(RBACError::UserDoesNotExist);
Expand Down
16 changes: 16 additions & 0 deletions src/handlers/http/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,14 @@ pub async fn get(req: HttpRequest, name: web::Path<String>) -> Result<impl Respo

// Handler for GET /api/v1/role
// Fetch all roles in the system
#[tracing::instrument(
name = "role.list",
skip(req),
fields(tenant_id = tracing::field::Empty)
)]
pub async fn list(req: HttpRequest) -> Result<impl Responder, RoleError> {
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current().record("tenant_id", tenant_id.as_deref().unwrap_or("none"));
let metadata = get_metadata(&tenant_id).await?;
let mut roles = HashMap::new();
for (k, r) in metadata.roles.into_iter() {
Expand Down Expand Up @@ -186,9 +192,15 @@ pub async fn put_default(

// Handler for GET /api/v1/role/default
// Delete existing role
#[tracing::instrument(
name = "role.get_default",
skip(req),
fields(tenant_id = tracing::field::Empty)
)]
pub async fn get_default(req: HttpRequest) -> Result<impl Responder, RoleError> {
let tenant_id = get_tenant_id_from_request(&req);
let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
tracing::Span::current().record("tenant_id", tenant_id);
let res = if let Some(role) = DEFAULT_ROLE
.read()
// .unwrap()
Expand All @@ -211,6 +223,10 @@ pub async fn get_default(req: HttpRequest) -> Result<impl Responder, RoleError>
Ok(web::Json(res))
}

#[tracing::instrument(
name = "role.get_metadata",
fields(tenant_id = ?tenant_id)
)]
async fn get_metadata(
tenant_id: &Option<String>,
) -> Result<crate::storage::StorageMetadata, ObjectStorageError> {
Expand Down
Loading
Loading