Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use arrow::array::{ArrayRef, BooleanArray};
use datafusion_common::{Result, not_impl_err};
use datafusion_common::{Result, not_impl_err, utils::split_vec_min_alloc};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -45,13 +45,7 @@ impl EmitTo {
// Take the entire vector, leave new (empty) vector
std::mem::take(v)
}
Self::First(n) => {
// get end n+1,.. values into t
let mut t = v.split_off(*n);
// leave n+1,.. in v
std::mem::swap(v, &mut t);
t
}
Self::First(n) => split_vec_min_alloc(v, *n),
}
}
}
Expand Down Expand Up @@ -254,3 +248,53 @@ pub trait GroupsAccumulator: Send + std::any::Any {
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}

#[cfg(test)]
mod tests {
use super::EmitTo;

/// When `n` is small relative to `len`, the old `split_off(n) + swap` pattern had
/// two allocation problems:
///
/// 1. The returned Vec kept the original large backing allocation even though it
/// only contains `n` elements (wasted capacity on a short-lived value).
/// 2. `split_off` allocated a fresh Vec for the `len - n` remaining elements,
/// even though that side is much larger than `n` — the expensive side to
/// allocate.
///
/// `split_vec_min_alloc` fixes both: when `n * 2 <= len` it uses
/// `drain(0..n).collect()`, allocating only `n` elements for the emitted prefix
/// and keeping the original large backing in the remaining accumulator.
#[test]
fn take_needed_first_small_n_allocates_minimally() {
let mut v: Vec<i32> = Vec::with_capacity(128);
v.extend(0..20i32);
let original_capacity = v.capacity(); // 128

// n=4, n*2=8 <= len=20 -> drain branch in split_vec_min_alloc
let emitted = EmitTo::First(4).take_needed(&mut v);

assert_eq!(emitted, vec![0, 1, 2, 3]);
assert_eq!(v, (4..20i32).collect::<Vec<_>>());

// The emitted prefix must NOT carry the original large allocation.
// Old split_off+swap returned a Vec with capacity=128 for only 4 elements.
assert!(
emitted.capacity() <= 4,
"emitted prefix capacity {} should be ~n=4, not the original {}",
emitted.capacity(),
original_capacity,
);

// The remaining accumulator must retain the original large allocation so
// that incoming groups don't immediately force a realloc.
// Old split_off+swap left the remaining vec with a small fresh allocation.
assert_eq!(
v.capacity(),
original_capacity,
"remaining vec capacity {} should equal original {}",
v.capacity(),
original_capacity,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::array::{
use arrow::datatypes::{DataType, i256};
use datafusion_common::Result;
use datafusion_common::hash_utils::RandomState;
use datafusion_common::utils::split_vec_min_alloc;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use half::f16;
Expand Down Expand Up @@ -207,9 +208,7 @@ where
Some(_) => self.null_group.take(),
None => None,
};
let mut split = self.values.split_off(n);
std::mem::swap(&mut self.values, &mut split);
build_primitive(split, null_group)
build_primitive(split_vec_min_alloc(&mut self.values, n), null_group)
}
};

Expand All @@ -223,3 +222,58 @@ where
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
}
}

#[cfg(test)]
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
fn values_capacity(&self) -> usize {
self.values.capacity()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::types::Int32Type;
use arrow::array::{ArrayRef, Int32Array};
use arrow::datatypes::DataType;
use datafusion_expr::EmitTo;
use std::sync::Arc;

/// Mirror of the `EmitTo::take_needed` regression test, applied to the
/// concrete `GroupValuesPrimitive` accumulator.
///
/// When `n` is small, the old `split_off(n) + swap` pattern used inside
/// `emit(EmitTo::First(n))` left `self.values` with a small fresh allocation
/// and returned the emitted prefix carrying the original large backing.
///
/// With `split_vec_min_alloc` and `n * 2 <= len`, the drain branch is taken:
/// the emitted prefix gets a compact allocation and `self.values` retains the
/// original large one.
#[test]
fn emit_first_small_n_allocates_minimally() -> Result<()> {
let mut gv = GroupValuesPrimitive::<Int32Type>::new(DataType::Int32);

// Intern 20 distinct values; `new()` pre-allocates capacity 128 for `values`.
let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..20i32));
let mut groups = vec![];
gv.intern(&[arr], &mut groups)?;
let capacity_before = gv.values_capacity(); // 128

// n=4, n*2=8 <= len=20 -> drain branch
let emitted = gv.emit(EmitTo::First(4))?;

assert_eq!(emitted[0].len(), 4);

// `self.values` must retain its original large allocation.
// Old split_off+swap left it with a fresh small allocation (~16).
assert_eq!(
gv.values_capacity(),
capacity_before,
"self.values capacity {} should equal original {} after small First(n) emit",
gv.values_capacity(),
capacity_before,
);

Ok(())
}
}
Loading