Skip to content

feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)#6409

Open
g-talbot wants to merge 7 commits into
gtt/parquet-page-decoderfrom
gtt/streaming-merge-engine-merger
Open

feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)#6409
g-talbot wants to merge 7 commits into
gtt/parquet-page-decoderfrom
gtt/streaming-merge-engine-merger

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 8, 2026

Summary

  • Streaming column-major merge engine on top of PR-6a.2's per-page Arrow decoder. Body col memory is bounded by output page size, not column-chunk size.
  • Preserves caller-specified M:N output splitting at sorted_series boundaries — the existing compute_output_boundaries logic is unchanged; the merge engine just runs its column writes streamingly.
  • Single-RG inputs only in this PR (real or PR-5-adapter-presented). Multi-RG metric-aligned input streaming lands in PR-6c.2 alongside the per-merge-region restructure.

Architecture (Husky multi-input → multi-output sorted merge)

Phase 0 (async) — drain sort cols + sorted_series from each input via StreamDecoder::decode_next_page(). Husky column ordering puts sort cols first within each row group's body bytes, so the decoder stops after they're fully decoded; body col pages stay un-read in the stream for phase 3.

Phase 1compute_merge_order over per-input sort-col RecordBatches using the existing k-way (sorted_series, timestamp_secs) heap.

Phase 2compute_output_boundaries with the caller's num_outputs, splitting at sorted_series transitions. Same algorithm as the non-streaming engine.

Phase 3 (blocking + block_on bridges) — streaming write. All M output writers are alive concurrently inside one tokio::task::spawn_blocking. For each column in Husky order, every output's col K is written in turn via write_next_column_arrays:

  • Sort col / sorted_series: applied via arrow::compute::interleave from the already-buffered phase-0 data, split into OUTPUT_PAGE_ROWS-sized chunks → page-sized arrays.
  • Body col: each output page is assembled via arrow::compute::interleave from input page slices. The sync iterator passed to write_next_column_arrays drives the async decoder via tokio::runtime::Handle::block_on(decode_next_page()) from inside the spawn_blocking task. Pages flush to the writer's sink as SerializedColumnWriter's page-size threshold trips — memory stays bounded by the in-flight output page + a few in-flight input pages per input.

After all M outputs' col K is done, every input decoder is at the start of col K+1 in its single row group. Move to col K+1.

Why async/tokio

ColumnPageStream is async (S3 reads). The parquet writer is sync. Single spawn_blocking + Handle::block_on from sync iterators is cleaner than per-output tokio tasks: the writers + borrowed RowGroupBuilders all live in one scope, lifetimes track properly, and merge work is mostly CPU-bound so parallelising outputs across threads wouldn't help much.

Single-RG-input simplification

PR-6b.2 rejects multi-RG inputs with a clear error message. Rationale: with Husky ordering, each RG's body bytes look like [sort_cols, body_cols]. For multi-RG inputs the layout is [sort_cols_RG0, body_cols_RG0, sort_cols_RG1, body_cols_RG1, ...]. To collect sort cols across all RGs in phase 0 without buffering RG[i-1]'s body cols would require a second body GET (sparse range request for sort col bytes only), which violates the "two GETs per input" contract.

The fix lives in PR-6c.2 as the per-merge-region restructure: drain sort cols of ONE input RG at a time (= one merge region), produce output for that region, then advance to the next RG. The sort-prefix alignment (prefix_len ≥ 1) guarantees that one merge region = at most one RG per input, so the per-region phase 0 never needs to span RGs.

Today's real inputs satisfy the single-RG assumption: post-PR-3 ingest splits will be single-RG by design, and legacy multi-RG splits route through PR-5's LegacyMultiRGAdapter (#6408) which buffers the whole file and presents it as a synthetic single-RG stream.

M:N preserved

Caller's num_outputs config is honoured. The engine splits at sorted_series transitions (the existing compute_output_boundaries algorithm), which can occur mid-input-RG since they're at row-key boundaries. The "don't let the file get too big" file-size cap (which adds additional splits beyond the caller's N at any sort-key transition) lands in PR-6c.2.

Tests

9 tests, all passing:

  • test_two_inputs_simple_merge — 2 inputs → 1 output, sort order preserved.
  • test_output_is_single_row_group — single-metric_name input + num_outputs=1 → single output RG.
  • test_total_rows_preserved — N inputs → M outputs, MC-1 invariant.
  • test_sort_schema_mismatch_rejected — incompatible sort schemas error.
  • test_kv_metadata_propagated_to_outputqh.* KVs propagate, num_merge_ops increments.
  • test_all_empty_inputs_no_output — zero-row inputs → empty output set.
  • test_output_drainable_by_stream_decoder — output round-trips through PR-6a.2's decoder.
  • test_multi_rg_input_rejected — multi-RG input rejected with clear error.
  • test_body_col_streams_many_pages_per_column_chunkpage-bounded contract sanity: with data_page_row_count_limit=1000 on an 8000-row merge, output value col spans ≥ 2 pages.

441/441 crate tests pass; clippy clean; doc, machete, license, fmt all clean.

Helpers exposed

Existing helpers in merge/writer.rs are now pub(super) (apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, resolve_sort_field_names, verify_sort_order) so streaming.rs reuses the same MC-3 / KV / sorting-columns construction the non-streaming engine uses. PR-7 will fold the non-streaming engine away.

Stack

Base: gtt/parquet-page-decoder (PR-6a.2 #6407).

PR-6c.2 (#6410) extends with: per-merge-region restructure → unlocks multi-RG metric-aligned input support + multi-RG output at metric_name boundaries + file-size cap with sort-key-boundary splits.

@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 6226032 to add52f0 Compare May 8, 2026 13:33
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 4bc7122 to 5d5c4b1 Compare May 8, 2026 20:49
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from add52f0 to e1990b2 Compare May 8, 2026 20:49
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 5d5c4b1 to e660f78 Compare May 8, 2026 21:28
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from e1990b2 to 2af2fa8 Compare May 8, 2026 21:31
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from e660f78 to fcfb854 Compare May 8, 2026 21:46
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 2af2fa8 to b2eee32 Compare May 8, 2026 21:47
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from fcfb854 to 736ce0e Compare May 9, 2026 00:08
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from b2eee32 to 85b679a Compare May 9, 2026 00:08
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 736ce0e to 7bcf723 Compare May 11, 2026 11:06
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 85b679a to 38d4763 Compare May 11, 2026 11:06
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 7bcf723 to 67ac5b0 Compare May 11, 2026 11:15
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch 3 times, most recently from 3c6e227 to 83153a2 Compare May 11, 2026 16:36
@g-talbot g-talbot changed the title feat: streaming column-major single-RG merge engine (PR-6b) feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2) May 11, 2026
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 309372e to 898a709 Compare May 11, 2026 18:18
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 83153a2 to f0a2d99 Compare May 11, 2026 18:18
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 898a709 to 9c3e606 Compare May 12, 2026 11:10
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch 3 times, most recently from 618e366 to ecd10a2 Compare May 12, 2026 12:40
… (PR-6b.2)

Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The
streaming merge engine now keeps body-col memory bounded by output
page size (not column-chunk size) while preserving caller-specified
M:N output splitting at sorted_series boundaries.

Architecture (Husky multi-input → multi-output sorted merge):

  Phase 0 (async) — drain sort cols from each input. With Husky
  column ordering, sort cols + sorted_series are the prefix of each
  row group's body bytes, so the decoder can stop after they are
  fully decoded; the remaining body col pages stay un-read in the
  input stream, ready for phase 3.

  Phase 1 — compute_merge_order over the per-input sort-col
  RecordBatches using the existing k-way (sorted_series,
  timestamp_secs) heap.

  Phase 2 — compute_output_boundaries with the caller's
  num_outputs, splitting at sorted_series transitions.

  Phase 3 (blocking + block_on bridges) — streaming write. All M
  output writers are alive for the duration. For each column in
  Husky order, every output's col K is written in turn:
   - Sort col / sorted_series: applied via arrow::interleave from
     the already-buffered phase-0 data.
   - Body col: each output page is assembled via arrow::interleave
     from input page slices, with decoders advanced page-by-page via
     handle.block_on from inside the sync iterator passed to
     write_next_column_arrays. Pages flush to the writer's sink as
     SerializedColumnWriter's page-size threshold trips — memory
     stays bounded by the in-flight output page plus a small number
     of in-flight input pages.

After all M outputs' col K is done, every input decoder is at the
start of col K+1 in its single row group. Move to col K+1.

PR-6b.2 only handles single-row-group inputs (real or PR-5-
adapter-presented). Multi-RG metric-aligned inputs are rejected
with a clear error message; supporting them requires either
consuming + discarding body cols of RG[i-1] from the stream to
reach RG[i]'s sort cols, or a second body GET — both are larger
scope changes that land in a follow-up.

Page-bounded contract verified by
test_body_col_streams_many_pages_per_column_chunk: with
data_page_row_count_limit=1000 on an 8000-row merge, the output
value column spans ≥ 2 pages, demonstrating that body col writes
respect data_page_size and do not materialise whole column chunks.

Tests (9, all passing): two-input merge, single-RG output for
single-metric_name input, total-rows-preserved across M:N,
sort-schema mismatch rejection, KV metadata propagation,
all-empty-inputs no-output, output drainable by StreamDecoder,
multi-RG input rejection, page-bounded body col streaming.

Also exposes existing helpers in merge/writer.rs as pub(super)
(apply_merge_permutation, build_merge_kv_metadata,
build_sorting_columns, resolve_sort_field_names, verify_sort_order)
so streaming.rs can reuse the same MC-3 / KV / sorting-columns
construction the non-streaming engine uses. PR-7 will fold the
non-streaming engine away.

PR-6c.2 will add file-size monitoring on top: close the current
output at the next sorted_series transition when an in-progress
file approaches the size cap, producing additional splits beyond
the caller's N.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from ecd10a2 to 26b9257 Compare May 12, 2026 13:48
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 26b9257544

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Address two Codex review findings on PR-6b.2 (#6409):

* P1 — Preserve decoder/page cache across output chunks. The merge
  engine was constructing a fresh `StreamDecoder` for every
  `advance_decoder_to_row` call, which reset the per-column
  `rows_decoded` counter so the second decoded page reported
  `row_start = 0` after the stream had already advanced. The page
  cache also lived on the per-output assembler, so pages whose row
  range straddled two outputs were dropped when the first output
  finished even though the stream couldn't be rewound. Both
  scenarios produced silently wrong rows or out-of-bounds panics on
  any input large enough to require multi-page advances per output
  or multi-output coverage of a single page.

  The decoder now lives on `InputDecoderState` (owned via the new
  `StreamDecoder::from_owned` constructor), and the per-input body-
  col page cache + cursor are reset only at the start of each body
  column.

* P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The
  per-output body-col write now feeds `write_next_column_arrays`
  one page at a time via `StreamingBodyColIter`, which captures
  assembly errors in a side cell so memory stays bounded by output-
  page size rather than column-chunk size.

Two regression tests cover the bug shapes — multi-page body col
within one output (2500 rows × 50-row pages) and multi-output input
where pages span output boundaries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 81f7414800

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0
explicitly accepts inputs with `num_row_groups() == 0` (returning a
zero-row sort batch), but `write_body_col_for_all_outputs`
unconditionally called `state.metadata.row_group(0)` for every
input, panicking with "index out of bounds" before the first body
column was written.

Treat zero-RG inputs the same as inputs whose schema lacks the
current column: push `None` into `input_col_indices` and skip them
for this body col. Also drop the unused `input_target_rows` vec
that was being built only for its row-group lookup side effect.

Regression test `test_zero_row_input_mixed_with_non_empty` builds a
0-row + 50-row pair and merges them through the streaming engine;
without this fix the merge blocking task panics inside parquet-rs's
`row_group()` indexing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 05127947b1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema
derivation condition `sort_optimised.has(name) || full_union.has(name)`
was tautologically true for every iterated field — every `field` came
from `full_union_schema`, so the second disjunct was always satisfied
and the intended "drop all-null sort fields" branch never fired.

Pass the sort union schema in explicitly so we can tell sort fields
apart from body fields. Sort field present in `sort_union_schema` →
keep only if `optimize_output_batch` kept it (not all-null for this
output's rows). Body field → keep unconditionally; tracking per-output
body-col presence would require pre-reading every body column for
every output, which is the column-chunk-bounded buffering the
streaming path exists to avoid.

Regression test `test_derive_output_schema_drops_all_null_sort_field`
calls the helper directly with a synthetic union + sort-optimised
pair and asserts an all-null sort field is dropped while a body
field with the same union-schema position is preserved. Verified
the test fails against the pre-fix logic with the expected
`['metric_name', 'env', 'timestamp_secs', 'value']` vs
`['metric_name', 'timestamp_secs', 'value']` mismatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

https://github.com/quickwit-oss/quickwit/blob/1b80d47f2320c95963d569f35e3238e6848a848d/quickwit-parquet-engine/src/merge/streaming.rs#L1072-L1076
P2 Badge Use the full union schema when driving column writes

When different outputs drop different all-null sort fields, selecting one per-output schema as the parent can omit fields that are still present in another output. For example, if one output keeps env and drops region while another keeps region and drops env with the same field count, the first schema wins here, region is never visited, and the second writer either misses that column or writes subsequent columns into the wrong next slot. Drive the column loop from the full union schema instead of a single output schema.


https://github.com/quickwit-oss/quickwit/blob/1b80d47f2320c95963d569f35e3238e6848a848d/quickwit-parquet-engine/src/merge/streaming.rs#L1037-L1041
P2 Badge Preserve service tags when service is a sort column

For sort schemas that include service (e.g. metric_name|service|...), this branch writes service through the sort-column path, so it never reaches write_body_col_for_all_outputs where track_service populates service_names_per_output; finalize_output_writer then emits no TAG_SERVICE low-cardinality metadata even though the column is present. Collect service names from the sort-column data as well, or derive them from the per-output sort batch.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

g-talbot and others added 3 commits May 12, 2026 15:08
Bundle three pieces:

- **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in
  the streaming engine's doc-comments and error messages with neutral
  "sort-cols-first storage ordering" / "column ordering" phrasing.
  Project is Quickwit; the internal column-ordering scheme didn't
  need a separate brand in user-visible error strings.

- **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup
  in `drain_sort_cols_one_input` is guarded by a `contains_key`
  check; promote the implicit invariant to a documented panic
  message per CODE_STYLE.md.

- **`align_inputs_to_union_schema` nullability fix.** The first-sight
  branch unconditionally marked new fields nullable; the existing
  comment claims "columns that don't appear in every input must be
  nullable" but the code applied that rule to every field. Replaced
  with a two-pass scheme: track `any_nullable` and `appears_in` per
  field across all inputs, then mark nullable iff some input had it
  nullable OR the field is missing from some input. This unblocks
  `List<Float64>` columns end-to-end (the writer rejects nullable
  List; the previous behaviour forced every list column nullable on
  first sight even when every input declared it non-null).

- **MC-2 round-trip integration test.** New
  `test_mc2_all_types_round_trip_through_streaming_merge` builds two
  inputs covering every parquet physical type the decoder accepts —
  Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8,
  Dictionary<Int32, Utf8>, LargeBinary, and non-nullable
  `List<Float64>` — merges them through the streaming engine, and
  asserts every `(sorted_series_key → body-col tuple)` pair survives
  byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→
  Binary) are normalised in the render helper because MC-2 promises
  value preservation, not internal representation preservation.

  This test caught two real bugs while being written:
  1. Body cols must be declared in lexicographic order — the streaming
     engine assumes the storage convention and crashes mid-merge if
     they aren't. Fixture re-ordered accordingly. (Worth adding
     upfront validation in a follow-up; not in scope here.)
  2. The schema-union nullability bug above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN
that records the maximum length any input's `body_col_page_cache`
ever reached during the current merge, bumped on every page push in
`advance_decoder_to_row`. Zero production overhead — the `record_*`
helper compiles to a no-op outside test builds.

New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size`
runs the streaming merge over three input sizes (300 / 3 000 / 30 000
rows at 50-row pages) and asserts:
  1. Peak resident pages stays below a fixed ceiling (24, for the
     ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a
     few-page slack for decoder lookahead + transients).
  2. Growth from 3 000 to 30 000 rows (10× more input pages) yields
     at most a 2-page increase in peak. The whole MS-7 claim is that
     peak does not scale with input size.

Verified the test catches a deliberate regression: removing the per-
output-page eviction loop in `assemble_one_output_page` pushed the
3 000-row peak to 60 (60 > 24) and the test failed with the expected
message.

Fixture support: `write_input_parquet_with_small_pages` now also
sets `write_batch_size` and `data_page_size` proportional to the
requested page row count. Without those, the arrow writer's defaults
(64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently
ignored and produced one giant page per column. Probed the output
via `get_column_page_reader` — 30 000 rows now produces 600 pages
per col as expected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…m sort col

Address two new Codex P2 findings on PR-6b.2 (#6409):

- **Use the full union schema when driving column writes.** The old
  `build_parent_union_schema` picked one per-output schema by field
  count and used it as the column-iteration driver. If two outputs
  drop *different* all-null sort fields and end up with the same
  field count, the first wins — and any column it dropped is never
  iterated, leaving the other output's writer missing a column or
  writing subsequent columns into the wrong slot. The doc comment
  already said "process the FULL union schema's cols in order"; the
  implementation diverged. Drive `write_all_columns` from
  `full_union_schema` directly and delete the broken heuristic.

- **Collect service names from the sort-col path too.** If the sort
  schema places `service` in the sort key
  (`metric_name|service|...`), the streaming engine writes it via
  the sort-col path and the body-col `track_service` branch never
  runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came
  back empty even though every row had a service value. Extract
  service names from `static_meta.sort_optimised` at
  `finalize_output_writer` time so the TAG_SERVICE metadata is
  accurate regardless of which write path the column took.

Two regression tests:
- `test_heterogeneous_dropped_fields_drive_from_full_union_schema`
  builds two inputs whose per-output schemas drop different all-null
  sort fields with the same field count. Each kept tag must survive
  to its output. Verified the test fails (panic on missing column)
  against the pre-fix logic.
- `test_service_as_sort_column_still_populates_low_cardinality_tags`
  uses a sort schema `metric_name|service|-timestamp_secs/V2` and
  asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers
  every distinct service value. Verified the test fails against
  pre-fix `finalize_output_writer` with the expected "must contain
  TAG_SERVICE" message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

Re: review #6409 (review) — these two findings landed as review-body Markdown links rather than inline comments, so I can't thread-reply individually. Addressing both here.

P2 — Use the full union schema when driving column writes (build_parent_union_schema site). Confirmed and fixed in 61163cd. You diagnosed this precisely: the old heuristic picked one per-output schema by field count, and two outputs that drop different all-null sort fields with the same count would silently lose a column. The doc comment immediately above build_parent_union_schema's call site already said "process the FULL union schema's cols in order"; the implementation diverged. Fix is to thread full_union_schema into write_all_columns and drive iteration from it directly. build_parent_union_schema deleted.

Regression test test_heterogeneous_dropped_fields_drive_from_full_union_schema builds two inputs whose per-output schemas drop tag_a and tag_b respectively with matching field counts. Both kept tag values must survive to their output. Verified the test fails (panic on missing column) against the pre-fix logic before passing.

P2 — Preserve service tags when service is a sort column. Confirmed and fixed in 61163cd (and in PR-6c.2 / #6410 at d93f8e6 via the equivalent code path). You're right: track_service = col_name == "service" lives inside write_body_col_for_all_outputs, so a sort schema metric_name|service|... writes service via the sort-col path and the body-col collection never runs. finalize_output_writer then emits empty TAG_SERVICE metadata.

Fix: at finalize time, check if static_meta.sort_optimised.schema() contains a service column; if so, call collect_service_names_from_page on it and merge into service_names before constructing low_cardinality_tags. Reuses the existing per-page string-extraction helper so Utf8 / LargeUtf8 / Dictionary<Int*, Utf8> all work uniformly.

Regression test test_service_as_sort_column_still_populates_low_cardinality_tags uses sort schema metric_name|service|-timestamp_secs/V2, three distinct service values across 30 rows, and asserts outputs[0].low_cardinality_tags[TAG_SERVICE] covers every distinct value. Verified the test fails against pre-fix finalize_output_writer with the expected "must contain TAG_SERVICE" message.

Both fixes pushed; PR-6b.2 at 461 tests / PR-6c.2 at 468 tests, clippy + nightly fmt clean.

@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

1 similar comment
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Another round soon, please!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@g-talbot g-talbot marked this pull request as ready for review May 13, 2026 01:12
@g-talbot g-talbot requested a review from a team as a code owner May 13, 2026 01:12
Copy link
Copy Markdown

@adamtobey adamtobey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this all makes sense to me. It does get a bit confusing in ways that I suspect could be factored better, but I think it's reasonable to not worry about that now (or maybe I'm wrong about that). Left some comments, but didn't find anything I'd consider an issue

let mut decoders_state = build_input_decoders_state(&mut inputs)?;

// Phase 0
let sort_col_batches =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand: we do have a hard requirement on column ordering across all files? Sort cols must be first, then all other cols must be in consistent order?

Comment on lines +439 to +442
window_start_secs: consensus_window_start.unwrap_or(None),
window_duration_secs: consensus_window_duration.unwrap_or(0),
num_merge_ops: max_merge_ops + 1,
rg_partition_prefix_len: consensus_prefix_len.unwrap_or(0),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we considering these truly optional? Or should we enforce that we do have them and do output them?

page.col_idx,
);
}
if page.rg_idx != 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this will be relaxed later if we can support metric-name rowgroups that have consistent ordering across all input files?

Comment on lines +695 to +696
per_input[run.input_index][input_row] = Some((out_idx, rows_per_output[out_idx]));
rows_per_output[out_idx] += 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rows_for_current_output could be a register then rows_per_output[out_idx] = rows_for_current_output after the loop?

Comment on lines +933 to +935
/// don't want to take in the page-bounded path. Re-introducing
/// dict-encoded output strings can be done later by tracking
/// cardinality during the streaming pass — call site is here.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable TODO. If we know things are dictionary encoded we can optimize for merging dictionaries and similar cases, better than un-encode -> interleave -> re-encode

}
};
let max_needed_row = *input_rows.iter().max().expect("non-empty");
advance_decoder_to_row(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this function correctly I think something like fill_page_cache_to_row would be a clearer name. My initial read on this was "why are we advancing the decoder without reading any result? Is there a case where we're skipping data?" but that's not what the function does as I understand it

} else {
let cache_start = pages[0].row_start;
let arrays: Vec<&dyn Array> = pages.iter().map(|p| p.array.as_ref()).collect();
let concatenated = arrow::compute::concat(&arrays).with_context(|| {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little hard to follow all of this, but is it correct to say that pages here is just double buffering the pages on the stream as it's read? It's not going to actually end up filling with all the pages in the column?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK actually it seems like the page cache will hold all the input pages for this column that contribute to the current output, dropped at the end of this function, and filled from advance_decoder_to_row?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants