perf: Replace Arrow IPC with more efficient shuffle format [WIP]#3733
Draft
andygrove wants to merge 33 commits intoapache:mainfrom
Draft
perf: Replace Arrow IPC with more efficient shuffle format [WIP]#3733andygrove wants to merge 33 commits intoapache:mainfrom
andygrove wants to merge 33 commits intoapache:mainfrom
Conversation
Adds a design document for bypassing Arrow FFI in the shuffle read path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks from JVM via CometShuffleBlockIterator and decodes them natively using read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing: - ClassCastException: factory closure incorrectly cast Partition to CometExecPartition before extracting ShuffledRowRDDPartition; the partition passed to the factory is already the unwrapped partition from the input RDD - NoSuchElementException in SQLShuffleReadMetricsReporter: metrics field in CometShuffledBatchRDD was not exposed as a val, causing Map.empty to be used instead of the real shuffle metrics map Add Scala integration test that runs a repartition+aggregate query with direct read enabled and disabled to verify result parity. Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value) - Make readAsRawStream() lazy instead of materializing all streams to a List - Remove pointless DirectByteBuffer re-allocation in close() - Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls foreign zstd functions that Miri cannot execute.
Replace Arrow IPC StreamWriter/StreamReader with a lightweight raw buffer format that writes Arrow ArrayData buffers directly. The new format has minimal per-block overhead (~16 bytes per column vs ~200-800 bytes for IPC schema flatbuffers). The outer block header (compressed_length + field_count) is unchanged for JVM compatibility. Key changes: - write_array_data: recursively serializes ArrayData (validity + buffers + children) - read_array_data: reconstructs ArrayData from raw buffers using known schema - Dictionary arrays are cast to value type before writing - read_shuffle_block replaces read_ipc_compressed (takes schema parameter) - read_ipc_compressed retained temporarily for callers not yet migrated
Use RecordBatch::try_new_with_options with explicit row_count instead of try_new so that zero-column batches (produced by Spark when query results are unused) do not fail with "must either specify a row count or at least one column".
CometColumnarShuffle was not setting outputAttributes on the CometShuffleDependency, leaving it as Seq.empty. This caused the shuffle reader to pass an empty schema to the native decodeShuffleBlock, resulting in "Output column count mismatch: expected N, got 0" errors.
The null bitmap in Arrow arrays can have a non-zero bit offset even when ArrayData.offset() is 0 (e.g. after RecordBatch::slice). The raw shuffle writer was copying the bitmap bytes verbatim, but the reader assumes bits start at offset 0. This caused shifted null bitmaps, corrupting data during shuffle and producing wrong query results (e.g. TPC-DS q6 counts off by 1). Fix by detecting non-zero bitmap offsets and emitting a re-aligned copy. Add a roundtrip test with sliced batches to cover this case.
Arrays from RecordBatch::slice can have non-zero offsets in both the ArrayData and the null bitmap. The raw shuffle format writes buffers verbatim assuming offset 0, causing data corruption when offsets are present. Use take() to produce zero-offset copies when needed, similar to prepare_output in jni_api.rs. This fixes TPC-DS q64 failures where the debug_assert fired and data mismatch errors from shifted null bitmaps.
- Replace take() with MutableArrayData::extend in normalize_array for zero-offset copies. MutableArrayData does a direct memcpy instead of building an index array and doing per-element lookups. - Cache parsed shuffle schema in thread-local storage. The schema bytes are identical for every decodeShuffleBlock call within a shuffle reader, avoiding repeated protobuf decode and Field/Schema allocation on every batch. - Change read_shuffle_block to accept Arc<Schema> instead of &Schema, eliminating a full Schema clone (all field names and types) on every batch decode.
Previously, dictionary arrays were cast to their value types before writing to the shuffle format. This expanded the data (e.g. 100 unique strings × 8192 rows became 8192 full string copies) and reduced compression effectiveness. Now dictionary arrays are written natively with a per-column tag byte indicating the encoding (plain=0, dictionary=1) and key type. The reader reconstructs DictionaryArrays from these tags. For the ShuffleScanExec path, dictionary arrays flow directly to DataFusion which handles them natively. For the JNI decodeShuffleBlock path, dictionary columns are cast to value types after decode since the JVM expects plain Arrow types. The cast cost is the same but the serialized data is much smaller, saving IO and compression time.
Two medium-priority optimizations: 1. Write uncompressed size as a u32 header before compressed data. The reader uses this to pre-allocate the decompression buffer to exact size, eliminating ~18 reallocations (doubling strategy) per 256KB block. For Zstd, use bulk decompress which is also faster than streaming. 2. Serialize raw batch to intermediate buffer first, then compress in one shot. This avoids creating a streaming compression encoder per batch (Zstd allocates ~128KB internal state per encoder). For Zstd, use bulk::compress which reuses internal context. Also batches many small write_all calls into a single buffer, reducing overhead through compression codec state machines.
LZ4 is the default shuffle compression codec. Switch from lz4_flex FrameEncoder/FrameDecoder (streaming, per-batch encoder allocation) to lz4_flex::compress/decompress (block-level, no encoder state). Combined with the uncompressed size header, the decompressor allocates exactly once to the right size.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Performance optimization for native shuffle.
Includes changes from #3731
I see 8% speedup in local TPC-H @ 100GB.
Rationale for this change
The Arrow IPC format used for Comet's native shuffle serialization includes overhead that is unnecessary for the shuffle use case: FlatBuffer schema metadata per batch, per-message headers, and dictionary encoding expansion. Since the shuffle writer and reader are both controlled by Comet, we can use a simpler raw buffer format tailored to the shuffle path.
What changes are included in this PR?
Raw buffer shuffle format
Replaces Arrow IPC with a lightweight raw buffer format for shuffle write/read. The format writes Arrow array buffers directly with minimal framing:
[block_length: u64][field_count: u64][codec_tag: 4B][uncompressed_len: u32][compressed_data][dict_tag: u8][key_type: u8 (if dict)][null_count: u32][bitmap_len: u32][bitmap][num_buffers: u32][buf_len + buf_data ...][num_children: u32][child_num_rows + recursive ...]Dictionary encoding preservation
Dictionary arrays (e.g. from string columns) are written natively instead of being cast to value types. A per-column tag byte indicates plain (0) or dictionary (1) encoding. On the ShuffleScanExec path, dictionary arrays flow directly to DataFusion. On the JNI decode path, dictionary columns are cast to value types after decode since the JVM expects plain Arrow types. This preserves the compact dictionary representation during serialization and compression.
Compression optimizations
lz4_flex::compress/decompressblock APIzstd::bulk::compress/decompressOther optimizations
decodeShuffleBlockJNI path (avoids repeated protobuf decode per batch)Arc<Schema>passed through read path to avoid cloning field names/types per batchMutableArrayData::extendfor zero-offset array normalization instead oftake()with index arrayRecordBatch::slice) and non-zero null bitmap offsets are properly handled via physical copy before serializationHow are these changes tested?