Skip to content

feat(streaming-merge): per-region engine + multi-output sorted_series splitting#6424

Open
g-talbot wants to merge 3 commits into
gtt/streaming-merge-engine-mergerfrom
gtt/streaming-merge-engine-multi-rg-1-engine
Open

feat(streaming-merge): per-region engine + multi-output sorted_series splitting#6424
g-talbot wants to merge 3 commits into
gtt/streaming-merge-engine-mergerfrom
gtt/streaming-merge-engine-multi-rg-1-engine

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Slice 1 of 3 from the former PR #6410. Stacked underneath #6425 (adapter) and #6426 (hardening).

The streaming merge engine's per-region foundation. Refactors the engine from "one big merge" to "one region per prefix-key value", and adds intra-region splitting at sorted_series transitions so prefix_len=0 multi-output cases honor the requested file count.

What's in here

  • New module merge/streaming/region_grouping.rs: extracts regions from input metadata by composite prefix key (BTreeMap-driven), validates MS-2 (region order matches each input's physical RG order), and verifies PA-3 uniqueness via assert_unique_rg_prefix_keys.
  • New module merge/streaming/body_assembler.rs: page-bounded body-col write driven from per-input page caches.
  • New module merge/streaming/output.rs: per-output writer + finalize that derives row_keys / zonemap / metric_names from the rows that landed in that output.
  • Refactor merge/streaming.rs from a single-region engine to a per-region processor with sub-region splitting.
  • Composite prefix key encoding: escape-encoded bytes (0x00 → 0x00 0x01, terminator 0x00 0x00) with bytewise complement for DESC columns, so BTreeMap iteration matches the declared sort order across any composite of leaf primitives.
  • Up-front rejection of multi-RG legacy inputs (prefix_len=0 + num_RGs>1) — those must go through the adapter (next slice).
  • Up-front rejection of duplicate composite prefix keys within a single input (05dfb).
  • split_region_at_sorted_series: when prefix_len=0 + num_outputs > 1, walks the merge order and splits at sorted_series transitions so even single-prefix inputs honor the file count. Single sorted_series runs are never broken.

Test plan

  • cargo test -p quickwit-parquet-engine --all-features — 476 unit tests pass at this slice's HEAD.
  • MS-2 rejection test: a file whose physical RG order disagrees with the BTreeMap-derived region order is rejected up-front, not allowed to crash mid-merge.
  • MS-7 page-cache bound test: peak resident pages stays ≤ small constant across 300/3000/30000-row fixtures.
  • MC-1 / MC-2 / MC-3 / MC-4 proptests on the regular merge path remain green.
  • Composite-key encoding tests for two byte-array cols, mixed ASC/DESC direction, length-variance.
  • PA-3 duplicate-prefix rejection test.

Out of scope

🤖 Generated with Claude Code

g-talbot and others added 3 commits May 13, 2026 11:21
…PR-6c.2)

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).

## Architecture

Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:

1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
   each RG's metric_name min stat (must equal max — verifies
   metric-alignment). Group RGs across inputs by prefix_key. Sort
   regions by prefix_key. For `prefix_len == 0` (single-RG inputs
   only, validated earlier), one region covers everything.

2. Assign regions to output files by cumulative row count. Caller's
   `num_outputs` preserved as the upper bound. Each output file gets
   a contiguous slice of the region list, so output files have
   non-overlapping key ranges.

3. Per-region processing: for each region, advance contributing
   inputs' decoders through their RGs (drain sort cols of that RG,
   then stream body cols via the existing page-bounded
   BodyColOutputPageAssembler). Each region becomes one output RG in
   the current writer; when the assignment moves to a new output
   file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).

## Single-RG-input restriction lifted

PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.

## Per-output schema simplification

PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.

## Tests

- All 9 PR-6b.2 tests still pass (single-RG input regression —
  behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
  feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
  cpu.usage, RG 1 = memory.used); the streaming engine accepts it
  and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
  test_legacy_multi_rg_input_rejected to reflect the new rejection
  scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
  is accepted).

10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.

## Follow-ups deferred

1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
   during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
   M:N control when callers want more outputs than regions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two P1 bugs flagged by Codex on PR-6c.2 (#6410):

1. **Duplicate input row groups silently dropped.** When one input
   contained two RGs with the same composite prefix key,
   `process_region` overwrote `sort_col_batches[input_idx]` while
   `Region::total_rows` still counted both — losing rows and
   misaligning the body-col / sort-col mapping. Now enforce
   at-most-one-RG-per-input-per-prefix as a strong invariant at three
   sites: the merge read path (`extract_regions_from_metadata`), the
   streaming merge output finalize, and the indexing writer
   (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
   The new `assert_unique_rg_prefix_keys` helper is shared.

2. **Byte-array prefix encoding broke lex order across lengths.**
   The 4-byte length prefix made `"b"` sort before `"aa"`, violating
   the declared ASC order. Switched to byte-stuffed escape encoding
   (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
   single-column lex order AND retains unambiguous concatenation for
   composite keys (the terminator is the smallest 2-byte sequence
   under escaping, so shorter values still sort before longer ones
   with the same prefix).

Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
  `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
  null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
  end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
  + the `write_to_file` and single-RG positive counterparts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…i-output

When inputs declare rg_partition_prefix_len = 0 (legacy single-RG)
and the caller asks for num_outputs > 1, the engine subdivides the
single region at sorted_series transitions in the merge order so it
can honor the output count. A single sorted_series run is never
broken; if one run exceeds the remaining budget the whole run lands
in one output anyway. The output inherits the input's
rg_partition_prefix_len (=0) — the engine does not synthesize a
prefix it can't unconditionally guarantee.

Also handles the giant-single-metric case (prefix_len=0, one
metric_name, num_outputs > 1): sorted_series transitions still
split the merge order even though there are no metric_name
transitions to drive a prefix synthesis.

Implementation:
- New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at
  sorted_series transitions when accumulated rows reach the target budget.
- Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the
  remaining budget, drain sort cols for the region, compute merge order, call
  split_region_at_sorted_series, process sub-regions.
- Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use
  them across sub-regions within the same top-level region. Resets between top-level regions
  (different RGs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant