diff --git a/native/core/src/execution/shuffle/metrics.rs b/native/core/src/execution/shuffle/metrics.rs index 33b51c3cd8..af6edc3574 100644 --- a/native/core/src/execution/shuffle/metrics.rs +++ b/native/core/src/execution/shuffle/metrics.rs @@ -23,10 +23,16 @@ pub(super) struct ShufflePartitionerMetrics { /// metrics pub(super) baseline: BaselineMetrics, - /// Time to perform repartitioning + /// Time to perform repartitioning (computing partition IDs via hash/range) pub(super) repart_time: Time, - /// Time encoding batches to IPC format + /// Time in interleave_record_batch (gathering rows from buffered batches) + pub(super) gather_time: Time, + + /// Time in BatchCoalescer (merging small batches before serialization) + pub(super) coalesce_time: Time, + + /// Time encoding batches to IPC format (includes compression) pub(super) encode_time: Time, /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. @@ -50,6 +56,8 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + gather_time: MetricBuilder::new(metrics).subset_time("gather_time", partition), + coalesce_time: MetricBuilder::new(metrics).subset_time("coalesce_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 9c366ad462..2f27894a45 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -39,7 +39,7 @@ use std::fmt::{Debug, Formatter}; use std::fs::{File, OpenOptions}; use std::io::{BufReader, BufWriter, Seek, Write}; use std::sync::Arc; -use tokio::time::Instant; +use std::time::{Duration, Instant}; #[derive(Default)] struct ScratchSpace { @@ -435,15 +435,24 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } + #[allow(clippy::too_many_arguments)] fn shuffle_write_partition( partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, + gather_time: &Time, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result<()> { + // Snapshot inner metric values so we can compute gather_time by subtraction + let coalesce_before = coalesce_time.value(); + let encode_before = encode_time.value(); + let write_before = write_time.value(); + let total_start = Instant::now(); + let mut buf_batch_writer = BufBatchWriter::new( shuffle_block_writer, output_data, @@ -452,9 +461,18 @@ impl MultiPartitionShuffleRepartitioner { ); for batch in partition_iter { let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; + buf_batch_writer.write(&batch, coalesce_time, encode_time, write_time)?; } - buf_batch_writer.flush(encode_time, write_time)?; + buf_batch_writer.flush(coalesce_time, encode_time, write_time)?; + + // gather_time = total - coalesce - encode - write (avoids per-batch syscalls) + let total_nanos = total_start.elapsed().as_nanos() as usize; + let inner_nanos = (coalesce_time.value() - coalesce_before) + + (encode_time.value() - encode_before) + + (write_time.value() - write_before); + gather_time.add_duration(Duration::from_nanos( + total_nanos.saturating_sub(inner_nanos) as u64, + )); Ok(()) } @@ -595,6 +613,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut partition_iter, &mut self.shuffle_block_writer, &mut output_data, + &self.metrics.gather_time, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, self.write_buffer_size, diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs b/native/core/src/execution/shuffle/partitioners/single_partition.rs index eeca4458cc..1905b622e7 100644 --- a/native/core/src/execution/shuffle/partitioners/single_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs @@ -125,6 +125,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if let Some(batch) = concatenated_batch { self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; @@ -134,6 +135,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Write the new batch self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; @@ -162,12 +164,16 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if let Some(batch) = concatenated_batch { self.output_data_writer.write( &batch, + &self.metrics.coalesce_time, &self.metrics.encode_time, &self.metrics.write_time, )?; } - self.output_data_writer - .flush(&self.metrics.encode_time, &self.metrics.write_time)?; + self.output_data_writer.flush( + &self.metrics.coalesce_time, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; // Write index file. It should only contain 2 entries: 0 and the total number of bytes written let index_file = OpenOptions::new() diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fe1bf0fccf..ed4167988b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -616,6 +616,7 @@ mod test { .collect(); let codec = CompressionCodec::Lz4Frame; + let coalesce_time = Time::default(); let encode_time = Time::default(); let write_time = Time::default(); @@ -630,9 +631,13 @@ mod test { 8192, ); for batch in &small_batches { - buf_writer.write(batch, &encode_time, &write_time).unwrap(); + buf_writer + .write(batch, &coalesce_time, &encode_time, &write_time) + .unwrap(); } - buf_writer.flush(&encode_time, &write_time).unwrap(); + buf_writer + .flush(&coalesce_time, &encode_time, &write_time) + .unwrap(); } // Write without coalescing (batch_size=1) @@ -646,9 +651,13 @@ mod test { 1, ); for batch in &small_batches { - buf_writer.write(batch, &encode_time, &write_time).unwrap(); + buf_writer + .write(batch, &coalesce_time, &encode_time, &write_time) + .unwrap(); } - buf_writer.flush(&encode_time, &write_time).unwrap(); + buf_writer + .flush(&coalesce_time, &encode_time, &write_time) + .unwrap(); } // Coalesced output should be smaller due to fewer IPC schema blocks diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs index 8d056d7bb0..03a263f2c3 100644 --- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs +++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs @@ -61,9 +61,11 @@ impl, W: Write> BufBatchWriter { pub(crate) fn write( &mut self, batch: &RecordBatch, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, ) -> datafusion::common::Result { + let mut coalesce_timer = coalesce_time.timer(); let coalescer = self .coalescer .get_or_insert_with(|| BatchCoalescer::new(batch.schema(), self.batch_size)); @@ -75,6 +77,7 @@ impl, W: Write> BufBatchWriter { while let Some(batch) = coalescer.next_completed_batch() { completed.push(batch); } + coalesce_timer.stop(); let mut bytes_written = 0; for batch in &completed { @@ -108,10 +111,12 @@ impl, W: Write> BufBatchWriter { pub(crate) fn flush( &mut self, + coalesce_time: &Time, encode_time: &Time, write_time: &Time, ) -> datafusion::common::Result<()> { // Finish any remaining buffered rows in the coalescer + let mut coalesce_timer = coalesce_time.timer(); let mut remaining = Vec::new(); if let Some(coalescer) = &mut self.coalescer { coalescer.finish_buffered_batch()?; @@ -119,6 +124,7 @@ impl, W: Write> BufBatchWriter { remaining.push(batch); } } + coalesce_timer.stop(); for batch in &remaining { self.write_batch_to_buffer(batch, encode_time, write_time)?; } diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 7c2dbe0444..ce9c5f8d64 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -23,6 +23,7 @@ use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; +use std::time::{Duration, Instant}; struct SpillFile { temp_file: RefCountedTempFile, @@ -84,6 +85,12 @@ impl PartitionWriter { if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; + // Snapshot inner metric values so we can compute gather_time by subtraction + let coalesce_before = metrics.coalesce_time.value(); + let encode_before = metrics.encode_time.value(); + let write_before = metrics.write_time.value(); + let total_start = Instant::now(); + let total_bytes_written = { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, @@ -91,20 +98,37 @@ impl PartitionWriter { write_buffer_size, batch_size, ); - let mut bytes_written = - buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; + let mut bytes_written = buf_batch_writer.write( + &batch?, + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; for batch in iter { - let batch = batch?; bytes_written += buf_batch_writer.write( - &batch, + &batch?, + &metrics.coalesce_time, &metrics.encode_time, &metrics.write_time, )?; } - buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; + buf_batch_writer.flush( + &metrics.coalesce_time, + &metrics.encode_time, + &metrics.write_time, + )?; bytes_written }; + // gather_time = total - coalesce - encode - write (avoids per-batch syscalls) + let total_nanos = total_start.elapsed().as_nanos() as usize; + let inner_nanos = (metrics.coalesce_time.value() - coalesce_before) + + (metrics.encode_time.value() - encode_before) + + (metrics.write_time.value() - write_before); + metrics.gather_time.add_duration(Duration::from_nanos( + total_nanos.saturating_sub(inner_nanos) as u64, + )); + Ok(total_bytes_written) } else { Ok(0) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 8c75df1d45..a163a94745 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -248,6 +248,8 @@ object CometMetricNode { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), + "gather_time" -> SQLMetrics.createNanoTimingMetric(sc, "gather/interleave time"), + "coalesce_time" -> SQLMetrics.createNanoTimingMetric(sc, "batch coalescing time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"), "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"), diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..50cbf47dbd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -77,8 +77,10 @@ class CometNativeShuffleWriter[K, V]( val detailedMetrics = Seq( "elapsed_compute", - "encode_time", "repart_time", + "gather_time", + "coalesce_time", + "encode_time", "input_batches", "spill_count", "spilled_bytes")