diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 9053f7a8eab9f..da5da384c7b4e 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow::array::{ArrayRef, BooleanArray}; -use datafusion_common::{Result, not_impl_err}; +use datafusion_common::{Result, not_impl_err, utils::split_vec_min_alloc}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -45,13 +45,7 @@ impl EmitTo { // Take the entire vector, leave new (empty) vector std::mem::take(v) } - Self::First(n) => { - // get end n+1,.. values into t - let mut t = v.split_off(*n); - // leave n+1,.. in v - std::mem::swap(v, &mut t); - t - } + Self::First(n) => split_vec_min_alloc(v, *n), } } } @@ -254,3 +248,53 @@ pub trait GroupsAccumulator: Send + std::any::Any { /// compute, not `O(num_groups)` fn size(&self) -> usize; } + +#[cfg(test)] +mod tests { + use super::EmitTo; + + /// When `n` is small relative to `len`, the old `split_off(n) + swap` pattern had + /// two allocation problems: + /// + /// 1. The returned Vec kept the original large backing allocation even though it + /// only contains `n` elements (wasted capacity on a short-lived value). + /// 2. `split_off` allocated a fresh Vec for the `len - n` remaining elements, + /// even though that side is much larger than `n` — the expensive side to + /// allocate. + /// + /// `split_vec_min_alloc` fixes both: when `n * 2 <= len` it uses + /// `drain(0..n).collect()`, allocating only `n` elements for the emitted prefix + /// and keeping the original large backing in the remaining accumulator. + #[test] + fn take_needed_first_small_n_allocates_minimally() { + let mut v: Vec = Vec::with_capacity(128); + v.extend(0..20i32); + let original_capacity = v.capacity(); // 128 + + // n=4, n*2=8 <= len=20 -> drain branch in split_vec_min_alloc + let emitted = EmitTo::First(4).take_needed(&mut v); + + assert_eq!(emitted, vec![0, 1, 2, 3]); + assert_eq!(v, (4..20i32).collect::>()); + + // The emitted prefix must NOT carry the original large allocation. + // Old split_off+swap returned a Vec with capacity=128 for only 4 elements. + assert!( + emitted.capacity() <= 4, + "emitted prefix capacity {} should be ~n=4, not the original {}", + emitted.capacity(), + original_capacity, + ); + + // The remaining accumulator must retain the original large allocation so + // that incoming groups don't immediately force a realloc. + // Old split_off+swap left the remaining vec with a small fresh allocation. + assert_eq!( + v.capacity(), + original_capacity, + "remaining vec capacity {} should equal original {}", + v.capacity(), + original_capacity, + ); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index efaf7eba0f1b5..e5ec56dfef5db 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -24,6 +24,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, i256}; use datafusion_common::Result; use datafusion_common::hash_utils::RandomState; +use datafusion_common::utils::split_vec_min_alloc; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; use half::f16; @@ -207,9 +208,7 @@ where Some(_) => self.null_group.take(), None => None, }; - let mut split = self.values.split_off(n); - std::mem::swap(&mut self.values, &mut split); - build_primitive(split, null_group) + build_primitive(split_vec_min_alloc(&mut self.values, n), null_group) } }; @@ -223,3 +222,58 @@ where self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared } } + +#[cfg(test)] +impl GroupValuesPrimitive { + fn values_capacity(&self) -> usize { + self.values.capacity() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::types::Int32Type; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::DataType; + use datafusion_expr::EmitTo; + use std::sync::Arc; + + /// Mirror of the `EmitTo::take_needed` regression test, applied to the + /// concrete `GroupValuesPrimitive` accumulator. + /// + /// When `n` is small, the old `split_off(n) + swap` pattern used inside + /// `emit(EmitTo::First(n))` left `self.values` with a small fresh allocation + /// and returned the emitted prefix carrying the original large backing. + /// + /// With `split_vec_min_alloc` and `n * 2 <= len`, the drain branch is taken: + /// the emitted prefix gets a compact allocation and `self.values` retains the + /// original large one. + #[test] + fn emit_first_small_n_allocates_minimally() -> Result<()> { + let mut gv = GroupValuesPrimitive::::new(DataType::Int32); + + // Intern 20 distinct values; `new()` pre-allocates capacity 128 for `values`. + let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..20i32)); + let mut groups = vec![]; + gv.intern(&[arr], &mut groups)?; + let capacity_before = gv.values_capacity(); // 128 + + // n=4, n*2=8 <= len=20 -> drain branch + let emitted = gv.emit(EmitTo::First(4))?; + + assert_eq!(emitted[0].len(), 4); + + // `self.values` must retain its original large allocation. + // Old split_off+swap left it with a fresh small allocation (~16). + assert_eq!( + gv.values_capacity(), + capacity_before, + "self.values capacity {} should equal original {} after small First(n) emit", + gv.values_capacity(), + capacity_before, + ); + + Ok(()) + } +}