From 09a8630f2f441d8674677d08d48dc887c9e55310 Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 3 Mar 2026 14:40:30 +0000 Subject: [PATCH 1/4] Fix repartition from dropping data when spilling --- .../physical-plan/src/spill/spill_pool.rs | 109 +++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 1b9d82eaf4506..ad755e13a15f1 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -61,6 +61,10 @@ struct SpillPoolShared { /// Writer's reference to the current file (shared by all cloned writers). /// Has its own lock to allow I/O without blocking queue access. current_write_file: Option>>, + /// Number of active writer clones. Only when this reaches zero should + /// `writer_dropped` be set to true. This prevents premature EOF signaling + /// when one writer clone is dropped while others are still active. + active_writer_count: usize, } impl SpillPoolShared { @@ -72,6 +76,7 @@ impl SpillPoolShared { waker: None, writer_dropped: false, current_write_file: None, + active_writer_count: 1, } } @@ -97,7 +102,6 @@ impl SpillPoolShared { /// The writer automatically manages file rotation based on the `max_file_size_bytes` /// configured in [`channel`]. When the last writer clone is dropped, it finalizes the /// current file so readers can access all written data. -#[derive(Clone)] pub struct SpillPoolWriter { /// Maximum size in bytes before rotating to a new file. /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. @@ -106,6 +110,18 @@ pub struct SpillPoolWriter { shared: Arc>, } +impl Clone for SpillPoolWriter { + fn clone(&self) -> Self { + // Increment the active writer count so that `writer_dropped` is only + // set to true when the *last* clone is dropped. + self.shared.lock().active_writer_count += 1; + Self { + max_file_size_bytes: self.max_file_size_bytes, + shared: Arc::clone(&self.shared), + } + } +} + impl SpillPoolWriter { /// Spills a batch to the pool, rotating files when necessary. /// @@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter { fn drop(&mut self) { let mut shared = self.shared.lock(); + shared.active_writer_count -= 1; + let is_last_writer = shared.active_writer_count == 0; + + if !is_last_writer { + // Other writer clones are still active; do not finalize or + // signal EOF to readers. + return; + } + // Finalize the current file when the last writer is dropped if let Some(current_file) = shared.current_write_file.take() { // Release shared lock before locking file @@ -1343,6 +1368,88 @@ mod tests { Ok(()) } + /// Regression test for data loss when multiple writer clones are used. + /// + /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning + /// mode all input partition tasks share clones of the same writer. Before + /// the fix, `Drop` unconditionally set `writer_dropped = true` even when + /// other clones were still alive. This caused the `SpillPoolReader` to + /// return EOF prematurely, silently losing every batch written by the + /// remaining writers. + /// + /// The test sequence is: + /// + /// 1. writer1 writes a batch, then is dropped. + /// 2. The reader consumes that batch. + /// 3. The reader polls again — the queue is now empty. + /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF). + /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data). + /// 4. writer2 (still alive) writes a batch. + /// 5. The reader must see that batch — not silently lose it. + #[tokio::test] + async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> { + let (writer1, mut reader) = create_spill_channel(1024 * 1024); + let writer2 = writer1.clone(); + + // Synchronization: tell writer2 when it may proceed. + let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>(); + + // Spawn writer2 — it waits for the signal before writing. + let writer2_handle = SpawnedTask::spawn(async move { + proceed_rx.await.unwrap(); + writer2.push_batch(&create_test_batch(10, 10)).unwrap(); + // writer2 is dropped here (last clone → true EOF) + }); + + // Writer1 writes one batch, then drops. + writer1.push_batch(&create_test_batch(0, 10))?; + drop(writer1); + + // Read writer1's batch. + let batch1 = reader.next().await.unwrap()?; + assert_eq!(batch1.num_rows(), 10); + let col = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 0); + + // Signal writer2 to write its batch. It will execute when the + // current task yields (i.e. when reader.next() returns Pending). + proceed_tx.send(()).unwrap(); + + // With the bug the reader returns None here because it already + // saw writer_dropped=true on an empty queue. With the fix it + // returns Pending, the runtime schedules writer2, and the batch + // becomes available. + let batch2 = + tokio::time::timeout(std::time::Duration::from_secs(5), reader.next()) + .await + .expect("Reader timed out — should not hang"); + + assert!( + batch2.is_some(), + "Reader returned None prematurely — batch from writer2 was lost \ + because dropping writer1 incorrectly signaled EOF" + ); + let batch2 = batch2.unwrap()?; + assert_eq!(batch2.num_rows(), 10); + let col = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 10); + + writer2_handle.await.unwrap(); + + // All writers dropped — reader should see real EOF now. + assert!(reader.next().await.is_none()); + + Ok(()) + } + #[tokio::test] async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> { use datafusion_execution::runtime_env::RuntimeEnvBuilder; From 59cfe3b5eeed79455254a974b28a39e70ec375e3 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 15:28:23 +0000 Subject: [PATCH 2/4] comment --- .../physical-plan/src/spill/spill_pool.rs | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index ad755e13a15f1..2777b753bb37a 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -1368,24 +1368,21 @@ mod tests { Ok(()) } - /// Regression test for data loss when multiple writer clones are used. + /// Verifies that the reader stays alive as long as any writer clone exists. /// /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning - /// mode all input partition tasks share clones of the same writer. Before - /// the fix, `Drop` unconditionally set `writer_dropped = true` even when - /// other clones were still alive. This caused the `SpillPoolReader` to - /// return EOF prematurely, silently losing every batch written by the - /// remaining writers. + /// mode multiple input partition tasks share clones of the same writer. + /// The reader must not see EOF until **all** clones have been dropped, + /// even if the queue is temporarily empty between writes from different + /// clones. /// /// The test sequence is: /// /// 1. writer1 writes a batch, then is dropped. - /// 2. The reader consumes that batch. - /// 3. The reader polls again — the queue is now empty. - /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF). - /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data). - /// 4. writer2 (still alive) writes a batch. - /// 5. The reader must see that batch — not silently lose it. + /// 2. The reader consumes that batch (queue is now empty). + /// 3. writer2 (still alive) writes a batch. + /// 4. The reader must see that batch. + /// 5. EOF is only signalled after writer2 is also dropped. #[tokio::test] async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> { let (writer1, mut reader) = create_spill_channel(1024 * 1024); @@ -1419,10 +1416,7 @@ mod tests { // current task yields (i.e. when reader.next() returns Pending). proceed_tx.send(()).unwrap(); - // With the bug the reader returns None here because it already - // saw writer_dropped=true on an empty queue. With the fix it - // returns Pending, the runtime schedules writer2, and the batch - // becomes available. + // The reader should wait (Pending) for writer2's data, not EOF. let batch2 = tokio::time::timeout(std::time::Duration::from_secs(5), reader.next()) .await @@ -1430,8 +1424,7 @@ mod tests { assert!( batch2.is_some(), - "Reader returned None prematurely — batch from writer2 was lost \ - because dropping writer1 incorrectly signaled EOF" + "Reader must not return EOF while a writer clone is still alive" ); let batch2 = batch2.unwrap()?; assert_eq!(batch2.num_rows(), 10); From 52a06a40889fb20837bbc4ad9083a9b92ba2859f Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 20:52:59 +0000 Subject: [PATCH 3/4] Try encapsulation for the counter --- .../physical-plan/src/spill/spill_pool.rs | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 2777b753bb37a..219ccd02ff65b 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -18,6 +18,7 @@ use futures::{Stream, StreamExt}; use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Waker; use parking_lot::Mutex; @@ -61,10 +62,6 @@ struct SpillPoolShared { /// Writer's reference to the current file (shared by all cloned writers). /// Has its own lock to allow I/O without blocking queue access. current_write_file: Option>>, - /// Number of active writer clones. Only when this reaches zero should - /// `writer_dropped` be set to true. This prevents premature EOF signaling - /// when one writer clone is dropped while others are still active. - active_writer_count: usize, } impl SpillPoolShared { @@ -76,7 +73,6 @@ impl SpillPoolShared { waker: None, writer_dropped: false, current_write_file: None, - active_writer_count: 1, } } @@ -93,6 +89,37 @@ impl SpillPoolShared { } } +/// Tracks the number of live [`SpillPoolWriter`] clones. +/// +/// Cloning increments the count; dropping decrements it. +/// [`WriterCount::is_last`] returns `true` when called from the final clone, +/// which the writer uses to decide whether to finalize the spill pool. +struct WriterCount(Arc); + +impl WriterCount { + fn new() -> Self { + Self(Arc::new(AtomicUsize::new(1))) + } + + /// Returns `true` if this is the only remaining clone. + fn is_last(&self) -> bool { + self.0.load(Ordering::Acquire) == 1 + } +} + +impl Clone for WriterCount { + fn clone(&self) -> Self { + self.0.fetch_add(1, Ordering::Relaxed); + Self(Arc::clone(&self.0)) + } +} + +impl Drop for WriterCount { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::Release); + } +} + /// Writer for a spill pool. Provides coordinated write access with FIFO semantics. /// /// Created by [`channel`]. See that function for architecture diagrams and usage examples. @@ -102,24 +129,16 @@ impl SpillPoolShared { /// The writer automatically manages file rotation based on the `max_file_size_bytes` /// configured in [`channel`]. When the last writer clone is dropped, it finalizes the /// current file so readers can access all written data. +#[derive(Clone)] pub struct SpillPoolWriter { /// Maximum size in bytes before rotating to a new file. /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. max_file_size_bytes: usize, /// Shared state with readers (includes current_write_file for coordination) shared: Arc>, -} - -impl Clone for SpillPoolWriter { - fn clone(&self) -> Self { - // Increment the active writer count so that `writer_dropped` is only - // set to true when the *last* clone is dropped. - self.shared.lock().active_writer_count += 1; - Self { - max_file_size_bytes: self.max_file_size_bytes, - shared: Arc::clone(&self.shared), - } - } + /// Tracks how many writer clones are alive. The pool is only finalized + /// when the last clone is dropped. + writer_count: WriterCount, } impl SpillPoolWriter { @@ -247,17 +266,15 @@ impl SpillPoolWriter { impl Drop for SpillPoolWriter { fn drop(&mut self) { - let mut shared = self.shared.lock(); - - shared.active_writer_count -= 1; - let is_last_writer = shared.active_writer_count == 0; - - if !is_last_writer { + if !self.writer_count.is_last() { // Other writer clones are still active; do not finalize or - // signal EOF to readers. + // signal EOF to readers. `self.writer_count` is decremented + // automatically when this `Drop` returns. return; } + let mut shared = self.shared.lock(); + // Finalize the current file when the last writer is dropped if let Some(current_file) = shared.current_write_file.take() { // Release shared lock before locking file @@ -468,6 +485,7 @@ pub fn channel( let writer = SpillPoolWriter { max_file_size_bytes, shared: Arc::clone(&shared), + writer_count: WriterCount::new(), }; let reader = SpillPoolReader::new(shared, schema); From 380da74bb74573bd9f075593ad7d03346361de9c Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 4 Mar 2026 23:23:12 +0000 Subject: [PATCH 4/4] comments --- .../physical-plan/src/spill/spill_pool.rs | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index 219ccd02ff65b..841183d254aec 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -91,9 +91,8 @@ impl SpillPoolShared { /// Tracks the number of live [`SpillPoolWriter`] clones. /// -/// Cloning increments the count; dropping decrements it. -/// [`WriterCount::is_last`] returns `true` when called from the final clone, -/// which the writer uses to decide whether to finalize the spill pool. +/// Cloning increments the count. [`WriterCount::decrement`] atomically +/// decrements the count and reports whether the caller was the last clone. struct WriterCount(Arc); impl WriterCount { @@ -101,25 +100,22 @@ impl WriterCount { Self(Arc::new(AtomicUsize::new(1))) } - /// Returns `true` if this is the only remaining clone. - fn is_last(&self) -> bool { - self.0.load(Ordering::Acquire) == 1 + /// Decrements the count and returns `true` if this was the last clone. + /// + /// This is a single atomic operation, so concurrent drops cannot both + /// observe themselves as "last". + fn decrement(&self) -> bool { + self.0.fetch_sub(1, Ordering::SeqCst) == 1 } } impl Clone for WriterCount { fn clone(&self) -> Self { - self.0.fetch_add(1, Ordering::Relaxed); + self.0.fetch_add(1, Ordering::SeqCst); Self(Arc::clone(&self.0)) } } -impl Drop for WriterCount { - fn drop(&mut self) { - self.0.fetch_sub(1, Ordering::Release); - } -} - /// Writer for a spill pool. Provides coordinated write access with FIFO semantics. /// /// Created by [`channel`]. See that function for architecture diagrams and usage examples. @@ -266,10 +262,9 @@ impl SpillPoolWriter { impl Drop for SpillPoolWriter { fn drop(&mut self) { - if !self.writer_count.is_last() { + if !self.writer_count.decrement() { // Other writer clones are still active; do not finalize or - // signal EOF to readers. `self.writer_count` is decremented - // automatically when this `Drop` returns. + // signal EOF to readers. return; }