Fix repartition from dropping data when spilling#20672
Fix repartition from dropping data when spilling#20672xanderbailey wants to merge 3 commits intoapache:mainfrom
Conversation
| .await | ||
| .expect("Reader timed out — should not hang"); | ||
|
|
||
| assert!( |
There was a problem hiding this comment.
Without this fix we fail here.
There was a problem hiding this comment.
I also verified that this test fails without the code fix
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo test -p datafusion-physical-plan test_clone_drop_does_not_signal_eof_prematurely
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.13s
Running unittests src/lib.rs (target/debug/deps/datafusion_physical_plan-33977765615826e4)
running 1 test
test spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely ... FAILED
failures:
---- spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely stdout ----
thread 'spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely' (2314602) panicked at datafusion/physical-plan/src/spill/spill_pool.rs:1400:9:
Reader must not return EOF while a writer clone is still alive
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1266 filtered out; finished in 0.00s
error: test failed, to rerun pass `-p datafusion-physical-plan --lib`4925c63 to
09a8630
Compare
neilconway
left a comment
There was a problem hiding this comment.
lgtm! Thanks @xanderbailey
| /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning | ||
| /// mode all input partition tasks share clones of the same writer. Before | ||
| /// the fix, `Drop` unconditionally set `writer_dropped = true` even when | ||
| /// other clones were still alive. This caused the `SpillPoolReader` to | ||
| /// return EOF prematurely, silently losing every batch written by the | ||
| /// remaining writers. |
There was a problem hiding this comment.
[ Nit, optional ] I personally try to avoid comments that are very specific to the previous (buggy) implementation, because that implementation won't exist for future readers, and the current / future state of the code might drift without the comments being updated. So I'd personally remove references to "the bug" or the exact previous behavior, and talk more about what a correct implementation ought to do and the properties of a correct implementation that we're checking for.
There was a problem hiding this comment.
Yep, that makes sense to me.
alamb
left a comment
There was a problem hiding this comment.
Looks great to me -- thank you @xanderbailey and @neilconway
cc @2010YOUY01 in case you are interested or want a chance to review
| fn drop(&mut self) { | ||
| let mut shared = self.shared.lock(); | ||
|
|
||
| shared.active_writer_count -= 1; |
There was a problem hiding this comment.
I think in general this pattern of incrementing / decrementing / checking a counter from some other strcuture under a lock is error prone (e.g. if someone else changes this code in the future but forgets to update the counter it may be a hard to find bug)
I would personally suggest encapsulating the counter into some structure that encpsulated the increment/decrement
Something maybe like putting an Arc<AtomicUsize> on the shared pool and then passing out objects like
struct WriterLease {
inner: Arc<AtomicUsize>
}
impl Drop for WriterLease {
...
// decrement the counter
}And then just get one of those for the SpillPollWriter
pub struct SpillPoolWriter {
lease: WriterLease
...
}However it is a fair point that this PR is probably more efficient as it uses an already existing lock 🤷
I don't feel strongly about this
There was a problem hiding this comment.
I like that idea, it seems more future proof, tried it here 52a06a4 what do you think?
| .await | ||
| .expect("Reader timed out — should not hang"); | ||
|
|
||
| assert!( |
There was a problem hiding this comment.
I also verified that this test fails without the code fix
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo test -p datafusion-physical-plan test_clone_drop_does_not_signal_eof_prematurely
Finished `test` profile [unoptimized + debuginfo] target(s) in 0.13s
Running unittests src/lib.rs (target/debug/deps/datafusion_physical_plan-33977765615826e4)
running 1 test
test spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely ... FAILED
failures:
---- spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely stdout ----
thread 'spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely' (2314602) panicked at datafusion/physical-plan/src/spill/spill_pool.rs:1400:9:
Reader must not return EOF while a writer clone is still alive
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1266 filtered out; finished in 0.00s
error: test failed, to rerun pass `-p datafusion-physical-plan --lib`
alamb
left a comment
There was a problem hiding this comment.
Thanks @xanderbailey -- I am not sure about some part of the new count
|
|
||
| impl Clone for WriterCount { | ||
| fn clone(&self) -> Self { | ||
| self.0.fetch_add(1, Ordering::Relaxed); |
There was a problem hiding this comment.
I think using Ordering::SeqCst or Ordering::Acquire might be safer here. It probably is unlikely to cause problems in practice, but I think it would be important that any wrtiter increment is seen before the is_last check is done
https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
| let is_last_writer = shared.active_writer_count == 0; | ||
|
|
||
| if !is_last_writer { | ||
| if !self.writer_count.is_last() { |
There was a problem hiding this comment.
I think there a race condition now -- what prevents a new writer from being added after is_last returns false but before the lock is taken below?
Which issue does this PR close?
Rationale for this change
In non-preserve-order repartitioning mode, all input partition tasks share clones of the same
SpillPoolWriterfor each output partition.SpillPoolWriterused#[derive(Clone)]but itsDropimplementation unconditionally setwriter_dropped = trueand finalized the current spill file. This meant that when the first input task finished and its clone was dropped, theSpillPoolReaderwould seewriter_dropped = trueon an empty queue and return EOF — silently discarding every batch subsequently written by the still-running input tasks.This bug requires three conditions to trigger:
What changes are included in this PR?
datafusion/physical-plan/src/spill/spill_pool.rs:active_writer_count: usizetoSpillPoolSharedto track the number of live writer clones.#[derive(Clone)]onSpillPoolWriterwith a manualCloneimpl that incrementsactive_writer_countunder the shared lock.Dropto decrementactive_writer_countand only finalize the current file / setwriter_dropped = truewhen the count reaches zero (i.e. the last clone is dropped). Non-last clones now return immediately fromDrop.test_clone_drop_does_not_signal_eof_prematurelythat reproduces the exact failure: writer1 writes and drops, the reader drains the queue, then writer2 (still alive) writes. Without the fix the reader returns premature EOF and the assertion fails; with the fix the reader waits and reads both batches.Are these changes tested?
Yes. A new unit test (
test_clone_drop_does_not_signal_eof_prematurely) directly reproduces the bug. It was verified to fail without the fix and pass with the fix.Are there any user-facing changes?
No.