Skip to content

perf: Replace Arrow IPC with more efficient shuffle format [WIP]#3733

Draft
andygrove wants to merge 33 commits intoapache:mainfrom
andygrove:shuffle-format
Draft

perf: Replace Arrow IPC with more efficient shuffle format [WIP]#3733
andygrove wants to merge 33 commits intoapache:mainfrom
andygrove:shuffle-format

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Mar 19, 2026

Which issue does this PR close?

Performance optimization for native shuffle.

Includes changes from #3731

I see 8% speedup in local TPC-H @ 100GB.

Rationale for this change

The Arrow IPC format used for Comet's native shuffle serialization includes overhead that is unnecessary for the shuffle use case: FlatBuffer schema metadata per batch, per-message headers, and dictionary encoding expansion. Since the shuffle writer and reader are both controlled by Comet, we can use a simpler raw buffer format tailored to the shuffle path.

What changes are included in this PR?

Raw buffer shuffle format

Replaces Arrow IPC with a lightweight raw buffer format for shuffle write/read. The format writes Arrow array buffers directly with minimal framing:

  • Per-batch: [block_length: u64][field_count: u64][codec_tag: 4B][uncompressed_len: u32][compressed_data]
  • Per-column: [dict_tag: u8][key_type: u8 (if dict)][null_count: u32][bitmap_len: u32][bitmap][num_buffers: u32][buf_len + buf_data ...][num_children: u32][child_num_rows + recursive ...]

Dictionary encoding preservation

Dictionary arrays (e.g. from string columns) are written natively instead of being cast to value types. A per-column tag byte indicates plain (0) or dictionary (1) encoding. On the ShuffleScanExec path, dictionary arrays flow directly to DataFusion. On the JNI decode path, dictionary columns are cast to value types after decode since the JVM expects plain Arrow types. This preserves the compact dictionary representation during serialization and compression.

Compression optimizations

  • All codecs use bulk/block-level compression instead of streaming encoders, avoiding per-batch encoder state allocation (~128KB for Zstd)
  • LZ4 (default): lz4_flex::compress/decompress block API
  • Zstd: zstd::bulk::compress/decompress
  • Uncompressed size written as header, enabling exact pre-allocation on the read side (eliminates ~18 reallocations per block)

Other optimizations

  • Thread-local cache for parsed shuffle schema in decodeShuffleBlock JNI path (avoids repeated protobuf decode per batch)
  • Arc<Schema> passed through read path to avoid cloning field names/types per batch
  • MutableArrayData::extend for zero-offset array normalization instead of take() with index array
  • Arrays with non-zero offsets (from RecordBatch::slice) and non-zero null bitmap offsets are properly handled via physical copy before serialization

How are these changes tested?

  • Roundtrip unit tests covering all data types: primitives, strings, binary, boolean, null, decimal, date, timestamp, nested types (List, Struct, Map), dictionary arrays, and sliced batches with non-zero offsets
  • Existing shuffle writer integration tests (multi-partition, spilling, coalescing)
  • TPC-DS query result verification (hash and sort_merge join variants)

Adds a design document for bypassing Arrow FFI in the shuffle read
path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks
from JVM via CometShuffleBlockIterator and decodes them natively using
read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called
externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing:
- ClassCastException: factory closure incorrectly cast Partition to
  CometExecPartition before extracting ShuffledRowRDDPartition; the
  partition passed to the factory is already the unwrapped partition
  from the input RDD
- NoSuchElementException in SQLShuffleReadMetricsReporter: metrics
  field in CometShuffledBatchRDD was not exposed as a val, causing
  Map.empty to be used instead of the real shuffle metrics map

Add Scala integration test that runs a repartition+aggregate query
with direct read enabled and disabled to verify result parity.
Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value)
- Make readAsRawStream() lazy instead of materializing all streams to a List
- Remove pointless DirectByteBuffer re-allocation in close()
- Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls
foreign zstd functions that Miri cannot execute.
Replace Arrow IPC StreamWriter/StreamReader with a lightweight raw buffer
format that writes Arrow ArrayData buffers directly. The new format has
minimal per-block overhead (~16 bytes per column vs ~200-800 bytes for IPC
schema flatbuffers). The outer block header (compressed_length + field_count)
is unchanged for JVM compatibility.

Key changes:
- write_array_data: recursively serializes ArrayData (validity + buffers + children)
- read_array_data: reconstructs ArrayData from raw buffers using known schema
- Dictionary arrays are cast to value type before writing
- read_shuffle_block replaces read_ipc_compressed (takes schema parameter)
- read_ipc_compressed retained temporarily for callers not yet migrated
@andygrove andygrove changed the title Shuffle format perf: Replace Arrow IPC with more efficient shuffle format Mar 19, 2026
Use RecordBatch::try_new_with_options with explicit row_count instead
of try_new so that zero-column batches (produced by Spark when query
results are unused) do not fail with "must either specify a row count
or at least one column".
CometColumnarShuffle was not setting outputAttributes on the
CometShuffleDependency, leaving it as Seq.empty. This caused
the shuffle reader to pass an empty schema to the native
decodeShuffleBlock, resulting in "Output column count mismatch:
expected N, got 0" errors.
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format perf: Replace Arrow IPC with more efficient shuffle format [WIP] Mar 19, 2026
The null bitmap in Arrow arrays can have a non-zero bit offset even
when ArrayData.offset() is 0 (e.g. after RecordBatch::slice). The
raw shuffle writer was copying the bitmap bytes verbatim, but the
reader assumes bits start at offset 0. This caused shifted null
bitmaps, corrupting data during shuffle and producing wrong query
results (e.g. TPC-DS q6 counts off by 1).

Fix by detecting non-zero bitmap offsets and emitting a re-aligned
copy. Add a roundtrip test with sliced batches to cover this case.
Arrays from RecordBatch::slice can have non-zero offsets in both
the ArrayData and the null bitmap. The raw shuffle format writes
buffers verbatim assuming offset 0, causing data corruption when
offsets are present.

Use take() to produce zero-offset copies when needed, similar to
prepare_output in jni_api.rs. This fixes TPC-DS q64 failures
where the debug_assert fired and data mismatch errors from
shifted null bitmaps.
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format [WIP] perf: Replace Arrow IPC with more efficient shuffle format [experimental] Mar 19, 2026
- Replace take() with MutableArrayData::extend in normalize_array
  for zero-offset copies. MutableArrayData does a direct memcpy
  instead of building an index array and doing per-element lookups.

- Cache parsed shuffle schema in thread-local storage. The schema
  bytes are identical for every decodeShuffleBlock call within a
  shuffle reader, avoiding repeated protobuf decode and Field/Schema
  allocation on every batch.

- Change read_shuffle_block to accept Arc<Schema> instead of &Schema,
  eliminating a full Schema clone (all field names and types) on
  every batch decode.
Previously, dictionary arrays were cast to their value types before
writing to the shuffle format. This expanded the data (e.g. 100
unique strings × 8192 rows became 8192 full string copies) and
reduced compression effectiveness.

Now dictionary arrays are written natively with a per-column tag
byte indicating the encoding (plain=0, dictionary=1) and key type.
The reader reconstructs DictionaryArrays from these tags.

For the ShuffleScanExec path, dictionary arrays flow directly to
DataFusion which handles them natively. For the JNI decodeShuffleBlock
path, dictionary columns are cast to value types after decode since
the JVM expects plain Arrow types. The cast cost is the same but
the serialized data is much smaller, saving IO and compression time.
Two medium-priority optimizations:

1. Write uncompressed size as a u32 header before compressed data.
   The reader uses this to pre-allocate the decompression buffer to
   exact size, eliminating ~18 reallocations (doubling strategy) per
   256KB block. For Zstd, use bulk decompress which is also faster
   than streaming.

2. Serialize raw batch to intermediate buffer first, then compress
   in one shot. This avoids creating a streaming compression encoder
   per batch (Zstd allocates ~128KB internal state per encoder).
   For Zstd, use bulk::compress which reuses internal context. Also
   batches many small write_all calls into a single buffer, reducing
   overhead through compression codec state machines.
LZ4 is the default shuffle compression codec. Switch from
lz4_flex FrameEncoder/FrameDecoder (streaming, per-batch encoder
allocation) to lz4_flex::compress/decompress (block-level, no
encoder state). Combined with the uncompressed size header, the
decompressor allocates exactly once to the right size.
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format [experimental] perf: Replace Arrow IPC with more efficient shuffle format Mar 19, 2026
@andygrove andygrove changed the title perf: Replace Arrow IPC with more efficient shuffle format perf: Replace Arrow IPC with more efficient shuffle format [WIP] Mar 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant