From a634a4133414c3edde073b7a1f7d4bc741a3af48 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 13:22:32 -0600 Subject: [PATCH 1/4] feat: add gather_time and coalesce_time metrics to shuffle write Pass coalesce_time to BufBatchWriter::write and flush calls across all callers. Add gather_time instrumentation around PartitionedBatchIterator iteration in both multi_partition shuffle_write_partition and partition_writer spill to measure interleave_record_batch time. --- native/core/src/execution/shuffle/metrics.rs | 12 +++++- .../shuffle/partitioners/multi_partition.rs | 21 +++++++-- .../shuffle/partitioners/single_partition.rs | 10 ++++- .../src/execution/shuffle/shuffle_writer.rs | 9 ++-- .../shuffle/writers/buf_batch_writer.rs | 6 +++ .../shuffle/writers/partition_writer.rs | 43 ++++++++++++++----- 6 files changed, 78 insertions(+), 23 deletions(-) diff --git a/native/core/src/execution/shuffle/metrics.rs b/native/core/src/execution/shuffle/metrics.rs index 33b51c3cd8..af6edc3574 100644 --- a/native/core/src/execution/shuffle/metrics.rs +++ b/native/core/src/execution/shuffle/metrics.rs @@ -23,10 +23,16 @@ pub(super) struct ShufflePartitionerMetrics { /// metrics pub(super) baseline: BaselineMetrics, - /// Time to perform repartitioning + /// Time to perform repartitioning (computing partition IDs via hash/range) pub(super) repart_time: Time, - /// Time encoding batches to IPC format + /// Time in interleave_record_batch (gathering rows from buffered batches) + pub(super) gather_time: Time, + + /// Time in BatchCoalescer (merging small batches before serialization) + pub(super) coalesce_time: Time, + + /// Time encoding batches to IPC format (includes compression) pub(super) encode_time: Time, /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. @@ -50,6 +56,8 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + gather_time: MetricBuilder::new(metrics).subset_time("gather_time", partition), + coalesce_time: MetricBuilder::new(metrics).subset_time("coalesce_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 9c366ad462..e7d3b081b7 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -439,6 +439,8 @@ impl MultiPartitionShuffleRepartitioner { partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, + gather_time: &Time, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, write_buffer_size: usize, @@ -450,11 +452,20 @@ impl MultiPartitionShuffleRepartitioner { write_buffer_size, batch_size, ); - for batch in partition_iter { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; + loop { + let gather_start = Instant::now(); + let maybe_batch = partition_iter.next(); + gather_time.add_duration(gather_start.elapsed()); + + match maybe_batch { + Some(batch) => { + let batch = batch?; + buf_batch_writer.write(&batch, coalesce_time, encode_time, write_time)?; + } + None => break, + } } - buf_batch_writer.flush(encode_time, write_time)?; + buf_batch_writer.flush(coalesce_time, encode_time, write_time)?; Ok(()) } @@ -595,6 +606,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut partition_iter, &mut self.shuffle_block_writer, &mut output_data, + &self.metrics.gather_time, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, self.write_buffer_size, diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs b/native/core/src/execution/shuffle/partitioners/single_partition.rs index eeca4458cc..1905b622e7 100644 --- a/native/core/src/execution/shuffle/partitioners/single_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs @@ -125,6 +125,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if let Some(batch) = concatenated_batch { self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; @@ -134,6 +135,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the new batch self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; @@ -162,12 +164,16 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if let Some(batch) = concatenated_batch { self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; } - self.output_data_writer - .flush(&self.metrics.encode_time, &self.metrics.write_time)?; + self.output_data_writer.flush( + &self.metrics.coalesce_time, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; // Write index file. It should only contain 2 entries: 0 and the total number of bytes written let index_file = OpenOptions::new() diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fe1bf0fccf..534c5ba434 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -616,6 +616,7 @@ mod test { .collect(); let codec = CompressionCodec::Lz4Frame; + let coalesce_time = Time::default(); let encode_time = Time::default(); let write_time = Time::default(); @@ -630,9 +631,9 @@ mod test { 8192, ); for batch in &small_batches { - buf_writer.write(batch, &encode_time, &write_time).unwrap(); + buf_writer.write(batch, &coalesce_time, &encode_time, &write_time).unwrap(); } - buf_writer.flush(&encode_time, &write_time).unwrap(); + buf_writer.flush(&coalesce_time, &encode_time, &write_time).unwrap(); } // Write without coalescing (batch_size=1) @@ -646,9 +647,9 @@ mod test { 1, ); for batch in &small_batches { - buf_writer.write(batch, &encode_time, &write_time).unwrap(); + buf_writer.write(batch, &coalesce_time, &encode_time, &write_time).unwrap(); } - buf_writer.flush(&encode_time, &write_time).unwrap(); + buf_writer.flush(&coalesce_time, &encode_time, &write_time).unwrap(); } // Coalesced output should be smaller due to fewer IPC schema blocks diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs index 8d056d7bb0..03a263f2c3 100644 --- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs +++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs @@ -61,9 +61,11 @@ impl, W: Write> BufBatchWriter { pub(crate) fn write( &mut self, batch: &RecordBatch, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, ) -> datafusion::common::Result { + let mut coalesce_timer = coalesce_time.timer(); let coalescer = self .coalescer .get_or_insert_with(|| BatchCoalescer::new(batch.schema(), self.batch_size)); @@ -75,6 +77,7 @@ impl, W: Write> BufBatchWriter { while let Some(batch) = coalescer.next_completed_batch() { completed.push(batch); } + coalesce_timer.stop(); let mut bytes_written = 0; for batch in &completed { @@ -108,10 +111,12 @@ impl, W: Write> BufBatchWriter { pub(crate) fn flush( &mut self, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, ) -> datafusion::common::Result<()> { // Finish any remaining buffered rows in the coalescer + let mut coalesce_timer = coalesce_time.timer(); let mut remaining = Vec::new(); if let Some(coalescer) = &mut self.coalescer { coalescer.finish_buffered_batch()?; @@ -119,6 +124,7 @@ impl, W: Write> BufBatchWriter { remaining.push(batch); } } + coalesce_timer.stop(); for batch in &remaining { self.write_batch_to_buffer(batch, encode_time, write_time)?; } diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 7c2dbe0444..2dbd4ee5ca 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -23,6 +23,7 @@ use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; +use tokio::time::Instant; struct SpillFile { temp_file: RefCountedTempFile, @@ -81,7 +82,11 @@ impl PartitionWriter { write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { + let gather_start = Instant::now(); + let first = iter.next(); + metrics.gather_time.add_duration(gather_start.elapsed()); + + if let Some(batch) = first { self.ensure_spill_file_created(runtime)?; let total_bytes_written = { @@ -91,17 +96,33 @@ impl PartitionWriter { write_buffer_size, batch_size, ); - let mut bytes_written = - buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { - let batch = batch?; - bytes_written += buf_batch_writer.write( - &batch, - &metrics.encode_time, - &metrics.write_time, - )?; + let mut bytes_written = buf_batch_writer.write( + &batch?, + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; + loop { + let gather_start = Instant::now(); + let maybe_batch = iter.next(); + metrics.gather_time.add_duration(gather_start.elapsed()); + match maybe_batch { + Some(batch) => { + bytes_written += buf_batch_writer.write( + &batch?, + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; + } + None => break, + } } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + buf_batch_writer.flush( + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; bytes_written }; From b76bdb876693530a01d3bac417e2d5d759d3febb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Mar 2026 13:27:06 -0600 Subject: [PATCH 2/4] feat: add gather_time and coalesce_time metrics to shuffle write profiling --- .../src/execution/shuffle/partitioners/multi_partition.rs | 1 + .../scala/org/apache/spark/sql/comet/CometMetricNode.scala | 2 ++ .../comet/execution/shuffle/CometNativeShuffleWriter.scala | 4 +++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index e7d3b081b7..49b0e1135a 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -435,6 +435,7 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } + #[allow(clippy::too_many_arguments)] fn shuffle_write_partition( partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 8c75df1d45..a163a94745 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -248,6 +248,8 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), + "gather_time" -> SQLMetrics.createNanoTimingMetric(sc, "gather/interleave time"), + "coalesce_time" -> SQLMetrics.createNanoTimingMetric(sc, "batch coalescing time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"), "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"), diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..50cbf47dbd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -77,8 +77,10 @@ class CometNativeShuffleWriter[K, V]( val detailedMetrics = Seq( "elapsed_compute", - "encode_time", "repart_time", + "gather_time", + "coalesce_time", + "encode_time", "input_batches", "spill_count", "spilled_bytes") From cef8b984dcf3e31f9cded9349f6bfa6bd16da4ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Mar 2026 09:08:36 -0600 Subject: [PATCH 3/4] fmt --- .../core/src/execution/shuffle/shuffle_writer.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 534c5ba434..ed4167988b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -631,9 +631,13 @@ mod test { 8192, ); for batch in &small_batches { - buf_writer.write(batch, &coalesce_time, &encode_time, &write_time).unwrap(); + buf_writer + .write(batch, &coalesce_time, &encode_time, &write_time) + .unwrap(); } - buf_writer.flush(&coalesce_time, &encode_time, &write_time).unwrap(); + buf_writer + .flush(&coalesce_time, &encode_time, &write_time) + .unwrap(); } // Write without coalescing (batch_size=1) @@ -647,9 +651,13 @@ mod test { 1, ); for batch in &small_batches { - buf_writer.write(batch, &coalesce_time, &encode_time, &write_time).unwrap(); + buf_writer + .write(batch, &coalesce_time, &encode_time, &write_time) + .unwrap(); } - buf_writer.flush(&coalesce_time, &encode_time, &write_time).unwrap(); + buf_writer + .flush(&coalesce_time, &encode_time, &write_time) + .unwrap(); } // Coalesced output should be smaller due to fewer IPC schema blocks From 14c68d97fccf1d54b7706dd2af5a34786ae42a85 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Mar 2026 09:22:43 -0600 Subject: [PATCH 4/4] refactor: compute gather_time by subtraction to avoid per-batch syscalls Instead of calling Instant::now() for every batch in the shuffle write loop, snapshot inner metric values before the loop and compute gather_time = total - coalesce - encode - write afterward. Also replace tokio::time::Instant with std::time::Instant. --- .../shuffle/partitioners/multi_partition.rs | 32 +++++++------ .../shuffle/writers/partition_writer.rs | 45 ++++++++++--------- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 49b0e1135a..2f27894a45 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -39,7 +39,7 @@ use std::fmt::{Debug, Formatter}; use std::fs::{File, OpenOptions}; use std::io::{BufReader, BufWriter, Seek, Write}; use std::sync::Arc; -use tokio::time::Instant; +use std::time::{Duration, Instant}; #[derive(Default)] struct ScratchSpace { @@ -447,26 +447,32 @@ impl MultiPartitionShuffleRepartitioner { write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result<()> { + // Snapshot inner metric values so we can compute gather_time by subtraction + let coalesce_before = coalesce_time.value(); + let encode_before = encode_time.value(); + let write_before = write_time.value(); + let total_start = Instant::now(); + let mut buf_batch_writer = BufBatchWriter::new( shuffle_block_writer, output_data, write_buffer_size, batch_size, ); - loop { - let gather_start = Instant::now(); - let maybe_batch = partition_iter.next(); - gather_time.add_duration(gather_start.elapsed()); - - match maybe_batch { - Some(batch) => { - let batch = batch?; - buf_batch_writer.write(&batch, coalesce_time, encode_time, write_time)?; - } - None => break, - } + for batch in partition_iter { + let batch = batch?; + buf_batch_writer.write(&batch, coalesce_time, encode_time, write_time)?; } buf_batch_writer.flush(coalesce_time, encode_time, write_time)?; + + // gather_time = total - coalesce - encode - write (avoids per-batch syscalls) + let total_nanos = total_start.elapsed().as_nanos() as usize; + let inner_nanos = (coalesce_time.value() - coalesce_before) + + (encode_time.value() - encode_before) + + (write_time.value() - write_before); + gather_time.add_duration(Duration::from_nanos( + total_nanos.saturating_sub(inner_nanos) as u64, + )); Ok(()) } diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 2dbd4ee5ca..ce9c5f8d64 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -23,7 +23,7 @@ use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; -use tokio::time::Instant; +use std::time::{Duration, Instant}; struct SpillFile { temp_file: RefCountedTempFile, @@ -82,13 +82,15 @@ impl PartitionWriter { write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - let gather_start = Instant::now(); - let first = iter.next(); - metrics.gather_time.add_duration(gather_start.elapsed()); - - if let Some(batch) = first { + if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; + // Snapshot inner metric values so we can compute gather_time by subtraction + let coalesce_before = metrics.coalesce_time.value(); + let encode_before = metrics.encode_time.value(); + let write_before = metrics.write_time.value(); + let total_start = Instant::now(); + let total_bytes_written = { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, @@ -102,21 +104,13 @@ impl PartitionWriter { &metrics.encode_time, &metrics.write_time, )?; - loop { - let gather_start = Instant::now(); - let maybe_batch = iter.next(); - metrics.gather_time.add_duration(gather_start.elapsed()); - match maybe_batch { - Some(batch) => { - bytes_written += buf_batch_writer.write( - &batch?, - &metrics.coalesce_time, - &metrics.encode_time, - &metrics.write_time, - )?; - } - None => break, - } + for batch in iter { + bytes_written += buf_batch_writer.write( + &batch?, + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; } buf_batch_writer.flush( &metrics.coalesce_time, @@ -126,6 +120,15 @@ impl PartitionWriter { bytes_written }; + // gather_time = total - coalesce - encode - write (avoids per-batch syscalls) + let total_nanos = total_start.elapsed().as_nanos() as usize; + let inner_nanos = (metrics.coalesce_time.value() - coalesce_before) + + (metrics.encode_time.value() - encode_before) + + (metrics.write_time.value() - write_before); + metrics.gather_time.add_duration(Duration::from_nanos( + total_nanos.saturating_sub(inner_nanos) as u64, + )); + Ok(total_bytes_written) } else { Ok(0)