Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 107 additions & 24 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl TopK {
expr,
row_converter,
scratch_rows,
heap: TopKHeap::new(k, batch_size),
heap: TopKHeap::new(k),
common_sort_prefix_converter: prefix_row_converter,
common_sort_prefix: Arc::from(common_sort_prefix),
finished: false,
Expand Down Expand Up @@ -662,8 +662,6 @@ impl TopKMetrics {
struct TopKHeap {
/// The maximum number of elements to store in this heap.
k: usize,
/// The target number of rows for output batches
batch_size: usize,
/// Storage for up at most `k` items using a BinaryHeap. Reversed
/// so that the smallest k so far is on the top
inner: BinaryHeap<TopKRow>,
Expand All @@ -674,11 +672,10 @@ struct TopKHeap {
}

impl TopKHeap {
fn new(k: usize, batch_size: usize) -> Self {
fn new(k: usize) -> Self {
assert!(k > 0);
Self {
k,
batch_size,
inner: BinaryHeap::new(),
store: RecordBatchStore::new(),
owned_bytes: 0,
Expand Down Expand Up @@ -790,16 +787,20 @@ impl TopKHeap {
/// Compact this heap, rewriting all stored batches into a single
/// input batch
pub fn maybe_compact(&mut self) -> Result<()> {
// we compact if the number of "unused" rows in the store is
// past some pre-defined threshold. Target holding up to
// around 20 batches, but handle cases of large k where some
// batches might be partially full
let max_unused_rows = (20 * self.batch_size) + self.k;
let unused_rows = self.store.unused_rows();

// don't compact if the store has one extra batch or
// unused rows is under the threshold
if self.store.len() <= 2 || unused_rows < max_unused_rows {
// Don't compact if there's only one batch (compacting into itself is pointless)
if self.store.len() <= 1 {
return Ok(());
}

let total_rows = self.store.total_rows;

let avg_bytes_per_row = self.store.batches_size / total_rows.max(1); // .max(1) prevents div by zero
let compacted_estimate = avg_bytes_per_row * self.inner.len();

// Compact when current store memory exceeds 2x what the compacted
// result would need. The multiplier avoids compacting when the
// savings would be marginal.
if self.store.batches_size <= compacted_estimate * 2 {
return Ok(());
}
// at first, compact the entire thing always into a new batch
Expand Down Expand Up @@ -980,6 +981,8 @@ struct RecordBatchStore {
batches: HashMap<u32, RecordBatchEntry>,
/// total size of all record batches tracked by this store
batches_size: usize,
/// row count of all the batches
total_rows: usize,
}

impl RecordBatchStore {
Expand All @@ -988,6 +991,7 @@ impl RecordBatchStore {
next_id: 0,
batches: HashMap::new(),
batches_size: 0,
total_rows: 0,
}
}

Expand All @@ -1005,6 +1009,7 @@ impl RecordBatchStore {
// uses of 0 means that none of the rows in the batch were stored in the topk
if entry.uses > 0 {
self.batches_size += get_record_batch_memory_size(&entry.batch);
self.total_rows += entry.batch.num_rows();
self.batches.insert(entry.id, entry);
}
}
Expand All @@ -1013,6 +1018,7 @@ impl RecordBatchStore {
fn clear(&mut self) {
self.batches.clear();
self.batches_size = 0;
self.total_rows = 0;
}

fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
Expand All @@ -1024,15 +1030,6 @@ impl RecordBatchStore {
self.batches.len()
}

/// Returns the total number of rows in batches minus the number
/// which are in use
fn unused_rows(&self) -> usize {
self.batches
.values()
.map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
.sum()
}

/// returns true if the store has nothing stored
fn is_empty(&self) -> bool {
self.batches.is_empty()
Expand All @@ -1056,6 +1053,11 @@ impl RecordBatchStore {
.batches_size
.checked_sub(get_record_batch_memory_size(&old_entry.batch))
.unwrap();

self.total_rows = self
.total_rows
.checked_sub(old_entry.batch.num_rows())
.unwrap_or_default();
}
}

Expand Down Expand Up @@ -1254,4 +1256,85 @@ mod tests {

Ok(())
}

/// Tests that memory-based compaction triggers when a large batch
/// has very few rows referenced by the top-k heap.
#[tokio::test]
async fn test_topk_memory_compaction() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let sort_expr = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};

let full_expr = LexOrdering::from([sort_expr.clone()]);
let prefix = vec![sort_expr];

let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();

let k = 5;
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
k,
8192,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
)))),
)?;

// Insert a large batch (100,000 rows) with values 1..=100_000.
// Only the smallest 5 values (1..=5) will end up in the heap.
let large_values: Vec<i32> = (1..=100_000).collect();
let array1: ArrayRef = Arc::new(Int32Array::from(large_values));
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?;
topk.insert_batch(batch1)?;

// After the first batch, store has 1 batch — compaction should
// not trigger (guard: store.len() <= 1).
assert_eq!(
topk.heap.store.len(),
1,
"should have 1 batch before second insert"
);

// Insert a second batch with a couple of values that replace
// some heap entries (e.g. values 2 and 3 appear again, but won't
// displace 1-5; use values smaller than current min-of-max to
// force replacements). Actually, heap already has 1..=5. Inserting
// values like [1, 2] would just be duplicates that don't replace.
// Instead, insert values that do NOT replace, to keep the 5 rows
// pointing at the first batch. The point is that we now have 2
// batches and the first is huge, so compaction should fire.
let array2: ArrayRef = Arc::new(Int32Array::from(vec![200_000, 300_000]));
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array2])?;
topk.insert_batch(batch2)?;

// After inserting a second batch, maybe_compact should have fired
// because the large batch dwarfs the compacted estimate.
// The store should now contain only 1 batch (the compacted one).
assert_eq!(
topk.heap.store.len(),
1,
"store should be compacted to 1 batch"
);

// Verify the emitted results are correct (top 5 ascending).
let results: Vec<_> = topk.emit()?.try_collect().await?;
assert_batches_eq!(
&[
"+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |", "| 5 |",
"+---+",
],
&results
);

Ok(())
}
}