From 9e506f3c8bbf7a30c1a6e0adbc95f197daafcf6b Mon Sep 17 00:00:00 2001 From: cetra3 Date: Mon, 16 Feb 2026 11:44:01 +0000 Subject: [PATCH] Compact more aggressively in TopK --- datafusion/physical-plan/src/topk/mod.rs | 131 ++++++++++++++++++----- 1 file changed, 107 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 4b93e6a188d57..d95231917c742 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -220,7 +220,7 @@ impl TopK { expr, row_converter, scratch_rows, - heap: TopKHeap::new(k, batch_size), + heap: TopKHeap::new(k), common_sort_prefix_converter: prefix_row_converter, common_sort_prefix: Arc::from(common_sort_prefix), finished: false, @@ -662,8 +662,6 @@ impl TopKMetrics { struct TopKHeap { /// The maximum number of elements to store in this heap. k: usize, - /// The target number of rows for output batches - batch_size: usize, /// Storage for up at most `k` items using a BinaryHeap. Reversed /// so that the smallest k so far is on the top inner: BinaryHeap, @@ -674,11 +672,10 @@ struct TopKHeap { } impl TopKHeap { - fn new(k: usize, batch_size: usize) -> Self { + fn new(k: usize) -> Self { assert!(k > 0); Self { k, - batch_size, inner: BinaryHeap::new(), store: RecordBatchStore::new(), owned_bytes: 0, @@ -790,16 +787,20 @@ impl TopKHeap { /// Compact this heap, rewriting all stored batches into a single /// input batch pub fn maybe_compact(&mut self) -> Result<()> { - // we compact if the number of "unused" rows in the store is - // past some pre-defined threshold. Target holding up to - // around 20 batches, but handle cases of large k where some - // batches might be partially full - let max_unused_rows = (20 * self.batch_size) + self.k; - let unused_rows = self.store.unused_rows(); - - // don't compact if the store has one extra batch or - // unused rows is under the threshold - if self.store.len() <= 2 || unused_rows < max_unused_rows { + // Don't compact if there's only one batch (compacting into itself is pointless) + if self.store.len() <= 1 { + return Ok(()); + } + + let total_rows = self.store.total_rows; + + let avg_bytes_per_row = self.store.batches_size / total_rows.max(1); // .max(1) prevents div by zero + let compacted_estimate = avg_bytes_per_row * self.inner.len(); + + // Compact when current store memory exceeds 2x what the compacted + // result would need. The multiplier avoids compacting when the + // savings would be marginal. + if self.store.batches_size <= compacted_estimate * 2 { return Ok(()); } // at first, compact the entire thing always into a new batch @@ -980,6 +981,8 @@ struct RecordBatchStore { batches: HashMap, /// total size of all record batches tracked by this store batches_size: usize, + /// row count of all the batches + total_rows: usize, } impl RecordBatchStore { @@ -988,6 +991,7 @@ impl RecordBatchStore { next_id: 0, batches: HashMap::new(), batches_size: 0, + total_rows: 0, } } @@ -1005,6 +1009,7 @@ impl RecordBatchStore { // uses of 0 means that none of the rows in the batch were stored in the topk if entry.uses > 0 { self.batches_size += get_record_batch_memory_size(&entry.batch); + self.total_rows += entry.batch.num_rows(); self.batches.insert(entry.id, entry); } } @@ -1013,6 +1018,7 @@ impl RecordBatchStore { fn clear(&mut self) { self.batches.clear(); self.batches_size = 0; + self.total_rows = 0; } fn get(&self, id: u32) -> Option<&RecordBatchEntry> { @@ -1024,15 +1030,6 @@ impl RecordBatchStore { self.batches.len() } - /// Returns the total number of rows in batches minus the number - /// which are in use - fn unused_rows(&self) -> usize { - self.batches - .values() - .map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses) - .sum() - } - /// returns true if the store has nothing stored fn is_empty(&self) -> bool { self.batches.is_empty() @@ -1056,6 +1053,11 @@ impl RecordBatchStore { .batches_size .checked_sub(get_record_batch_memory_size(&old_entry.batch)) .unwrap(); + + self.total_rows = self + .total_rows + .checked_sub(old_entry.batch.num_rows()) + .unwrap_or_default(); } } @@ -1254,4 +1256,85 @@ mod tests { Ok(()) } + + /// Tests that memory-based compaction triggers when a large batch + /// has very few rows referenced by the top-k heap. + #[tokio::test] + async fn test_topk_memory_compaction() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let sort_expr = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + + let full_expr = LexOrdering::from([sort_expr.clone()]); + let prefix = vec![sort_expr]; + + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + let k = 5; + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + prefix, + full_expr, + k, + 8192, + runtime, + &metrics, + Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(vec![], lit(true)), + )))), + )?; + + // Insert a large batch (100,000 rows) with values 1..=100_000. + // Only the smallest 5 values (1..=5) will end up in the heap. + let large_values: Vec = (1..=100_000).collect(); + let array1: ArrayRef = Arc::new(Int32Array::from(large_values)); + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?; + topk.insert_batch(batch1)?; + + // After the first batch, store has 1 batch — compaction should + // not trigger (guard: store.len() <= 1). + assert_eq!( + topk.heap.store.len(), + 1, + "should have 1 batch before second insert" + ); + + // Insert a second batch with a couple of values that replace + // some heap entries (e.g. values 2 and 3 appear again, but won't + // displace 1-5; use values smaller than current min-of-max to + // force replacements). Actually, heap already has 1..=5. Inserting + // values like [1, 2] would just be duplicates that don't replace. + // Instead, insert values that do NOT replace, to keep the 5 rows + // pointing at the first batch. The point is that we now have 2 + // batches and the first is huge, so compaction should fire. + let array2: ArrayRef = Arc::new(Int32Array::from(vec![200_000, 300_000])); + let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array2])?; + topk.insert_batch(batch2)?; + + // After inserting a second batch, maybe_compact should have fired + // because the large batch dwarfs the compacted estimate. + // The store should now contain only 1 batch (the compacted one). + assert_eq!( + topk.heap.store.len(), + 1, + "store should be compacted to 1 batch" + ); + + // Verify the emitted results are correct (top 5 ascending). + let results: Vec<_> = topk.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |", "| 5 |", + "+---+", + ], + &results + ); + + Ok(()) + } }