Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
uuid = { version = "1", features = ["v4"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
rustc-hash = "2"
futures-core = "0.3.31"
tempfile = "3.20.0"
lazy_static = "1.4.0"
Expand Down
174 changes: 173 additions & 1 deletion src/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use opentelemetry_proto::tonic::metrics::v1::{
};
use serde_json::{Map, Value};

use rustc_hash::FxHasher;
use std::hash::Hasher;
use tracing::info_span;

use crate::metrics::increment_metrics_collected_by_date;
Expand All @@ -31,7 +33,7 @@ use super::otel_utils::{
convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some,
};

pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [
pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [
"metric_name",
"metric_description",
"metric_unit",
Expand Down Expand Up @@ -68,8 +70,58 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 36] = [
"scope_dropped_attributes_count",
"resource_dropped_attributes_count",
"resource_schema_url",
// Precomputed per-sample identity of the physical series. Stable
// u64 hash of `metric_name` + sorted attribute key/value pairs,
// stored as a decimal-encoded string so arrow-json infers Utf8 and
// we get byte-exact roundtrip. Int64/Float64 inference dropped bits
// for hashes near the high range; string sidesteps that entirely.
// Lets the query layer group samples into physical series via a
// single column read instead of decoding every label column and
// hashing per row.
"__series_hash",
];

/// Compute a stable u64 identifier for the physical series a sample
/// belongs to. Hashes `metric_name` plus every attribute key/value pair
/// that survived OTel flattening — everything in the flattened data
/// point that isn't a known sample-level field is treated as a label.
///
/// Hash output must be stable across process restarts and matching at
/// query time. Uses rustc-hash's FxHasher (fast, deterministic,
/// non-cryptographic) and feeds keys in sorted order so the hash
/// doesn't depend on JSON Map iteration order.
fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
let known: std::collections::HashSet<&str> =
OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect();
let mut label_pairs: Vec<(&str, String)> = dp
.iter()
.filter(|(k, _)| !known.contains(k.as_str()))
.map(|(k, v)| {
let v_str = match v {
Value::String(s) => s.clone(),
other => other.to_string(),
};
(k.as_str(), v_str)
})
.collect();
label_pairs.sort_by(|a, b| a.0.cmp(b.0));

let mut hasher = FxHasher::default();
// Include metric_name in the identity. Without it, two different
// metrics with identical label sets would collide into one series.
if let Some(Value::String(name)) = dp.get("metric_name") {
hasher.write(name.as_bytes());
hasher.write(b"\0");
}
for (k, v) in &label_pairs {
hasher.write(k.as_bytes());
hasher.write(b"=");
hasher.write(v.as_bytes());
hasher.write(b"\0");
}
hasher.finish()
}

/// otel metrics event has json array for exemplar
/// this function flatten the exemplar json array
/// and returns a `Map` of the exemplar json
Expand Down Expand Up @@ -564,6 +616,19 @@ fn process_resource_metrics<T, S, M>(
for (k, v) in &envelope {
dp.insert(k.clone(), v.clone());
}
// Compute the physical-series hash AFTER envelope merge
// so resource/scope attributes participate in series
// identity (they're labels from the query layer's
// perspective). Computed once per data point — O(label
// count) per sample, ~200 ns at typical attribute counts.
let series_hash = compute_series_hash(&dp);
// Stored as decimal-encoded string. Arrow-json
// infers Utf8, preserving all 64 bits — Int64/Float64
// inference truncated values near the high range.
dp.insert(
"__series_hash".to_string(),
Value::String(series_hash.to_string()),
);
vec_otel_json.push(Value::Object(dp));
}
}
Expand Down Expand Up @@ -655,3 +720,110 @@ fn flatten_data_point_flags(flags: u32) -> Map<String, Value> {
);
data_point_flags_json
}

#[cfg(test)]
mod tests {
use super::*;

fn make_dp() -> Map<String, Value> {
let mut dp = Map::new();
dp.insert(
"metric_name".to_string(),
Value::String("counter.app.metric_0006".into()),
);
dp.insert(
"time_unix_nano".to_string(),
Value::String("2026-05-19T09:00:00Z".into()),
);
dp.insert("data_point_value".to_string(), Value::Number(1000.into()));
dp.insert("is_monotonic".to_string(), Value::Bool(true));
dp.insert("service.name".to_string(), Value::String("api".into()));
dp.insert("http.method".to_string(), Value::String("GET".into()));
dp.insert("request.id".to_string(), Value::String("req-1".into()));
dp
}

#[test]
fn series_hash_stable_across_runs() {
// Same input → same hash. Locks in the wire contract between
// ingest and query layers; any algorithm change here breaks
// grouping for already-ingested data.
let dp = make_dp();
let h1 = compute_series_hash(&dp);
let h2 = compute_series_hash(&dp);
assert_eq!(h1, h2);
}

#[test]
fn series_hash_independent_of_label_insertion_order() {
// serde_json::Map preserves insertion order; query side may see
// labels in different order. Hash must be insertion-order-agnostic.
let mut a = Map::new();
a.insert("metric_name".to_string(), Value::String("m".into()));
a.insert("service.name".to_string(), Value::String("api".into()));
a.insert("http.method".to_string(), Value::String("GET".into()));

let mut b = Map::new();
b.insert("http.method".to_string(), Value::String("GET".into()));
b.insert("metric_name".to_string(), Value::String("m".into()));
b.insert("service.name".to_string(), Value::String("api".into()));

assert_eq!(compute_series_hash(&a), compute_series_hash(&b));
}

#[test]
fn series_hash_changes_with_label_value() {
let dp = make_dp();
let mut dp2 = dp.clone();
dp2.insert("service.name".to_string(), Value::String("billing".into()));
assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2));
}

#[test]
fn series_hash_changes_with_metric_name() {
// Two different metrics with identical labels must hash to
// different values, otherwise samples for `requests_total` and
// `latency_seconds` would collide into one logical series.
let dp = make_dp();
let mut dp2 = dp.clone();
dp2.insert(
"metric_name".to_string(),
Value::String("other.metric".into()),
);
assert_ne!(compute_series_hash(&dp), compute_series_hash(&dp2));
}

#[test]
fn series_hash_ignores_sample_level_fields() {
// time_unix_nano and data_point_value belong to the SAMPLE, not
// the series. Hash must be identical across samples of the same
// physical series taken at different times with different values.
let dp = make_dp();
let mut dp_later = dp.clone();
dp_later.insert(
"time_unix_nano".to_string(),
Value::String("2026-05-19T10:00:00Z".into()),
);
dp_later.insert("data_point_value".to_string(), Value::Number(2000.into()));
assert_eq!(compute_series_hash(&dp), compute_series_hash(&dp_later));
}

#[test]
fn series_hash_distinguishes_label_kv_swap() {
// Pathological pair: {a=bc, d=e} vs {a=b, cd=e}. A naive
// concatenation hash would emit identical bytes. Delimiters in
// the hasher input prevent this — verify here so a future
// optimisation can't silently regress collision resistance.
let mut a = Map::new();
a.insert("metric_name".to_string(), Value::String("m".into()));
a.insert("a".to_string(), Value::String("bc".into()));
a.insert("d".to_string(), Value::String("e".into()));

let mut b = Map::new();
b.insert("metric_name".to_string(), Value::String("m".into()));
b.insert("a".to_string(), Value::String("b".into()));
b.insert("cd".to_string(), Value::String("e".into()));

assert_ne!(compute_series_hash(&a), compute_series_hash(&b));
}
}
123 changes: 116 additions & 7 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

use arrow_array::RecordBatch;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_ipc::reader::StreamReader;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
Expand Down Expand Up @@ -589,12 +589,26 @@ impl Stream {
Encoding::DELTA_BINARY_PACKED,
);

// Create sorting columns
let mut sorting_column_vec = vec![SortingColumn {
// Build sorting columns. For OTel-metrics streams, put
// `metric_name` ahead of the time partition so per-page parquet
// min/max stats can prune by metric (PromQL's universal selector
// predicate). The actual row order is enforced at write time
// (`sort_batch_for_metric_pruning`) — this just advertises the
// sort in parquet footer metadata so readers can rely on it.
let is_otel_metrics = self.is_otel_metrics();
let mut sorting_column_vec: Vec<SortingColumn> = Vec::new();
if is_otel_metrics && let Ok(name_idx) = merged_schema.index_of("metric_name") {
sorting_column_vec.push(SortingColumn {
column_idx: name_idx as i32,
descending: false,
nulls_first: false,
});
}
sorting_column_vec.push(SortingColumn {
column_idx: time_partition_idx as i32,
descending: true,
nulls_first: true,
}];
nulls_first: false,
});

// Describe custom partition column encodings and sorting
if let Some(custom_partition) = custom_partition {
Expand All @@ -616,6 +630,66 @@ impl Stream {
props.set_sorting_columns(Some(sorting_column_vec)).build()
}

/// True if this stream's log_source carries the OTel-metrics
/// format. Determines whether per-batch sort and metric_name-first
/// SortingColumn metadata get applied at write time.
fn is_otel_metrics(&self) -> bool {
self.get_log_source()
.iter()
.any(|s| matches!(s.log_source_format, LogSource::OtelMetrics))
}

/// Permute a `RecordBatch` so rows are ordered by
/// `(metric_name ASC, time_partition DESC)`. Required for parquet
/// page-index pruning to be effective on PromQL's
/// `metric_name = 'X'` selector — without this, pages within a row
/// group hold interleaved metrics and per-page min/max stats span
/// every metric in the stream, killing pruning.
///
/// Bails out without sorting when either source column is missing
/// (non-metric stream, schema drift) so the caller can write the
/// batch unchanged.
fn sort_batch_for_metric_pruning(
batch: &RecordBatch,
time_partition_field: &str,
) -> Result<RecordBatch, StagingError> {
use arrow::compute::{SortColumn, kernels::sort::SortOptions, lexsort_to_indices, take};
let schema = batch.schema();
let Some(name_idx) = schema.index_of("metric_name").ok() else {
return Ok(batch.clone());
};
let Some(time_idx) = schema.index_of(time_partition_field).ok() else {
return Ok(batch.clone());
};
if batch.num_rows() < 2 {
return Ok(batch.clone());
}

let sort_cols = vec![
SortColumn {
values: batch.column(name_idx).clone(),
options: Some(SortOptions {
descending: false,
nulls_first: false,
}),
},
SortColumn {
values: batch.column(time_idx).clone(),
options: Some(SortOptions {
descending: true,
nulls_first: false,
}),
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
];
let indices = lexsort_to_indices(&sort_cols, None)?;
let columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;
Ok(RecordBatch::try_new(schema, columns)?)
}

fn reset_staging_metrics(&self, tenant_id: &Option<String>) {
let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
metrics::STAGING_FILES
Expand Down Expand Up @@ -739,8 +813,43 @@ impl Stream {
.open(part_path)
.map_err(|_| StagingError::Create)?;
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
writer.write(record)?;
let sort_for_metric_pruning = self.is_otel_metrics();
let time_partition_field = time_partition.map_or_else(
|| DEFAULT_TIMESTAMP_KEY.to_string(),
|s| s.as_str().to_string(),
);

if sort_for_metric_pruning {
// Buffer batches up to the row-group target, then
// concat + sort + write as a single contiguous batch. The
// ArrowWriter splits the sorted batch into row groups at the
// same boundary, so the row order survives intact and
// per-page (metric_name min, max) stats narrow to the slice
// each page actually carries.
let target = self.options.row_group_size;
let mut buffer: Vec<RecordBatch> = Vec::new();
let mut buffered_rows: usize = 0;
for record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
buffered_rows += record.num_rows();
buffer.push(record);
if buffered_rows >= target {
let combined = arrow::compute::concat_batches(schema, &buffer)?;
let sorted =
Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?;
writer.write(&sorted)?;
buffer.clear();
buffered_rows = 0;
}
}
if !buffer.is_empty() {
let combined = arrow::compute::concat_batches(schema, &buffer)?;
let sorted = Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?;
writer.write(&sorted)?;
}
} else {
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
writer.write(record)?;
}
}
writer.close()?;

Expand Down
Loading
Loading