From 9750b2607a4dba2f8896f54f2747453e58f60137 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 09:50:50 -0600 Subject: [PATCH 1/6] feat: add immediate mode option for native shuffle Add ImmediateModePartitioner that partitions incoming batches immediately using per-partition builders, flushing compressed IPC blocks when they reach target batch size. This reduces memory overhead compared to the buffered approach that stores all uncompressed rows before writing. Includes documentation and config option (spark.comet.exec.shuffle.partitionerMode). Default is buffered. --- .../scala/org/apache/comet/CometConf.scala | 12 + .../contributor-guide/native_shuffle.md | 98 +- docs/source/user-guide/latest/tuning.md | 11 + native/core/src/execution/planner.rs | 1 + native/proto/src/proto/operator.proto | 4 + native/shuffle/README.md | 34 +- native/shuffle/benches/shuffle_writer.rs | 1 + native/shuffle/src/bin/shuffle_bench.rs | 17 +- .../src/partitioners/immediate_mode.rs | 1089 +++++++++++++++++ native/shuffle/src/partitioners/mod.rs | 2 + native/shuffle/src/shuffle_writer.rs | 25 +- .../shuffle/CometNativeShuffleWriter.scala | 2 + 12 files changed, 1247 insertions(+), 49 deletions(-) create mode 100644 native/shuffle/src/partitioners/immediate_mode.rs diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..3c07ff6c2e 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -523,6 +523,18 @@ object CometConf extends ShimCometConf { "Should not be larger than batch size `spark.comet.batchSize`") .createWithDefault(8192) + val COMET_SHUFFLE_PARTITIONER_MODE: ConfigEntry[String] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.partitionerMode") + .category(CATEGORY_SHUFFLE) + .doc( + "The partitioner mode used by the native shuffle writer. " + + "'immediate' writes partitioned IPC blocks immediately as batches arrive, " + + "reducing memory usage. 'buffered' buffers all rows before writing, which may " + + "improve performance for small datasets but uses more memory.") + .stringConf + .checkValues(Set("immediate", "buffered")) + .createWithDefault("buffered") + val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize") .category(CATEGORY_SHUFFLE) diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md index 18e80a90c8..f2a59523a0 100644 --- a/docs/source/contributor-guide/native_shuffle.md +++ b/docs/source/contributor-guide/native_shuffle.md @@ -81,10 +81,18 @@ Native shuffle (`CometExchange`) is selected when all of the following condition └─────────────────────────────────────────────────────────────────────────────┘ │ │ ▼ ▼ -┌───────────────────────────────────┐ ┌───────────────────────────────────┐ -│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │ -│ (hash/range partitioning) │ │ (single partition case) │ -└───────────────────────────────────┘ └───────────────────────────────────┘ +┌───────────────────────────────────────────────────────────────────────┐ +│ Partitioner Selection │ +│ Controlled by spark.comet.exec.shuffle.partitionerMode │ +├───────────────────────────┬───────────────────────────────────────────┤ +│ immediate (default) │ buffered │ +│ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │ +│ (hash/range/round-robin) │ (hash/range/round-robin) │ +│ Writes IPC blocks as │ Buffers all rows in memory │ +│ batches arrive │ before writing │ +├───────────────────────────┴───────────────────────────────────────────┤ +│ SinglePartitionShufflePartitioner (single partition case) │ +└───────────────────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────────────────┐ @@ -113,11 +121,13 @@ Native shuffle (`CometExchange`) is selected when all of the following condition ### Rust Side -| File | Location | Description | -| ----------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ | -| `shuffle_writer.rs` | `native/core/src/execution/shuffle/` | `ShuffleWriterExec` plan and partitioners. Main shuffle logic. | -| `codec.rs` | `native/core/src/execution/shuffle/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | -| `comet_partitioning.rs` | `native/core/src/execution/shuffle/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | +| File | Location | Description | +| ----------------------- | ---------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | +| `shuffle_writer.rs` | `native/shuffle/src/` | `ShuffleWriterExec` plan. Selects partitioner based on `immediate_mode` flag. | +| `immediate_mode.rs` | `native/shuffle/src/partitioners/` | `ImmediateModePartitioner`. Scatter-writes rows into per-partition Arrow builders and flushes IPC blocks to in-memory buffers eagerly. | +| `multi_partition.rs` | `native/shuffle/src/partitioners/` | `MultiPartitionShuffleRepartitioner`. Buffers all rows in memory, then writes partitions. | +| `codec.rs` | `native/shuffle/src/` | `ShuffleBlockWriter` for Arrow IPC encoding with compression. Also handles decoding. | +| `comet_partitioning.rs` | `native/shuffle/src/` | `CometPartitioning` enum defining partition schemes (Hash, Range, Single). | ## Data Flow @@ -129,23 +139,33 @@ Native shuffle (`CometExchange`) is selected when all of the following condition 2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust. -3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner: - - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning - - `SinglePartitionShufflePartitioner`: For single partition (simpler path) +3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner + based on the `partitionerMode` configuration: + - **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning. + As each batch arrives, rows are scattered into per-partition Arrow array builders. When a + partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC + block to an in-memory buffer. Under memory pressure, these buffers are spilled to + per-partition temporary files. This keeps memory usage much lower than buffered mode since + data is encoded into compact IPC format eagerly rather than held as raw Arrow arrays. -4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure - exceeds the threshold, partitions spill to temporary files. + - **Buffered mode** (`MultiPartitionShuffleRepartitioner`): For hash/range/round-robin + partitioning. Buffers all input `RecordBatch`es in memory, then partitions and writes + them in a single pass. When memory pressure exceeds the threshold, partitions spill to + temporary files. -5. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: + - `SinglePartitionShufflePartitioner`: For single partition (simpler path, used regardless + of partitioner mode). + +4. **Encoding**: `ShuffleBlockWriter` encodes each partition's data as compressed Arrow IPC: - Writes compression type header - Writes field count header - Writes compressed IPC stream -6. **Output files**: Two files are produced: +5. **Output files**: Two files are produced: - **Data file**: Concatenated partition data - **Index file**: Array of 8-byte little-endian offsets marking partition boundaries -7. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition +6. **Commit**: Back in JVM, `CometNativeShuffleWriter` reads the index file to get partition lengths and commits via Spark's `IndexShuffleBlockResolver`. ### Read Path @@ -201,10 +221,31 @@ sizes. ## Memory Management -Native shuffle uses DataFusion's memory management with spilling support: +Native shuffle uses DataFusion's memory management. The memory characteristics differ +between the two partitioner modes: + +### Immediate Mode + +Immediate mode keeps memory usage low by partitioning and encoding data eagerly as it arrives, +rather than buffering all input rows before writing: + +- **Per-partition builders**: Each partition has a set of Arrow array builders sized to the + target batch size. When a builder fills up, it is flushed as a compressed IPC block to an + in-memory buffer. +- **Memory footprint**: Proportional to `num_partitions × batch_size` for the builders, plus + the accumulated IPC buffers. This is typically much smaller than buffered mode since IPC + encoding is more compact than raw Arrow arrays. +- **Spilling**: When memory pressure is detected via DataFusion's `MemoryConsumer` trait, + partition builders are flushed and all IPC buffers are drained to per-partition temporary + files on disk. + +### Buffered Mode + +Buffered mode holds all input data in memory before writing: -- **Memory pool**: Tracks memory usage across the shuffle operation. -- **Spill threshold**: When buffered data exceeds the threshold, partitions spill to disk. +- **Buffered batches**: All incoming `RecordBatch`es are accumulated in a `Vec`. +- **Spill threshold**: When buffered data exceeds the memory threshold, partitions spill to + temporary files on disk. - **Per-partition spilling**: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output. - **Scratch space**: Reusable buffers for partition ID computation to reduce allocations. @@ -232,14 +273,15 @@ independently compressed, allowing parallel decompression during reads. ## Configuration -| Config | Default | Description | -| ------------------------------------------------- | ------- | ---------------------------------------- | -| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | -| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | -| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | -| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | -| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | -| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | +| Config | Default | Description | +| ------------------------------------------------- | ----------- | ------------------------------------------- | +| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | +| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | +| `spark.comet.exec.shuffle.partitionerMode` | `immediate` | Partitioner mode: `immediate` or `buffered` | +| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | +| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | +| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | +| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | ## Comparison with JVM Shuffle diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..c47fe0a644 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -144,6 +144,17 @@ Comet provides a fully native shuffle implementation, which generally provides t supports `HashPartitioning`, `RangePartitioning` and `SinglePartitioning` but currently only supports primitive type partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays. +Native shuffle has two partitioner modes, configured via +`spark.comet.exec.shuffle.partitionerMode`: + +- **`immediate`** (default): Writes partitioned Arrow IPC blocks to disk immediately as each batch + arrives. This mode uses less memory because it does not need to buffer the entire input before + writing. It is recommended for most workloads, especially large datasets. + +- **`buffered`**: Buffers all input rows in memory before partitioning and writing to disk. This + may improve performance for small datasets that fit in memory, but uses significantly more + memory. + #### Columnar (JVM) Shuffle Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ac35925ace..6206dd1b6b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1379,6 +1379,7 @@ impl PhysicalPlanner { writer.output_index_file.clone(), writer.tracing_enabled, write_buffer_size, + writer.immediate_mode, )?); Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index fb438b26a4..0402401825 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -294,6 +294,10 @@ message ShuffleWriter { // Size of the write buffer in bytes used when writing shuffle data to disk. // Larger values may improve write performance but use more memory. int32 write_buffer_size = 8; + // Whether to use immediate mode partitioner. When true, partitioned IPC blocks + // are written immediately as batches arrive. When false, rows are buffered + // before writing (the original behavior). + bool immediate_mode = 9; } message ParquetWriter { diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 0f53604fa3..7484cc1093 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -35,32 +35,32 @@ performance outside of Spark. It streams input data directly from Parquet files. cargo run --release --features shuffle-bench --bin shuffle_bench -- \ --input /data/tpch-sf100/lineitem/ \ --partitions 200 \ - --codec lz4 \ + --codec zstd --zstd-level 1 \ --hash-columns 0,3 ``` ### Options -| Option | Default | Description | -| --------------------- | -------------------------- | ------------------------------------------------------ | -| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | -| `--partitions` | `200` | Number of output shuffle partitions | -| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | -| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | -| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | -| `--zstd-level` | `1` | Zstd compression level (1–22) | -| `--batch-size` | `8192` | Batch size for reading Parquet data | -| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | -| `--write-buffer-size` | `1048576` | Write buffer size in bytes | -| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | -| `--iterations` | `1` | Number of timed iterations | -| `--warmup` | `0` | Number of warmup iterations before timing | -| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | +| Option | Default | Description | +| ------------------------ | -------------------------- | ------------------------------------------------------------ | +| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | +| `--partitions` | `200` | Number of output shuffle partitions | +| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | +| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | +| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--zstd-level` | `1` | Zstd compression level (1–22) | +| `--batch-size` | `8192` | Batch size for reading Parquet data | +| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | +| `--write-buffer-size` | `1048576` | Write buffer size in bytes | +| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | +| `--iterations` | `1` | Number of timed iterations | +| `--warmup` | `0` | Number of warmup iterations before timing | +| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | ### Profiling with flamegraph ```sh cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \ --input /data/tpch-sf100/lineitem/ \ - --partitions 200 --codec lz4 + --partitions 200 --codec zstd --zstd-level 1 ``` diff --git a/native/shuffle/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs index 27abd919fa..873e872adf 100644 --- a/native/shuffle/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec( "/tmp/index.out".to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap() } diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index bb8c2a0380..17999b482a 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -24,7 +24,7 @@ //! cargo run --release --bin shuffle_bench -- \ //! --input /data/tpch-sf100/lineitem/ \ //! --partitions 200 \ -//! --codec lz4 \ +//! --codec zstd --zstd-level 1 \ //! --hash-columns 0,3 //! ``` //! @@ -32,7 +32,7 @@ //! ```sh //! cargo flamegraph --release --bin shuffle_bench -- \ //! --input /data/tpch-sf100/lineitem/ \ -//! --partitions 200 --codec lz4 +//! --partitions 200 --codec zstd --zstd-level 1 //! ``` use arrow::datatypes::{DataType, SchemaRef}; @@ -79,7 +79,7 @@ struct Args { hash_columns: String, /// Compression codec: none, lz4, zstd, snappy - #[arg(long, default_value = "lz4")] + #[arg(long, default_value = "zstd")] codec: String, /// Zstd compression level (1-22) @@ -114,6 +114,11 @@ struct Args { /// Each task reads the same input and writes to its own output files. #[arg(long, default_value_t = 1)] concurrent_tasks: usize, + + /// Shuffle mode: 'immediate' writes IPC blocks per batch as they arrive, + /// 'buffered' buffers all rows before writing (original behavior). + #[arg(long, default_value = "immediate")] + mode: String, } fn main() { @@ -141,6 +146,7 @@ fn main() { println!("Partitioning: {}", args.partitioning); println!("Partitions: {}", args.partitions); println!("Codec: {:?}", codec); + println!("Mode: {}", args.mode); println!("Hash columns: {:?}", hash_col_indices); if let Some(mem_limit) = args.memory_limit { println!("Memory limit: {}", format_bytes(mem_limit)); @@ -413,6 +419,7 @@ fn run_shuffle_write( args.limit, data_file.to_string(), index_file.to_string(), + args.mode == "immediate", ) .await .unwrap(); @@ -436,6 +443,7 @@ async fn execute_shuffle_write( limit: usize, data_file: String, index_file: String, + immediate_mode: bool, ) -> datafusion::common::Result<(MetricsSet, MetricsSet)> { let config = SessionConfig::new().with_batch_size(batch_size); let mut runtime_builder = RuntimeEnvBuilder::new(); @@ -477,6 +485,7 @@ async fn execute_shuffle_write( index_file, false, write_buffer_size, + immediate_mode, ) .expect("Failed to create ShuffleWriterExec"); @@ -541,6 +550,7 @@ fn run_concurrent_shuffle_writes( let memory_limit = args.memory_limit; let write_buffer_size = args.write_buffer_size; let limit = args.limit; + let immediate_mode = args.mode == "immediate"; handles.push(tokio::spawn(async move { execute_shuffle_write( @@ -553,6 +563,7 @@ fn run_concurrent_shuffle_writes( limit, data_file, index_file, + immediate_mode, ) .await .unwrap() diff --git a/native/shuffle/src/partitioners/immediate_mode.rs b/native/shuffle/src/partitioners/immediate_mode.rs new file mode 100644 index 0000000000..ae039d16ef --- /dev/null +++ b/native/shuffle/src/partitioners/immediate_mode.rs @@ -0,0 +1,1089 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::{comet_partitioning, CometPartitioning, CompressionCodec}; +use arrow::array::builder::{ + make_builder, ArrayBuilder, BinaryBuilder, BinaryViewBuilder, BooleanBuilder, + LargeBinaryBuilder, LargeStringBuilder, NullBuilder, PrimitiveBuilder, StringBuilder, + StringViewBuilder, +}; +use arrow::array::{ + Array, ArrayRef, AsArray, BinaryViewArray, RecordBatch, StringViewArray, UInt32Array, +}; +use arrow::compute::take; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, +}; +use arrow::ipc::writer::StreamWriter; +use datafusion::common::{DataFusionError, Result}; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryLimit, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +macro_rules! scatter_byte_array { + ($builder:expr, $source:expr, $indices:expr, $offset_type:ty, $builder_type:ty, $cast:ident) => {{ + let src = $source.$cast::<$offset_type>(); + let dst = $builder + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +macro_rules! scatter_byte_view { + ($builder:expr, $source:expr, $indices:expr, $array_type:ty, $builder_type:ty) => {{ + let src = $source + .as_any() + .downcast_ref::<$array_type>() + .expect("array type mismatch"); + let dst = $builder + .as_any_mut() + .downcast_mut::<$builder_type>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +macro_rules! scatter_primitive { + ($builder:expr, $source:expr, $indices:expr, $arrow_type:ty) => {{ + let src = $source.as_primitive::<$arrow_type>(); + let dst = $builder + .as_any_mut() + .downcast_mut::>() + .expect("builder type mismatch"); + if src.null_count() == 0 { + for &idx in $indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in $indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + }}; +} + +/// Scatter-append selected rows from `source` into `builder`. +fn scatter_append( + builder: &mut dyn ArrayBuilder, + source: &dyn Array, + indices: &[usize], +) -> Result<()> { + use DataType::*; + match source.data_type() { + Boolean => { + let src = source.as_boolean(); + let dst = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + if src.null_count() == 0 { + for &idx in indices { + dst.append_value(src.value(idx)); + } + } else { + for &idx in indices { + dst.append_option(src.is_valid(idx).then(|| src.value(idx))); + } + } + } + Int8 => scatter_primitive!(builder, source, indices, Int8Type), + Int16 => scatter_primitive!(builder, source, indices, Int16Type), + Int32 => scatter_primitive!(builder, source, indices, Int32Type), + Int64 => scatter_primitive!(builder, source, indices, Int64Type), + UInt8 => scatter_primitive!(builder, source, indices, UInt8Type), + UInt16 => scatter_primitive!(builder, source, indices, UInt16Type), + UInt32 => scatter_primitive!(builder, source, indices, UInt32Type), + UInt64 => scatter_primitive!(builder, source, indices, UInt64Type), + Float32 => scatter_primitive!(builder, source, indices, Float32Type), + Float64 => scatter_primitive!(builder, source, indices, Float64Type), + Date32 => scatter_primitive!(builder, source, indices, Date32Type), + Date64 => scatter_primitive!(builder, source, indices, Date64Type), + Timestamp(TimeUnit::Second, _) => { + scatter_primitive!(builder, source, indices, TimestampSecondType) + } + Timestamp(TimeUnit::Millisecond, _) => { + scatter_primitive!(builder, source, indices, TimestampMillisecondType) + } + Timestamp(TimeUnit::Microsecond, _) => { + scatter_primitive!(builder, source, indices, TimestampMicrosecondType) + } + Timestamp(TimeUnit::Nanosecond, _) => { + scatter_primitive!(builder, source, indices, TimestampNanosecondType) + } + Decimal128(_, _) => scatter_primitive!(builder, source, indices, Decimal128Type), + Decimal256(_, _) => scatter_primitive!(builder, source, indices, Decimal256Type), + Utf8 => scatter_byte_array!(builder, source, indices, i32, StringBuilder, as_string), + LargeUtf8 => { + scatter_byte_array!(builder, source, indices, i64, LargeStringBuilder, as_string) + } + Binary => scatter_byte_array!(builder, source, indices, i32, BinaryBuilder, as_binary), + LargeBinary => { + scatter_byte_array!(builder, source, indices, i64, LargeBinaryBuilder, as_binary) + } + Utf8View => { + scatter_byte_view!(builder, source, indices, StringViewArray, StringViewBuilder) + } + BinaryView => { + scatter_byte_view!(builder, source, indices, BinaryViewArray, BinaryViewBuilder) + } + Null => { + let dst = builder.as_any_mut().downcast_mut::().unwrap(); + dst.append_nulls(indices.len()); + } + dt => { + return Err(DataFusionError::NotImplemented(format!( + "Scatter append not implemented for {dt}" + ))); + } + } + Ok(()) +} + +/// Per-column strategy: scatter-write via builder for primitive/string types, +/// or accumulate taken sub-arrays for complex types (List, Map, Struct, etc.). +enum ColumnBuffer { + /// Fast path: direct scatter into a pre-allocated builder. + Builder(Box), + /// Fallback for complex types: accumulate `take`-produced sub-arrays, + /// concatenate at flush time. + Accumulator(Vec), +} + +/// Returns true if `scatter_append` can handle this data type directly. +fn has_scatter_support(dt: &DataType) -> bool { + use DataType::*; + matches!( + dt, + Boolean + | Int8 + | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float32 + | Float64 + | Date32 + | Date64 + | Timestamp(_, _) + | Decimal128(_, _) + | Decimal256(_, _) + | Utf8 + | LargeUtf8 + | Binary + | LargeBinary + | Utf8View + | BinaryView + | Null + ) +} + +struct PartitionBuffer { + columns: Vec, + schema: SchemaRef, + num_rows: usize, + target_batch_size: usize, +} + +impl PartitionBuffer { + fn new(schema: &SchemaRef, target_batch_size: usize) -> Self { + let columns = schema + .fields() + .iter() + .map(|f| { + if has_scatter_support(f.data_type()) { + ColumnBuffer::Builder(make_builder(f.data_type(), target_batch_size)) + } else { + ColumnBuffer::Accumulator(Vec::new()) + } + }) + .collect(); + Self { + columns, + schema: Arc::clone(schema), + num_rows: 0, + target_batch_size, + } + } + + fn is_full(&self) -> bool { + self.num_rows >= self.target_batch_size + } + + /// Finish all columns into a RecordBatch. Builders are reset (retaining + /// capacity); accumulators are concatenated and cleared. + fn flush(&mut self) -> Result { + let arrays: Vec = self + .columns + .iter_mut() + .map(|col| match col { + ColumnBuffer::Builder(b) => b.finish(), + ColumnBuffer::Accumulator(chunks) => { + let refs: Vec<&dyn Array> = chunks.iter().map(|a| a.as_ref()).collect(); + let result = arrow::compute::concat(&refs) + .expect("concat failed for accumulated arrays"); + chunks.clear(); + result + } + }) + .collect(); + let batch = RecordBatch::try_new(Arc::clone(&self.schema), arrays) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + self.num_rows = 0; + Ok(batch) + } + + fn has_data(&self) -> bool { + self.num_rows > 0 + } +} + +pub(crate) struct PartitionOutputStream { + schema: SchemaRef, + codec: CompressionCodec, + buffer: Vec, +} + +impl PartitionOutputStream { + pub(crate) fn try_new(schema: SchemaRef, codec: CompressionCodec) -> Result { + Ok(Self { + schema, + codec, + buffer: Vec::new(), + }) + } + + fn write_ipc_block(&mut self, batch: &RecordBatch) -> Result { + let start_pos = self.buffer.len(); + + self.buffer.extend_from_slice(&0u64.to_le_bytes()); + let field_count = self.schema.fields().len(); + self.buffer + .extend_from_slice(&(field_count as u64).to_le_bytes()); + let codec_tag: &[u8; 4] = match &self.codec { + CompressionCodec::Snappy => b"SNAP", + CompressionCodec::Lz4Frame => b"LZ4_", + CompressionCodec::Zstd(_) => b"ZSTD", + CompressionCodec::None => b"NONE", + }; + self.buffer.extend_from_slice(codec_tag); + + match &self.codec { + CompressionCodec::None => { + let mut w = StreamWriter::try_new(&mut self.buffer, &batch.schema())?; + w.write(batch)?; + w.finish()?; + w.into_inner()?; + } + CompressionCodec::Lz4Frame => { + let mut wtr = lz4_flex::frame::FrameEncoder::new(&mut self.buffer); + let mut w = StreamWriter::try_new(&mut wtr, &batch.schema())?; + w.write(batch)?; + w.finish()?; + wtr.finish().map_err(|e| { + DataFusionError::Execution(format!("lz4 compression error: {e}")) + })?; + } + CompressionCodec::Zstd(level) => { + let enc = zstd::Encoder::new(&mut self.buffer, *level)?; + let mut w = StreamWriter::try_new(enc, &batch.schema())?; + w.write(batch)?; + w.finish()?; + w.into_inner()?.finish()?; + } + CompressionCodec::Snappy => { + let mut wtr = snap::write::FrameEncoder::new(&mut self.buffer); + let mut w = StreamWriter::try_new(&mut wtr, &batch.schema())?; + w.write(batch)?; + w.finish()?; + wtr.into_inner().map_err(|e| { + DataFusionError::Execution(format!("snappy compression error: {e}")) + })?; + } + } + + let end_pos = self.buffer.len(); + let ipc_length = (end_pos - start_pos - 8) as u64; + if ipc_length > i32::MAX as u64 { + return Err(DataFusionError::Execution(format!( + "Shuffle block size {ipc_length} exceeds maximum size of {}", + i32::MAX + ))); + } + self.buffer[start_pos..start_pos + 8].copy_from_slice(&ipc_length.to_le_bytes()); + + Ok(end_pos - start_pos) + } + + fn drain_buffer(&mut self) -> Vec { + std::mem::take(&mut self.buffer) + } + + #[cfg(test)] + fn finish(self) -> Result> { + Ok(self.buffer) + } +} + +struct SpillFile { + _temp_file: datafusion::execution::disk_manager::RefCountedTempFile, + file: File, +} + +/// A partitioner that scatter-writes incoming rows directly into pre-allocated +/// per-partition column builders. When a partition's builders reach +/// `target_batch_size`, the batch is flushed to a compressed IPC block. +/// No intermediate sub-batches or coalescers are created. +pub(crate) struct ImmediateModePartitioner { + output_data_file: String, + output_index_file: String, + partition_buffers: Vec, + streams: Vec, + spill_files: Vec>, + partitioning: CometPartitioning, + runtime: Arc, + reservation: MemoryReservation, + metrics: ShufflePartitionerMetrics, + hashes_buf: Vec, + partition_ids: Vec, + /// Reusable per-partition row index scratch space. + partition_row_indices: Vec>, + /// Maximum bytes this partitioner will reserve from the memory pool. + /// Computed as memory_pool_size * memory_fraction at construction. + memory_limit: usize, +} + +impl ImmediateModePartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + ) -> Result { + let num_output_partitions = partitioning.partition_count(); + + let partition_buffers = (0..num_output_partitions) + .map(|_| PartitionBuffer::new(&schema, batch_size)) + .collect(); + + let streams = (0..num_output_partitions) + .map(|_| PartitionOutputStream::try_new(Arc::clone(&schema), codec.clone())) + .collect::>>()?; + + let spill_files: Vec> = + (0..num_output_partitions).map(|_| None).collect(); + + let hashes_buf = match &partitioning { + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0u32; batch_size] + } + _ => vec![], + }; + + let memory_limit = match runtime.memory_pool.memory_limit() { + MemoryLimit::Finite(pool_size) => pool_size, + _ => usize::MAX, + }; + + let reservation = MemoryConsumer::new(format!("ImmediateModePartitioner[{partition}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); + + let partition_row_indices = (0..num_output_partitions).map(|_| Vec::new()).collect(); + + Ok(Self { + output_data_file, + output_index_file, + partition_buffers, + streams, + spill_files, + partitioning, + runtime, + reservation, + metrics, + hashes_buf, + partition_ids: vec![0u32; batch_size], + partition_row_indices, + memory_limit, + }) + } + + fn compute_partition_ids(&mut self, batch: &RecordBatch) -> Result { + let num_rows = batch.num_rows(); + + // Ensure scratch buffers are large enough for this batch + if self.hashes_buf.len() < num_rows { + self.hashes_buf.resize(num_rows, 0); + } + if self.partition_ids.len() < num_rows { + self.partition_ids.resize(num_rows, 0); + } + + match &self.partitioning { + CometPartitioning::Hash(exprs, num_output_partitions) => { + let num_output_partitions = *num_output_partitions; + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(batch)?.into_array(num_rows)) + .collect::>>()?; + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&arrays, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, num_output_partitions) as u32; + } + Ok(num_output_partitions) + } + CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { + let num_output_partitions = *num_output_partitions; + let max_hash_columns = *max_hash_columns; + let num_columns_to_hash = if max_hash_columns == 0 { + batch.num_columns() + } else { + max_hash_columns.min(batch.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(batch.column(i))) + .collect(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + for (idx, hash) in hashes_buf.iter().enumerate() { + partition_ids[idx] = + comet_partitioning::pmod(*hash, num_output_partitions) as u32; + } + Ok(num_output_partitions) + } + CometPartitioning::RangePartitioning( + lex_ordering, + num_output_partitions, + row_converter, + bounds, + ) => { + let num_output_partitions = *num_output_partitions; + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(batch)?.into_array(num_rows)) + .collect::>>()?; + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut self.partition_ids[..num_rows]; + for (row_idx, row) in row_batch.iter().enumerate() { + partition_ids[row_idx] = bounds + .as_slice() + .partition_point(|bound| bound.row() <= row) + as u32; + } + Ok(num_output_partitions) + } + other => Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))), + } + } + + /// Scatter-write rows from batch into per-partition builders, flushing + /// any partition that reaches target_batch_size. Returns + /// `(flushed_builder_bytes, ipc_bytes_written)`. + /// + /// Uses column-first iteration so each column's type dispatch happens once + /// per batch (num_columns times) rather than once per partition per column + /// (num_columns × num_partitions times). + fn repartition_batch(&mut self, batch: &RecordBatch) -> Result<(usize, usize)> { + let num_partitions = self.partition_buffers.len(); + let num_rows = batch.num_rows(); + + // Build per-partition row indices, reusing scratch vecs + for indices in self.partition_row_indices.iter_mut() { + indices.clear(); + } + for row_idx in 0..num_rows { + let pid = self.partition_ids[row_idx] as usize; + self.partition_row_indices[pid].push(row_idx); + } + + // Column-first scatter: resolve each column's type once, then + // scatter across all partitions with the same typed path. + for col_idx in 0..batch.num_columns() { + let source = batch.column(col_idx); + for pid in 0..num_partitions { + let indices = &self.partition_row_indices[pid]; + if indices.is_empty() { + continue; + } + match &mut self.partition_buffers[pid].columns[col_idx] { + ColumnBuffer::Builder(builder) => { + scatter_append(builder.as_mut(), source.as_ref(), indices)?; + } + ColumnBuffer::Accumulator(chunks) => { + let idx_array = + UInt32Array::from_iter_values(indices.iter().map(|&i| i as u32)); + let taken = take(source.as_ref(), &idx_array, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + chunks.push(taken); + } + } + } + } + + // Update row counts and flush full partitions + let mut flushed_builder_bytes = 0usize; + let mut ipc_bytes = 0usize; + for pid in 0..num_partitions { + let added = self.partition_row_indices[pid].len(); + if added == 0 { + continue; + } + self.partition_buffers[pid].num_rows += added; + if self.partition_buffers[pid].is_full() { + let (builder_bytes, written) = self.flush_partition(pid)?; + flushed_builder_bytes += builder_bytes; + ipc_bytes += written; + } + } + + Ok((flushed_builder_bytes, ipc_bytes)) + } + + /// Flush a partition's builders to an IPC block in its output stream. + /// Returns `(flushed_batch_memory, ipc_bytes_written)`. + fn flush_partition(&mut self, pid: usize) -> Result<(usize, usize)> { + let output_batch = self.partition_buffers[pid].flush()?; + let batch_mem = output_batch.get_array_memory_size(); + let mut encode_timer = self.metrics.encode_time.timer(); + let ipc_bytes = self.streams[pid].write_ipc_block(&output_batch)?; + encode_timer.stop(); + Ok((batch_mem, ipc_bytes)) + } + + /// Spill all partition IPC buffers to per-partition temp files. + fn spill_all(&mut self) -> Result<()> { + let mut spilled_bytes = 0usize; + + // Flush any partially-filled partition builders + for pid in 0..self.partition_buffers.len() { + if self.partition_buffers[pid].has_data() { + self.flush_partition(pid)?; + } + } + + // Drain IPC buffers to disk + for pid in 0..self.streams.len() { + let buf = self.streams[pid].drain_buffer(); + if buf.is_empty() { + continue; + } + + if self.spill_files[pid].is_none() { + let temp_file = self + .runtime + .disk_manager + .create_tmp_file(&format!("imm_shuffle_p{pid}"))?; + let path = temp_file.path().to_owned(); + let file = OpenOptions::new().append(true).open(&path).map_err(|e| { + DataFusionError::Execution(format!("Failed to open spill file: {e}")) + })?; + self.spill_files[pid] = Some(SpillFile { + _temp_file: temp_file, + file, + }); + } + + if let Some(spill) = &mut self.spill_files[pid] { + spill.file.write_all(&buf).map_err(|e| { + DataFusionError::Execution(format!("Failed to write spill: {e}")) + })?; + spilled_bytes += buf.len(); + } + } + + for spill in self.spill_files.iter_mut().flatten() { + spill.file.flush()?; + } + + self.reservation.free(); + if spilled_bytes > 0 { + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(spilled_bytes); + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for ImmediateModePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let start_time = Instant::now(); + + let batch_mem = batch.get_array_memory_size(); + self.metrics.data_size.add(batch_mem); + self.metrics.baseline.record_output(batch.num_rows()); + + let repart_start = Instant::now(); + self.compute_partition_ids(&batch)?; + self.metrics + .repart_time + .add_duration(repart_start.elapsed()); + + let (flushed_builder_bytes, ipc_growth) = self.repartition_batch(&batch)?; + let builder_growth = batch_mem; + + // Net memory change: data entered builders, some was flushed to IPC + let net_growth = (builder_growth + ipc_growth).saturating_sub(flushed_builder_bytes); + + if net_growth > 0 { + // Use our own memory limit rather than relying solely on the pool, + // since the pool doesn't see builder allocations directly. + if self.reservation.size() + net_growth > self.memory_limit + || self.reservation.try_grow(net_growth).is_err() + { + self.spill_all()?; + } + } + + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } + + fn shuffle_write(&mut self) -> Result<()> { + let start_time = Instant::now(); + let num_output_partitions = self.streams.len(); + let mut offsets = vec![0i64; num_output_partitions + 1]; + + let mut output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + #[allow(clippy::needless_range_loop)] + for pid in 0..num_output_partitions { + offsets[pid] = output_data.stream_position()? as i64; + + if let Some(spill) = &self.spill_files[pid] { + let path = spill._temp_file.path().to_owned(); + let spill_reader = File::open(&path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to open spill file for reading: {e}" + )) + })?; + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut &spill_reader, &mut output_data)?; + write_timer.stop(); + } + + if self.partition_buffers[pid].has_data() { + self.flush_partition(pid)?; + } + + let buf = self.streams[pid].drain_buffer(); + if !buf.is_empty() { + let mut write_timer = self.metrics.write_time.timer(); + output_data.write_all(&buf)?; + write_timer.stop(); + } + } + + for spill in self.spill_files.iter_mut() { + *spill = None; + } + + offsets[num_output_partitions] = output_data.stream_position()? as i64; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = BufWriter::new( + File::create(&self.output_index_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + for offset in &offsets { + output_index.write_all(&offset.to_le_bytes())?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::read_ipc_compressed; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::execution::memory_pool::GreedyMemoryPool; + use datafusion::execution::runtime_env::RuntimeEnvBuilder; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + + fn make_test_batch(values: &[i32]) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let array = Int32Array::from(values.to_vec()); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + + #[test] + fn test_scatter_append_primitives() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])); + let mut builder = make_builder(&DataType::Int32, 8); + scatter_append(builder.as_mut(), array.as_ref(), &[0, 2, 4]).unwrap(); + let result = builder.finish(); + let result = result.as_primitive::(); + assert_eq!(result.values().as_ref(), &[10, 30, 50]); + } + + #[test] + fn test_scatter_append_strings() { + let array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let mut builder = make_builder(&DataType::Utf8, 4); + scatter_append(builder.as_mut(), array.as_ref(), &[1, 3]).unwrap(); + let result = builder.finish(); + let result = result.as_string::(); + assert_eq!(result.value(0), "b"); + assert_eq!(result.value(1), "d"); + } + + #[test] + fn test_scatter_append_nulls() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); + let mut builder = make_builder(&DataType::Int32, 4); + scatter_append(builder.as_mut(), array.as_ref(), &[0, 1, 2]).unwrap(); + let result = builder.finish(); + let result = result.as_primitive::(); + assert!(result.is_valid(0)); + assert!(result.is_null(1)); + assert!(result.is_valid(2)); + } + + #[test] + fn test_partition_buffer_flush_reuse() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = make_test_batch(&[1, 2, 3, 4, 5]); + + let mut buf = PartitionBuffer::new(&schema, 3); + match &mut buf.columns[0] { + ColumnBuffer::Builder(b) => { + scatter_append(b.as_mut(), batch.column(0).as_ref(), &[0, 1, 2]).unwrap() + } + _ => panic!("expected Builder"), + } + buf.num_rows += 3; + assert!(buf.is_full()); + + let flushed = buf.flush().unwrap(); + assert_eq!(flushed.num_rows(), 3); + assert_eq!(buf.num_rows, 0); + + // Builders are reused after flush + match &mut buf.columns[0] { + ColumnBuffer::Builder(b) => { + scatter_append(b.as_mut(), batch.column(0).as_ref(), &[3, 4]).unwrap() + } + _ => panic!("expected Builder"), + } + buf.num_rows += 2; + assert_eq!(buf.num_rows, 2); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_partition_output_stream_write_and_read() { + let batch = make_test_batch(&[1, 2, 3, 4, 5]); + let schema = batch.schema(); + + for codec in [ + CompressionCodec::None, + CompressionCodec::Lz4Frame, + CompressionCodec::Zstd(1), + CompressionCodec::Snappy, + ] { + let mut stream = PartitionOutputStream::try_new(Arc::clone(&schema), codec).unwrap(); + stream.write_ipc_block(&batch).unwrap(); + + let buf = stream.finish().unwrap(); + assert!(!buf.is_empty()); + + let ipc_length = u64::from_le_bytes(buf[0..8].try_into().unwrap()) as usize; + assert!(ipc_length > 0); + + let block_end = 8 + ipc_length; + let ipc_data = &buf[16..block_end]; + let batch2 = read_ipc_compressed(ipc_data).unwrap(); + assert_eq!(batch2.num_rows(), 5); + } + } + + fn make_hash_partitioning(col_name: &str, num_partitions: usize) -> CometPartitioning { + use datafusion::physical_expr::expressions::Column; + let expr: Arc = + Arc::new(Column::new(col_name, 0)); + CometPartitioning::Hash(vec![expr], num_partitions) + } + + #[tokio::test] + async fn test_immediate_mode_partitioner_hash() { + let batch = make_test_batch(&[1, 2, 3, 4, 5, 6, 7, 8]); + let schema = batch.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path, + index_path, + schema, + make_hash_partitioning("a", 4), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + partitioner.insert_batch(batch).await.unwrap(); + + let total_rows: usize = partitioner + .partition_buffers + .iter() + .map(|b| b.num_rows) + .sum(); + assert_eq!(total_rows, 8); + } + + #[tokio::test] + async fn test_immediate_mode_shuffle_write() { + let batch1 = make_test_batch(&[1, 2, 3, 4, 5, 6]); + let batch2 = make_test_batch(&[7, 8, 9, 10, 11, 12]); + let schema = batch1.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 3; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + schema, + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + partitioner.insert_batch(batch1).await.unwrap(); + partitioner.insert_batch(batch2).await.unwrap(); + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + assert_eq!(index_data.len(), (num_partitions + 1) * 8); + + let first_offset = i64::from_le_bytes(index_data[0..8].try_into().unwrap()); + assert_eq!(first_offset, 0); + + let data_file_size = std::fs::metadata(&data_path).unwrap().len(); + let last_offset = i64::from_le_bytes( + index_data[num_partitions * 8..(num_partitions + 1) * 8] + .try_into() + .unwrap(), + ); + assert_eq!(last_offset as u64, data_file_size); + assert!(data_file_size > 0); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] // spill uses std::io::copy which triggers copy_file_range + async fn test_immediate_mode_spill() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 2; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(GreedyMemoryPool::new(256))) + .build() + .unwrap(), + ); + + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + Arc::clone(&schema), + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 8192, + CompressionCodec::None, + ) + .unwrap(); + + for i in 0..10 { + let values: Vec = ((i * 10)..((i + 1) * 10)).collect(); + let batch = make_test_batch(&values); + partitioner.insert_batch(batch).await.unwrap(); + } + + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + assert_eq!(index_data.len(), (num_partitions + 1) * 8); + + let data_file_size = std::fs::metadata(&data_path).unwrap().len(); + let last_offset = i64::from_le_bytes( + index_data[num_partitions * 8..(num_partitions + 1) * 8] + .try_into() + .unwrap(), + ); + assert_eq!(last_offset as u64, data_file_size); + assert!(data_file_size > 0); + } + + #[tokio::test] + async fn test_block_format_compatible_with_read_ipc_compressed() { + let batch = make_test_batch(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let schema = batch.schema(); + let dir = tempfile::tempdir().unwrap(); + let data_path = dir.path().join("data").to_str().unwrap().to_string(); + let index_path = dir.path().join("index").to_str().unwrap().to_string(); + + let num_partitions = 2; + let metrics = ShufflePartitionerMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap()); + + // Small target to trigger flush during insert + let mut partitioner = ImmediateModePartitioner::try_new( + 0, + data_path.clone(), + index_path.clone(), + Arc::clone(&schema), + make_hash_partitioning("a", num_partitions), + metrics, + runtime, + 4, + CompressionCodec::Lz4Frame, + ) + .unwrap(); + + partitioner.insert_batch(batch).await.unwrap(); + partitioner.shuffle_write().unwrap(); + + let index_data = std::fs::read(&index_path).unwrap(); + let mut offsets = Vec::new(); + for i in 0..=num_partitions { + let offset = i64::from_le_bytes(index_data[i * 8..(i + 1) * 8].try_into().unwrap()); + offsets.push(offset as usize); + } + + let data = std::fs::read(&data_path).unwrap(); + let mut total_rows = 0; + for pid in 0..num_partitions { + let (start, end) = (offsets[pid], offsets[pid + 1]); + if start == end { + continue; + } + let mut pos = start; + while pos < end { + let payload_len = + u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize; + assert!(payload_len > 0); + let block_end = pos + 8 + payload_len; + let ipc_data = &data[pos + 16..block_end]; + let decoded = read_ipc_compressed(ipc_data).unwrap(); + assert_eq!(decoded.num_columns(), 1); + assert!(decoded.num_rows() > 0); + let col = decoded + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..col.len() { + assert!((1..=10).contains(&col.value(i))); + } + total_rows += decoded.num_rows(); + pos = block_end; + } + assert_eq!(pos, end); + } + assert_eq!(total_rows, 10); + } +} diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index a0bc652b4b..4a15f09bdc 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -16,12 +16,14 @@ // under the License. mod empty_schema; +mod immediate_mode; mod multi_partition; mod partitioned_batch_iterator; mod single_partition; mod traits; pub(crate) use empty_schema::EmptySchemaShufflePartitioner; +pub(crate) use immediate_mode::ImmediateModePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index 4ac4fc287b..b8000c9840 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -19,7 +19,8 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ - EmptySchemaShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, + EmptySchemaShufflePartitioner, ImmediateModePartitioner, MultiPartitionShuffleRepartitioner, + ShufflePartitioner, SinglePartitionShufflePartitioner, }; use crate::{CometPartitioning, CompressionCodec}; @@ -68,6 +69,8 @@ pub struct ShuffleWriterExec { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// When true, use ImmediateModePartitioner instead of MultiPartitionShuffleRepartitioner + immediate_mode: bool, } impl ShuffleWriterExec { @@ -81,6 +84,7 @@ impl ShuffleWriterExec { output_index_file: String, tracing_enabled: bool, write_buffer_size: usize, + immediate_mode: bool, ) -> Result { let cache = Arc::new(PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -99,6 +103,7 @@ impl ShuffleWriterExec { codec, tracing_enabled, write_buffer_size, + immediate_mode, }) } } @@ -159,6 +164,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_index_file.clone(), self.tracing_enabled, self.write_buffer_size, + self.immediate_mode, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -186,6 +192,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.codec.clone(), self.tracing_enabled, self.write_buffer_size, + self.immediate_mode, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -206,6 +213,7 @@ async fn external_shuffle( codec: CompressionCodec, tracing_enabled: bool, write_buffer_size: usize, + immediate_mode: bool, ) -> Result { with_trace_async("external_shuffle", tracing_enabled, || async { let schema = input.schema(); @@ -233,6 +241,17 @@ async fn external_shuffle( write_buffer_size, )?) } + _ if immediate_mode => Box::new(ImmediateModePartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + )?), _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, @@ -473,6 +492,7 @@ mod test { "/tmp/index.out".to_string(), false, 1024 * 1024, // write_buffer_size: 1MB default + false, // immediate_mode ) .unwrap(); @@ -532,6 +552,7 @@ mod test { index_file.clone(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); @@ -736,6 +757,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); @@ -824,6 +846,7 @@ mod test { index_file.to_str().unwrap().to_string(), false, 1024 * 1024, + false, // immediate_mode ) .unwrap(); diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..96c140300b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -192,6 +192,8 @@ class CometNativeShuffleWriter[K, V]( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) shuffleWriterBuilder.setWriteBufferSize( CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) + shuffleWriterBuilder.setImmediateMode( + CometConf.COMET_SHUFFLE_PARTITIONER_MODE.get() == "immediate") outputPartitioning match { case p if isSinglePartitioning(p) => From c3dd88068df9e7f00ae1737b10d38a12dcd0fff4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 09:56:16 -0600 Subject: [PATCH 2/6] refactor: extract shared partition ID computation to utility module Address PR review feedback: - Extract hash and range partition ID assignment into shared partition_id module, removing duplication between immediate and buffered partitioners - Fix docs table to show buffered as default mode --- .../contributor-guide/native_shuffle.md | 2 +- .../src/partitioners/immediate_mode.rs | 20 ++------ native/shuffle/src/partitioners/mod.rs | 1 + .../src/partitioners/multi_partition.rs | 34 ++++++-------- .../shuffle/src/partitioners/partition_id.rs | 46 +++++++++++++++++++ 5 files changed, 67 insertions(+), 36 deletions(-) create mode 100644 native/shuffle/src/partitioners/partition_id.rs diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md index f2a59523a0..bd76537a34 100644 --- a/docs/source/contributor-guide/native_shuffle.md +++ b/docs/source/contributor-guide/native_shuffle.md @@ -85,7 +85,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition │ Partitioner Selection │ │ Controlled by spark.comet.exec.shuffle.partitionerMode │ ├───────────────────────────┬───────────────────────────────────────────┤ -│ immediate (default) │ buffered │ +│ immediate │ buffered (default) │ │ ImmediateModePartitioner │ MultiPartitionShuffleRepartitioner │ │ (hash/range/round-robin) │ (hash/range/round-robin) │ │ Writes IPC blocks as │ Buffers all rows in memory │ diff --git a/native/shuffle/src/partitioners/immediate_mode.rs b/native/shuffle/src/partitioners/immediate_mode.rs index ae039d16ef..a6070dffc3 100644 --- a/native/shuffle/src/partitioners/immediate_mode.rs +++ b/native/shuffle/src/partitioners/immediate_mode.rs @@ -16,8 +16,9 @@ // under the License. use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; use crate::partitioners::ShufflePartitioner; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec}; +use crate::{CometPartitioning, CompressionCodec}; use arrow::array::builder::{ make_builder, ArrayBuilder, BinaryBuilder, BinaryViewBuilder, BooleanBuilder, LargeBinaryBuilder, LargeStringBuilder, NullBuilder, PrimitiveBuilder, StringBuilder, @@ -479,10 +480,7 @@ impl ImmediateModePartitioner { hashes_buf.fill(42_u32); create_murmur3_hashes(&arrays, hashes_buf)?; let partition_ids = &mut self.partition_ids[..num_rows]; - for (idx, hash) in hashes_buf.iter().enumerate() { - partition_ids[idx] = - comet_partitioning::pmod(*hash, num_output_partitions) as u32; - } + assign_hash_partition_ids(hashes_buf, partition_ids, num_output_partitions); Ok(num_output_partitions) } CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { @@ -500,10 +498,7 @@ impl ImmediateModePartitioner { hashes_buf.fill(42_u32); create_murmur3_hashes(&columns_to_hash, hashes_buf)?; let partition_ids = &mut self.partition_ids[..num_rows]; - for (idx, hash) in hashes_buf.iter().enumerate() { - partition_ids[idx] = - comet_partitioning::pmod(*hash, num_output_partitions) as u32; - } + assign_hash_partition_ids(hashes_buf, partition_ids, num_output_partitions); Ok(num_output_partitions) } CometPartitioning::RangePartitioning( @@ -519,12 +514,7 @@ impl ImmediateModePartitioner { .collect::>>()?; let row_batch = row_converter.convert_columns(arrays.as_slice())?; let partition_ids = &mut self.partition_ids[..num_rows]; - for (row_idx, row) in row_batch.iter().enumerate() { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32; - } + assign_range_partition_ids(&row_batch, partition_ids, bounds); Ok(num_output_partitions) } other => Err(DataFusionError::NotImplemented(format!( diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index 4a15f09bdc..a6fee92414 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -18,6 +18,7 @@ mod empty_schema; mod immediate_mode; mod multi_partition; +mod partition_id; mod partitioned_batch_iterator; mod single_partition; mod traits; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7de9314f54..99d1ffb2f9 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -19,9 +19,10 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; +use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; @@ -237,15 +238,13 @@ impl MultiPartitionShuffleRepartitioner { // Generate partition ids for every row. { - // Hash arrays and compute partition ids based on number of partitions. let partition_ids = &mut scratch.partition_ids[..num_rows]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); + create_murmur3_hashes(&arrays, hashes_buf)?; + assign_hash_partition_ids( + hashes_buf, + partition_ids, + *num_output_partitions, + ); } // We now have partition ids for every input row, map that to partition starts @@ -292,13 +291,7 @@ impl MultiPartitionShuffleRepartitioner { { let row_batch = row_converter.convert_columns(arrays.as_slice())?; let partition_ids = &mut scratch.partition_ids[..num_rows]; - - row_batch.iter().enumerate().for_each(|(row_idx, row)| { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32 - }); + assign_range_partition_ids(&row_batch, partition_ids, bounds); } // We now have partition ids for every input row, map that to partition starts @@ -356,10 +349,11 @@ impl MultiPartitionShuffleRepartitioner { // Assign partition IDs based on hash (same as hash partitioning) let partition_ids = &mut scratch.partition_ids[..num_rows]; - hashes_buf.iter().enumerate().for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); + assign_hash_partition_ids( + hashes_buf, + partition_ids, + *num_output_partitions, + ); // We now have partition ids for every input row, map that to partition starts // and partition indices to eventually write these rows to partition buffers. diff --git a/native/shuffle/src/partitioners/partition_id.rs b/native/shuffle/src/partitioners/partition_id.rs new file mode 100644 index 0000000000..0c3395254f --- /dev/null +++ b/native/shuffle/src/partitioners/partition_id.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared partition ID computation used by both immediate and buffered shuffle modes. + +use crate::comet_partitioning; +use arrow::row::Rows; + +/// Assign partition IDs from pre-computed hash values using Spark-compatible pmod. +pub(crate) fn assign_hash_partition_ids( + hashes: &[u32], + partition_ids: &mut [u32], + num_partitions: usize, +) { + for (idx, hash) in hashes.iter().enumerate() { + partition_ids[idx] = comet_partitioning::pmod(*hash, num_partitions) as u32; + } +} + +/// Assign partition IDs using binary search on range boundaries. +pub(crate) fn assign_range_partition_ids( + rows: &Rows, + partition_ids: &mut [u32], + bounds: &[arrow::row::OwnedRow], +) { + for (row_idx, row) in rows.iter().enumerate() { + partition_ids[row_idx] = bounds + .as_ref() + .partition_point(|bound| bound.row() <= row) + as u32; + } +} From 514ecdbc59261d22d1a1efa3887402c0e849139e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 09:57:13 -0600 Subject: [PATCH 3/6] chore: format README with prettier --- native/shuffle/README.md | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 7484cc1093..4ba5daebda 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -41,21 +41,21 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \ ### Options -| Option | Default | Description | -| ------------------------ | -------------------------- | ------------------------------------------------------------ | -| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | -| `--partitions` | `200` | Number of output shuffle partitions | -| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | -| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | -| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | -| `--zstd-level` | `1` | Zstd compression level (1–22) | -| `--batch-size` | `8192` | Batch size for reading Parquet data | -| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | -| `--write-buffer-size` | `1048576` | Write buffer size in bytes | -| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | -| `--iterations` | `1` | Number of timed iterations | -| `--warmup` | `0` | Number of warmup iterations before timing | -| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | +| Option | Default | Description | +| --------------------- | -------------------------- | ------------------------------------------------------ | +| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | +| `--partitions` | `200` | Number of output shuffle partitions | +| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | +| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | +| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--zstd-level` | `1` | Zstd compression level (1–22) | +| `--batch-size` | `8192` | Batch size for reading Parquet data | +| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | +| `--write-buffer-size` | `1048576` | Write buffer size in bytes | +| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | +| `--iterations` | `1` | Number of timed iterations | +| `--warmup` | `0` | Number of warmup iterations before timing | +| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | ### Profiling with flamegraph From 185794ac56939b8bee1dd2ceff45e9bb0f5c79ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 10:00:53 -0600 Subject: [PATCH 4/6] docs: fix tuning guide to accurately describe partitioner mode trade-offs --- docs/source/user-guide/latest/tuning.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index c47fe0a644..016f26ee65 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -147,13 +147,16 @@ partitioning keys. Columns that are not partitioning keys may contain complex ty Native shuffle has two partitioner modes, configured via `spark.comet.exec.shuffle.partitionerMode`: -- **`immediate`** (default): Writes partitioned Arrow IPC blocks to disk immediately as each batch - arrives. This mode uses less memory because it does not need to buffer the entire input before - writing. It is recommended for most workloads, especially large datasets. - -- **`buffered`**: Buffers all input rows in memory before partitioning and writing to disk. This - may improve performance for small datasets that fit in memory, but uses significantly more - memory. +- **`buffered`** (default): Buffers input batches in a shared memory pool before partitioning and + writing to disk. Per-partition overhead is minimal, so this mode scales well to large numbers + of partitions (1000+). It spills to disk when memory pressure is detected. + +- **`immediate`**: Partitions incoming batches immediately using per-partition Arrow array builders, + flushing compressed IPC blocks when they reach the target batch size. This avoids buffering the + entire input in memory, which can reduce peak memory usage for workloads with a moderate number + of partitions. However, because it allocates builders per partition (proportional to + `num_partitions × batch_size`), memory overhead grows with partition count. For workloads with + many partitions (1000+), `buffered` mode is recommended. #### Columnar (JVM) Shuffle From d36e7fff6c21e3502117b99442f265c43c620dd3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 10:04:09 -0600 Subject: [PATCH 5/6] docs: more accurately describe memory trade-offs between partitioner modes --- docs/source/user-guide/latest/tuning.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 016f26ee65..48ca4aa250 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -147,16 +147,16 @@ partitioning keys. Columns that are not partitioning keys may contain complex ty Native shuffle has two partitioner modes, configured via `spark.comet.exec.shuffle.partitionerMode`: -- **`buffered`** (default): Buffers input batches in a shared memory pool before partitioning and - writing to disk. Per-partition overhead is minimal, so this mode scales well to large numbers - of partitions (1000+). It spills to disk when memory pressure is detected. +- **`buffered`** (default): Buffers all input batches in memory, then uses `interleave` to produce + partitioned output one partition at a time. Only one partition's output batch is in memory at + a time during the write phase, so this mode scales well to large numbers of partitions (1000+). + The trade-off is that it must hold all input data in memory (or spill it) before writing begins. - **`immediate`**: Partitions incoming batches immediately using per-partition Arrow array builders, flushing compressed IPC blocks when they reach the target batch size. This avoids buffering the - entire input in memory, which can reduce peak memory usage for workloads with a moderate number - of partitions. However, because it allocates builders per partition (proportional to - `num_partitions × batch_size`), memory overhead grows with partition count. For workloads with - many partitions (1000+), `buffered` mode is recommended. + entire input in memory. However, because it maintains builders for all partitions simultaneously + (proportional to `num_partitions × batch_size × num_columns`), memory overhead grows with + partition count. For workloads with many partitions (1000+), `buffered` mode is recommended. #### Columnar (JVM) Shuffle From 1922ffdc0333c67c2c2ffe635794f00c5be7f305 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 8 Apr 2026 10:55:06 -0600 Subject: [PATCH 6/6] chore: cargo fmt --- native/shuffle/src/partitioners/multi_partition.rs | 8 ++------ native/shuffle/src/partitioners/partition_id.rs | 5 +---- native/shuffle/src/shuffle_writer.rs | 3 +-- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 99d1ffb2f9..a87eb0fa26 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -16,10 +16,10 @@ // under the License. use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; -use crate::partitioners::partition_id::{assign_hash_partition_ids, assign_range_partition_ids}; use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; @@ -349,11 +349,7 @@ impl MultiPartitionShuffleRepartitioner { // Assign partition IDs based on hash (same as hash partitioning) let partition_ids = &mut scratch.partition_ids[..num_rows]; - assign_hash_partition_ids( - hashes_buf, - partition_ids, - *num_output_partitions, - ); + assign_hash_partition_ids(hashes_buf, partition_ids, *num_output_partitions); // We now have partition ids for every input row, map that to partition starts // and partition indices to eventually write these rows to partition buffers. diff --git a/native/shuffle/src/partitioners/partition_id.rs b/native/shuffle/src/partitioners/partition_id.rs index 0c3395254f..a574a75daa 100644 --- a/native/shuffle/src/partitioners/partition_id.rs +++ b/native/shuffle/src/partitioners/partition_id.rs @@ -38,9 +38,6 @@ pub(crate) fn assign_range_partition_ids( bounds: &[arrow::row::OwnedRow], ) { for (row_idx, row) in rows.iter().enumerate() { - partition_ids[row_idx] = bounds - .as_ref() - .partition_point(|bound| bound.row() <= row) - as u32; + partition_ids[row_idx] = bounds.as_ref().partition_point(|bound| bound.row() <= row) as u32; } } diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index b8000c9840..5f8c8a6aab 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -20,8 +20,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ EmptySchemaShufflePartitioner, ImmediateModePartitioner, MultiPartitionShuffleRepartitioner, - ShufflePartitioner, - SinglePartitionShufflePartitioner, + ShufflePartitioner, SinglePartitionShufflePartitioner, }; use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait;