feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)#6409
feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)#6409g-talbot wants to merge 7 commits into
Conversation
6226032 to
add52f0
Compare
4bc7122 to
5d5c4b1
Compare
add52f0 to
e1990b2
Compare
5d5c4b1 to
e660f78
Compare
e1990b2 to
2af2fa8
Compare
e660f78 to
fcfb854
Compare
2af2fa8 to
b2eee32
Compare
fcfb854 to
736ce0e
Compare
b2eee32 to
85b679a
Compare
736ce0e to
7bcf723
Compare
85b679a to
38d4763
Compare
7bcf723 to
67ac5b0
Compare
3c6e227 to
83153a2
Compare
309372e to
898a709
Compare
83153a2 to
f0a2d99
Compare
898a709 to
9c3e606
Compare
618e366 to
ecd10a2
Compare
… (PR-6b.2)
Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The
streaming merge engine now keeps body-col memory bounded by output
page size (not column-chunk size) while preserving caller-specified
M:N output splitting at sorted_series boundaries.
Architecture (Husky multi-input → multi-output sorted merge):
Phase 0 (async) — drain sort cols from each input. With Husky
column ordering, sort cols + sorted_series are the prefix of each
row group's body bytes, so the decoder can stop after they are
fully decoded; the remaining body col pages stay un-read in the
input stream, ready for phase 3.
Phase 1 — compute_merge_order over the per-input sort-col
RecordBatches using the existing k-way (sorted_series,
timestamp_secs) heap.
Phase 2 — compute_output_boundaries with the caller's
num_outputs, splitting at sorted_series transitions.
Phase 3 (blocking + block_on bridges) — streaming write. All M
output writers are alive for the duration. For each column in
Husky order, every output's col K is written in turn:
- Sort col / sorted_series: applied via arrow::interleave from
the already-buffered phase-0 data.
- Body col: each output page is assembled via arrow::interleave
from input page slices, with decoders advanced page-by-page via
handle.block_on from inside the sync iterator passed to
write_next_column_arrays. Pages flush to the writer's sink as
SerializedColumnWriter's page-size threshold trips — memory
stays bounded by the in-flight output page plus a small number
of in-flight input pages.
After all M outputs' col K is done, every input decoder is at the
start of col K+1 in its single row group. Move to col K+1.
PR-6b.2 only handles single-row-group inputs (real or PR-5-
adapter-presented). Multi-RG metric-aligned inputs are rejected
with a clear error message; supporting them requires either
consuming + discarding body cols of RG[i-1] from the stream to
reach RG[i]'s sort cols, or a second body GET — both are larger
scope changes that land in a follow-up.
Page-bounded contract verified by
test_body_col_streams_many_pages_per_column_chunk: with
data_page_row_count_limit=1000 on an 8000-row merge, the output
value column spans ≥ 2 pages, demonstrating that body col writes
respect data_page_size and do not materialise whole column chunks.
Tests (9, all passing): two-input merge, single-RG output for
single-metric_name input, total-rows-preserved across M:N,
sort-schema mismatch rejection, KV metadata propagation,
all-empty-inputs no-output, output drainable by StreamDecoder,
multi-RG input rejection, page-bounded body col streaming.
Also exposes existing helpers in merge/writer.rs as pub(super)
(apply_merge_permutation, build_merge_kv_metadata,
build_sorting_columns, resolve_sort_field_names, verify_sort_order)
so streaming.rs can reuse the same MC-3 / KV / sorting-columns
construction the non-streaming engine uses. PR-7 will fold the
non-streaming engine away.
PR-6c.2 will add file-size monitoring on top: close the current
output at the next sorted_series transition when an in-progress
file approaches the size cap, producing additional splits beyond
the caller's N.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ecd10a2 to
26b9257
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 26b9257544
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Address two Codex review findings on PR-6b.2 (#6409): * P1 — Preserve decoder/page cache across output chunks. The merge engine was constructing a fresh `StreamDecoder` for every `advance_decoder_to_row` call, which reset the per-column `rows_decoded` counter so the second decoded page reported `row_start = 0` after the stream had already advanced. The page cache also lived on the per-output assembler, so pages whose row range straddled two outputs were dropped when the first output finished even though the stream couldn't be rewound. Both scenarios produced silently wrong rows or out-of-bounds panics on any input large enough to require multi-page advances per output or multi-output coverage of a single page. The decoder now lives on `InputDecoderState` (owned via the new `StreamDecoder::from_owned` constructor), and the per-input body- col page cache + cursor are reset only at the start of each body column. * P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The per-output body-col write now feeds `write_next_column_arrays` one page at a time via `StreamingBodyColIter`, which captures assembly errors in a side cell so memory stays bounded by output- page size rather than column-chunk size. Two regression tests cover the bug shapes — multi-page body col within one output (2500 rows × 50-row pages) and multi-output input where pages span output boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 81f7414800
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0 explicitly accepts inputs with `num_row_groups() == 0` (returning a zero-row sort batch), but `write_body_col_for_all_outputs` unconditionally called `state.metadata.row_group(0)` for every input, panicking with "index out of bounds" before the first body column was written. Treat zero-RG inputs the same as inputs whose schema lacks the current column: push `None` into `input_col_indices` and skip them for this body col. Also drop the unused `input_target_rows` vec that was being built only for its row-group lookup side effect. Regression test `test_zero_row_input_mixed_with_non_empty` builds a 0-row + 50-row pair and merges them through the streaming engine; without this fix the merge blocking task panics inside parquet-rs's `row_group()` indexing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 05127947b1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema derivation condition `sort_optimised.has(name) || full_union.has(name)` was tautologically true for every iterated field — every `field` came from `full_union_schema`, so the second disjunct was always satisfied and the intended "drop all-null sort fields" branch never fired. Pass the sort union schema in explicitly so we can tell sort fields apart from body fields. Sort field present in `sort_union_schema` → keep only if `optimize_output_batch` kept it (not all-null for this output's rows). Body field → keep unconditionally; tracking per-output body-col presence would require pre-reading every body column for every output, which is the column-chunk-bounded buffering the streaming path exists to avoid. Regression test `test_derive_output_schema_drops_all_null_sort_field` calls the helper directly with a synthetic union + sort-optimised pair and asserts an all-null sort field is dropped while a body field with the same union-schema position is preserved. Verified the test fails against the pre-fix logic with the expected `['metric_name', 'env', 'timestamp_secs', 'value']` vs `['metric_name', 'timestamp_secs', 'value']` mismatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
https://github.com/quickwit-oss/quickwit/blob/1b80d47f2320c95963d569f35e3238e6848a848d/quickwit-parquet-engine/src/merge/streaming.rs#L1072-L1076
Use the full union schema when driving column writes
When different outputs drop different all-null sort fields, selecting one per-output schema as the parent can omit fields that are still present in another output. For example, if one output keeps env and drops region while another keeps region and drops env with the same field count, the first schema wins here, region is never visited, and the second writer either misses that column or writes subsequent columns into the wrong next slot. Drive the column loop from the full union schema instead of a single output schema.
https://github.com/quickwit-oss/quickwit/blob/1b80d47f2320c95963d569f35e3238e6848a848d/quickwit-parquet-engine/src/merge/streaming.rs#L1037-L1041
Preserve service tags when service is a sort column
For sort schemas that include service (e.g. metric_name|service|...), this branch writes service through the sort-column path, so it never reaches write_body_col_for_all_outputs where track_service populates service_names_per_output; finalize_output_writer then emits no TAG_SERVICE low-cardinality metadata even though the column is present. Collect service names from the sort-column data as well, or derive them from the per-output sort batch.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Bundle three pieces:
- **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in
the streaming engine's doc-comments and error messages with neutral
"sort-cols-first storage ordering" / "column ordering" phrasing.
Project is Quickwit; the internal column-ordering scheme didn't
need a separate brand in user-visible error strings.
- **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup
in `drain_sort_cols_one_input` is guarded by a `contains_key`
check; promote the implicit invariant to a documented panic
message per CODE_STYLE.md.
- **`align_inputs_to_union_schema` nullability fix.** The first-sight
branch unconditionally marked new fields nullable; the existing
comment claims "columns that don't appear in every input must be
nullable" but the code applied that rule to every field. Replaced
with a two-pass scheme: track `any_nullable` and `appears_in` per
field across all inputs, then mark nullable iff some input had it
nullable OR the field is missing from some input. This unblocks
`List<Float64>` columns end-to-end (the writer rejects nullable
List; the previous behaviour forced every list column nullable on
first sight even when every input declared it non-null).
- **MC-2 round-trip integration test.** New
`test_mc2_all_types_round_trip_through_streaming_merge` builds two
inputs covering every parquet physical type the decoder accepts —
Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8,
Dictionary<Int32, Utf8>, LargeBinary, and non-nullable
`List<Float64>` — merges them through the streaming engine, and
asserts every `(sorted_series_key → body-col tuple)` pair survives
byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→
Binary) are normalised in the render helper because MC-2 promises
value preservation, not internal representation preservation.
This test caught two real bugs while being written:
1. Body cols must be declared in lexicographic order — the streaming
engine assumes the storage convention and crashes mid-merge if
they aren't. Fixture re-ordered accordingly. (Worth adding
upfront validation in a follow-up; not in scope here.)
2. The schema-union nullability bug above.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN
that records the maximum length any input's `body_col_page_cache`
ever reached during the current merge, bumped on every page push in
`advance_decoder_to_row`. Zero production overhead — the `record_*`
helper compiles to a no-op outside test builds.
New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size`
runs the streaming merge over three input sizes (300 / 3 000 / 30 000
rows at 50-row pages) and asserts:
1. Peak resident pages stays below a fixed ceiling (24, for the
ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a
few-page slack for decoder lookahead + transients).
2. Growth from 3 000 to 30 000 rows (10× more input pages) yields
at most a 2-page increase in peak. The whole MS-7 claim is that
peak does not scale with input size.
Verified the test catches a deliberate regression: removing the per-
output-page eviction loop in `assemble_one_output_page` pushed the
3 000-row peak to 60 (60 > 24) and the test failed with the expected
message.
Fixture support: `write_input_parquet_with_small_pages` now also
sets `write_batch_size` and `data_page_size` proportional to the
requested page row count. Without those, the arrow writer's defaults
(64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently
ignored and produced one giant page per column. Probed the output
via `get_column_page_reader` — 30 000 rows now produces 600 pages
per col as expected.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…m sort col Address two new Codex P2 findings on PR-6b.2 (#6409): - **Use the full union schema when driving column writes.** The old `build_parent_union_schema` picked one per-output schema by field count and used it as the column-iteration driver. If two outputs drop *different* all-null sort fields and end up with the same field count, the first wins — and any column it dropped is never iterated, leaving the other output's writer missing a column or writing subsequent columns into the wrong slot. The doc comment already said "process the FULL union schema's cols in order"; the implementation diverged. Drive `write_all_columns` from `full_union_schema` directly and delete the broken heuristic. - **Collect service names from the sort-col path too.** If the sort schema places `service` in the sort key (`metric_name|service|...`), the streaming engine writes it via the sort-col path and the body-col `track_service` branch never runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came back empty even though every row had a service value. Extract service names from `static_meta.sort_optimised` at `finalize_output_writer` time so the TAG_SERVICE metadata is accurate regardless of which write path the column took. Two regression tests: - `test_heterogeneous_dropped_fields_drive_from_full_union_schema` builds two inputs whose per-output schemas drop different all-null sort fields with the same field count. Each kept tag must survive to its output. Verified the test fails (panic on missing column) against the pre-fix logic. - `test_service_as_sort_column_still_populates_low_cardinality_tags` uses a sort schema `metric_name|service|-timestamp_secs/V2` and asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers every distinct service value. Verified the test fails against pre-fix `finalize_output_writer` with the expected "must contain TAG_SERVICE" message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Re: review #6409 (review) — these two findings landed as review-body Markdown links rather than inline comments, so I can't thread-reply individually. Addressing both here. P2 — Use the full union schema when driving column writes ( Regression test P2 — Preserve service tags when service is a sort column. Confirmed and fixed in 61163cd (and in PR-6c.2 / #6410 at d93f8e6 via the equivalent code path). You're right: Fix: at finalize time, check if Regression test Both fixes pushed; PR-6b.2 at 461 tests / PR-6c.2 at 468 tests, clippy + nightly fmt clean. |
|
@codex review |
1 similar comment
|
@codex review |
|
Codex Review: Didn't find any major issues. Another round soon, please! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
adamtobey
left a comment
There was a problem hiding this comment.
I think this all makes sense to me. It does get a bit confusing in ways that I suspect could be factored better, but I think it's reasonable to not worry about that now (or maybe I'm wrong about that). Left some comments, but didn't find anything I'd consider an issue
| let mut decoders_state = build_input_decoders_state(&mut inputs)?; | ||
|
|
||
| // Phase 0 | ||
| let sort_col_batches = |
There was a problem hiding this comment.
To understand: we do have a hard requirement on column ordering across all files? Sort cols must be first, then all other cols must be in consistent order?
| window_start_secs: consensus_window_start.unwrap_or(None), | ||
| window_duration_secs: consensus_window_duration.unwrap_or(0), | ||
| num_merge_ops: max_merge_ops + 1, | ||
| rg_partition_prefix_len: consensus_prefix_len.unwrap_or(0), |
There was a problem hiding this comment.
Are we considering these truly optional? Or should we enforce that we do have them and do output them?
| page.col_idx, | ||
| ); | ||
| } | ||
| if page.rg_idx != 0 { |
There was a problem hiding this comment.
I'm guessing this will be relaxed later if we can support metric-name rowgroups that have consistent ordering across all input files?
| per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx])); | ||
| rows_per_output[out_idx] += 1; |
There was a problem hiding this comment.
nit: rows_for_current_output could be a register then rows_per_output[out_idx] = rows_for_current_output after the loop?
| /// don't want to take in the page-bounded path. Re-introducing | ||
| /// dict-encoded output strings can be done later by tracking | ||
| /// cardinality during the streaming pass — call site is here. |
There was a problem hiding this comment.
Reasonable TODO. If we know things are dictionary encoded we can optimize for merging dictionaries and similar cases, better than un-encode -> interleave -> re-encode
| } | ||
| }; | ||
| let max_needed_row = *input_rows.iter().max().expect("non-empty"); | ||
| advance_decoder_to_row( |
There was a problem hiding this comment.
If I understand this function correctly I think something like fill_page_cache_to_row would be a clearer name. My initial read on this was "why are we advancing the decoder without reading any result? Is there a case where we're skipping data?" but that's not what the function does as I understand it
| } else { | ||
| let cache_start = pages[0].row_start; | ||
| let arrays: Vec<&dyn Array> = pages.iter().map(|p| p.array.as_ref()).collect(); | ||
| let concatenated = arrow::compute::concat(&arrays).with_context(|| { |
There was a problem hiding this comment.
It's a little hard to follow all of this, but is it correct to say that pages here is just double buffering the pages on the stream as it's read? It's not going to actually end up filling with all the pages in the column?
There was a problem hiding this comment.
OK actually it seems like the page cache will hold all the input pages for this column that contribute to the current output, dropped at the end of this function, and filled from advance_decoder_to_row?
Summary
sorted_seriesboundaries — the existingcompute_output_boundarieslogic is unchanged; the merge engine just runs its column writes streamingly.Architecture (Husky multi-input → multi-output sorted merge)
Phase 0 (async) — drain sort cols +
sorted_seriesfrom each input viaStreamDecoder::decode_next_page(). Husky column ordering puts sort cols first within each row group's body bytes, so the decoder stops after they're fully decoded; body col pages stay un-read in the stream for phase 3.Phase 1 —
compute_merge_orderover per-input sort-colRecordBatches using the existing k-way(sorted_series, timestamp_secs)heap.Phase 2 —
compute_output_boundarieswith the caller'snum_outputs, splitting at sorted_series transitions. Same algorithm as the non-streaming engine.Phase 3 (blocking + block_on bridges) — streaming write. All M output writers are alive concurrently inside one
tokio::task::spawn_blocking. For each column in Husky order, every output's col K is written in turn viawrite_next_column_arrays:arrow::compute::interleavefrom the already-buffered phase-0 data, split intoOUTPUT_PAGE_ROWS-sized chunks → page-sized arrays.arrow::compute::interleavefrom input page slices. The sync iterator passed towrite_next_column_arraysdrives the async decoder viatokio::runtime::Handle::block_on(decode_next_page())from inside the spawn_blocking task. Pages flush to the writer's sink asSerializedColumnWriter's page-size threshold trips — memory stays bounded by the in-flight output page + a few in-flight input pages per input.After all M outputs' col K is done, every input decoder is at the start of col K+1 in its single row group. Move to col K+1.
Why async/tokio
ColumnPageStreamis async (S3 reads). The parquet writer is sync. Singlespawn_blocking+Handle::block_onfrom sync iterators is cleaner than per-output tokio tasks: the writers + borrowedRowGroupBuilders all live in one scope, lifetimes track properly, and merge work is mostly CPU-bound so parallelising outputs across threads wouldn't help much.Single-RG-input simplification
PR-6b.2 rejects multi-RG inputs with a clear error message. Rationale: with Husky ordering, each RG's body bytes look like
[sort_cols, body_cols]. For multi-RG inputs the layout is[sort_cols_RG0, body_cols_RG0, sort_cols_RG1, body_cols_RG1, ...]. To collect sort cols across all RGs in phase 0 without buffering RG[i-1]'s body cols would require a second body GET (sparse range request for sort col bytes only), which violates the "two GETs per input" contract.The fix lives in PR-6c.2 as the per-merge-region restructure: drain sort cols of ONE input RG at a time (= one merge region), produce output for that region, then advance to the next RG. The sort-prefix alignment (
prefix_len ≥ 1) guarantees that one merge region = at most one RG per input, so the per-region phase 0 never needs to span RGs.Today's real inputs satisfy the single-RG assumption: post-PR-3 ingest splits will be single-RG by design, and legacy multi-RG splits route through PR-5's
LegacyMultiRGAdapter(#6408) which buffers the whole file and presents it as a synthetic single-RG stream.M:N preserved
Caller's
num_outputsconfig is honoured. The engine splits atsorted_seriestransitions (the existingcompute_output_boundariesalgorithm), which can occur mid-input-RG since they're at row-key boundaries. The "don't let the file get too big" file-size cap (which adds additional splits beyond the caller's N at any sort-key transition) lands in PR-6c.2.Tests
9 tests, all passing:
test_two_inputs_simple_merge— 2 inputs → 1 output, sort order preserved.test_output_is_single_row_group— single-metric_name input + num_outputs=1 → single output RG.test_total_rows_preserved— N inputs → M outputs, MC-1 invariant.test_sort_schema_mismatch_rejected— incompatible sort schemas error.test_kv_metadata_propagated_to_output—qh.*KVs propagate,num_merge_opsincrements.test_all_empty_inputs_no_output— zero-row inputs → empty output set.test_output_drainable_by_stream_decoder— output round-trips through PR-6a.2's decoder.test_multi_rg_input_rejected— multi-RG input rejected with clear error.test_body_col_streams_many_pages_per_column_chunk— page-bounded contract sanity: withdata_page_row_count_limit=1000on an 8000-row merge, output value col spans ≥ 2 pages.441/441 crate tests pass; clippy clean; doc, machete, license, fmt all clean.
Helpers exposed
Existing helpers in
merge/writer.rsare nowpub(super)(apply_merge_permutation,build_merge_kv_metadata,build_sorting_columns,resolve_sort_field_names,verify_sort_order) sostreaming.rsreuses the same MC-3 / KV / sorting-columns construction the non-streaming engine uses. PR-7 will fold the non-streaming engine away.Stack
Base:
gtt/parquet-page-decoder(PR-6a.2 #6407).PR-6c.2 (#6410) extends with: per-merge-region restructure → unlocks multi-RG metric-aligned input support + multi-RG output at metric_name boundaries + file-size cap with sort-key-boundary splits.