Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,23 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
return Ok(());
}

// `mixed_prefix_ok` matches the operation's promotion mode:
// promote-legacy operations bundle inputs from different
// `rg_partition_prefix_len` buckets (the adapter normalizes
// them at read time), so the input-side equality check in
// `merge_parquet_split_metadata` would spuriously fail. Regular
// merges keep the strict check.
let mixed_prefix_ok = scratch
.merge_operation
.target_prefix_len_override
.is_some();

let mut merged_splits = Vec::with_capacity(outputs.len());
for output in &outputs {
let mut metadata = merge_parquet_split_metadata(input_splits, output)
.context("failed to build merge output metadata")
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
let mut metadata =
merge_parquet_split_metadata(input_splits, output, mixed_prefix_ok)
.context("failed to build merge output metadata")
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;

// Use the split ID that was assigned when the merge operation was
// planned, rather than the one generated inside
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ use crate::split::{ParquetSplitId, ParquetSplitMetadata};
/// # Preconditions
///
/// All input splits must share the same kind, index_uid, partition_id,
/// sort_fields, and window.
/// sort_fields, and window. In the default case (`mixed_prefix_ok = false`)
/// they must also share `rg_partition_prefix_len`. In legacy-promotion
/// mode (`mixed_prefix_ok = true`) the prefix-len equality check is
/// skipped because inputs come from different prefix buckets — the
/// output's prefix_len is taken from the writer's KV stamp via
/// `output.output_rg_partition_prefix_len` (CS-1), so the input-side
/// equality is no longer load-bearing for the metastore record.
pub fn merge_parquet_split_metadata(
inputs: &[ParquetSplitMetadata],
output: &MergeOutputFile,
mixed_prefix_ok: bool,
) -> Result<ParquetSplitMetadata> {
if inputs.is_empty() {
bail!("merge_parquet_split_metadata requires at least one input split");
Expand Down Expand Up @@ -93,10 +100,11 @@ pub fn merge_parquet_split_metadata(
first.window
);
}
if input.rg_partition_prefix_len != first.rg_partition_prefix_len {
if !mixed_prefix_ok && input.rg_partition_prefix_len != first.rg_partition_prefix_len {
bail!(
"input {} has rg_partition_prefix_len {}, expected {} — splits with different \
prefix lengths must not appear in the same merge",
prefix lengths must not appear in the same regular merge (legacy-promotion \
operations bypass this check)",
i,
input.rg_partition_prefix_len,
first.rg_partition_prefix_len
Expand Down Expand Up @@ -248,7 +256,7 @@ mod tests {
make_test_split("s1", (1200, 2000), 0),
];
let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

// Invariant fields come from inputs.
assert_eq!(result.kind, ParquetSplitKind::Metrics);
Expand All @@ -267,7 +275,7 @@ mod tests {
make_test_split("s1", (1200, 2000), 0),
];
let output = make_output_with_metadata(200, 9000, (1000, 2000), &["cpu.usage", "mem.used"]);
let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

// Data-dependent fields come from the output, not inputs.
assert_eq!(result.time_range.start_secs, 1000);
Expand Down Expand Up @@ -302,7 +310,7 @@ mod tests {
.or_default()
.insert("api".to_string());

let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

let service_values = result.low_cardinality_tags.get("service").unwrap();
assert_eq!(service_values.len(), 2);
Expand All @@ -323,7 +331,7 @@ mod tests {
.insert(format!("host-{i}"));
}

let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

assert!(result.high_cardinality_tag_keys.contains("host"));
assert!(!result.low_cardinality_tags.contains_key("host"));
Expand All @@ -337,15 +345,15 @@ mod tests {
make_test_split("s2", (1000, 2000), 2),
];
let output = make_output(300, 12000);
let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

assert_eq!(result.num_merge_ops, 3); // max(2,2,2) + 1
}

#[test]
fn test_empty_inputs_error() {
let output = make_output(0, 0);
let result = merge_parquet_split_metadata(&[], &output);
let result = merge_parquet_split_metadata(&[], &output, false);
assert!(result.is_err());
assert!(
result
Expand All @@ -362,7 +370,7 @@ mod tests {
s1.kind = ParquetSplitKind::Sketches;

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
assert!(result.is_err());
}

Expand All @@ -373,7 +381,7 @@ mod tests {
s1.index_uid = "other-index:00000000000000000000000002".to_string();

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
assert!(result.is_err());
}

Expand All @@ -384,7 +392,7 @@ mod tests {
s1.partition_id = 99;

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
assert!(result.is_err());
}

Expand All @@ -395,7 +403,7 @@ mod tests {
s1.sort_fields = "different|schema/V2".to_string();

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
assert!(result.is_err());
}

Expand All @@ -406,7 +414,7 @@ mod tests {
s1.window = Some(2000..5600);

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
assert!(result.is_err());
}

Expand All @@ -417,7 +425,7 @@ mod tests {
s1.rg_partition_prefix_len = 1;

let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&[s0, s1], &output);
let result = merge_parquet_split_metadata(&[s0, s1], &output, false);
let err = result.expect_err("merge must reject mismatched prefix lengths");
let msg = err.to_string();
assert!(
Expand All @@ -442,7 +450,7 @@ mod tests {
// num_row_groups = 2 + writer reports demoted prefix_len = 0
// (the legacy writer's choice for a row-count-driven multi-RG).
let output = make_output_full_with_prefix(200, 9000, 2, 0, (1000, 2000), &["cpu.usage"]);
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap();
assert_eq!(result.rg_partition_prefix_len, 0);
}

Expand All @@ -457,10 +465,42 @@ mod tests {
s1.rg_partition_prefix_len = 3;

let output = make_output_full_with_prefix(200, 9000, 1, 3, (1000, 2000), &["cpu.usage"]);
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap();
assert_eq!(result.rg_partition_prefix_len, 3);
}

#[test]
fn test_mixed_prefix_ok_skips_input_equality_check() {
// Promotion mode: inputs come from different prefix buckets
// (e.g. one prefix_len=0 legacy + one prefix_len=2 aligned).
// With `mixed_prefix_ok = true` the aggregator must accept this
// and take the output prefix from the writer's stamp.
let mut legacy = make_test_split("s0", (1000, 2000), 0);
legacy.rg_partition_prefix_len = 0;
let mut aligned = make_test_split("s1", (1000, 2000), 0);
aligned.rg_partition_prefix_len = 2;

// Writer stamps prefix_len = 2 on the multi-RG output (streaming
// engine output that successfully promoted the legacy input).
let output = make_output_full_with_prefix(300, 12000, 3, 2, (1000, 2000), &["cpu.usage"]);

let result =
merge_parquet_split_metadata(&[legacy.clone(), aligned.clone()], &output, true)
.expect("mixed-prefix inputs must be accepted in promotion mode");
assert_eq!(
result.rg_partition_prefix_len, 2,
"output prefix matches the writer's stamp (CS-1)",
);

// Same inputs without the mixed_prefix_ok flag must still fail.
let strict = merge_parquet_split_metadata(&[legacy, aligned], &output, false);
let err = strict.expect_err("strict mode must reject mixed-prefix inputs");
assert!(
err.to_string().contains("rg_partition_prefix_len"),
"error should mention the prefix-len mismatch, got: {err}",
);
}

#[test]
fn test_output_prefix_len_preserved_on_multi_rg_streaming_engine() {
// CS-1 regression for F1: the streaming engine produces
Expand All @@ -479,7 +519,7 @@ mod tests {
// num_row_groups = 3 (multi-RG) AND writer reports prefix_len = 2
// (the streaming engine's stamp because it verified alignment).
let output = make_output_full_with_prefix(300, 12000, 3, 2, (1000, 2000), &["cpu.usage"]);
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap();
assert_eq!(
result.rg_partition_prefix_len, 2,
"metastore must mirror the writer's KV (CS-1); multi-RG aligned output keeps its \
Expand All @@ -494,7 +534,7 @@ mod tests {
make_test_split("s1", (1000, 2000), 0),
];
let output = make_output(200, 9000);
let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

assert_ne!(result.split_id.as_str(), "s0");
assert_ne!(result.split_id.as_str(), "s1");
Expand All @@ -510,7 +550,7 @@ mod tests {
output.row_keys_proto = None;
output.zonemap_regexes = HashMap::new();

let result = merge_parquet_split_metadata(&inputs, &output).unwrap();
let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap();

assert!(result.row_keys_proto.is_none());
assert!(result.zonemap_regexes.is_empty());
Expand Down
81 changes: 79 additions & 2 deletions quickwit/quickwit-parquet-engine/src/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod writer;
mod tests;

use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result, bail};
use arrow::array::RecordBatch;
Expand All @@ -41,8 +42,9 @@ pub use self::merge_order::MergeRun;
use crate::sort_fields::{equivalent_schemas_for_compaction, parse_sort_fields};
use crate::sorted_series::SORTED_SERIES_COLUMN;
use crate::storage::{
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS,
PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, ParquetWriterConfig,
ColumnPageStream, LegacyInputAdapter, PARQUET_META_NUM_MERGE_OPS,
PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION,
PARQUET_META_WINDOW_START, ParquetWriterConfig, RemoteByteSource, StreamingParquetReader,
};

/// Configuration for a merge operation.
Expand Down Expand Up @@ -483,3 +485,78 @@ fn extract_and_validate_input_metadata(paths: &[PathBuf]) -> Result<InputMetadat
rg_partition_prefix_len: consensus_prefix_len.unwrap_or(0),
})
}

/// Execute a [`policy::ParquetMergeOperation`] by opening each input through
/// the appropriate `ColumnPageStream` impl, then feeding the streams
/// to the streaming merge engine.
///
/// Routing per input:
/// - **Regular merge** (`op.target_prefix_len_override == None`): every split is opened directly
/// via [`StreamingParquetReader`]. MP-3 already requires inputs to share
/// `rg_partition_prefix_len`, so the streaming engine sees uniform metadata.
/// - **Promotion merge** (`op.target_prefix_len_override == Some(target)`): splits with
/// `rg_partition_prefix_len < target` are opened through [`LegacyInputAdapter`] with the same
/// target — the adapter re-encodes the file in memory as prefix-aligned and rewrites the
/// `qh.rg_partition_prefix_len` KV. Splits already at `target` are opened directly. The streaming
/// engine then consumes a homogeneous stream advertising `prefix_len = target` on every input.
///
/// `sources` is parallel to `op.splits`: `sources[i]` provides byte-
/// range reads against `op.splits[i].parquet_file`. The caller (e.g.
/// the executor wrapper that lives outside this crate) is responsible
/// for materializing one [`RemoteByteSource`] per split based on its
/// storage backend (S3, local FS, etc.). Splits with names that
/// cannot be opened by the source surface as `LegacyAdapterError::Io`
/// or `ParquetReadError`.
///
/// Returns the merge engine's [`MergeOutputFile`]s. Conversion to
/// `ParquetSplitMetadata` for the metastore is the caller's
/// responsibility — use [`metadata_aggregation::merge_parquet_split_metadata`]
/// with `mixed_prefix_ok = op.target_prefix_len_override.is_some()`.
pub async fn execute_merge_operation(
op: &policy::ParquetMergeOperation,
sources: Vec<Arc<dyn RemoteByteSource>>,
output_dir: &Path,
config: &MergeConfig,
) -> Result<Vec<MergeOutputFile>> {
if sources.len() != op.splits.len() {
bail!(
"execute_merge_operation: sources.len() ({}) != op.splits.len() ({})",
sources.len(),
op.splits.len(),
);
}

let mut streams: Vec<Box<dyn ColumnPageStream>> = Vec::with_capacity(op.splits.len());
for (split, source) in op.splits.iter().zip(sources.into_iter()) {
let path = PathBuf::from(&split.parquet_file);
let stream: Box<dyn ColumnPageStream> = match op.target_prefix_len_override {
Some(target) if split.rg_partition_prefix_len < target => {
// Promote this legacy input. The adapter re-encodes in
// memory and presents itself as a prefix_len = target
// single-RG stream to the merge engine.
let adapter = LegacyInputAdapter::try_open(source, path, target)
.await
.with_context(|| {
format!(
"opening legacy adapter for split {} with target_prefix_len = {target}",
split.split_id,
)
})?;
Box::new(adapter)
}
_ => {
// Direct streaming reader: regular merge, or promotion
// where this input already satisfies the target.
let reader = StreamingParquetReader::try_open(source, path)
.await
.with_context(|| {
format!("opening streaming reader for split {}", split.split_id)
})?;
Box::new(reader)
}
};
streams.push(stream);
}

streaming::streaming_merge_sorted_parquet_files(streams, output_dir, config).await
}
Loading
Loading