Skip to content

Fix repartition from dropping data when spilling#20672

Open
xanderbailey wants to merge 3 commits intoapache:mainfrom
xanderbailey:xb/fix_repartition
Open

Fix repartition from dropping data when spilling#20672
xanderbailey wants to merge 3 commits intoapache:mainfrom
xanderbailey:xb/fix_repartition

Conversation

@xanderbailey
Copy link
Contributor

@xanderbailey xanderbailey commented Mar 3, 2026

Which issue does this PR close?

Rationale for this change

In non-preserve-order repartitioning mode, all input partition tasks share clones of the same SpillPoolWriter for each output partition. SpillPoolWriter used #[derive(Clone)] but its Drop implementation unconditionally set writer_dropped = true and finalized the current spill file. This meant that when the first input task finished and its clone was dropped, the SpillPoolReader would see writer_dropped = true on an empty queue and return EOF — silently discarding every batch subsequently written by the still-running input tasks.

This bug requires three conditions to trigger:

  1. Non-preserve-order repartitioning (so spill writers are cloned across input tasks)
  2. Memory pressure causing batches to spill to disk
  3. Input tasks finishing at different times (the common case with varying partition sizes)

What changes are included in this PR?

datafusion/physical-plan/src/spill/spill_pool.rs:

  • Added active_writer_count: usize to SpillPoolShared to track the number of live writer clones.
  • Replaced #[derive(Clone)] on SpillPoolWriter with a manual Clone impl that increments active_writer_count under the shared lock.
  • Updated Drop to decrement active_writer_count and only finalize the current file / set writer_dropped = true when the count reaches zero (i.e. the last clone is dropped). Non-last clones now return immediately from Drop.
  • Added regression test test_clone_drop_does_not_signal_eof_prematurely that reproduces the exact failure: writer1 writes and drops, the reader drains the queue, then writer2 (still alive) writes. Without the fix the reader returns premature EOF and the assertion fails; with the fix the reader waits and reads both batches.

Are these changes tested?

Yes. A new unit test (test_clone_drop_does_not_signal_eof_prematurely) directly reproduces the bug. It was verified to fail without the fix and pass with the fix.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 3, 2026
.await
.expect("Reader timed out — should not hang");

assert!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this fix we fail here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified that this test fails without the code fix

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo test -p datafusion-physical-plan test_clone_drop_does_not_signal_eof_prematurely
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.13s
     Running unittests src/lib.rs (target/debug/deps/datafusion_physical_plan-33977765615826e4)

running 1 test
test spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely ... FAILED

failures:

---- spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely stdout ----

thread 'spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely' (2314602) panicked at datafusion/physical-plan/src/spill/spill_pool.rs:1400:9:
Reader must not return EOF while a writer clone is still alive
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1266 filtered out; finished in 0.00s

error: test failed, to rerun pass `-p datafusion-physical-plan --lib`

Copy link
Contributor

@hareshkh hareshkh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

Copy link
Contributor

@neilconway neilconway left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! Thanks @xanderbailey

Comment on lines +1373 to +1378
/// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
/// mode all input partition tasks share clones of the same writer. Before
/// the fix, `Drop` unconditionally set `writer_dropped = true` even when
/// other clones were still alive. This caused the `SpillPoolReader` to
/// return EOF prematurely, silently losing every batch written by the
/// remaining writers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ Nit, optional ] I personally try to avoid comments that are very specific to the previous (buggy) implementation, because that implementation won't exist for future readers, and the current / future state of the code might drift without the comments being updated. So I'd personally remove references to "the bug" or the exact previous behavior, and talk more about what a correct implementation ought to do and the properties of a correct implementation that we're checking for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thank you @xanderbailey and @neilconway

cc @2010YOUY01 in case you are interested or want a chance to review

fn drop(&mut self) {
let mut shared = self.shared.lock();

shared.active_writer_count -= 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general this pattern of incrementing / decrementing / checking a counter from some other strcuture under a lock is error prone (e.g. if someone else changes this code in the future but forgets to update the counter it may be a hard to find bug)

I would personally suggest encapsulating the counter into some structure that encpsulated the increment/decrement

Something maybe like putting an Arc<AtomicUsize> on the shared pool and then passing out objects like

struct WriterLease {
  inner: Arc<AtomicUsize>
}

impl Drop for WriterLease { 
...
// decrement the counter
}

And then just get one of those for the SpillPollWriter

pub struct SpillPoolWriter {
  lease: WriterLease
...
}

However it is a fair point that this PR is probably more efficient as it uses an already existing lock 🤷

I don't feel strongly about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea, it seems more future proof, tried it here 52a06a4 what do you think?

.await
.expect("Reader timed out — should not hang");

assert!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified that this test fails without the code fix

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo test -p datafusion-physical-plan test_clone_drop_does_not_signal_eof_prematurely
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.13s
     Running unittests src/lib.rs (target/debug/deps/datafusion_physical_plan-33977765615826e4)

running 1 test
test spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely ... FAILED

failures:

---- spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely stdout ----

thread 'spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely' (2314602) panicked at datafusion/physical-plan/src/spill/spill_pool.rs:1400:9:
Reader must not return EOF while a writer clone is still alive
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1266 filtered out; finished in 0.00s

error: test failed, to rerun pass `-p datafusion-physical-plan --lib`

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xanderbailey -- I am not sure about some part of the new count


impl Clone for WriterCount {
fn clone(&self) -> Self {
self.0.fetch_add(1, Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using Ordering::SeqCst or Ordering::Acquire might be safer here. It probably is unlikely to cause problems in practice, but I think it would be important that any wrtiter increment is seen before the is_last check is done

https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html

let is_last_writer = shared.active_writer_count == 0;

if !is_last_writer {
if !self.writer_count.is_last() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there a race condition now -- what prevents a new writer from being added after is_last returns false but before the lock is taken below?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Repartition drops data when spilling

4 participants