Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions native/core/src/execution/shuffle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ pub(super) struct ShufflePartitionerMetrics {
/// metrics
pub(super) baseline: BaselineMetrics,

/// Time to perform repartitioning
/// Time to perform repartitioning (computing partition IDs via hash/range)
pub(super) repart_time: Time,

/// Time encoding batches to IPC format
/// Time in interleave_record_batch (gathering rows from buffered batches)
pub(super) gather_time: Time,

/// Time in BatchCoalescer (merging small batches before serialization)
pub(super) coalesce_time: Time,

/// Time encoding batches to IPC format (includes compression)
pub(super) encode_time: Time,

/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics.
Expand All @@ -50,6 +56,8 @@ impl ShufflePartitionerMetrics {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
gather_time: MetricBuilder::new(metrics).subset_time("gather_time", partition),
coalesce_time: MetricBuilder::new(metrics).subset_time("coalesce_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
Expand Down
26 changes: 23 additions & 3 deletions native/core/src/execution/shuffle/partitioners/multi_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::fmt::{Debug, Formatter};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Seek, Write};
use std::sync::Arc;
use tokio::time::Instant;
use std::time::{Duration, Instant};

#[derive(Default)]
struct ScratchSpace {
Expand Down Expand Up @@ -435,15 +435,24 @@ impl MultiPartitionShuffleRepartitioner {
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn shuffle_write_partition(
partition_iter: &mut PartitionedBatchIterator,
shuffle_block_writer: &mut ShuffleBlockWriter,
output_data: &mut BufWriter<File>,
gather_time: &Time,
coalesce_time: &Time,
encode_time: &Time,
write_time: &Time,
write_buffer_size: usize,
batch_size: usize,
) -> datafusion::common::Result<()> {
// Snapshot inner metric values so we can compute gather_time by subtraction
let coalesce_before = coalesce_time.value();
let encode_before = encode_time.value();
let write_before = write_time.value();
let total_start = Instant::now();

let mut buf_batch_writer = BufBatchWriter::new(
shuffle_block_writer,
output_data,
Expand All @@ -452,9 +461,18 @@ impl MultiPartitionShuffleRepartitioner {
);
for batch in partition_iter {
let batch = batch?;
buf_batch_writer.write(&batch, encode_time, write_time)?;
buf_batch_writer.write(&batch, coalesce_time, encode_time, write_time)?;
}
buf_batch_writer.flush(encode_time, write_time)?;
buf_batch_writer.flush(coalesce_time, encode_time, write_time)?;

// gather_time = total - coalesce - encode - write (avoids per-batch syscalls)
let total_nanos = total_start.elapsed().as_nanos() as usize;
let inner_nanos = (coalesce_time.value() - coalesce_before)
+ (encode_time.value() - encode_before)
+ (write_time.value() - write_before);
gather_time.add_duration(Duration::from_nanos(
total_nanos.saturating_sub(inner_nanos) as u64,
));
Ok(())
}

Expand Down Expand Up @@ -595,6 +613,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
&mut partition_iter,
&mut self.shuffle_block_writer,
&mut output_data,
&self.metrics.gather_time,
&self.metrics.coalesce_time,
&self.metrics.encode_time,
&self.metrics.write_time,
self.write_buffer_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
if let Some(batch) = concatenated_batch {
self.output_data_writer.write(
&batch,
&self.metrics.coalesce_time,
&self.metrics.encode_time,
&self.metrics.write_time,
)?;
Expand All @@ -134,6 +135,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
// Write the new batch
self.output_data_writer.write(
&batch,
&self.metrics.coalesce_time,
&self.metrics.encode_time,
&self.metrics.write_time,
)?;
Expand Down Expand Up @@ -162,12 +164,16 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
if let Some(batch) = concatenated_batch {
self.output_data_writer.write(
&batch,
&self.metrics.coalesce_time,
&self.metrics.encode_time,
&self.metrics.write_time,
)?;
}
self.output_data_writer
.flush(&self.metrics.encode_time, &self.metrics.write_time)?;
self.output_data_writer.flush(
&self.metrics.coalesce_time,
&self.metrics.encode_time,
&self.metrics.write_time,
)?;

// Write index file. It should only contain 2 entries: 0 and the total number of bytes written
let index_file = OpenOptions::new()
Expand Down
17 changes: 13 additions & 4 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ mod test {
.collect();

let codec = CompressionCodec::Lz4Frame;
let coalesce_time = Time::default();
let encode_time = Time::default();
let write_time = Time::default();

Expand All @@ -630,9 +631,13 @@ mod test {
8192,
);
for batch in &small_batches {
buf_writer.write(batch, &encode_time, &write_time).unwrap();
buf_writer
.write(batch, &coalesce_time, &encode_time, &write_time)
.unwrap();
}
buf_writer.flush(&encode_time, &write_time).unwrap();
buf_writer
.flush(&coalesce_time, &encode_time, &write_time)
.unwrap();
}

// Write without coalescing (batch_size=1)
Expand All @@ -646,9 +651,13 @@ mod test {
1,
);
for batch in &small_batches {
buf_writer.write(batch, &encode_time, &write_time).unwrap();
buf_writer
.write(batch, &coalesce_time, &encode_time, &write_time)
.unwrap();
}
buf_writer.flush(&encode_time, &write_time).unwrap();
buf_writer
.flush(&coalesce_time, &encode_time, &write_time)
.unwrap();
}

// Coalesced output should be smaller due to fewer IPC schema blocks
Expand Down
6 changes: 6 additions & 0 deletions native/core/src/execution/shuffle/writers/buf_batch_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
pub(crate) fn write(
&mut self,
batch: &RecordBatch,
coalesce_time: &Time,
encode_time: &Time,
write_time: &Time,
) -> datafusion::common::Result<usize> {
let mut coalesce_timer = coalesce_time.timer();
let coalescer = self
.coalescer
.get_or_insert_with(|| BatchCoalescer::new(batch.schema(), self.batch_size));
Expand All @@ -75,6 +77,7 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
while let Some(batch) = coalescer.next_completed_batch() {
completed.push(batch);
}
coalesce_timer.stop();

let mut bytes_written = 0;
for batch in &completed {
Expand Down Expand Up @@ -108,17 +111,20 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {

pub(crate) fn flush(
&mut self,
coalesce_time: &Time,
encode_time: &Time,
write_time: &Time,
) -> datafusion::common::Result<()> {
// Finish any remaining buffered rows in the coalescer
let mut coalesce_timer = coalesce_time.timer();
let mut remaining = Vec::new();
if let Some(coalescer) = &mut self.coalescer {
coalescer.finish_buffered_batch()?;
while let Some(batch) = coalescer.next_completed_batch() {
remaining.push(batch);
}
}
coalesce_timer.stop();
for batch in &remaining {
self.write_batch_to_buffer(batch, encode_time, write_time)?;
}
Expand Down
34 changes: 29 additions & 5 deletions native/core/src/execution/shuffle/writers/partition_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::common::DataFusionError;
use datafusion::execution::disk_manager::RefCountedTempFile;
use datafusion::execution::runtime_env::RuntimeEnv;
use std::fs::{File, OpenOptions};
use std::time::{Duration, Instant};

struct SpillFile {
temp_file: RefCountedTempFile,
Expand Down Expand Up @@ -84,27 +85,50 @@ impl PartitionWriter {
if let Some(batch) = iter.next() {
self.ensure_spill_file_created(runtime)?;

// Snapshot inner metric values so we can compute gather_time by subtraction
let coalesce_before = metrics.coalesce_time.value();
let encode_before = metrics.encode_time.value();
let write_before = metrics.write_time.value();
let total_start = Instant::now();

let total_bytes_written = {
let mut buf_batch_writer = BufBatchWriter::new(
&mut self.shuffle_block_writer,
&mut self.spill_file.as_mut().unwrap().file,
write_buffer_size,
batch_size,
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
let mut bytes_written = buf_batch_writer.write(
&batch?,
&metrics.coalesce_time,
&metrics.encode_time,
&metrics.write_time,
)?;
for batch in iter {
let batch = batch?;
bytes_written += buf_batch_writer.write(
&batch,
&batch?,
&metrics.coalesce_time,
&metrics.encode_time,
&metrics.write_time,
)?;
}
buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
buf_batch_writer.flush(
&metrics.coalesce_time,
&metrics.encode_time,
&metrics.write_time,
)?;
bytes_written
};

// gather_time = total - coalesce - encode - write (avoids per-batch syscalls)
let total_nanos = total_start.elapsed().as_nanos() as usize;
let inner_nanos = (metrics.coalesce_time.value() - coalesce_before)
+ (metrics.encode_time.value() - encode_before)
+ (metrics.write_time.value() - write_before);
metrics.gather_time.add_duration(Duration::from_nanos(
total_nanos.saturating_sub(inner_nanos) as u64,
));

Ok(total_bytes_written)
} else {
Ok(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ object CometMetricNode {
Map(
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"),
"repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"),
"gather_time" -> SQLMetrics.createNanoTimingMetric(sc, "gather/interleave time"),
"coalesce_time" -> SQLMetrics.createNanoTimingMetric(sc, "batch coalescing time"),
"encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),
"decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"),
"spill_count" -> SQLMetrics.createMetric(sc, "number of spills"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ class CometNativeShuffleWriter[K, V](

val detailedMetrics = Seq(
"elapsed_compute",
"encode_time",
"repart_time",
"gather_time",
"coalesce_time",
"encode_time",
"input_batches",
"spill_count",
"spilled_bytes")
Expand Down
Loading