Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions datafusion/functions-nested/benches/array_set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use criterion::{
};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_functions_nested::except::ArrayExcept;
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion};
use rand::SeedableRng;
use rand::prelude::SliceRandom;
Expand All @@ -34,11 +35,19 @@ use std::sync::Arc;
const NUM_ROWS: usize = 1000;
const ARRAY_SIZES: &[usize] = &[10, 50, 100];
const SEED: u64 = 42;
/// Extra rows on each side when building sliced arrays, so the underlying
/// values buffer is much larger than the visible portion.
const SLICE_PADDING: usize = 5000;

fn criterion_benchmark(c: &mut Criterion) {
bench_array_union(c);
bench_array_intersect(c);
bench_array_distinct(c);
bench_array_except(c);
bench_array_union_sliced(c);
bench_array_intersect_sliced(c);
bench_array_distinct_sliced(c);
bench_array_except_sliced(c);
}

fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
Expand Down Expand Up @@ -255,5 +264,126 @@ fn create_array_with_duplicates(
)
}

fn bench_array_except(c: &mut Criterion) {
let mut group = c.benchmark_group("array_except");
let udf = ArrayExcept::new();

for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
for &array_size in ARRAY_SIZES {
let (array1, array2) =
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
group.bench_with_input(
BenchmarkId::new(*overlap_label, array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
);
}
}

group.finish();
}

/// Slice a pair of arrays to the middle `NUM_ROWS` rows from a larger array.
fn slice_pair(arrays: &(ArrayRef, ArrayRef)) -> (ArrayRef, ArrayRef) {
let a1 = arrays.0.slice(SLICE_PADDING, NUM_ROWS);
let a2 = arrays.1.slice(SLICE_PADDING, NUM_ROWS);
(a1, a2)
}

fn bench_array_union_sliced(c: &mut Criterion) {
let mut group = c.benchmark_group("array_union_sliced");
let udf = ArrayUnion::new();

for &array_size in ARRAY_SIZES {
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
NUM_ROWS + 2 * SLICE_PADDING,
array_size,
0.5,
));
group.bench_with_input(
BenchmarkId::from_parameter(array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
);
}
group.finish();
}

fn bench_array_intersect_sliced(c: &mut Criterion) {
let mut group = c.benchmark_group("array_intersect_sliced");
let udf = ArrayIntersect::new();

for &array_size in ARRAY_SIZES {
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
NUM_ROWS + 2 * SLICE_PADDING,
array_size,
0.5,
));
group.bench_with_input(
BenchmarkId::from_parameter(array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
);
}
group.finish();
}

fn bench_array_except_sliced(c: &mut Criterion) {
let mut group = c.benchmark_group("array_except_sliced");
let udf = ArrayExcept::new();

for &array_size in ARRAY_SIZES {
let (a1, a2) = slice_pair(&create_arrays_with_overlap(
NUM_ROWS + 2 * SLICE_PADDING,
array_size,
0.5,
));
group.bench_with_input(
BenchmarkId::from_parameter(array_size),
&array_size,
|b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)),
);
}
group.finish();
}

fn bench_array_distinct_sliced(c: &mut Criterion) {
let mut group = c.benchmark_group("array_distinct_sliced");
let udf = ArrayDistinct::new();

for &array_size in ARRAY_SIZES {
let array =
create_array_with_duplicates(NUM_ROWS + 2 * SLICE_PADDING, array_size, 0.5)
.slice(SLICE_PADDING, NUM_ROWS);
group.bench_with_input(
BenchmarkId::from_parameter(array_size),
&array_size,
|b, _| {
b.iter(|| {
black_box(
udf.invoke_with_args(ScalarFunctionArgs {
args: vec![ColumnarValue::Array(array.clone())],
arg_fields: vec![
Field::new("arr", array.data_type().clone(), false)
.into(),
],
number_rows: NUM_ROWS,
return_field: Field::new(
"result",
array.data_type().clone(),
false,
)
.into(),
config_options: Arc::new(ConfigOptions::default()),
})
.unwrap(),
)
})
},
);
}
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
79 changes: 73 additions & 6 deletions datafusion/functions-nested/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
) -> Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;

let l_values = l.values().to_owned();
let r_values = r.values().to_owned();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;
// Only convert the visible portion of the values array. For sliced
// ListArrays, values() returns the full underlying array but only
// elements between the first and last offset are referenced.
let l_first = l.offsets()[0].as_usize();
let l_len = l.offsets()[l.len()].as_usize() - l_first;
let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;

let r_first = r.offsets()[0].as_usize();
let r_len = r.offsets()[r.len()].as_usize() - r_first;
let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;

let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));
Expand All @@ -197,11 +203,11 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
continue;
}

for element_index in r_start.as_usize()..r_end.as_usize() {
for element_index in r_start.as_usize() - r_first..r_end.as_usize() - r_first {
let right_row = r_values.row(element_index);
dedup.insert(right_row);
}
for element_index in l_start.as_usize()..l_end.as_usize() {
for element_index in l_start.as_usize() - l_first..l_end.as_usize() - l_first {
let left_row = l_values.row(element_index);
if dedup.insert(left_row) {
rows.push(left_row);
Expand All @@ -223,3 +229,64 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
internal_err!("array_except failed to convert rows")
}
}

#[cfg(test)]
mod tests {
use super::ArrayExcept;
use arrow::array::{Array, AsArray, Int32Array, ListArray};
use arrow::datatypes::{Field, Int32Type};
use datafusion_common::{Result, config::ConfigOptions};
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use std::sync::Arc;

#[test]
fn test_array_except_sliced_lists() -> Result<()> {
// l: [[1,2], [3,4], [5,6], [7,8]] → slice(1,2) → [[3,4], [5,6]]
// r: [[3], [5], [6], [8]] → slice(1,2) → [[5], [6]]
// except(l, r) should be [[3,4], [5]]
let l_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![Some(3), Some(4)]),
Some(vec![Some(5), Some(6)]),
Some(vec![Some(7), Some(8)]),
]);
let r_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(3)]),
Some(vec![Some(5)]),
Some(vec![Some(6)]),
Some(vec![Some(8)]),
]);
let l_sliced = l_full.slice(1, 2);
let r_sliced = r_full.slice(1, 2);

let list_field = Arc::new(Field::new("item", l_sliced.data_type().clone(), true));
let return_field =
Arc::new(Field::new("return", l_sliced.data_type().clone(), true));

let result = ArrayExcept::new().invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(l_sliced)),
ColumnarValue::Array(Arc::new(r_sliced)),
],
arg_fields: vec![Arc::clone(&list_field), Arc::clone(&list_field)],
number_rows: 2,
return_field,
config_options: Arc::new(ConfigOptions::default()),
})?;

let output = result.into_array(2)?;
let output = output.as_list::<i32>();

// Row 0: [3,4] except [5] = [3,4]
let row0 = output.value(0);
let row0 = row0.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(row0.values().as_ref(), &[3, 4]);

// Row 1: [5,6] except [6] = [5]
let row1 = output.value(1);
let row1 = row1.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(row1.values().as_ref(), &[5]);

Ok(())
}
}
Loading