From ebce99f433eb781257322dfe98f986acd8c4d77f Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 13 May 2026 11:10:47 -0400 Subject: [PATCH] feat(merge): legacy-prefix promotion path + schema-evolution body cols MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List` / `LargeList`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict, LargeBinary vs Binary, List in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../parquet_merge_executor.rs | 18 +- .../src/merge/metadata_aggregation.rs | 80 ++- .../quickwit-parquet-engine/src/merge/mod.rs | 81 ++- .../src/merge/policy/mod.rs | 150 +++++- .../src/merge/schema.rs | 29 +- .../src/merge/streaming.rs | 499 ++++++++++++++++++ .../src/storage/streaming_writer.rs | 197 ++++--- 7 files changed, 938 insertions(+), 116 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs index 8d2649973a2..4fc6e2b84f7 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs @@ -231,11 +231,23 @@ impl Handler for ParquetMergeExecutor { return Ok(()); } + // `mixed_prefix_ok` matches the operation's promotion mode: + // promote-legacy operations bundle inputs from different + // `rg_partition_prefix_len` buckets (the adapter normalizes + // them at read time), so the input-side equality check in + // `merge_parquet_split_metadata` would spuriously fail. Regular + // merges keep the strict check. + let mixed_prefix_ok = scratch + .merge_operation + .target_prefix_len_override + .is_some(); + let mut merged_splits = Vec::with_capacity(outputs.len()); for output in &outputs { - let mut metadata = merge_parquet_split_metadata(input_splits, output) - .context("failed to build merge output metadata") - .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; + let mut metadata = + merge_parquet_split_metadata(input_splits, output, mixed_prefix_ok) + .context("failed to build merge output metadata") + .map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?; // Use the split ID that was assigned when the merge operation was // planned, rather than the one generated inside diff --git a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs index 1da61905b88..0805f6ef35c 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs @@ -40,10 +40,17 @@ use crate::split::{ParquetSplitId, ParquetSplitMetadata}; /// # Preconditions /// /// All input splits must share the same kind, index_uid, partition_id, -/// sort_fields, and window. +/// sort_fields, and window. In the default case (`mixed_prefix_ok = false`) +/// they must also share `rg_partition_prefix_len`. In legacy-promotion +/// mode (`mixed_prefix_ok = true`) the prefix-len equality check is +/// skipped because inputs come from different prefix buckets — the +/// output's prefix_len is taken from the writer's KV stamp via +/// `output.output_rg_partition_prefix_len` (CS-1), so the input-side +/// equality is no longer load-bearing for the metastore record. pub fn merge_parquet_split_metadata( inputs: &[ParquetSplitMetadata], output: &MergeOutputFile, + mixed_prefix_ok: bool, ) -> Result { if inputs.is_empty() { bail!("merge_parquet_split_metadata requires at least one input split"); @@ -93,10 +100,11 @@ pub fn merge_parquet_split_metadata( first.window ); } - if input.rg_partition_prefix_len != first.rg_partition_prefix_len { + if !mixed_prefix_ok && input.rg_partition_prefix_len != first.rg_partition_prefix_len { bail!( "input {} has rg_partition_prefix_len {}, expected {} — splits with different \ - prefix lengths must not appear in the same merge", + prefix lengths must not appear in the same regular merge (legacy-promotion \ + operations bypass this check)", i, input.rg_partition_prefix_len, first.rg_partition_prefix_len @@ -248,7 +256,7 @@ mod tests { make_test_split("s1", (1200, 2000), 0), ]; let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); // Invariant fields come from inputs. assert_eq!(result.kind, ParquetSplitKind::Metrics); @@ -267,7 +275,7 @@ mod tests { make_test_split("s1", (1200, 2000), 0), ]; let output = make_output_with_metadata(200, 9000, (1000, 2000), &["cpu.usage", "mem.used"]); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); // Data-dependent fields come from the output, not inputs. assert_eq!(result.time_range.start_secs, 1000); @@ -302,7 +310,7 @@ mod tests { .or_default() .insert("api".to_string()); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); let service_values = result.low_cardinality_tags.get("service").unwrap(); assert_eq!(service_values.len(), 2); @@ -323,7 +331,7 @@ mod tests { .insert(format!("host-{i}")); } - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); assert!(result.high_cardinality_tag_keys.contains("host")); assert!(!result.low_cardinality_tags.contains_key("host")); @@ -337,7 +345,7 @@ mod tests { make_test_split("s2", (1000, 2000), 2), ]; let output = make_output(300, 12000); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); assert_eq!(result.num_merge_ops, 3); // max(2,2,2) + 1 } @@ -345,7 +353,7 @@ mod tests { #[test] fn test_empty_inputs_error() { let output = make_output(0, 0); - let result = merge_parquet_split_metadata(&[], &output); + let result = merge_parquet_split_metadata(&[], &output, false); assert!(result.is_err()); assert!( result @@ -362,7 +370,7 @@ mod tests { s1.kind = ParquetSplitKind::Sketches; let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); assert!(result.is_err()); } @@ -373,7 +381,7 @@ mod tests { s1.index_uid = "other-index:00000000000000000000000002".to_string(); let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); assert!(result.is_err()); } @@ -384,7 +392,7 @@ mod tests { s1.partition_id = 99; let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); assert!(result.is_err()); } @@ -395,7 +403,7 @@ mod tests { s1.sort_fields = "different|schema/V2".to_string(); let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); assert!(result.is_err()); } @@ -406,7 +414,7 @@ mod tests { s1.window = Some(2000..5600); let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); assert!(result.is_err()); } @@ -417,7 +425,7 @@ mod tests { s1.rg_partition_prefix_len = 1; let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&[s0, s1], &output); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false); let err = result.expect_err("merge must reject mismatched prefix lengths"); let msg = err.to_string(); assert!( @@ -442,7 +450,7 @@ mod tests { // num_row_groups = 2 + writer reports demoted prefix_len = 0 // (the legacy writer's choice for a row-count-driven multi-RG). let output = make_output_full_with_prefix(200, 9000, 2, 0, (1000, 2000), &["cpu.usage"]); - let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap(); assert_eq!(result.rg_partition_prefix_len, 0); } @@ -457,10 +465,42 @@ mod tests { s1.rg_partition_prefix_len = 3; let output = make_output_full_with_prefix(200, 9000, 1, 3, (1000, 2000), &["cpu.usage"]); - let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap(); assert_eq!(result.rg_partition_prefix_len, 3); } + #[test] + fn test_mixed_prefix_ok_skips_input_equality_check() { + // Promotion mode: inputs come from different prefix buckets + // (e.g. one prefix_len=0 legacy + one prefix_len=2 aligned). + // With `mixed_prefix_ok = true` the aggregator must accept this + // and take the output prefix from the writer's stamp. + let mut legacy = make_test_split("s0", (1000, 2000), 0); + legacy.rg_partition_prefix_len = 0; + let mut aligned = make_test_split("s1", (1000, 2000), 0); + aligned.rg_partition_prefix_len = 2; + + // Writer stamps prefix_len = 2 on the multi-RG output (streaming + // engine output that successfully promoted the legacy input). + let output = make_output_full_with_prefix(300, 12000, 3, 2, (1000, 2000), &["cpu.usage"]); + + let result = + merge_parquet_split_metadata(&[legacy.clone(), aligned.clone()], &output, true) + .expect("mixed-prefix inputs must be accepted in promotion mode"); + assert_eq!( + result.rg_partition_prefix_len, 2, + "output prefix matches the writer's stamp (CS-1)", + ); + + // Same inputs without the mixed_prefix_ok flag must still fail. + let strict = merge_parquet_split_metadata(&[legacy, aligned], &output, false); + let err = strict.expect_err("strict mode must reject mixed-prefix inputs"); + assert!( + err.to_string().contains("rg_partition_prefix_len"), + "error should mention the prefix-len mismatch, got: {err}", + ); + } + #[test] fn test_output_prefix_len_preserved_on_multi_rg_streaming_engine() { // CS-1 regression for F1: the streaming engine produces @@ -479,7 +519,7 @@ mod tests { // num_row_groups = 3 (multi-RG) AND writer reports prefix_len = 2 // (the streaming engine's stamp because it verified alignment). let output = make_output_full_with_prefix(300, 12000, 3, 2, (1000, 2000), &["cpu.usage"]); - let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + let result = merge_parquet_split_metadata(&[s0, s1], &output, false).unwrap(); assert_eq!( result.rg_partition_prefix_len, 2, "metastore must mirror the writer's KV (CS-1); multi-RG aligned output keeps its \ @@ -494,7 +534,7 @@ mod tests { make_test_split("s1", (1000, 2000), 0), ]; let output = make_output(200, 9000); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); assert_ne!(result.split_id.as_str(), "s0"); assert_ne!(result.split_id.as_str(), "s1"); @@ -510,7 +550,7 @@ mod tests { output.row_keys_proto = None; output.zonemap_regexes = HashMap::new(); - let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + let result = merge_parquet_split_metadata(&inputs, &output, false).unwrap(); assert!(result.row_keys_proto.is_none()); assert!(result.zonemap_regexes.is_empty()); diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 7ab8d685f7a..14ea051d1c0 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -31,6 +31,7 @@ mod writer; mod tests; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result, bail}; use arrow::array::RecordBatch; @@ -41,8 +42,9 @@ pub use self::merge_order::MergeRun; use crate::sort_fields::{equivalent_schemas_for_compaction, parse_sort_fields}; use crate::sorted_series::SORTED_SERIES_COLUMN; use crate::storage::{ - PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS, - PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, ParquetWriterConfig, + ColumnPageStream, LegacyInputAdapter, PARQUET_META_NUM_MERGE_OPS, + PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, + PARQUET_META_WINDOW_START, ParquetWriterConfig, RemoteByteSource, StreamingParquetReader, }; /// Configuration for a merge operation. @@ -483,3 +485,78 @@ fn extract_and_validate_input_metadata(paths: &[PathBuf]) -> Result>, + output_dir: &Path, + config: &MergeConfig, +) -> Result> { + if sources.len() != op.splits.len() { + bail!( + "execute_merge_operation: sources.len() ({}) != op.splits.len() ({})", + sources.len(), + op.splits.len(), + ); + } + + let mut streams: Vec> = Vec::with_capacity(op.splits.len()); + for (split, source) in op.splits.iter().zip(sources.into_iter()) { + let path = PathBuf::from(&split.parquet_file); + let stream: Box = match op.target_prefix_len_override { + Some(target) if split.rg_partition_prefix_len < target => { + // Promote this legacy input. The adapter re-encodes in + // memory and presents itself as a prefix_len = target + // single-RG stream to the merge engine. + let adapter = LegacyInputAdapter::try_open(source, path, target) + .await + .with_context(|| { + format!( + "opening legacy adapter for split {} with target_prefix_len = {target}", + split.split_id, + ) + })?; + Box::new(adapter) + } + _ => { + // Direct streaming reader: regular merge, or promotion + // where this input already satisfies the target. + let reader = StreamingParquetReader::try_open(source, path) + .await + .with_context(|| { + format!("opening streaming reader for split {}", split.split_id) + })?; + Box::new(reader) + } + }; + streams.push(stream); + } + + streaming::streaming_merge_sorted_parquet_files(streams, output_dir, config).await +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs index f373f71bd0c..3a0a72d7da8 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs @@ -40,13 +40,25 @@ use crate::split::{ParquetSplitId, ParquetSplitKind, ParquetSplitMetadata}; pub struct ParquetMergeOperation { /// New split ID for the merged output. pub merge_split_id: ParquetSplitId, - /// The input splits being merged (all from the same compaction scope and - /// `num_merge_ops` level). + /// The input splits being merged. All share the same `num_merge_ops` + /// level and the same windowed compaction scope (sort_fields + + /// window). In the default form they also share `rg_partition_prefix_len`; + /// in promotion mode (`target_prefix_len_override` is `Some`) they + /// may differ in prefix length up to the target — see + /// [`Self::promote_legacy`]. pub splits: Vec, + /// When `Some(N)`, this operation is a legacy-promotion merge: + /// inputs may carry `rg_partition_prefix_len < N` (they will be + /// opened through [`crate::storage::LegacyInputAdapter`] with + /// `target_prefix_len = N`) and the output will declare + /// `rg_partition_prefix_len = N`. When `None`, the operation is a + /// regular merge and all inputs must agree on prefix_len (default + /// behaviour). + pub target_prefix_len_override: Option, } impl ParquetMergeOperation { - /// Create a new merge operation consuming the given splits. + /// Create a regular merge operation consuming the given splits. /// /// Generates a fresh split ID for the merged output. The `kind` is inferred /// from the first split (all splits in a merge share the same kind). @@ -56,7 +68,66 @@ impl ParquetMergeOperation { /// - **MP-1**: all splits share the same `num_merge_ops` level /// - **MP-2**: at least 2 input splits /// - **MP-3**: all splits share the same compaction scope (sort_fields + window) + /// + /// For legacy-promotion operations (inputs at different + /// `rg_partition_prefix_len`), use [`Self::promote_legacy`] instead. pub fn new(splits: Vec) -> Self { + Self::check_mp1_mp2_mp3(&splits); + let kind = splits + .first() + .map(|s| s.kind) + .unwrap_or(ParquetSplitKind::Metrics); + Self { + merge_split_id: ParquetSplitId::generate(kind), + splits, + target_prefix_len_override: None, + } + } + + /// Create a legacy-promotion merge operation. + /// + /// Inputs may have heterogeneous `rg_partition_prefix_len` as long + /// as every input's value is `<= target_prefix_len`. The executor + /// opens any input with `prefix_len < target_prefix_len` through + /// [`crate::storage::LegacyInputAdapter`] with `target` set to the + /// override; inputs already at the target are opened directly via + /// the streaming reader. + /// + /// All other MP-3 dimensions (sort_fields, window) still must + /// agree — only the prefix-len equality is relaxed. + /// + /// # Panics (debug builds) / metrics (all builds) + /// + /// - **MP-1**: all splits share the same `num_merge_ops` level + /// - **MP-2**: at least 2 input splits + /// - **MP-3 (relaxed)**: all splits share sort_fields + window + /// - All inputs' `rg_partition_prefix_len <= target_prefix_len`. Inputs above the target are a + /// planner bug — they shouldn't be demoted, only promoted. + pub fn promote_legacy(splits: Vec, target_prefix_len: u32) -> Self { + Self::check_mp1_mp2_mp3(&splits); + // Every input must be promotable: prefix_len <= target. + // Demoting (input > target) is not the adapter's contract. + for (i, split) in splits.iter().enumerate() { + assert!( + split.rg_partition_prefix_len <= target_prefix_len, + "promote_legacy: input {i} has rg_partition_prefix_len = {} > target_prefix_len = \ + {target_prefix_len}; the adapter cannot demote a higher prefix to a lower one. \ + Pick a target >= max(inputs' prefix_len) or exclude this input.", + split.rg_partition_prefix_len, + ); + } + let kind = splits + .first() + .map(|s| s.kind) + .unwrap_or(ParquetSplitKind::Metrics); + Self { + merge_split_id: ParquetSplitId::generate(kind), + splits, + target_prefix_len_override: Some(target_prefix_len), + } + } + + fn check_mp1_mp2_mp3(splits: &[ParquetSplitMetadata]) { use quickwit_dst::check_invariant; use quickwit_dst::invariants::{InvariantId, merge_policy}; @@ -90,15 +161,6 @@ impl ParquetMergeOperation { InvariantId::MP3, merge_policy::all_same_compaction_scope(&sort_fields_vec, &windows) ); - - let kind = splits - .first() - .map(|s| s.kind) - .unwrap_or(ParquetSplitKind::Metrics); - Self { - merge_split_id: ParquetSplitId::generate(kind), - splits, - } } /// Returns the input splits as a slice. @@ -251,5 +313,69 @@ mod tests { ]; let op = ParquetMergeOperation::new(splits); assert_eq!(op.splits.len(), 2); + assert!( + op.target_prefix_len_override.is_none(), + "regular merges don't set the override", + ); + } + + /// Legacy-promotion happy path: a prefix_len=0 split + a prefix_len=2 + /// split with target=2. Both inputs share the windowed scope; the + /// operation records `target_prefix_len_override = Some(2)`. + #[test] + fn test_promote_legacy_pairs_legacy_with_aligned_peer() { + let mut legacy = make_split("legacy", 0, "metric_name|service|ts/V2", Some((0, 3600))); + legacy.rg_partition_prefix_len = 0; + + let mut aligned = make_split("aligned", 0, "metric_name|service|ts/V2", Some((0, 3600))); + aligned.rg_partition_prefix_len = 2; + + let op = ParquetMergeOperation::promote_legacy(vec![legacy, aligned], 2); + assert_eq!(op.splits.len(), 2); + assert_eq!(op.target_prefix_len_override, Some(2)); + } + + /// Promotion requires all inputs to have `prefix_len <= target`. + /// Passing an input whose prefix_len exceeds the target is a planner + /// bug — the adapter cannot DEMOTE alignment, only promote. + #[test] + #[should_panic(expected = "cannot demote a higher prefix")] + fn test_promote_legacy_rejects_higher_prefix_input() { + let mut legacy = make_split("legacy", 0, "a|b|ts/V2", Some((0, 3600))); + legacy.rg_partition_prefix_len = 0; + + let mut too_high = make_split("too_high", 0, "a|b|ts/V2", Some((0, 3600))); + too_high.rg_partition_prefix_len = 3; + + // target = 2, but too_high.rg_partition_prefix_len = 3. + ParquetMergeOperation::promote_legacy(vec![legacy, too_high], 2); + } + + /// Promotion still requires MP-3 on the non-prefix scope + /// dimensions: sort_fields + window. Mixed sort_fields must still + /// panic. + #[test] + #[should_panic(expected = "MP-3 violated")] + fn test_promote_legacy_still_enforces_sort_fields() { + let mut a = make_split("a", 0, "metric_name|ts/V2", Some((0, 3600))); + a.rg_partition_prefix_len = 0; + let mut b = make_split("b", 0, "different|schema/V2", Some((0, 3600))); + b.rg_partition_prefix_len = 1; + ParquetMergeOperation::promote_legacy(vec![a, b], 1); + } + + /// All inputs at the target prefix_len (no actual legacy promotion + /// happening) — the constructor still accepts it. The executor + /// will just open every input directly without the adapter. + /// Useful when a planner produces a uniform op that happens to be + /// at the same target. + #[test] + fn test_promote_legacy_all_at_target_is_valid() { + let mut a = make_split("a", 0, "a|ts/V2", Some((0, 3600))); + a.rg_partition_prefix_len = 1; + let mut b = make_split("b", 0, "a|ts/V2", Some((0, 3600))); + b.rg_partition_prefix_len = 1; + let op = ParquetMergeOperation::promote_legacy(vec![a, b], 1); + assert_eq!(op.target_prefix_len_override, Some(1)); } } diff --git a/quickwit/quickwit-parquet-engine/src/merge/schema.rs b/quickwit/quickwit-parquet-engine/src/merge/schema.rs index 68ebf851e59..085a673b463 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/schema.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/schema.rs @@ -300,14 +300,23 @@ fn align_batch_to_schema(batch: &RecordBatch, target_schema: &SchemaRef) -> Resu /// Normalize an Arrow data type for the internal union schema. /// /// All string-like types (Utf8, LargeUtf8, Dictionary(*, Utf8/LargeUtf8)) -/// are normalized to Utf8. This ensures `take` works uniformly across -/// concatenated inputs regardless of their original encoding. +/// are normalized to Utf8. All byte-array-like types (Binary, +/// LargeBinary, Dictionary(*, Binary/LargeBinary)) are normalized to +/// Binary. Parquet stores both string flavours under the same `BYTE_ARRAY` +/// physical type and both binary flavours likewise, so two inputs whose +/// schemas differ only by string/binary flavour represent the same +/// logical data; the union must accept them as one column. /// -/// Non-string types are returned as-is. +/// This ensures `take` works uniformly across concatenated inputs +/// regardless of their original encoding. Non-string/non-binary types +/// are returned as-is. fn normalize_type(dt: &DataType) -> DataType { if is_string_type(dt) { return DataType::Utf8; } + if is_byte_array_type(dt) { + return DataType::Binary; + } dt.clone() } @@ -319,3 +328,17 @@ fn is_string_type(dt: &DataType) -> bool { _ => false, } } + +/// Returns true if the data type represents raw byte arrays. +/// +/// Parquet has a single `BYTE_ARRAY` physical type; Binary and +/// LargeBinary (and dict-encoded variants) all map to it on the +/// wire. Schema evolution that changes one to the other across +/// ingester versions must merge cleanly. +fn is_byte_array_type(dt: &DataType) -> bool { + match dt { + DataType::Binary | DataType::LargeBinary => true, + DataType::Dictionary(_, value_type) => is_byte_array_type(value_type), + _ => false, + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs index 671579b84b8..2f660ca7885 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -3739,6 +3739,337 @@ mod tests { } } + /// Build two batches with the SAME sort schema but **different body- + /// col schemas**, exercising the merger's schema-evolution paths: + /// + /// - `body_string`: `Utf8` in A, `Dict` in B (string-flavour variation — + /// `normalize_type` collapses to Utf8). + /// - `body_bytes`: `LargeBinary` in A, `Binary` in B (byte-array-flavour variation — F6 + /// normalizer extension collapses to Binary). + /// - `body_list`: `List` present in A, ABSENT from B (MC-4 column union — B's rows + /// appear as nulls in the merged output; outer becomes nullable in the union, exercising the + /// F13 nullable-outer list write path). + /// - `body_a_only`: `Int32` in A only (MC-4 column union — B's rows null). + /// - `body_b_only`: `Int64` in B only (MC-4 column union — A's rows null). + /// - `body_value`: `Float64` in both (common-typed control column). + /// + /// Each row is keyed by a unique `sorted_series` value, and the cell + /// values are derived from the key + column name so that comparison + /// is byte-stable. + fn make_mixed_schema_input_a(num_rows: usize, key_offset: u64) -> RecordBatch { + use arrow::array::{Int32Array, LargeBinaryArray, ListArray}; + use arrow::buffer::OffsetBuffer; + + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + // sort cols + Field::new("metric_name", dict_type, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("sorted_series", DataType::Binary, false), + // body cols in lex order. body_a_only is A-exclusive. + // body_list is non-nullable here (A always has lists) but + // becomes nullable in the union because B lacks the col. + Field::new("body_a_only", DataType::Int32, true), + Field::new("body_bytes", DataType::LargeBinary, true), + Field::new( + "body_list", + DataType::List(Arc::new(Field::new("item", DataType::Float64, false))), + false, + ), + Field::new("body_string", DataType::Utf8, true), + Field::new("body_value", DataType::Float64, true), + ])); + + let metric_keys: Vec = vec![0; num_rows]; + let metric_values = StringArray::from(vec!["cpu.usage"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("dict"), + ); + let timestamps: Vec = (0..num_rows as u64) + .map(|i| 1_700_000_000 + (num_rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let key_bytes: Vec> = (0..num_rows as u64) + .map(|i| (key_offset + i).to_be_bytes().to_vec()) + .collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from( + key_bytes.iter().map(|v| v.as_slice()).collect::>(), + )); + + let body_a_only: ArrayRef = Arc::new(Int32Array::from( + (0..num_rows).map(|i| i as i32 * 7 - 5).collect::>(), + )); + let body_bytes_vals: Vec> = (0..num_rows) + .map(|i| format!("a-bytes-{i:04}").into_bytes()) + .collect(); + let body_bytes: ArrayRef = Arc::new(LargeBinaryArray::from( + body_bytes_vals + .iter() + .map(|v| Some(v.as_slice())) + .collect::>(), + )); + let body_string_vals: Vec = + (0..num_rows).map(|i| format!("a-str-{i:04}")).collect(); + let body_string: ArrayRef = Arc::new(StringArray::from(body_string_vals)); + let body_value: ArrayRef = Arc::new(Float64Array::from( + (0..num_rows).map(|i| i as f64 + 0.5).collect::>(), + )); + + // List body col: row i has a list of length (i % 3) + 1. + let mut list_offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut list_values: Vec = Vec::new(); + list_offsets.push(0); + for i in 0..num_rows { + for j in 0..((i % 3) + 1) { + list_values.push((i * 10 + j) as f64 + 0.25); + } + list_offsets.push(list_values.len() as i32); + } + let list_inner: ArrayRef = Arc::new(Float64Array::from(list_values)); + let body_list: ArrayRef = Arc::new(ListArray::new( + Arc::new(Field::new("item", DataType::Float64, false)), + OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(list_offsets)), + list_inner, + None, + )); + + RecordBatch::try_new( + schema, + vec![ + metric_name, + timestamp_secs, + sorted_series, + body_a_only, + body_bytes, + body_list, + body_string, + body_value, + ], + ) + .expect("input A batch") + } + + fn make_mixed_schema_input_b(num_rows: usize, key_offset: u64) -> RecordBatch { + use arrow::array::{BinaryArray as BinArr, Int64Array as I64}; + + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + // Same sort cols. + Field::new("metric_name", dict_type.clone(), false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("sorted_series", DataType::Binary, false), + // Body cols in lex order. body_string is Dict (flavor + // change from A's Utf8). body_bytes is Binary (flavor + // change from A's LargeBinary). No body_list. Adds + // body_b_only. + Field::new("body_b_only", DataType::Int64, true), + Field::new("body_bytes", DataType::Binary, true), + Field::new("body_string", dict_type.clone(), true), + Field::new("body_value", DataType::Float64, true), + ])); + + let metric_keys: Vec = vec![0; num_rows]; + let metric_values = StringArray::from(vec!["cpu.usage"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("dict"), + ); + let timestamps: Vec = (0..num_rows as u64) + .map(|i| 1_700_000_000 + (num_rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let key_bytes: Vec> = (0..num_rows as u64) + .map(|i| (key_offset + i).to_be_bytes().to_vec()) + .collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from( + key_bytes.iter().map(|v| v.as_slice()).collect::>(), + )); + + let body_b_only: ArrayRef = Arc::new(I64::from( + (0..num_rows) + .map(|i| (i as i64) * 1_000_003 + 17) + .collect::>(), + )); + let body_bytes_vals: Vec> = (0..num_rows) + .map(|i| format!("b-bytes-{i:04}").into_bytes()) + .collect(); + let body_bytes: ArrayRef = Arc::new(BinArr::from( + body_bytes_vals + .iter() + .map(|v| Some(v.as_slice())) + .collect::>(), + )); + + // Dict-encoded body_string. + let body_string_pool: Vec = + (0..num_rows).map(|i| format!("b-str-{i:04}")).collect(); + let body_string_keys: Vec = (0..num_rows as i32).collect(); + let body_string: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(body_string_keys), + Arc::new(StringArray::from(body_string_pool)), + ) + .expect("dict"), + ); + + let body_value: ArrayRef = Arc::new(Float64Array::from( + (0..num_rows) + .map(|i| (i as f64) * -1.25 + 7.0) + .collect::>(), + )); + + RecordBatch::try_new( + schema, + vec![ + metric_name, + timestamp_secs, + sorted_series, + body_b_only, + body_bytes, + body_string, + body_value, + ], + ) + .expect("input B batch") + } + + /// F6 — MC-2/MC-4 across heterogeneous body schemas. Inputs A and B + /// share the sort schema but differ in: + /// - body_string flavour (Utf8 vs Dict) + /// - body_bytes flavour (LargeBinary vs Binary) — relies on the F6 `normalize_type` extension + /// - body_list present in A, missing in B + /// - body_a_only / body_b_only present in only one input + /// + /// After merging, the output must contain the union of body cols + /// (MC-4) with B's rows null in body_list / body_a_only and A's + /// rows null in body_b_only. Shared-col values must survive (MC-2) + /// despite the flavour differences. + #[tokio::test] + async fn test_mc2_mixed_schemas_round_trip() { + let batch_a = make_mixed_schema_input_a(40, 0); + let batch_b = make_mixed_schema_input_b(40, 10_000); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge with mixed schemas"); + assert_eq!(outputs.len(), 1); + assert_eq!( + outputs[0].num_rows, 80, + "rows preserved across schema-evolved inputs (MC-1)", + ); + + let merged = read_output_to_record_batch(&outputs[0].path); + let merged_schema = merged.schema(); + + // MC-4: union of body-col names must appear in the output. The + // streaming engine drops all-null sort cols, but body cols + // present in any input must survive (even if half the rows + // are null). + for col_name in [ + "body_a_only", + "body_b_only", + "body_bytes", + "body_list", + "body_string", + "body_value", + ] { + merged_schema.index_of(col_name).unwrap_or_else(|_| { + panic!("merged output is missing body col '{col_name}' — MC-4 violated") + }); + } + + // MC-2: build (sorted_series → tuple) maps for each input and + // the output, then verify every input row's tuple matches the + // output row's tuple. `render_cell` normalizes the same + // string/byte flavour rendering so the type variations don't + // generate spurious diffs. + let series_idx_a = batch_a.schema().index_of("sorted_series").unwrap(); + let series_idx_b = batch_b.schema().index_of("sorted_series").unwrap(); + let body_col_names_union: Vec<&str> = vec![ + "body_a_only", + "body_b_only", + "body_bytes", + "body_list", + "body_string", + "body_value", + ]; + + let render_row = |batch: &RecordBatch, row: usize| -> String { + let mut tuple = String::new(); + for (n, &name) in body_col_names_union.iter().enumerate() { + if n > 0 { + tuple.push('|'); + } + match batch.schema().index_of(name) { + Ok(col_idx) => { + tuple.push_str(&render_cell(batch.column(col_idx).as_ref(), row)) + } + // Col absent from this input → null when seen + // in the merged output. + Err(_) => tuple.push_str(""), + } + } + tuple + }; + + let mut expected: HashMap, String> = HashMap::new(); + let series_col_a = batch_a + .column(series_idx_a) + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..batch_a.num_rows() { + let key = series_col_a.value(row).to_vec(); + expected.insert(key, render_row(&batch_a, row)); + } + let series_col_b = batch_b + .column(series_idx_b) + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..batch_b.num_rows() { + let key = series_col_b.value(row).to_vec(); + expected.insert(key, render_row(&batch_b, row)); + } + assert_eq!(expected.len(), 80); + + let series_idx_out = merged_schema.index_of("sorted_series").unwrap(); + let series_col_out = merged + .column(series_idx_out) + .as_any() + .downcast_ref::() + .unwrap(); + let mut observed: HashMap, String> = HashMap::with_capacity(merged.num_rows()); + for row in 0..merged.num_rows() { + let key = series_col_out.value(row).to_vec(); + observed.insert(key, render_row(&merged, row)); + } + assert_eq!(observed.len(), 80, "MC-1 — no rows lost"); + + for (key, want) in &expected { + let got = observed + .get(key) + .unwrap_or_else(|| panic!("missing key {key:?} in merged output")); + assert_eq!( + got, want, + "MC-2 mismatch for sorted_series {key:?}: got {got}, want {want}", + ); + } + } + // ============================================================================ // MS-7: page-cache bounded-memory contract. The streaming engine's // raison d'être is that body-col memory stays bounded by ~constant @@ -4017,4 +4348,172 @@ mod tests { "service-name set must cover every distinct value in the sort col", ); } + + // ============================================================================ + // (b) Legacy-promotion executor: an end-to-end test through + // `ParquetMergeOperation::promote_legacy` → + // `execute_merge_operation` → streaming engine output. + // ============================================================================ + + /// Build a minimal `ParquetSplitMetadata` for use by the promotion + /// executor. Only the routing-relevant fields are real — the rest + /// are placeholders sized to match across inputs so MP-3 holds. + fn make_promotion_split_meta( + split_id: &str, + rg_partition_prefix_len: u32, + ) -> crate::split::ParquetSplitMetadata { + crate::split::ParquetSplitMetadata::metrics_builder() + .split_id(crate::split::ParquetSplitId::new(split_id)) + .index_uid("test-index:00000000000000000000000001") + .partition_id(0) + .time_range(crate::split::TimeRange::new(1_700_000_000, 1_700_000_060)) + .sort_fields("metric_name|-timestamp_secs/V2") + .window_start_secs(1_700_000_000) + .window_duration_secs(60) + .num_merge_ops(0) + .rg_partition_prefix_len(rg_partition_prefix_len) + .num_rows(0) + .size_bytes(0) + .build() + } + + /// End-to-end legacy promotion: a prefix_len=0 legacy single-RG + /// input + a prefix_len=1 aligned multi-RG input, merged via + /// `execute_merge_operation` with `target_prefix_len = 1`. The + /// legacy input is routed through `LegacyInputAdapter`, the + /// aligned one goes direct. The streaming engine sees uniform + /// prefix_len=1 inputs and produces aligned multi-RG output that + /// passes `assert_unique_rg_prefix_keys`. + #[tokio::test] + async fn test_promote_legacy_executor_end_to_end() { + use crate::merge::execute_merge_operation; + use crate::merge::policy::ParquetMergeOperation; + use crate::storage::RemoteByteSource; + + // Legacy input: 3 metrics × 20 rows, prefix_len = 0. + let legacy_bytes = make_multi_metric_single_rg_input(&[ + ("cpu.usage", 20), + ("memory.used", 20), + ("net.bytes", 20), + ]); + let legacy_meta = make_promotion_split_meta("legacy_001", 0); + + // Aligned input: 2 metrics × 30 rows in 2 RGs, prefix_len = 1. + // `make_two_metric_aligned_input` uses metric names "cpu.usage" + // and "memory.used", overlapping with the legacy input — the + // merge engine must interleave by metric_name across inputs. + let aligned_bytes = make_two_metric_aligned_input(); + let aligned_meta = make_promotion_split_meta("aligned_001", 1); + + let op = ParquetMergeOperation::promote_legacy(vec![legacy_meta, aligned_meta], 1); + + let sources: Vec> = vec![ + Arc::new(InMemorySource { + bytes: legacy_bytes, + }), + Arc::new(InMemorySource { + bytes: aligned_bytes, + }), + ]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = execute_merge_operation(&op, sources, tmp.path(), &merge_config(1)) + .await + .expect("promote-legacy merge"); + + assert_eq!(outputs.len(), 1, "single output file"); + assert_eq!( + outputs[0].num_rows, + 60 + 60, + "legacy 3×20 + aligned 2×30 rows = 120 total", + ); + + // The streaming engine sees the legacy input as prefix_len=1 + // (adapter rewrote the KV), and the aligned input is already + // prefix_len=1. So the output should advertise prefix_len=1. + assert_eq!( + outputs[0].output_rg_partition_prefix_len, 1, + "executor must produce an output that carries the promoted prefix_len", + ); + + // Read the on-disk KV: must match. (CS-1 inside the file.) + let bytes_out = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes_out)).expect("ser"); + let prefix_kv = reader + .metadata() + .file_metadata() + .key_value_metadata() + .and_then(|kvs| { + kvs.iter() + .find(|k| k.key == PARQUET_META_RG_PARTITION_PREFIX_LEN) + .and_then(|k| k.value.clone()) + }); + assert_eq!( + prefix_kv.as_deref(), + Some("1"), + "on-disk KV must declare prefix_len = 1 (target)", + ); + + // Output should have one RG per distinct metric_name. The + // union of legacy + aligned metric names is {cpu.usage, + // memory.used, net.bytes} → 3 RGs. + assert_eq!( + reader.metadata().num_row_groups(), + 3, + "output should have one RG per distinct metric_name across inputs", + ); + + // PA-1 + PA-3: each output RG carries a unique, constant + // metric_name. This is the strong chunk-stats check that + // motivated F2. + assert_unique_rg_prefix_keys( + reader.metadata(), + "metric_name|-timestamp_secs/V2", + 1, + "test_promote_legacy_executor_end_to_end output", + ) + .expect("promoted output must satisfy PA-1 + PA-3"); + + // Metastore record (CS-1): builds successfully with + // mixed_prefix_ok = true, and `rg_partition_prefix_len` + // matches the on-disk KV. + let metastore_meta = crate::merge::metadata_aggregation::merge_parquet_split_metadata( + op.splits_as_slice(), + &outputs[0], + /* mixed_prefix_ok */ true, + ) + .expect("metastore aggregation accepts mixed-prefix inputs in promotion mode"); + assert_eq!( + metastore_meta.rg_partition_prefix_len, 1, + "metastore prefix_len must match the on-disk KV (CS-1)", + ); + } + + /// Negative: if the executor is given more sources than splits, or + /// fewer, it bails up-front rather than producing a partial merge. + #[tokio::test] + async fn test_executor_mismatched_sources_count_bails() { + use crate::merge::execute_merge_operation; + use crate::merge::policy::ParquetMergeOperation; + use crate::storage::RemoteByteSource; + + let legacy_meta = make_promotion_split_meta("a", 0); + let aligned_meta = make_promotion_split_meta("b", 1); + let op = ParquetMergeOperation::promote_legacy(vec![legacy_meta, aligned_meta], 1); + + // One source for two splits. + let sources: Vec> = vec![Arc::new(InMemorySource { + bytes: make_multi_metric_single_rg_input(&[("cpu.usage", 10)]), + })]; + + let tmp = TempDir::new().expect("tmpdir"); + let err = execute_merge_operation(&op, sources, tmp.path(), &merge_config(1)) + .await + .expect_err("mismatched source/split count must fail"); + let msg = err.to_string(); + assert!( + msg.contains("sources.len()") && msg.contains("op.splits.len()"), + "error should explain the mismatch, got: {msg}", + ); + } } diff --git a/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs index 3b83ff1d332..df903cacee2 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/streaming_writer.rs @@ -580,14 +580,15 @@ fn write_array_via_serialized_column_writer( &materialized_ref, ); } - // `List` / `LargeList` with non-nullable outer + inner. - // The DDSketch `keys` (`List`) and `counts` - // (`List`) columns are this shape. We compute Dremel - // def/rep levels from each input array and write them through - // the same `SerializedColumnWriter::write_batch` call the flat + // `List` / `LargeList` with non-nullable inner. The outer + // may be either nullable (schema-evolution case where the col + // is present in only some inputs) or non-nullable (e.g. + // DDSketch `keys` / `counts`). We compute Dremel def/rep + // levels from each input array and write them through the + // same `SerializedColumnWriter::write_batch` call the flat // path uses, so memory stays bounded by one in-flight page. DataType::List(_) | DataType::LargeList(_) => { - write_non_nullable_list_via_serialized_column_writer(col_writer, field, array)?; + write_list_via_serialized_column_writer(col_writer, field, array)?; } // Multi-leaf nested (Struct, Map) and other unsupported types. // Single-leaf multi-child Structs are rejected at @@ -607,37 +608,24 @@ fn write_array_via_serialized_column_writer( Ok(()) } -/// Page-bounded write for `List` / `LargeList` where the outer -/// field is non-nullable and the inner field is non-nullable. Computes -/// Dremel def/rep levels (max_def = 1, max_rep = 1) and dispatches the -/// flat inner values through the same typed `write_batch` call the flat -/// arms use. Pages flush as the writer's -/// `data_page_size_limit` / `data_page_row_count_limit` thresholds are -/// reached — same memory-bound contract as the flat path. -fn write_non_nullable_list_via_serialized_column_writer( - col_writer: &mut parquet::file::writer::SerializedColumnWriter<'_>, - field: &arrow::datatypes::Field, - array: &arrow::array::ArrayRef, -) -> Result<(), ParquetWriteError> { - use arrow::array::{Array, LargeListArray, ListArray}; - use arrow::datatypes::DataType; - - if field.is_nullable() { - return Err(ParquetWriteError::SchemaValidation(format!( - "field '{}' is a nullable List; only non-nullable List is supported on the streaming \ - write path", - field.name(), - ))); - } - - // Resolve inner field + values + per-row offsets uniformly across - // List and LargeList. Offsets coerce to i64 so a single - // function body handles both representations. - let (inner_field, inner_values, offsets): ( - &arrow::datatypes::Field, - &arrow::array::ArrayRef, +/// Resolve a `ListArray` or `LargeListArray` into a unified +/// `(inner_field, inner_values, offsets)` triple. Offsets always coerce +/// to `i64` so the caller doesn't need to branch on `List` vs +/// `LargeList`. +fn resolve_list_components<'a>( + field: &'a arrow::datatypes::Field, + array: &'a arrow::array::ArrayRef, +) -> Result< + ( + &'a arrow::datatypes::Field, + &'a arrow::array::ArrayRef, Vec, - ) = match field.data_type() { + ), + ParquetWriteError, +> { + use arrow::array::{LargeListArray, ListArray}; + use arrow::datatypes::DataType; + match field.data_type() { DataType::List(inner_field_ref) => { let arr = array.as_any().downcast_ref::().ok_or_else(|| { ParquetWriteError::SchemaValidation(format!( @@ -646,7 +634,7 @@ fn write_non_nullable_list_via_serialized_column_writer( )) })?; let offsets: Vec = arr.value_offsets().iter().map(|&o| o as i64).collect(); - (inner_field_ref.as_ref(), arr.values(), offsets) + Ok((inner_field_ref.as_ref(), arr.values(), offsets)) } DataType::LargeList(inner_field_ref) => { let arr = array @@ -659,14 +647,42 @@ fn write_non_nullable_list_via_serialized_column_writer( )) })?; let offsets: Vec = arr.value_offsets().to_vec(); - (inner_field_ref.as_ref(), arr.values(), offsets) - } - other => { - return Err(ParquetWriteError::SchemaValidation(format!( - "internal: write_non_nullable_list called with non-list type {other:?}", - ))); + Ok((inner_field_ref.as_ref(), arr.values(), offsets)) } - }; + other => Err(ParquetWriteError::SchemaValidation(format!( + "internal: resolve_list_components called with non-list type {other:?}", + ))), + } +} + +/// Page-bounded write for `List` / `LargeList` with non-nullable +/// inner element. Handles both nullable and non-nullable outer fields: +/// +/// - **Non-nullable outer** (e.g. DDSketch `keys` / `counts`): max_def = 1, max_rep = 1. +/// - Empty list at row → def=0, rep=0, no value. +/// - N-element list at row → N×(def=1, rep=[0,1,1,…]) plus N values. +/// - **Nullable outer** (schema-evolution case where the col is missing from some inputs): max_def +/// = 2, max_rep = 1. +/// - Outer is null at row → def=0, rep=0, no value. +/// - Empty list at row → def=1, rep=0, no value. +/// - N-element list at row → N×(def=2, rep=[0,1,1,…]) plus N values. +/// +/// Pages flush as the writer's `data_page_size_limit` / +/// `data_page_row_count_limit` thresholds are reached — same +/// memory-bound contract as the flat path. +fn write_list_via_serialized_column_writer( + col_writer: &mut parquet::file::writer::SerializedColumnWriter<'_>, + field: &arrow::datatypes::Field, + array: &arrow::array::ArrayRef, +) -> Result<(), ParquetWriteError> { + use arrow::array::Array; + + let outer_nullable = field.is_nullable(); + + // Resolve inner field + values + per-row offsets uniformly across + // List and LargeList. Offsets coerce to i64 so a single + // function body handles both representations. + let (inner_field, inner_values, offsets) = resolve_list_components(field, array)?; if inner_field.is_nullable() { return Err(ParquetWriteError::SchemaValidation(format!( @@ -676,49 +692,53 @@ fn write_non_nullable_list_via_serialized_column_writer( ))); } - // Walk per-row to build Dremel levels. - // - // Path: required outer group → repeated `list` → required `element`. - // - max_rep_level = 1 (only `list` is repeated). - // - max_def_level = 1 (the repeated `list` group can occur 0 times, which is how parquet - // encodes an empty list; 1 marks "element present"). - // - // Per row: - // - empty list: emit one slot with def = 0, rep = 0, no value - // - list of N elements: emit N slots, def = 1 each, rep = 0 for the first and rep = 1 for the - // rest, plus N values. + let empty_list_def: i16 = if outer_nullable { 1 } else { 0 }; + let element_present_def: i16 = if outer_nullable { 2 } else { 1 }; + let num_rows = array.len(); - let total_present: usize = (0..num_rows) - .map(|row| (offsets[row + 1] - offsets[row]).max(0) as usize) - .sum(); - // Each row contributes either 1 level (empty) or list_len levels. - let total_levels = (0..num_rows) - .map(|row| { - let len = (offsets[row + 1] - offsets[row]).max(0) as usize; - if len == 0 { 1 } else { len } - }) - .sum::(); - let mut def_levels: Vec = Vec::with_capacity(total_levels); - let mut rep_levels: Vec = Vec::with_capacity(total_levels); + // A null outer at row R contributes one def=0 level and no value. + // The inner-values gather (in `write_list_inner_values`) skips + // null rows entirely via this mask. + let null_rows: Option> = if outer_nullable { + Some((0..num_rows).map(|row| array.is_null(row)).collect()) + } else { + None + }; + + let mut def_levels: Vec = Vec::new(); + let mut rep_levels: Vec = Vec::new(); + let mut total_present: usize = 0; for row in 0..num_rows { - let start = offsets[row]; - let end = offsets[row + 1]; - let len = (end - start).max(0) as usize; - if len == 0 { + let is_null = match null_rows.as_ref() { + Some(n) => n[row], + None => false, + }; + if is_null { + // Null outer: one def=0 level, no value. def_levels.push(0); rep_levels.push(0); + continue; + } + let len = (offsets[row + 1] - offsets[row]).max(0) as usize; + if len == 0 { + def_levels.push(empty_list_def); + rep_levels.push(0); } else { for i in 0..len { - def_levels.push(1); + def_levels.push(element_present_def); rep_levels.push(if i == 0 { 0 } else { 1 }); } + total_present += len; } } - // Dispatch the inner primitive through the appropriate typed - // writer. Indexing iterates only the present (non-empty-list) rows - // — start..end ranges, walked once for the whole array — so we - // emit exactly `total_present` values. + // The inner-values dispatcher walks `start..end` per row and + // gathers into a contiguous vec. To do that against the original + // inner_values (which is shared across all rows including the + // null ones), we hand it the ORIGINAL per-row offsets plus a + // `null_rows` mask. Non-null rows contribute their full range; + // null rows are skipped. The dispatcher emits exactly + // `total_present` values. write_list_inner_values( col_writer, field, @@ -728,6 +748,7 @@ fn write_non_nullable_list_via_serialized_column_writer( total_present, &def_levels, &rep_levels, + null_rows.as_deref(), ) } @@ -746,6 +767,11 @@ fn write_list_inner_values( total_present: usize, def_levels: &[i16], rep_levels: &[i16], + // Present for nullable-outer lists: `null_rows[row]` is true when + // row is null on the outer (no inner values to emit). Indexed by + // the same row range as `offsets`. None means "no nullable outer" + // (all rows are present), so the per-row check is skipped. + null_rows: Option<&[bool]>, ) -> Result<(), ParquetWriteError> { use arrow::array::{ Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, UInt8Array, @@ -756,9 +782,19 @@ fn write_list_inner_values( // Walk the per-row [start, end) ranges once and gather the // present-only values into a contiguous Vec for `write_batch`. + // Null outer rows are skipped — their inner range is not emitted. + let is_null_row = |row: usize| -> bool { + match null_rows { + Some(n) => n[row], + None => false, + } + }; let collect_i32 = |extract: &dyn Fn(usize) -> i32| -> Vec { let mut out = Vec::with_capacity(total_present); for row in 0..(offsets.len() - 1) { + if is_null_row(row) { + continue; + } let start = offsets[row].max(0) as usize; let end = offsets[row + 1].max(0) as usize; for i in start..end { @@ -770,6 +806,9 @@ fn write_list_inner_values( let collect_i64 = |extract: &dyn Fn(usize) -> i64| -> Vec { let mut out = Vec::with_capacity(total_present); for row in 0..(offsets.len() - 1) { + if is_null_row(row) { + continue; + } let start = offsets[row].max(0) as usize; let end = offsets[row + 1].max(0) as usize; for i in start..end { @@ -861,6 +900,9 @@ fn write_list_inner_values( .unwrap(); let mut values = Vec::with_capacity(total_present); for row in 0..(offsets.len() - 1) { + if is_null_row(row) { + continue; + } let start = offsets[row].max(0) as usize; let end = offsets[row + 1].max(0) as usize; for i in start..end { @@ -880,6 +922,9 @@ fn write_list_inner_values( .unwrap(); let mut values = Vec::with_capacity(total_present); for row in 0..(offsets.len() - 1) { + if is_null_row(row) { + continue; + } let start = offsets[row].max(0) as usize; let end = offsets[row + 1].max(0) as usize; for i in start..end {