From 06ae69941d9dceea7f00ed42546125fa50b443b8 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Mon, 11 May 2026 11:24:35 -0400 Subject: [PATCH 1/9] feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The streaming merge engine now keeps body-col memory bounded by output page size (not column-chunk size) while preserving caller-specified M:N output splitting at sorted_series boundaries. Architecture (Husky multi-input → multi-output sorted merge): Phase 0 (async) — drain sort cols from each input. With Husky column ordering, sort cols + sorted_series are the prefix of each row group's body bytes, so the decoder can stop after they are fully decoded; the remaining body col pages stay un-read in the input stream, ready for phase 3. Phase 1 — compute_merge_order over the per-input sort-col RecordBatches using the existing k-way (sorted_series, timestamp_secs) heap. Phase 2 — compute_output_boundaries with the caller's num_outputs, splitting at sorted_series transitions. Phase 3 (blocking + block_on bridges) — streaming write. All M output writers are alive for the duration. For each column in Husky order, every output's col K is written in turn: - Sort col / sorted_series: applied via arrow::interleave from the already-buffered phase-0 data. - Body col: each output page is assembled via arrow::interleave from input page slices, with decoders advanced page-by-page via handle.block_on from inside the sync iterator passed to write_next_column_arrays. Pages flush to the writer's sink as SerializedColumnWriter's page-size threshold trips — memory stays bounded by the in-flight output page plus a small number of in-flight input pages. After all M outputs' col K is done, every input decoder is at the start of col K+1 in its single row group. Move to col K+1. PR-6b.2 only handles single-row-group inputs (real or PR-5- adapter-presented). Multi-RG metric-aligned inputs are rejected with a clear error message; supporting them requires either consuming + discarding body cols of RG[i-1] from the stream to reach RG[i]'s sort cols, or a second body GET — both are larger scope changes that land in a follow-up. Page-bounded contract verified by test_body_col_streams_many_pages_per_column_chunk: with data_page_row_count_limit=1000 on an 8000-row merge, the output value column spans ≥ 2 pages, demonstrating that body col writes respect data_page_size and do not materialise whole column chunks. Tests (9, all passing): two-input merge, single-RG output for single-metric_name input, total-rows-preserved across M:N, sort-schema mismatch rejection, KV metadata propagation, all-empty-inputs no-output, output drainable by StreamDecoder, multi-RG input rejection, page-bounded body col streaming. Also exposes existing helpers in merge/writer.rs as pub(super) (apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, resolve_sort_field_names, verify_sort_order) so streaming.rs can reuse the same MC-3 / KV / sorting-columns construction the non-streaming engine uses. PR-7 will fold the non-streaming engine away. PR-6c.2 will add file-size monitoring on top: close the current output at the next sorted_series transition when an in-progress file approaches the size cap, producing additional splits beyond the caller's N. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../quickwit-parquet-engine/src/merge/mod.rs | 1 + .../src/merge/streaming.rs | 2235 +++++++++++++++++ .../src/merge/writer.rs | 13 +- 3 files changed, 2244 insertions(+), 5 deletions(-) create mode 100644 quickwit/quickwit-parquet-engine/src/merge/streaming.rs diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 008c456208a..fcc353992a9 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -24,6 +24,7 @@ mod merge_order; pub mod metadata_aggregation; pub mod policy; mod schema; +pub mod streaming; mod writer; #[cfg(test)] diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs new file mode 100644 index 00000000000..d7e74e8428f --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -0,0 +1,2235 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Streaming column-major merge engine with page-bounded body cols. +//! +//! Architecture (Husky multi-input → multi-output sorted merge): +//! +//! 1. **Phase 0 (async): drain sort cols** from each input. With Husky column ordering, sort cols + +//! `sorted_series` are the prefix of each row group's body bytes, so we can stop the decoder +//! after those are fully decoded. The remaining body col pages stay un-read in the input stream, +//! ready for phase 3. +//! 2. **Phase 1: compute merge order** via the existing k-way merge on `(sorted_series, +//! timestamp_secs)` from the per-input sort col [`RecordBatch`]es. Produces a run-length-encoded +//! merge plan over input row positions. +//! 3. **Phase 2: compute output boundaries** with the caller's `num_outputs`, splitting at +//! `sorted_series` transitions so each output file's key range is non-overlapping with adjacent +//! files. +//! 4. **Phase 3 (blocking + block_on bridges): streaming write**. All output writers are alive for +//! the duration. For each column in Husky order, every output's col K is written in turn: +//! - Sort col / `sorted_series`: applied via `take` from the already-buffered phase 0 data. +//! - Body col: each output page is assembled via [`arrow::compute::interleave`] from input page +//! slices, with decoders advanced page-by-page via `Handle::block_on` from inside a sync +//! iterator. Pages flush to the writer's sink as [`SerializedColumnWriter`]'s page-size +//! threshold trips — memory stays bounded by the in-flight output page plus a small number of +//! in-flight input pages. +//! +//! After all M outputs' col K is done, every input decoder is at the +//! start of col K+1 in its single row group. Move to col K+1. +//! +//! ## Single-RG inputs assumption +//! +//! PR-6b.2 only handles **single-row-group inputs**. With multi-RG +//! inputs the body bytes interleave with successive RGs' sort cols +//! (`sort_cols_RG0`, `body_cols_RG0`, `sort_cols_RG1`, ...), so +//! draining sort cols from RG1 onwards requires either consuming + +//! discarding body cols of RG0 from the stream or buffering them. +//! Neither fits the page-bounded contract; multi-RG-input streaming +//! lands in a follow-up. Today's real inputs are: (a) post-PR-3 +//! single-RG ingest splits, or (b) PR-5's legacy adapter that +//! presents arbitrary multi-RG splits as one synthetic RG. Both +//! satisfy the assumption. +//! +//! [`SerializedColumnWriter`]: parquet::file::writer::SerializedColumnWriter + +#![allow(dead_code)] + +use std::collections::{HashMap, HashSet}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{Context, Result, anyhow, bail}; +use arrow::array::{Array, ArrayRef, RecordBatch, new_null_array}; +use arrow::compute::interleave; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef}; +use parquet::file::metadata::ParquetMetaData; +use tokio::runtime::Handle; +use tracing::info; +use ulid::Ulid; + +use super::merge_order::{MergeRun, compute_merge_order, compute_output_boundaries}; +use super::schema::{align_inputs_to_union_schema, optimize_output_batch}; +use super::writer::{ + apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, + resolve_sort_field_names, verify_sort_order, +}; +use super::{InputMetadata, MergeConfig, MergeOutputFile}; +use crate::row_keys; +use crate::sort_fields::{ + equivalent_schemas_for_compaction, is_timestamp_column_name, parse_sort_fields, +}; +use crate::sorted_series::SORTED_SERIES_COLUMN; +use crate::split::TAG_SERVICE; +use crate::storage::page_decoder::{DecodedPage, StreamDecoder}; +use crate::storage::split_writer::{extract_metric_names, extract_time_range}; +use crate::storage::streaming_writer::StreamingParquetWriter; +use crate::storage::{ + ColumnPageStream, PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, + PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, +}; +use crate::zonemap::{self, ZonemapOptions}; + +/// Output page size in rows for body-col assembly. Each call to the +/// sync iterator passed to [`write_next_column_arrays`] yields one +/// `ArrayRef` of up to this many rows; the parquet writer flushes +/// physical pages independently as encoded bytes cross +/// `data_page_size_limit`. 1024 keeps assembled arrays small enough +/// to bound per-output memory but large enough to amortise per-page +/// fixed costs. +/// +/// [`write_next_column_arrays`]: crate::storage::streaming_writer::RowGroupBuilder::write_next_column_arrays +const OUTPUT_PAGE_ROWS: usize = 1024; + +/// Streaming N-input → M-output column-major merge. +/// +/// See module docs for the four phases. Returns one +/// [`MergeOutputFile`] per output file produced (zero-row outputs are +/// dropped). Caller's `config.num_outputs` is the upper bound on the +/// number of files; fewer are returned when there are not enough +/// `sorted_series` transitions to split at. +pub async fn streaming_merge_sorted_parquet_files( + inputs: Vec>, + output_dir: &Path, + config: &MergeConfig, +) -> Result> { + if inputs.is_empty() { + bail!("merge requires at least one input"); + } + if config.num_outputs == 0 { + bail!("num_outputs must be at least 1"); + } + + // Validate that all inputs are single-RG (or zero-RG, which means + // the file has no data). PR-6b.2 simplification — see module docs. + for (idx, stream) in inputs.iter().enumerate() { + let num_rgs = stream.metadata().num_row_groups(); + if num_rgs > 1 { + bail!( + "streaming merge requires single-row-group inputs in PR-6b.2 (input {idx} has \ + {num_rgs} row groups); multi-RG metric-aligned inputs land in a follow-up. \ + Legacy multi-RG (rg_partition_prefix_len=0) inputs must go through the PR-5 \ + adapter, which presents them as a single synthetic row group." + ); + } + } + + let input_meta = extract_and_validate_input_metadata(&inputs)?; + + info!( + num_inputs = inputs.len(), + num_outputs = config.num_outputs, + sort_fields = %input_meta.sort_fields, + "starting streaming sorted parquet merge" + ); + + let output_dir = output_dir.to_path_buf(); + let writer_config = config.writer_config.clone(); + let num_outputs = config.num_outputs; + + // Move everything onto a blocking task. Inside, the decoders need to + // make async I/O calls (page fetches over the network); we drive + // those via `handle.block_on(...)` from inside sync iterators that + // feed the parquet writer's column-write methods. The writer is + // sync; this single-task pattern avoids the lifetime complexity of + // moving borrowed `RowGroupBuilder`s across tokio tasks. + let result = tokio::task::spawn_blocking(move || -> Result> { + let handle = Handle::current(); + + let mut inputs = inputs; + let mut decoders_state = build_input_decoders_state(&mut inputs)?; + + // Phase 0 + let sort_col_batches = + drain_sort_cols_all_inputs(&handle, &mut decoders_state, &input_meta.sort_fields)?; + + if sort_col_batches.iter().all(|b| b.num_rows() == 0) { + info!("all inputs empty, producing no output"); + return Ok(Vec::new()); + } + + // Phase 1: align inputs to a union sort-col schema so the merge-order + // comparator sees uniformly-typed `sorted_series` + `timestamp_secs`. + let (sort_union_schema, aligned_sort_batches) = + align_inputs_to_union_schema(&sort_col_batches, &input_meta.sort_fields)?; + let merge_order = compute_merge_order(&aligned_sort_batches, &input_meta.sort_fields)?; + + // Phase 2: split merge order into M outputs at sorted_series boundaries. + let boundaries = + compute_output_boundaries(&merge_order, &aligned_sort_batches, num_outputs)?; + + let total_rows: usize = aligned_sort_batches.iter().map(|b| b.num_rows()).sum(); + info!( + total_rows, + num_outputs = boundaries.len(), + "streaming merge order computed" + ); + + // Pre-compute per-input row → (output_idx, output_position) destination map. + // Used by every column write to slice take/interleave indices per page. + let destinations = + build_input_row_destinations(&aligned_sort_batches, &merge_order, &boundaries); + + // Phase 3 + let outputs = write_streaming_outputs( + &handle, + &mut decoders_state, + &aligned_sort_batches, + &sort_union_schema, + &merge_order, + &boundaries, + &destinations, + &input_meta, + &writer_config, + &output_dir, + )?; + + // MC-1: total row count preserved. + let output_total: usize = outputs.iter().map(|o| o.num_rows).sum(); + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::MC1, + output_total == total_rows, + ": streaming merge input rows={}, output rows={}", + total_rows, + output_total, + ); + + Ok(outputs) + }) + .await + .context("streaming merge blocking task panicked")??; + + Ok(result) +} + +/// Per-input state held across phase 0 → phase 3 inside the blocking +/// task. The decoder borrows mutably from its input stream; both live +/// here so the borrow checker is happy with one struct per input. +struct InputDecoderState { + /// Owned stream (so the decoder's `&mut dyn ColumnPageStream` borrow + /// is anchored to a stable address inside this struct). + stream: Box, + metadata: Arc, + /// Arrow schema of this input (from parquet → arrow conversion). + arrow_schema: SchemaRef, +} + +/// Build per-input state. The streams are moved in from the caller. +fn build_input_decoders_state( + inputs: &mut Vec>, +) -> Result> { + let mut states = Vec::with_capacity(inputs.len()); + for stream in inputs.drain(..) { + let metadata = Arc::clone(stream.metadata()); + let parquet_schema = metadata.file_metadata().schema_descr(); + let arrow_schema = parquet::arrow::parquet_to_arrow_schema(parquet_schema, None) + .context("converting parquet schema → arrow")?; + states.push(InputDecoderState { + stream, + metadata, + arrow_schema: Arc::new(arrow_schema), + }); + } + Ok(states) +} + +/// Extract sort schema, window, and merge-ops metadata from each +/// input stream and validate consistency across inputs. Reads +/// `qh.*` KVs from [`ColumnPageStream::metadata`]. +fn extract_and_validate_input_metadata( + inputs: &[Box], +) -> Result { + let mut consensus_sort_fields: Option = None; + let mut consensus_window_start: Option> = None; + let mut consensus_window_duration: Option = None; + let mut consensus_prefix_len: Option = None; + let mut max_merge_ops: u32 = 0; + + for (idx, stream) in inputs.iter().enumerate() { + let metadata = stream.metadata(); + let kv_metadata = metadata.file_metadata().key_value_metadata(); + + let find_kv = |key: &str| -> Option { + kv_metadata.and_then(|kvs| { + kvs.iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }) + }; + + let file_sort_fields = match find_kv(PARQUET_META_SORT_FIELDS) { + Some(s) => s, + None => bail!( + "input {idx} is missing {} metadata", + PARQUET_META_SORT_FIELDS, + ), + }; + + match &consensus_sort_fields { + Some(expected) => { + let expected_schema = parse_sort_fields(expected)?; + let file_schema = parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + if !equivalent_schemas_for_compaction(&expected_schema, &file_schema) { + bail!( + "sort schema mismatch in input {idx}: expected '{expected}', found \ + '{file_sort_fields}'", + ); + } + } + None => { + parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + consensus_sort_fields = Some(file_sort_fields.clone()); + } + } + + let file_window_start = find_kv(PARQUET_META_WINDOW_START) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_start from input {idx}"))?; + match &consensus_window_start { + Some(expected) if file_window_start != *expected => { + bail!( + "window_start mismatch in input {idx}: expected {:?}, found {:?}", + expected, + file_window_start, + ); + } + Some(_) => {} + None => consensus_window_start = Some(file_window_start), + } + + let file_window_duration = find_kv(PARQUET_META_WINDOW_DURATION) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_duration from input {idx}"))? + .unwrap_or(0); + match &consensus_window_duration { + Some(expected) if file_window_duration != *expected => { + bail!( + "window_duration_secs mismatch in input {idx}: expected {expected}, found \ + {file_window_duration}", + ); + } + Some(_) => {} + None => consensus_window_duration = Some(file_window_duration), + } + + let file_merge_ops = find_kv(PARQUET_META_NUM_MERGE_OPS) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing num_merge_ops from input {idx}"))? + .unwrap_or(0); + if file_merge_ops > max_merge_ops { + max_merge_ops = file_merge_ops; + } + + let file_prefix_len = find_kv(PARQUET_META_RG_PARTITION_PREFIX_LEN) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing rg_partition_prefix_len from input {idx}"))? + .unwrap_or(0); + match &consensus_prefix_len { + Some(expected) if file_prefix_len != *expected => { + bail!( + "rg_partition_prefix_len mismatch in input {idx}: expected {expected}, found \ + {file_prefix_len}", + ); + } + Some(_) => {} + None => consensus_prefix_len = Some(file_prefix_len), + } + } + + let sort_fields = match consensus_sort_fields { + Some(s) => s, + None => bail!("at least one input is required"), + }; + + Ok(InputMetadata { + sort_fields, + window_start_secs: consensus_window_start.unwrap_or(None), + window_duration_secs: consensus_window_duration.unwrap_or(0), + num_merge_ops: max_merge_ops + 1, + rg_partition_prefix_len: consensus_prefix_len.unwrap_or(0), + }) +} + +// ============================================================================ +// Phase 0: drain sort cols from each input +// ============================================================================ + +/// Drive each input's decoder via `block_on` until its sort cols + +/// `sorted_series` are fully decoded for the (single) row group. +/// Returns one [`RecordBatch`] per input with just those columns; the +/// rest of each input's body bytes stay un-read in the stream, ready +/// for phase 3 to consume page-by-page. +fn drain_sort_cols_all_inputs( + handle: &Handle, + decoders_state: &mut [InputDecoderState], + sort_fields_str: &str, +) -> Result> { + let mut batches = Vec::with_capacity(decoders_state.len()); + for (idx, state) in decoders_state.iter_mut().enumerate() { + let batch = drain_sort_cols_one_input(handle, state, sort_fields_str, idx)?; + if batch.num_columns() > 0 && batch.schema().index_of(SORTED_SERIES_COLUMN).is_err() { + bail!( + "input {idx} is missing the '{}' column required for merge", + SORTED_SERIES_COLUMN, + ); + } + batches.push(batch); + } + Ok(batches) +} + +fn drain_sort_cols_one_input( + handle: &Handle, + state: &mut InputDecoderState, + sort_fields_str: &str, + input_idx: usize, +) -> Result { + if state.metadata.num_row_groups() == 0 { + // Empty input — no rows to drain. Return a zero-row batch with the + // sort cols' fields preserved so downstream merge order code sees a + // uniform schema across inputs. + return empty_sort_col_record_batch(state, sort_fields_str); + } + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + + // The set of column names we treat as "sort columns" for drain + // purposes: every sort-schema column name that is present in this + // input's arrow schema, plus `sorted_series` (always required). + let sort_col_names: HashSet = + sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); + + // Map each sort col name → its parquet leaf column index. The + // page decoder reports pages by parquet column index (matches arrow + // top-level field index when there are no nested types). + let parquet_schema = state.metadata.file_metadata().schema_descr(); + let mut sort_col_parquet_indices: HashMap = HashMap::new(); + for (col_idx, col) in parquet_schema.columns().iter().enumerate() { + // For flat schemas (one leaf per top-level field), the parquet + // column index equals the arrow top-level field index. We + // match by name: parquet `column_path` root → arrow field name. + let name = col.path().parts()[0].to_string(); + if sort_col_names.contains(&name) { + sort_col_parquet_indices.insert(col_idx, name); + } + } + + if sort_col_parquet_indices.is_empty() { + // No sort cols present in this input — return an empty batch + // with the input's arrow schema. Downstream merge order check + // will catch the missing `sorted_series`. + return Ok(RecordBatch::new_empty(Arc::clone(&state.arrow_schema))); + } + + // Target row count per sort col (from row group's column chunk metadata). + let rg_meta = state.metadata.row_group(0); + let mut target_rows_per_col: HashMap = HashMap::new(); + for &col_idx in sort_col_parquet_indices.keys() { + target_rows_per_col.insert(col_idx, rg_meta.column(col_idx).num_values() as usize); + } + + // Drain pages into per-col buffers until all sort cols are fully + // decoded. With Husky storage ordering, sort col pages come before + // any body col page within the row group, so we should never see a + // non-sort page while sort cols are incomplete. + let mut per_col_pages: HashMap> = HashMap::new(); + let mut rows_done_per_col: HashMap = + sort_col_parquet_indices.keys().map(|&i| (i, 0)).collect(); + let mut sort_cols_finished = 0usize; + let sort_col_target = sort_col_parquet_indices.len(); + + let mut decoder = StreamDecoder::new(&mut *state.stream); + + while sort_cols_finished < sort_col_target { + let decoded = handle + .block_on(decoder.decode_next_page()) + .with_context(|| format!("decoding sort col page (input {input_idx})"))?; + let page = match decoded { + Some(p) => p, + None => bail!( + "stream ended before sort cols fully drained for input {input_idx}: \ + {sort_cols_finished}/{sort_col_target} cols complete", + ), + }; + + if !sort_col_parquet_indices.contains_key(&page.col_idx) { + bail!( + "input {input_idx} returned a non-sort page (col {}) before all sort cols were \ + drained — this violates Husky storage ordering", + page.col_idx, + ); + } + if page.rg_idx != 0 { + bail!( + "input {input_idx} returned a page from rg {} during sort col drain — only \ + single-RG inputs are supported in PR-6b.2", + page.rg_idx, + ); + } + + let array_len = page.array.len(); + let rows_done = rows_done_per_col.get_mut(&page.col_idx).unwrap(); + *rows_done += array_len; + per_col_pages + .entry(page.col_idx) + .or_default() + .push(page.array); + + if *rows_done == target_rows_per_col[&page.col_idx] { + sort_cols_finished += 1; + } else if *rows_done > target_rows_per_col[&page.col_idx] { + bail!( + "input {input_idx} col {} decoded more rows ({}) than expected ({})", + page.col_idx, + rows_done, + target_rows_per_col[&page.col_idx], + ); + } + } + + // Build a RecordBatch holding just the sort cols. Field order + // matches the arrow schema's order (so downstream consumers see + // the same field order whether or not body cols are present). + let mut fields: Vec> = Vec::new(); + let mut columns: Vec = Vec::new(); + for (field_idx, field) in state.arrow_schema.fields().iter().enumerate() { + let Some(_name) = sort_col_parquet_indices.get(&field_idx) else { + continue; + }; + let pages = per_col_pages.remove(&field_idx).expect("col drained"); + let concatenated = concat_arrays(&pages).with_context(|| { + format!( + "concatenating sort col '{}' pages for input {input_idx}", + field.name(), + ) + })?; + fields.push(Arc::clone(field)); + columns.push(concatenated); + } + + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns) + .with_context(|| format!("building sort col record batch for input {input_idx}")) +} + +/// Set of column names treated as "sort cols" for phase 0 drain. +fn sort_col_names_for_input( + sort_field_schema: &quickwit_proto::sortschema::SortSchema, + arrow_schema: &ArrowSchema, +) -> HashSet { + let mut names: HashSet = HashSet::new(); + for sf in &sort_field_schema.column { + if arrow_schema.field_with_name(&sf.name).is_ok() { + names.insert(sf.name.clone()); + } + // Legacy schemas may declare `timestamp` but the column is named + // `timestamp_secs`. The merge order code already handles this + // alias; we want both candidates drained whichever matches. + if is_timestamp_column_name(&sf.name) + && arrow_schema.field_with_name("timestamp_secs").is_ok() + { + names.insert("timestamp_secs".to_string()); + } + } + if arrow_schema.field_with_name(SORTED_SERIES_COLUMN).is_ok() { + names.insert(SORTED_SERIES_COLUMN.to_string()); + } + names +} + +/// Build a zero-row RecordBatch with the input's sort cols + `sorted_series`. +/// Used when an input file has zero rows (no row groups) so that downstream +/// k-way merge sees a consistent schema shape across inputs. +fn empty_sort_col_record_batch( + state: &InputDecoderState, + sort_fields_str: &str, +) -> Result { + let sort_field_schema = parse_sort_fields(sort_fields_str)?; + let sort_col_names = sort_col_names_for_input(&sort_field_schema, state.arrow_schema.as_ref()); + let mut fields: Vec> = Vec::new(); + let mut columns: Vec = Vec::new(); + for field in state.arrow_schema.fields() { + if !sort_col_names.contains(field.name()) { + continue; + } + fields.push(Arc::clone(field)); + columns.push(new_null_array(field.data_type(), 0)); + } + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).context("building empty sort col record batch") +} + +fn concat_arrays(arrays: &[ArrayRef]) -> Result { + if arrays.len() == 1 { + return Ok(Arc::clone(&arrays[0])); + } + let refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + Ok(arrow::compute::concat(&refs)?) +} + +// ============================================================================ +// Pre-compute input row → output destination map +// ============================================================================ + +/// `destinations[input_idx][input_row] = Some((output_idx, output_pos))` +/// if that input row contributes to output `output_idx` at position +/// `output_pos` within that output's row range. `None` means the row +/// is not in any output (only possible for rows beyond the merge +/// plan's coverage; shouldn't happen with our merge order). +#[derive(Debug)] +struct InputRowDestinations { + /// One Vec per input. Length = input's sort-col row count. + per_input: Vec>>, + /// Total rows per output index (cumulative writer "expected" rows). + rows_per_output: Vec, +} + +fn build_input_row_destinations( + aligned_sort_batches: &[RecordBatch], + merge_order: &[MergeRun], + boundaries: &[Range], +) -> InputRowDestinations { + let mut per_input: Vec>> = aligned_sort_batches + .iter() + .map(|b| vec![None; b.num_rows()]) + .collect(); + let mut rows_per_output: Vec = vec![0; boundaries.len()]; + + for (out_idx, boundary) in boundaries.iter().enumerate() { + let runs = &merge_order[boundary.clone()]; + for run in runs { + for r in 0..run.row_count { + let input_row = run.start_row + r; + per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx])); + rows_per_output[out_idx] += 1; + } + } + } + + InputRowDestinations { + per_input, + rows_per_output, + } +} + +// ============================================================================ +// Phase 3: streaming write with one writer per output +// ============================================================================ + +/// Per-output state owned across phase 3 (writer + bookkeeping). +/// The row group lives in a parallel Vec so its borrow into `writer` +/// is tracked by the compiler instead of through a `'static` +/// transmute. +struct OutputWriterStorage { + output_idx: usize, + output_path: PathBuf, + writer: StreamingParquetWriter, + /// Service-name set built during the body col write of "service" + /// (or empty if no service col). + service_names: HashSet, + /// Per-output total row count = sum of merge runs in this output's boundary. + num_rows: usize, +} + +#[allow(clippy::too_many_arguments)] +fn write_streaming_outputs( + handle: &Handle, + decoders_state: &mut [InputDecoderState], + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + input_meta: &InputMetadata, + writer_config: &crate::storage::ParquetWriterConfig, + output_dir: &Path, +) -> Result> { + // 1. Build the union schema across full input arrow schemas (so the output covers every column + // that appears in any input). The sort union schema covers only sort cols. + let input_arrow_schemas: Vec = decoders_state + .iter() + .map(|s| Arc::clone(&s.arrow_schema)) + .collect(); + let (full_union_schema, _aligned_full_placeholder) = + build_full_union_schema_from_arrow_schemas(&input_arrow_schemas, &input_meta.sort_fields)?; + + // 2. Build per-output metadata (KV entries, row keys, zonemaps) up front from sort col data — + // these are what the schema + writer props depend on. + let per_output_static = boundaries + .iter() + .enumerate() + .map(|(out_idx, boundary)| { + build_per_output_static( + out_idx, + boundary, + aligned_sort_batches, + sort_union_schema, + merge_order, + input_meta, + ) + }) + .collect::>>()?; + + // 3. Decide per-output schema: optimise based on each output's sort col data (which determines + // metric_name cardinality, etc.). Body cols stay as declared by the union schema; we don't + // probe their cardinality here since we haven't read them yet. This is a slight regression + // vs. the non-streaming engine — it would dict-encode low-cardinality string body cols too. + // PR-6c.2 or later can revisit by gathering body-col cardinality during the streaming pass. + let per_output_schemas: Vec = per_output_static + .iter() + .map(|s| derive_output_schema(&full_union_schema, &s.sort_optimised)) + .collect::>>()?; + + // 4. Open M writers, one per output. Writers + bookkeeping live in `writer_states`; the row + // group borrows mutably from each writer and is held in a parallel `row_groups` Vec for the + // col loop. + let mut writer_states: Vec = Vec::with_capacity(boundaries.len()); + for (out_idx, (schema, static_meta)) in per_output_schemas + .iter() + .zip(per_output_static.iter()) + .enumerate() + { + if destinations.rows_per_output[out_idx] == 0 { + continue; + } + writer_states.push(open_output_writer( + out_idx, + output_dir, + Arc::clone(schema), + static_meta, + input_meta, + writer_config, + )?); + } + + // Snapshot the (output_idx, num_rows) for each storage entry BEFORE + // calling `start_row_group`, which borrows `writer_states` mutably + // for the rest of phase 3's col loop. + let writer_index_view = writer_states_index_view(&writer_states); + let num_storages = writer_states.len(); + + let mut row_groups: Vec> = + writer_states + .iter_mut() + .map(|s| { + s.writer + .start_row_group() + .with_context(|| format!("opening row group for output {}", s.output_idx)) + }) + .collect::>>()?; + + // Service names are collected into a separate Vec> + // parallel to `row_groups`; we can't write into `writer_states` here + // because it is already borrowed mutably by `row_groups`. We merge + // these back into `writer_states` after dropping the row groups. + let mut service_names_per_output: Vec> = + (0..num_storages).map(|_| HashSet::new()).collect(); + write_all_columns( + handle, + &mut row_groups, + &mut service_names_per_output, + &writer_index_view, + decoders_state, + aligned_sort_batches, + sort_union_schema, + merge_order, + boundaries, + destinations, + &per_output_schemas, + )?; + + // 6. Finish all row groups (drops the borrows on writers). + for rg in row_groups { + rg.finish().context("finishing row group")?; + } + + // 7. Merge collected service names + close writers + build MergeOutputFiles. + let mut outputs = Vec::with_capacity(writer_states.len()); + for (mut state, services) in writer_states + .into_iter() + .zip(service_names_per_output.into_iter()) + { + state.service_names.extend(services); + outputs.push(finalize_output_writer(state, &per_output_static)?); + } + Ok(outputs) +} + +/// Static per-output state computed once from sort col data. Holds +/// the per-output sort-col-only batch (used for metadata extraction) +/// and the per-output schema-optimisation hints. +struct PerOutputStatic { + /// Sort-cols-only batch in output sort order — used by row_keys / + /// zonemap / metric_names / time_range extractors. + sort_optimised: RecordBatch, + row_keys_proto: Option>, + zonemap_regexes: HashMap, + metric_names: HashSet, + time_range: crate::split::TimeRange, + /// Number of rows that go into this output. + num_rows: usize, +} + +fn build_per_output_static( + out_idx: usize, + boundary: &Range, + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + input_meta: &InputMetadata, +) -> Result { + let runs = &merge_order[boundary.clone()]; + let sort_batch = apply_merge_permutation(aligned_sort_batches, sort_union_schema, runs) + .with_context(|| format!("applying merge permutation for output {out_idx} sort cols"))?; + let num_rows = sort_batch.num_rows(); + + // MC-3 sort order on the sort-col-only batch (same check the + // non-streaming engine does, just restricted to columns we have). + verify_sort_order(&sort_batch, &input_meta.sort_fields); + let sort_optimised = optimize_output_batch(&sort_batch); + + let row_keys_proto = row_keys::extract_row_keys(&input_meta.sort_fields, &sort_optimised) + .with_context(|| format!("extracting row keys for output {out_idx}"))? + .map(|rk| row_keys::encode_row_keys_proto(&rk)); + + let zonemap_opts = ZonemapOptions::default(); + let zonemap_regexes = + zonemap::extract_zonemap_regexes(&input_meta.sort_fields, &sort_optimised, &zonemap_opts) + .with_context(|| format!("extracting zonemap regexes for output {out_idx}"))?; + + let metric_names = extract_metric_names(&sort_optimised) + .with_context(|| format!("extracting metric names for output {out_idx}"))?; + let time_range = extract_time_range(&sort_optimised) + .with_context(|| format!("extracting time range for output {out_idx}"))?; + + Ok(PerOutputStatic { + sort_optimised, + row_keys_proto, + zonemap_regexes, + metric_names, + time_range, + num_rows, + }) +} + +/// Build the full union schema across all inputs' arrow schemas +/// (NOT just sort cols). Reuses the same algorithm as +/// [`align_inputs_to_union_schema`] but takes pre-extracted arrow +/// schemas — phase 3 doesn't have full input batches. +fn build_full_union_schema_from_arrow_schemas( + arrow_schemas: &[SchemaRef], + sort_fields_str: &str, +) -> Result<(SchemaRef, ())> { + // Build zero-row batches with the right schemas; that lets us + // reuse `align_inputs_to_union_schema`'s field-merge / Husky-order + // logic unchanged. + let empty_batches: Vec = arrow_schemas + .iter() + .map(|s| RecordBatch::new_empty(Arc::clone(s))) + .collect(); + let (schema, _) = align_inputs_to_union_schema(&empty_batches, sort_fields_str)?; + Ok((schema, ())) +} + +/// Compute the per-output schema. For PR-6b.2 we use the +/// (string-normalised) union schema as the output schema directly — +/// fields stay Utf8/LargeUtf8 rather than being re-dict-encoded. +/// Reason: streaming-decoded input arrays come out of the page +/// decoder as plain `StringArray`/`BinaryArray` (not Dictionary), and +/// dict re-encoding per output page would add a per-page CPU cost we +/// don't want to take in the page-bounded path. Re-introducing +/// dict-encoded output strings can be done later by tracking +/// cardinality during the streaming pass — call site is here. +/// +/// We do still want to drop columns that are all-null *for this +/// output* (e.g., a column only present in inputs that don't +/// contribute any rows to this output's range). Detect this from the +/// `sort_optimised` schema for sort cols; for body cols, leave them +/// in the union for now (PR-6c.2 will track per-output body-col +/// presence). +fn derive_output_schema( + full_union_schema: &SchemaRef, + sort_optimised: &RecordBatch, +) -> Result { + let sort_schema = sort_optimised.schema(); + let mut fields: Vec> = Vec::with_capacity(full_union_schema.fields().len()); + for field in full_union_schema.fields() { + // If sort_optimised dropped this field entirely, the field was + // all-null in this output's sort cols — drop from output too. + // (Body cols are always retained.) + let is_sort_field = sort_schema.index_of(field.name()).is_ok(); + if is_sort_field || full_union_schema.index_of(field.name()).is_ok() { + fields.push(Arc::clone(field)); + } + } + Ok(Arc::new(ArrowSchema::new(fields))) +} + +fn open_output_writer( + out_idx: usize, + output_dir: &Path, + schema: SchemaRef, + static_meta: &PerOutputStatic, + input_meta: &InputMetadata, + writer_config: &crate::storage::ParquetWriterConfig, +) -> Result { + let output_prefix_len = input_meta.rg_partition_prefix_len; + let kv_entries = build_merge_kv_metadata( + input_meta, + &static_meta.row_keys_proto, + &static_meta.zonemap_regexes, + output_prefix_len, + ); + let sorting_cols = build_sorting_columns(&static_meta.sort_optimised, &input_meta.sort_fields)?; + let sort_field_names = resolve_sort_field_names(&input_meta.sort_fields)?; + + let props = writer_config.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kv_entries), + &sort_field_names, + ); + + let output_filename = format!("merge_output_{}.parquet", Ulid::new()); + let output_path = output_dir.join(&output_filename); + let file = std::fs::File::create(&output_path) + .with_context(|| format!("creating output file: {}", output_path.display()))?; + let writer = StreamingParquetWriter::try_new(file, Arc::clone(&schema), props) + .with_context(|| format!("opening streaming writer for output {out_idx}"))?; + + Ok(OutputWriterStorage { + output_idx: out_idx, + output_path, + writer, + service_names: HashSet::new(), + num_rows: static_meta.num_rows, + }) +} + +/// Index view used inside the col loop to find the writer's +/// `output_idx` and `num_rows` without needing a mutable borrow on +/// `writer_states` (which is already mutably borrowed by `row_groups`). +fn writer_states_index_view(writer_states: &[OutputWriterStorage]) -> Vec<(usize, usize)> { + writer_states + .iter() + .map(|s| (s.output_idx, s.num_rows)) + .collect() +} + +#[allow(clippy::too_many_arguments)] +fn write_all_columns( + handle: &Handle, + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + service_names_per_output: &mut [HashSet], + writer_index_view: &[(usize, usize)], + decoders_state: &mut [InputDecoderState], + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + // Iterate cols in the union (per-output) schema order. We need + // ONE pass through Husky-ordered cols of an output to feed its + // writer. The writer's expected col order is each output's own + // schema. All outputs share Husky col order with possible + // per-output drops of all-null columns. + // + // Strategy: process the FULL union schema's cols in order. For + // each col K in the union schema, for each output: + // - If output's schema includes col K: write col K's data (sort col → from buffer, body col → + // from decoder). + // - Else: skip (col was dropped as all-null for this output). + // This keeps decoder advancement in lockstep across outputs: a + // body col K's pages are consumed once across all outputs in turn. + + // We need the parent union schema to drive iteration. Recompute + // here (cheap) from the per-output schemas + sort union schema. + let parent_union_schema = build_parent_union_schema(per_output_schemas); + + // For each union-schema col K: + for parent_col_idx in 0..parent_union_schema.fields().len() { + let parent_field = parent_union_schema.field(parent_col_idx); + let parent_name = parent_field.name(); + + // Is this a sort col (in memory) or a body col (streamed)? + let is_sort_col = sort_union_schema.index_of(parent_name).is_ok(); + + if is_sort_col { + write_sort_col_for_all_outputs( + row_groups, + writer_index_view, + parent_name, + aligned_sort_batches, + sort_union_schema, + merge_order, + boundaries, + destinations, + per_output_schemas, + )?; + } else { + write_body_col_for_all_outputs( + handle, + row_groups, + service_names_per_output, + writer_index_view, + decoders_state, + parent_name, + destinations, + per_output_schemas, + )?; + } + } + + Ok(()) +} + +fn build_parent_union_schema(per_output_schemas: &[SchemaRef]) -> SchemaRef { + // All per-output schemas have the same column order (Husky), with + // some cols possibly missing per output (all-null drops). Pick the + // schema with the most fields as the parent driver, ties broken by + // first occurrence. This is safe because all schemas share Husky + // ordering as a subsequence. + let mut best = Arc::clone(&per_output_schemas[0]); + for s in per_output_schemas.iter().skip(1) { + if s.fields().len() > best.fields().len() { + best = Arc::clone(s); + } + } + best +} + +#[allow(clippy::too_many_arguments)] +fn write_sort_col_for_all_outputs( + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + writer_index_view: &[(usize, usize)], + col_name: &str, + aligned_sort_batches: &[RecordBatch], + sort_union_schema: &SchemaRef, + merge_order: &[MergeRun], + boundaries: &[Range], + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + let _ = sort_union_schema; + + let mut storage_idx = 0; + for (out_idx, boundary) in boundaries.iter().enumerate() { + if destinations.rows_per_output[out_idx] == 0 { + continue; + } + debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); + + // Drop this col if the output's schema doesn't include it. + let out_schema = &per_output_schemas[out_idx]; + if out_schema.index_of(col_name).is_err() { + storage_idx += 1; + continue; + } + + let runs = &merge_order[boundary.clone()]; + let arrays = build_sort_col_pages_for_output(col_name, aligned_sort_batches, runs)?; + row_groups[storage_idx] + .write_next_column_arrays(arrays.into_iter()) + .with_context(|| format!("writing sort col '{col_name}' to output {out_idx}"))?; + storage_idx += 1; + } + Ok(()) +} + +/// Build per-output-page arrays for one sort col. The col is already +/// in memory across all inputs (`aligned_sort_batches`); for this +/// output we walk its merge runs and split the take result into +/// `OUTPUT_PAGE_ROWS`-sized chunks. +fn build_sort_col_pages_for_output( + col_name: &str, + aligned_sort_batches: &[RecordBatch], + runs: &[MergeRun], +) -> Result> { + // Collect references to each input's column array. + let mut input_arrays: Vec<&dyn Array> = Vec::with_capacity(aligned_sort_batches.len()); + for batch in aligned_sort_batches { + let idx = batch.schema().index_of(col_name).map_err(|_| { + anyhow!("input is missing sort col '{col_name}' that the union schema expected",) + })?; + input_arrays.push(batch.column(idx).as_ref()); + } + + let mut indices: Vec<(usize, usize)> = + Vec::with_capacity(runs.iter().map(|r| r.row_count).sum()); + for run in runs { + for r in 0..run.row_count { + indices.push((run.input_index, run.start_row + r)); + } + } + + // Split into OUTPUT_PAGE_ROWS-sized chunks; each chunk → one + // arrow::interleave call → one ArrayRef. + let mut pages = Vec::with_capacity(indices.len().div_ceil(OUTPUT_PAGE_ROWS)); + for chunk in indices.chunks(OUTPUT_PAGE_ROWS) { + let arr = interleave(&input_arrays, chunk) + .with_context(|| format!("interleaving sort col '{col_name}' pages"))?; + pages.push(arr); + } + Ok(pages) +} + +#[allow(clippy::too_many_arguments)] +fn write_body_col_for_all_outputs( + handle: &Handle, + row_groups: &mut [crate::storage::streaming_writer::RowGroupBuilder<'_, std::fs::File>], + service_names_per_output: &mut [HashSet], + writer_index_view: &[(usize, usize)], + decoders_state: &mut [InputDecoderState], + col_name: &str, + destinations: &InputRowDestinations, + per_output_schemas: &[SchemaRef], +) -> Result<()> { + // Find this col's per-input parquet leaf index (one per input). + // Inputs whose schema doesn't have this col contribute null rows + // and don't advance their decoder for this col. + let mut input_col_indices: Vec> = Vec::with_capacity(decoders_state.len()); + let mut input_target_rows: Vec = Vec::with_capacity(decoders_state.len()); + for state in decoders_state.iter() { + match state.arrow_schema.index_of(col_name) { + Ok(idx) => { + input_col_indices.push(Some(idx)); + let rg = state.metadata.row_group(0); + input_target_rows.push(rg.column(idx).num_values() as usize); + } + Err(_) => { + input_col_indices.push(None); + input_target_rows.push(state.metadata.row_group(0).num_rows() as usize); + } + } + } + + // For each output sequentially: build output pages, feed to writer. + let mut storage_idx = 0; + for (out_idx, &row_count) in destinations.rows_per_output.iter().enumerate() { + if row_count == 0 { + continue; + } + debug_assert_eq!(writer_index_view[storage_idx].0, out_idx); + + let out_schema = &per_output_schemas[out_idx]; + if out_schema.index_of(col_name).is_err() { + storage_idx += 1; + continue; + } + let out_field_idx = out_schema.index_of(col_name)?; + let out_field = out_schema.field(out_field_idx); + + // Build per-output assembler. Feeds one output page per + // `Iterator::next()` call. + let assembler = BodyColOutputPageAssembler::new( + handle, + decoders_state, + &input_col_indices, + destinations, + out_idx, + col_name, + out_field, + ); + + // Track service names while streaming the service col. + let track_service = col_name == "service"; + + // Drive the assembler via a sync iterator. We must `?`-propagate + // assembly errors out of the iterator; collect into a Vec and + // return on first error. + let pages: Vec = assembler + .into_iter() + .collect::>>() + .with_context(|| format!("assembling body col '{col_name}' for output {out_idx}"))?; + + if track_service { + for page in &pages { + collect_service_names_from_page( + page.as_ref(), + &mut service_names_per_output[storage_idx], + )?; + } + } + + row_groups[storage_idx] + .write_next_column_arrays(pages.into_iter()) + .with_context(|| format!("writing body col '{col_name}' to output {out_idx}"))?; + storage_idx += 1; + } + + Ok(()) +} + +/// Per-page service name collector. Used during the streaming write +/// of the "service" body col to populate per-output service_names. +fn collect_service_names_from_page(arr: &dyn Array, out: &mut HashSet) -> Result<()> { + use arrow::array::AsArray; + use arrow::datatypes::{Int8Type, Int16Type, Int32Type, Int64Type}; + + fn extend_from_strings(strings: &arrow::array::StringArray, out: &mut HashSet) { + for i in 0..strings.len() { + if strings.is_valid(i) { + out.insert(strings.value(i).to_string()); + } + } + } + + match arr.data_type() { + DataType::Utf8 => { + let strings = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("expected StringArray for service col page"))?; + extend_from_strings(strings, out); + } + DataType::LargeUtf8 => { + let strings = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("expected LargeStringArray for service col page"))?; + for i in 0..strings.len() { + if strings.is_valid(i) { + out.insert(strings.value(i).to_string()); + } + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Utf8) => + { + // Extract the dictionary's values that are referenced by + // valid (non-null) keys. + match key_type.as_ref() { + DataType::Int8 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int16 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int32 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + DataType::Int64 => { + let dict = arr.as_dictionary::(); + if let Some(strings) = dict + .values() + .as_any() + .downcast_ref::() + { + for i in 0..dict.len() { + if dict.is_valid(i) { + let key = dict.keys().value(i) as usize; + if key < strings.len() && strings.is_valid(key) { + out.insert(strings.value(key).to_string()); + } + } + } + } + } + _ => {} + } + } + _ => { + // Skip non-string types — service col is expected to be + // string-like; if it isn't, just don't collect names. + } + } + Ok(()) +} + +// ============================================================================ +// Body col output page assembler — the page-bounded streaming core +// ============================================================================ + +/// Assembles output pages for one (output_idx, body_col) by: +/// 1. Walking the destinations table forward through this output's row range, accumulating +/// `(input_idx, input_row)` index pairs. +/// 2. When the index buffer hits `OUTPUT_PAGE_ROWS`, advancing each contributing input's decoder +/// until its decoded pages cover the needed input rows, then calling +/// `arrow::compute::interleave`. +/// 3. Emitting one `ArrayRef` per iter step until the row range is exhausted; then `Ok(None)`. +/// +/// Memory per `next()` call: one in-progress output page (P rows) + +/// up to ~2 in-flight decoded pages per input (kept until all their +/// rows are consumed). Bounded by page sizes, not column-chunk sizes. +struct BodyColOutputPageAssembler<'a> { + handle: &'a Handle, + decoders_state: &'a mut [InputDecoderState], + input_col_indices: &'a [Option], + destinations: &'a InputRowDestinations, + out_idx: usize, + col_name: &'a str, + out_field: &'a Field, + /// Cursor into `destinations.per_input[*]` — next input row index per input. + input_cursors: Vec, + /// Per-input decoded page cache. Pages are kept in order; the + /// front page's `row_start` is the smallest input row we still + /// have decoded. + page_cache: Vec>, + /// Total rows written so far for this output's col. + rows_emitted: usize, + /// Total rows expected = destinations.rows_per_output[out_idx]. + expected_rows: usize, + /// EOF flag (returns None on subsequent calls once true). + done: bool, +} + +impl<'a> BodyColOutputPageAssembler<'a> { + #[allow(clippy::too_many_arguments)] + fn new( + handle: &'a Handle, + decoders_state: &'a mut [InputDecoderState], + input_col_indices: &'a [Option], + destinations: &'a InputRowDestinations, + out_idx: usize, + col_name: &'a str, + out_field: &'a Field, + ) -> Self { + let num_inputs = decoders_state.len(); + let mut page_cache: Vec> = Vec::with_capacity(num_inputs); + for _ in 0..num_inputs { + page_cache.push(Vec::new()); + } + Self { + handle, + decoders_state, + input_col_indices, + destinations, + out_idx, + col_name, + out_field, + input_cursors: vec![0; num_inputs], + page_cache, + rows_emitted: 0, + expected_rows: destinations.rows_per_output[out_idx], + done: false, + } + } + + fn into_iter(self) -> BodyColOutputPageIter<'a> { + BodyColOutputPageIter { inner: self } + } +} + +struct BodyColOutputPageIter<'a> { + inner: BodyColOutputPageAssembler<'a>, +} + +impl Iterator for BodyColOutputPageIter<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.inner.done || self.inner.rows_emitted >= self.inner.expected_rows { + self.inner.done = true; + return None; + } + match assemble_one_output_page(&mut self.inner) { + Ok(Some(arr)) => Some(Ok(arr)), + Ok(None) => { + self.inner.done = true; + None + } + Err(e) => { + self.inner.done = true; + Some(Err(e)) + } + } + } +} + +fn assemble_one_output_page(s: &mut BodyColOutputPageAssembler) -> Result> { + let remaining = s.expected_rows - s.rows_emitted; + if remaining == 0 { + return Ok(None); + } + let page_size = remaining.min(OUTPUT_PAGE_ROWS); + + // Walk this output's row positions and figure out which (input, input_row) + // contributes each one. We use the per-input destinations table: for + // input i, find the next input_row whose destination is (out_idx, *). + // Since `destinations.per_input[i]` is in input order and outputs are + // strictly increasing by sort key, the rows that go to this output are + // a contiguous slice in input i's row order. + // + // For each output position 0..page_size, we need (input_idx, input_row). + // Walk input cursors and pick the next row going to this output. + + // Collect (input_idx, input_row) indices for this output page. + let mut indices_per_input: Vec> = vec![Vec::new(); s.decoders_state.len()]; + let mut interleave_indices: Vec<(usize, usize)> = Vec::with_capacity(page_size); + let mut total_picked = 0usize; + + while total_picked < page_size { + // Look across all inputs for the next contribution to this output. + // Per the merge order, within each input the rows assigned to this + // output are a contiguous slice; once we've advanced cursor past + // them, no more rows from this input contribute. We collect ALL + // rows from one input up to a per-input limit determined by the + // merge order, but the simplest correct approach is to walk in + // merge-order globally. We don't have the merge order indexed by + // output here, so re-derive by scanning the destinations table. + // + // Better: pre-compute per-output, per-input row ranges. Each input + // contributes a contiguous half-open range `[lo_i..hi_i)` to this + // output (possibly empty). We could compute these ranges once and + // reuse. For now, lazy approach: scan forward from cursor on each + // input, picking the next row that maps to (out_idx, *). + // + // The ORDER in which we pick across inputs must match the merge + // plan's output position. We have output positions in destinations: + // `destinations.per_input[i][r] = Some((out_idx, pos))`. The merged + // output picks rows in order of increasing `pos`. + // + // For one output page, the positions we want are + // `s.rows_emitted..s.rows_emitted + page_size`. For each position + // p in that range, find (input_idx, input_row) such that + // destinations.per_input[input_idx][input_row] == Some((out_idx, p)). + let target_pos = s.rows_emitted + total_picked; + let mut found = false; + for (input_idx, dests) in s.destinations.per_input.iter().enumerate() { + let cursor = s.input_cursors[input_idx]; + for (input_row, dest) in dests.iter().enumerate().skip(cursor) { + match dest { + Some((o, p)) if *o == s.out_idx => { + if *p == target_pos { + interleave_indices.push((input_idx, input_row)); + indices_per_input[input_idx].push(input_row); + // Don't advance the cursor past this row yet — + // we may need rows from input i in this page + // with positions ahead. We bump it after the + // whole page is collected. + found = true; + break; + } + } + _ => {} + } + if found { + break; + } + } + if found { + break; + } + } + if !found { + // Shouldn't happen — every output position should be reachable. + bail!( + "merge plan inconsistency: output {} position {target_pos} not found in any input", + s.out_idx, + ); + } + total_picked += 1; + } + + // Now ensure each input's decoder has decoded pages covering all + // `indices_per_input[i]` rows. Advance decoders as needed. + for (input_idx, input_rows) in indices_per_input.iter().enumerate() { + if input_rows.is_empty() { + continue; + } + let col_parquet_idx = match s.input_col_indices[input_idx] { + Some(c) => c, + None => { + // This input lacks this col entirely — null contributions. + // We'll handle null-filling in the interleave step below. + continue; + } + }; + let max_needed_row = *input_rows.iter().max().expect("non-empty"); + advance_decoder_to_row( + s.handle, + &mut s.decoders_state[input_idx], + &mut s.page_cache[input_idx], + col_parquet_idx, + max_needed_row, + )?; + } + + // Build the per-(input, row) value array by: + // 1. Concatenating each input's cached pages into one ArrayRef (they cover a contiguous input + // row range from cache_start to cursor_max). + // 2. Computing local indices = input_row - cache_start. + // 3. Calling arrow::compute::interleave across N input arrays. + // + // For inputs without this col, we substitute a single null page of the + // out_field's type. + let mut input_array_refs: Vec = Vec::with_capacity(s.decoders_state.len()); + let mut input_cache_starts: Vec = Vec::with_capacity(s.decoders_state.len()); + + for input_idx in 0..s.decoders_state.len() { + match s.input_col_indices[input_idx] { + Some(_) => { + let pages = &s.page_cache[input_idx]; + if pages.is_empty() { + // No pages decoded for this input (no rows from this input go to this output). + // Use a zero-row placeholder; we won't index into it. + input_array_refs.push(new_null_array(s.out_field.data_type(), 0)); + input_cache_starts.push(0); + } else { + let cache_start = pages[0].row_start; + let arrays: Vec<&dyn Array> = pages.iter().map(|p| p.array.as_ref()).collect(); + let concatenated = arrow::compute::concat(&arrays).with_context(|| { + format!( + "concatenating cached pages for input {input_idx} col '{}'", + s.col_name, + ) + })?; + input_array_refs.push(concatenated); + input_cache_starts.push(cache_start); + } + } + None => { + // Null-fill array of the right length. The max needed local + // index from this input is the largest index we'd reference; + // since we don't actually reference rows from this input (we'd + // need an alternate "null contribution" mechanism), we leave + // it as a 1-row null array and route indices to position 0. + let null_arr = new_null_array(s.out_field.data_type(), 1); + input_array_refs.push(null_arr); + input_cache_starts.push(0); + } + } + } + + let interleave_local: Vec<(usize, usize)> = interleave_indices + .iter() + .map(|&(i_idx, i_row)| match s.input_col_indices[i_idx] { + Some(_) => (i_idx, i_row - input_cache_starts[i_idx]), + None => (i_idx, 0), + }) + .collect(); + + let array_refs_ref: Vec<&dyn Array> = input_array_refs.iter().map(|a| a.as_ref()).collect(); + let assembled = interleave(&array_refs_ref, &interleave_local).with_context(|| { + format!( + "interleaving body col '{}' for output {}", + s.col_name, s.out_idx, + ) + })?; + + // Bump input cursors past rows we just consumed and drop pages + // whose rows are fully consumed. + for (input_idx, input_rows) in indices_per_input.iter().enumerate() { + if input_rows.is_empty() { + continue; + } + let max_row = *input_rows.iter().max().expect("non-empty"); + s.input_cursors[input_idx] = max_row + 1; + + // Drop pages whose last row is < cursor. + if s.input_col_indices[input_idx].is_some() { + let pages = &mut s.page_cache[input_idx]; + while let Some(front) = pages.first() { + let front_end = front.row_start + front.array.len(); + if front_end <= s.input_cursors[input_idx] { + pages.remove(0); + } else { + break; + } + } + } + } + + s.rows_emitted += page_size; + Ok(Some(assembled)) +} + +/// Drive `state.stream`'s decoder forward via `block_on` until the +/// cached pages for `col_parquet_idx` cover up through `target_row` +/// (inclusive). Stops as soon as the latest cached page ends past +/// `target_row`. +fn advance_decoder_to_row( + handle: &Handle, + state: &mut InputDecoderState, + page_cache: &mut Vec, + col_parquet_idx: usize, + target_row: usize, +) -> Result<()> { + // If cache already covers target_row, nothing to do. + if let Some(last) = page_cache.last() { + let last_end = last.row_start + last.array.len(); + if target_row < last_end { + return Ok(()); + } + } + + let mut decoder = StreamDecoder::new(&mut *state.stream); + loop { + let decoded = handle + .block_on(decoder.decode_next_page()) + .context("decoding body col page")?; + let page = match decoded { + Some(p) => p, + None => bail!( + "stream EOF while advancing to row {target_row} for parquet col {col_parquet_idx}", + ), + }; + if page.col_idx != col_parquet_idx { + bail!( + "expected col {col_parquet_idx} page, got col {} — Husky col ordering violated", + page.col_idx, + ); + } + let end = page.row_start + page.array.len(); + page_cache.push(page); + if target_row < end { + return Ok(()); + } + } +} + +fn finalize_output_writer( + state: OutputWriterStorage, + per_output_static: &[PerOutputStatic], +) -> Result { + let OutputWriterStorage { + output_idx, + output_path, + writer, + service_names, + num_rows, + } = state; + + let _metadata = writer + .close() + .with_context(|| format!("closing writer for output {output_idx}"))?; + + let size_bytes = std::fs::metadata(&output_path) + .with_context(|| format!("stat output file: {}", output_path.display()))? + .len(); + + let static_meta = &per_output_static[output_idx]; + + let mut low_cardinality_tags: HashMap> = HashMap::new(); + if !service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); + } + + Ok(MergeOutputFile { + path: output_path, + num_rows, + num_row_groups: 1, + size_bytes, + row_keys_proto: static_meta.row_keys_proto.clone(), + zonemap_regexes: static_meta.zonemap_regexes.clone(), + metric_names: static_meta.metric_names.clone(), + time_range: static_meta.time_range, + low_cardinality_tags, + }) +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::sync::Arc; + + use arrow::array::{ + ArrayRef, BinaryArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt8Array, + UInt64Array, + }; + use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; + use bytes::Bytes; + use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::WriterProperties; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::TempDir; + use tokio::io::AsyncRead; + + use super::*; + use crate::storage::page_decoder::StreamDecoder; + use crate::storage::streaming_reader::{RemoteByteSource, StreamingParquetReader}; + use crate::storage::{Compression, ParquetWriterConfig}; + + // -------- Fixtures -------- + + /// Build a sorted metrics RecordBatch with `num_rows` rows in + /// **Husky column order**: sort cols (metric_name, timestamp_secs) + /// → sorted_series → remaining body cols lexicographic + /// (metric_type, service, timeseries_id, value). All rows share + /// the single metric_name "cpu.usage". `sorted_series` is monotonic + /// from `start_series_idx`. `service` carries nulls every 5th row. + fn make_sorted_batch(num_rows: usize, start_series_idx: u64) -> RecordBatch { + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + // sort cols (in sort schema order) + Field::new("metric_name", dict_type.clone(), false), + Field::new("timestamp_secs", DataType::UInt64, false), + // sorted_series marker + Field::new("sorted_series", DataType::Binary, false), + // body cols lexicographic + Field::new("metric_type", DataType::UInt8, false), + Field::new("service", dict_type, true), + Field::new("timeseries_id", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + + let metric_keys: Vec = (0..num_rows as i32).map(|_| 0).collect(); + let metric_values = StringArray::from(vec!["cpu.usage", "memory.used"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("test dict array"), + ); + let metric_type: ArrayRef = Arc::new(UInt8Array::from(vec![0u8; num_rows])); + let timestamps: Vec = (0..num_rows as u64) + .map(|i| 1_700_000_000 + (num_rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let values: Vec = (0..num_rows).map(|i| i as f64).collect(); + let value: ArrayRef = Arc::new(Float64Array::from(values)); + let tsids: Vec = (0..num_rows as i64).map(|i| 1000 + i).collect(); + let timeseries_id: ArrayRef = Arc::new(Int64Array::from(tsids)); + let svc_keys: Vec> = (0..num_rows as i32) + .map(|i| if i % 5 == 0 { None } else { Some(i % 3) }) + .collect(); + let svc_values = StringArray::from(vec!["api", "db", "cache"]); + let service: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(svc_keys), + Arc::new(svc_values), + ) + .expect("test dict array"), + ); + let mut series_bytes: Vec> = Vec::with_capacity(num_rows); + for i in 0..num_rows as u64 { + let id = start_series_idx + i; + series_bytes.push(id.to_be_bytes().to_vec()); + } + let series_refs: Vec<&[u8]> = series_bytes.iter().map(|v| v.as_slice()).collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from(series_refs)); + + RecordBatch::try_new( + schema, + vec![ + metric_name, + timestamp_secs, + sorted_series, + metric_type, + service, + timeseries_id, + value, + ], + ) + .expect("test batch") + } + + /// Write a fixture parquet file with the standard `qh.*` KVs that the + /// streaming merge engine validates. + fn write_input_parquet(batches: &[RecordBatch], extra_kvs: &[(&str, &str)]) -> Bytes { + let schema = batches[0].schema(); + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let sort_fields = "metric_name|-timestamp_secs/V2"; + let sort_field_names = vec!["metric_name".to_string(), "timestamp_secs".to_string()]; + let mut kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + sort_fields.to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + for (k, v) in extra_kvs { + kvs.push(KeyValue::new(k.to_string(), v.to_string())); + } + let sorting_cols = vec![ + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("metric_name").expect("test schema") as i32, + descending: false, + nulls_first: false, + }, + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("timestamp_secs").expect("test schema") as i32, + descending: true, + nulls_first: false, + }, + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kvs), + &sort_field_names, + ); + let mut buf: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props)).expect("arrow writer"); + for b in batches { + writer.write(b).expect("test write"); + } + writer.close().expect("test close"); + Bytes::from(buf) + } + + // -------- In-memory byte source -------- + + struct InMemorySource { + bytes: Bytes, + } + + #[async_trait::async_trait] + impl RemoteByteSource for InMemorySource { + async fn file_size(&self, _path: &std::path::Path) -> std::io::Result { + Ok(self.bytes.len() as u64) + } + async fn get_slice( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result { + Ok(self.bytes.slice(range.start as usize..range.end as usize)) + } + async fn get_slice_stream( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result> { + let slice = self.bytes.slice(range.start as usize..range.end as usize); + Ok(Box::new(std::io::Cursor::new(slice.to_vec()))) + } + } + + async fn open_stream(bytes: Bytes) -> Box { + let source = Arc::new(InMemorySource { bytes }); + let reader = StreamingParquetReader::try_open(source, PathBuf::from("test.parquet")) + .await + .expect("open reader"); + Box::new(reader) + } + + /// Read an output parquet file back into a single concatenated RecordBatch. + fn read_output_to_record_batch(path: &Path) -> RecordBatch { + let bytes = std::fs::read(path).expect("read output"); + let builder = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new( + Bytes::from(bytes), + ) + .expect("read output builder"); + let schema = builder.schema().clone(); + let reader = builder.build().expect("read output build"); + let batches: Vec = reader.collect::, _>>().expect("read output"); + if batches.is_empty() { + RecordBatch::new_empty(schema) + } else { + arrow::compute::concat_batches(&schema, &batches).expect("concat") + } + } + + fn merge_config(num_outputs: usize) -> MergeConfig { + MergeConfig { + num_outputs, + writer_config: ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }, + } + } + + // -------- Tests -------- + + /// Two inputs → one output: row count and sort order preserved. + #[tokio::test] + async fn test_two_inputs_simple_merge() { + let batch_a = make_sorted_batch(50, 0); + let batch_b = make_sorted_batch(50, 50); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 100); + + let merged = read_output_to_record_batch(&outputs[0].path); + assert_eq!(merged.num_rows(), 100); + let ss_array = merged.column(merged.schema().index_of("sorted_series").expect("col")); + let ss = ss_array + .as_any() + .downcast_ref::() + .expect("binary"); + for i in 0..ss_array.len().saturating_sub(1) { + assert!( + ss.value(i) <= ss.value(i + 1), + "row {i}: sorted_series not ascending", + ); + } + } + + /// Single-metric_name input + num_outputs=1 → output is single row group. + #[tokio::test] + async fn test_output_is_single_row_group() { + let batch_a = make_sorted_batch(200, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + assert_eq!( + reader.metadata().num_row_groups(), + 1, + "single-metric_name single-output merge must produce single row group", + ); + } + + /// N inputs → M outputs: total row count preserved (MC-1). + #[tokio::test] + async fn test_total_rows_preserved() { + let batch_a = make_sorted_batch(75, 0); + let batch_b = make_sorted_batch(50, 100); + let batch_c = make_sorted_batch(25, 200); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + let bytes_c = write_input_parquet(std::slice::from_ref(&batch_c), &[]); + + let inputs: Vec> = vec![ + open_stream(bytes_a).await, + open_stream(bytes_b).await, + open_stream(bytes_c).await, + ]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(2)) + .await + .expect("merge"); + + let total: usize = outputs.iter().map(|o| o.num_rows).sum(); + assert_eq!(total, 150); + } + + /// Sort schema mismatch across inputs is rejected. + #[tokio::test] + async fn test_sort_schema_mismatch_rejected() { + let batch_a = make_sorted_batch(20, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "service|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["service".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)).expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.close().expect("close"); + let bytes_b = Bytes::from(buf); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("must reject mismatched sort schema"); + let s = err.to_string(); + assert!( + s.contains("sort schema mismatch"), + "expected 'sort schema mismatch', got: {s}", + ); + } + + /// qh.* KV metadata is propagated to the output; num_merge_ops increments. + #[tokio::test] + async fn test_kv_metadata_propagated_to_output() { + let batch_a = make_sorted_batch(40, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + let kvs = reader + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + .unwrap_or_default(); + let find = |k: &str| -> Option { + kvs.iter() + .find(|kv| kv.key == k) + .and_then(|kv| kv.value.clone()) + }; + assert_eq!( + find(PARQUET_META_SORT_FIELDS).as_deref(), + Some("metric_name|-timestamp_secs/V2"), + ); + assert_eq!( + find(PARQUET_META_WINDOW_START).as_deref(), + Some("1700000000") + ); + assert_eq!(find(PARQUET_META_WINDOW_DURATION).as_deref(), Some("60")); + assert_eq!( + find(PARQUET_META_NUM_MERGE_OPS).as_deref(), + Some("1"), + "num_merge_ops must increment by 1 over input's max", + ); + } + + /// All-empty inputs produce no output. + #[tokio::test] + async fn test_all_empty_inputs_no_output() { + let empty = make_sorted_batch(0, 0); + let bytes = write_input_parquet(std::slice::from_ref(&empty), &[]); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert!(outputs.is_empty()); + } + + /// The streaming engine's output can be drained back via the new + /// page-bounded decoder. End-to-end sanity check. + #[tokio::test] + async fn test_output_drainable_by_stream_decoder() { + let batch_a = make_sorted_batch(40, 0); + let batch_b = make_sorted_batch(40, 40); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let mut output_stream = open_stream(Bytes::from(bytes)).await; + let mut decoder = StreamDecoder::new(&mut *output_stream); + let mut total_decoded = 0usize; + while let Some(page) = decoder.decode_next_page().await.expect("decode") { + // Count only sort col 0 (col_idx 0) pages to get a row count. + if page.col_idx == 0 { + total_decoded += page.array.len(); + } + } + assert_eq!(total_decoded, 80); + } + + /// Page-bounded contract sanity: with a row group large enough to + /// require many parquet pages per col, body col writes go through + /// the page-by-page assembler instead of materialising column + /// chunks. We can't directly observe peak memory from a test, but + /// we *can* assert that the merge completes correctly with input + /// data whose body cols span many pages, and that the output is + /// itself multi-page (no whole-column buffering happened on the + /// output side either). + #[tokio::test] + async fn test_body_col_streams_many_pages_per_column_chunk() { + // Force multiple pages per column chunk by setting a small + // data_page_row_count_limit. With 8000 rows and a 1000-row + // page limit, the output value col chunk must span ≥ 8 pages. + let batch = make_sorted_batch(8000, 0); + let bytes = write_input_parquet(std::slice::from_ref(&batch), &[]); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let writer_config = ParquetWriterConfig { + compression: Compression::Snappy, + data_page_row_count_limit: 1000, + ..ParquetWriterConfig::default() + }; + let config = MergeConfig { + num_outputs: 1, + writer_config, + }; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &config) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 8000); + + // Verify the output is itself multi-page-per-column (which is + // what page-bounded writing should produce, given the default + // data_page_size). Read via the page-bounded decoder and count + // pages for the value column. + let out_bytes = std::fs::read(&outputs[0].path).expect("read"); + let mut output_stream = open_stream(Bytes::from(out_bytes)).await; + // Find the "value" col index in the output's arrow schema BEFORE + // borrowing output_stream mutably for the decoder. + let arrow_schema = parquet::arrow::parquet_to_arrow_schema( + output_stream.metadata().file_metadata().schema_descr(), + None, + ) + .expect("arrow schema"); + let value_col_idx = arrow_schema.index_of("value").expect("value col"); + let mut decoder = StreamDecoder::new(&mut *output_stream); + + let mut value_pages = 0; + while let Some(page) = decoder.decode_next_page().await.expect("decode") { + if page.col_idx == value_col_idx { + value_pages += 1; + } + } + assert!( + value_pages >= 2, + "expected output 'value' col to span multiple pages (got {value_pages}); body col \ + writes should respect data_page_size", + ); + } + + /// Multi-RG input is rejected (PR-6b.2 simplification). + #[tokio::test] + async fn test_multi_rg_input_rejected() { + // Force a 2-RG file by writing two batches with row_group_size = 1 + // small enough to trip RG rollover. + let batch_a = make_sorted_batch(50, 0); + let batch_b = make_sorted_batch(50, 50); + + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + row_group_size: 50, // force one RG per 50-row batch + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["metric_name".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)).expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.write(&batch_b).expect("write"); + writer.close().expect("close"); + let bytes = Bytes::from(buf); + + let inputs: Vec> = vec![open_stream(bytes).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("multi-RG input must be rejected"); + let s = err.to_string(); + assert!( + s.contains("single-row-group"), + "expected 'single-row-group' error, got: {s}", + ); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 3ac908dab3c..071726a3a7d 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -185,7 +185,7 @@ pub fn write_merge_outputs( /// /// Takes the relevant row ranges from each input according to the merge runs, /// concatenates into a single batch, and applies the permutation via `take`. -fn apply_merge_permutation( +pub(super) fn apply_merge_permutation( inputs: &[RecordBatch], union_schema: &SchemaRef, runs: &[MergeRun], @@ -254,7 +254,7 @@ fn predict_num_row_groups(num_rows: usize, row_group_size: usize) -> usize { /// `qh.rg_partition_prefix_len` KV — caller computes this based on /// whether the file is going to be single-RG (preserve input prefix) /// or multi-RG (must be 0). -fn build_merge_kv_metadata( +pub(super) fn build_merge_kv_metadata( input_meta: &InputMetadata, row_keys_proto: &Option>, zonemap_regexes: &std::collections::HashMap, @@ -324,7 +324,10 @@ fn build_merge_kv_metadata( } /// Build `SortingColumn` entries for Parquet file metadata. -fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result> { +pub(super) fn build_sorting_columns( + batch: &RecordBatch, + sort_fields_str: &str, +) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; let schema = batch.schema(); @@ -347,7 +350,7 @@ fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result Result> { +pub(super) fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; Ok(sort_schema .column @@ -361,7 +364,7 @@ fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { /// /// Checks that sorted_series values are non-decreasing, and within equal /// sorted_series values, timestamp_secs respects the schema's sort direction. -fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { +pub(super) fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { if batch.num_rows() <= 1 { return; } From 6cf577de8f4a85d1350e269dc2f8ad97b3316816 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 12 May 2026 12:40:32 -0400 Subject: [PATCH 2/9] fix: persist decoder + page cache across body-col passes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two Codex review findings on PR-6b.2 (#6409): * P1 — Preserve decoder/page cache across output chunks. The merge engine was constructing a fresh `StreamDecoder` for every `advance_decoder_to_row` call, which reset the per-column `rows_decoded` counter so the second decoded page reported `row_start = 0` after the stream had already advanced. The page cache also lived on the per-output assembler, so pages whose row range straddled two outputs were dropped when the first output finished even though the stream couldn't be rewound. Both scenarios produced silently wrong rows or out-of-bounds panics on any input large enough to require multi-page advances per output or multi-output coverage of a single page. The decoder now lives on `InputDecoderState` (owned via the new `StreamDecoder::from_owned` constructor), and the per-input body- col page cache + cursor are reset only at the start of each body column. * P2 — Stream body pages instead of collecting `Vec`. The per-output body-col write now feeds `write_next_column_arrays` one page at a time via `StreamingBodyColIter`, which captures assembly errors in a side cell so memory stays bounded by output- page size rather than column-chunk size. Two regression tests cover the bug shapes — multi-page body col within one output (2500 rows × 50-row pages) and multi-output input where pages span output boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/merge/streaming.rs | 436 +++++++++++++++--- .../src/storage/page_decoder.rs | 63 ++- 2 files changed, 433 insertions(+), 66 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs index d7e74e8428f..16e3785f92e 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -224,18 +224,49 @@ pub async fn streaming_merge_sorted_parquet_files( } /// Per-input state held across phase 0 → phase 3 inside the blocking -/// task. The decoder borrows mutably from its input stream; both live -/// here so the borrow checker is happy with one struct per input. +/// task. The decoder owns its stream so it persists across phases and +/// across all output writes for a given input — critical for body +/// columns whose pages may need to be visited multiple times (one page +/// can supply rows for more than one output, or one output may need +/// rows from more than one page). Reconstructing the decoder mid-pass +/// would reset the per-column `rows_decoded` counter (so `row_start` +/// becomes wrong) and discard cached dictionary / queued pages. struct InputDecoderState { - /// Owned stream (so the decoder's `&mut dyn ColumnPageStream` borrow - /// is anchored to a stable address inside this struct). - stream: Box, + decoder: StreamDecoder<'static>, metadata: Arc, /// Arrow schema of this input (from parquet → arrow conversion). arrow_schema: SchemaRef, + /// Per-input page cache for the *currently active* body column. + /// Pages are appended as the decoder produces them and evicted from + /// the front once their last row is below `body_col_cursor`. The + /// cache must persist across all outputs that consume rows from + /// this column so that a page whose range straddles two outputs is + /// not re-decoded (the stream has already advanced past it). At + /// the start of each body column [`reset_body_col_state`] clears + /// this cache and zeroes the cursor. + body_col_page_cache: Vec, + /// Next unconsumed input row for the active body column. Advances + /// monotonically across outputs because the merge plan assigns each + /// input's rows to outputs in input-row order. + body_col_cursor: usize, } -/// Build per-input state. The streams are moved in from the caller. +impl InputDecoderState { + /// Clear the per-input body-column cache. Called at the start of + /// each new body column so leftover pages from the previous column + /// (which have a different `col_idx`) don't poison the new column's + /// row-start arithmetic. The decoder itself is *not* reset — its + /// per-(rg, col) `rows_decoded` counters and queued pages must + /// survive so subsequent decode calls return correct row offsets. + fn reset_body_col_state(&mut self) { + self.body_col_page_cache.clear(); + self.body_col_cursor = 0; + } +} + +/// Build per-input state. The streams are moved in from the caller and +/// installed in long-lived [`StreamDecoder`]s so per-column state +/// survives every phase of the merge. fn build_input_decoders_state( inputs: &mut Vec>, ) -> Result> { @@ -245,10 +276,13 @@ fn build_input_decoders_state( let parquet_schema = metadata.file_metadata().schema_descr(); let arrow_schema = parquet::arrow::parquet_to_arrow_schema(parquet_schema, None) .context("converting parquet schema → arrow")?; + let decoder = StreamDecoder::from_owned(stream); states.push(InputDecoderState { - stream, + decoder, metadata, arrow_schema: Arc::new(arrow_schema), + body_col_page_cache: Vec::new(), + body_col_cursor: 0, }); } Ok(states) @@ -466,11 +500,9 @@ fn drain_sort_cols_one_input( let mut sort_cols_finished = 0usize; let sort_col_target = sort_col_parquet_indices.len(); - let mut decoder = StreamDecoder::new(&mut *state.stream); - while sort_cols_finished < sort_col_target { let decoded = handle - .block_on(decoder.decode_next_page()) + .block_on(state.decoder.decode_next_page()) .with_context(|| format!("decoding sort col page (input {input_idx})"))?; let page = match decoded { Some(p) => p, @@ -1133,7 +1165,25 @@ fn write_body_col_for_all_outputs( } } - // For each output sequentially: build output pages, feed to writer. + // Reset each input's body-col cache + cursor at the start of this + // column. The persistent `StreamDecoder` retains its per-(rg, col) + // state for *every* column it has touched so the next page from + // this column has the correct `row_start`; only the cached pages + // (which belong to the previous column) need to be discarded. + for state in decoders_state.iter_mut() { + state.reset_body_col_state(); + } + + // Track service names while streaming the service col. + let track_service = col_name == "service"; + + // For each output sequentially: build output pages, feed to writer + // one page at a time. We must NOT collect the whole column into a + // Vec — that would defeat the page-bounded merge path and scale + // memory with column-chunk size on production splits. Instead we + // hand `write_next_column_arrays` a streaming iterator that + // captures the first error in a side cell so the writer stops as + // soon as assembly fails. let mut storage_idx = 0; for (out_idx, &row_count) in destinations.rows_per_output.iter().enumerate() { if row_count == 0 { @@ -1149,8 +1199,6 @@ fn write_body_col_for_all_outputs( let out_field_idx = out_schema.index_of(col_name)?; let out_field = out_schema.field(out_field_idx); - // Build per-output assembler. Feeds one output page per - // `Iterator::next()` call. let assembler = BodyColOutputPageAssembler::new( handle, decoders_state, @@ -1161,28 +1209,30 @@ fn write_body_col_for_all_outputs( out_field, ); - // Track service names while streaming the service col. - let track_service = col_name == "service"; - - // Drive the assembler via a sync iterator. We must `?`-propagate - // assembly errors out of the iterator; collect into a Vec and - // return on first error. - let pages: Vec = assembler - .into_iter() - .collect::>>() - .with_context(|| format!("assembling body col '{col_name}' for output {out_idx}"))?; - - if track_service { - for page in &pages { - collect_service_names_from_page( - page.as_ref(), - &mut service_names_per_output[storage_idx], - )?; - } - } + let mut error_slot: Option = None; + let service_collector: Option<&mut HashSet> = if track_service { + Some(&mut service_names_per_output[storage_idx]) + } else { + None + }; - row_groups[storage_idx] - .write_next_column_arrays(pages.into_iter()) + let stream_iter = StreamingBodyColIter { + inner: assembler.into_iter(), + error_slot: &mut error_slot, + service_collector, + }; + + let write_result = row_groups[storage_idx].write_next_column_arrays(stream_iter); + + // Assembly errors are reported via `error_slot`; surface them + // first because a downstream write error is usually a + // consequence (the writer stops on `None` and reports a + // row-count mismatch otherwise). + if let Some(err) = error_slot { + return Err(err) + .with_context(|| format!("assembling body col '{col_name}' for output {out_idx}")); + } + write_result .with_context(|| format!("writing body col '{col_name}' to output {out_idx}"))?; storage_idx += 1; } @@ -1190,6 +1240,47 @@ fn write_body_col_for_all_outputs( Ok(()) } +/// Adapts a `Result` page assembler into the +/// `Iterator` shape `write_next_column_arrays` expects. +/// The first assembly error is captured in `error_slot` and iteration +/// ends; the caller MUST check the slot after the writer returns. If +/// `service_collector` is `Some`, every yielded page is scanned for +/// service names and added to the set; collection failures also stop +/// the iterator and populate `error_slot`. +struct StreamingBodyColIter<'a, I> { + inner: I, + error_slot: &'a mut Option, + service_collector: Option<&'a mut HashSet>, +} + +impl Iterator for StreamingBodyColIter<'_, I> +where I: Iterator> +{ + type Item = ArrayRef; + + fn next(&mut self) -> Option { + if self.error_slot.is_some() { + return None; + } + match self.inner.next() { + Some(Ok(arr)) => { + if let Some(out) = self.service_collector.as_deref_mut() + && let Err(e) = collect_service_names_from_page(arr.as_ref(), out) + { + *self.error_slot = Some(e); + return None; + } + Some(arr) + } + Some(Err(e)) => { + *self.error_slot = Some(e); + None + } + None => None, + } + } +} + /// Per-page service name collector. Used during the streaming write /// of the "service" body col to populate per-output service_names. fn collect_service_names_from_page(arr: &dyn Array, out: &mut HashSet) -> Result<()> { @@ -1323,6 +1414,14 @@ fn collect_service_names_from_page(arr: &dyn Array, out: &mut HashSet) - /// Memory per `next()` call: one in-progress output page (P rows) + /// up to ~2 in-flight decoded pages per input (kept until all their /// rows are consumed). Bounded by page sizes, not column-chunk sizes. +/// +/// **Cross-output state**: the per-input `body_col_page_cache` and +/// `body_col_cursor` live on [`InputDecoderState`], not the assembler. +/// They must persist across all outputs that consume rows from the +/// active body column — a page whose row range straddles two outputs +/// would otherwise be dropped when the first output's assembler ends, +/// even though the stream has already advanced past it and the next +/// output still needs rows from inside that page. struct BodyColOutputPageAssembler<'a> { handle: &'a Handle, decoders_state: &'a mut [InputDecoderState], @@ -1331,12 +1430,6 @@ struct BodyColOutputPageAssembler<'a> { out_idx: usize, col_name: &'a str, out_field: &'a Field, - /// Cursor into `destinations.per_input[*]` — next input row index per input. - input_cursors: Vec, - /// Per-input decoded page cache. Pages are kept in order; the - /// front page's `row_start` is the smallest input row we still - /// have decoded. - page_cache: Vec>, /// Total rows written so far for this output's col. rows_emitted: usize, /// Total rows expected = destinations.rows_per_output[out_idx]. @@ -1356,11 +1449,6 @@ impl<'a> BodyColOutputPageAssembler<'a> { col_name: &'a str, out_field: &'a Field, ) -> Self { - let num_inputs = decoders_state.len(); - let mut page_cache: Vec> = Vec::with_capacity(num_inputs); - for _ in 0..num_inputs { - page_cache.push(Vec::new()); - } Self { handle, decoders_state, @@ -1369,8 +1457,6 @@ impl<'a> BodyColOutputPageAssembler<'a> { out_idx, col_name, out_field, - input_cursors: vec![0; num_inputs], - page_cache, rows_emitted: 0, expected_rows: destinations.rows_per_output[out_idx], done: false, @@ -1458,7 +1544,7 @@ fn assemble_one_output_page(s: &mut BodyColOutputPageAssembler) -> Result