diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index e3146921d7fe1..48b6e5f806dc8 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -23,6 +23,7 @@ use criterion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::except::ArrayExcept; use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; @@ -34,11 +35,19 @@ use std::sync::Arc; const NUM_ROWS: usize = 1000; const ARRAY_SIZES: &[usize] = &[10, 50, 100]; const SEED: u64 = 42; +/// Extra rows on each side when building sliced arrays, so the underlying +/// values buffer is much larger than the visible portion. +const SLICE_PADDING: usize = 5000; fn criterion_benchmark(c: &mut Criterion) { bench_array_union(c); bench_array_intersect(c); bench_array_distinct(c); + bench_array_except(c); + bench_array_union_sliced(c); + bench_array_intersect_sliced(c); + bench_array_distinct_sliced(c); + bench_array_except_sliced(c); } fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { @@ -255,5 +264,126 @@ fn create_array_with_duplicates( ) } +fn bench_array_except(c: &mut Criterion) { + let mut group = c.benchmark_group("array_except"); + let udf = ArrayExcept::new(); + + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } + } + + group.finish(); +} + +/// Slice a pair of arrays to the middle `NUM_ROWS` rows from a larger array. +fn slice_pair(arrays: &(ArrayRef, ArrayRef)) -> (ArrayRef, ArrayRef) { + let a1 = arrays.0.slice(SLICE_PADDING, NUM_ROWS); + let a2 = arrays.1.slice(SLICE_PADDING, NUM_ROWS); + (a1, a2) +} + +fn bench_array_union_sliced(c: &mut Criterion) { + let mut group = c.benchmark_group("array_union_sliced"); + let udf = ArrayUnion::new(); + + for &array_size in ARRAY_SIZES { + let (a1, a2) = slice_pair(&create_arrays_with_overlap( + NUM_ROWS + 2 * SLICE_PADDING, + array_size, + 0.5, + )); + group.bench_with_input( + BenchmarkId::from_parameter(array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)), + ); + } + group.finish(); +} + +fn bench_array_intersect_sliced(c: &mut Criterion) { + let mut group = c.benchmark_group("array_intersect_sliced"); + let udf = ArrayIntersect::new(); + + for &array_size in ARRAY_SIZES { + let (a1, a2) = slice_pair(&create_arrays_with_overlap( + NUM_ROWS + 2 * SLICE_PADDING, + array_size, + 0.5, + )); + group.bench_with_input( + BenchmarkId::from_parameter(array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)), + ); + } + group.finish(); +} + +fn bench_array_except_sliced(c: &mut Criterion) { + let mut group = c.benchmark_group("array_except_sliced"); + let udf = ArrayExcept::new(); + + for &array_size in ARRAY_SIZES { + let (a1, a2) = slice_pair(&create_arrays_with_overlap( + NUM_ROWS + 2 * SLICE_PADDING, + array_size, + 0.5, + )); + group.bench_with_input( + BenchmarkId::from_parameter(array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &a1, &a2)), + ); + } + group.finish(); +} + +fn bench_array_distinct_sliced(c: &mut Criterion) { + let mut group = c.benchmark_group("array_distinct_sliced"); + let udf = ArrayDistinct::new(); + + for &array_size in ARRAY_SIZES { + let array = + create_array_with_duplicates(NUM_ROWS + 2 * SLICE_PADDING, array_size, 0.5) + .slice(SLICE_PADDING, NUM_ROWS); + group.bench_with_input( + BenchmarkId::from_parameter(array_size), + &array_size, + |b, _| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(array.clone())], + arg_fields: vec![ + Field::new("arr", array.data_type().clone(), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + group.finish(); +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index 19a4e9573e35b..0fdea81a699cd 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -171,10 +171,16 @@ fn general_except( ) -> Result> { let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - let l_values = l.values().to_owned(); - let r_values = r.values().to_owned(); - let l_values = converter.convert_columns(&[l_values])?; - let r_values = converter.convert_columns(&[r_values])?; + // Only convert the visible portion of the values array. For sliced + // ListArrays, values() returns the full underlying array but only + // elements between the first and last offset are referenced. + let l_first = l.offsets()[0].as_usize(); + let l_len = l.offsets()[l.len()].as_usize() - l_first; + let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?; + + let r_first = r.offsets()[0].as_usize(); + let r_len = r.offsets()[r.len()].as_usize() - r_first; + let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?; let mut offsets = Vec::::with_capacity(l.len() + 1); offsets.push(OffsetSize::usize_as(0)); @@ -197,11 +203,11 @@ fn general_except( continue; } - for element_index in r_start.as_usize()..r_end.as_usize() { + for element_index in r_start.as_usize() - r_first..r_end.as_usize() - r_first { let right_row = r_values.row(element_index); dedup.insert(right_row); } - for element_index in l_start.as_usize()..l_end.as_usize() { + for element_index in l_start.as_usize() - l_first..l_end.as_usize() - l_first { let left_row = l_values.row(element_index); if dedup.insert(left_row) { rows.push(left_row); @@ -223,3 +229,64 @@ fn general_except( internal_err!("array_except failed to convert rows") } } + +#[cfg(test)] +mod tests { + use super::ArrayExcept; + use arrow::array::{Array, AsArray, Int32Array, ListArray}; + use arrow::datatypes::{Field, Int32Type}; + use datafusion_common::{Result, config::ConfigOptions}; + use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; + use std::sync::Arc; + + #[test] + fn test_array_except_sliced_lists() -> Result<()> { + // l: [[1,2], [3,4], [5,6], [7,8]] → slice(1,2) → [[3,4], [5,6]] + // r: [[3], [5], [6], [8]] → slice(1,2) → [[5], [6]] + // except(l, r) should be [[3,4], [5]] + let l_full = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + Some(vec![Some(5), Some(6)]), + Some(vec![Some(7), Some(8)]), + ]); + let r_full = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(3)]), + Some(vec![Some(5)]), + Some(vec![Some(6)]), + Some(vec![Some(8)]), + ]); + let l_sliced = l_full.slice(1, 2); + let r_sliced = r_full.slice(1, 2); + + let list_field = Arc::new(Field::new("item", l_sliced.data_type().clone(), true)); + let return_field = + Arc::new(Field::new("return", l_sliced.data_type().clone(), true)); + + let result = ArrayExcept::new().invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(l_sliced)), + ColumnarValue::Array(Arc::new(r_sliced)), + ], + arg_fields: vec![Arc::clone(&list_field), Arc::clone(&list_field)], + number_rows: 2, + return_field, + config_options: Arc::new(ConfigOptions::default()), + })?; + + let output = result.into_array(2)?; + let output = output.as_list::(); + + // Row 0: [3,4] except [5] = [3,4] + let row0 = output.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + assert_eq!(row0.values().as_ref(), &[3, 4]); + + // Row 1: [5,6] except [6] = [5] + let row1 = output.value(1); + let row1 = row1.as_any().downcast_ref::().unwrap(); + assert_eq!(row1.values().as_ref(), &[5]); + + Ok(()) + } +} diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 150559111fef3..f328a306882b6 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -368,10 +368,18 @@ fn generic_set_lists( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; - let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + + // Only convert the visible portion of the values array. For sliced + // ListArrays, values() returns the full underlying array but only + // elements between the first and last offset are referenced. + let l_first = l.offsets()[0].as_usize(); + let l_len = l.offsets()[l.len()].as_usize() - l_first; + let rows_l = converter.convert_columns(&[l.values().slice(l_first, l_len)])?; + + let r_first = r.offsets()[0].as_usize(); + let r_len = r.offsets()[r.len()].as_usize() - r_first; + let rows_r = converter.convert_columns(&[r.values().slice(r_first, r_len)])?; match set_op { SetOp::Union => generic_set_loop::( @@ -395,6 +403,8 @@ fn generic_set_loop( ) -> Result { let l_offsets = l.value_offsets(); let r_offsets = r.value_offsets(); + let l_first = l.offsets()[0].as_usize(); + let r_first = r.offsets()[0].as_usize(); let mut result_offsets = Vec::with_capacity(l.len() + 1); result_offsets.push(OffsetSize::usize_as(0)); @@ -419,10 +429,10 @@ fn generic_set_loop( continue; } - let l_start = l_offsets[i].as_usize(); - let l_end = l_offsets[i + 1].as_usize(); - let r_start = r_offsets[i].as_usize(); - let r_end = r_offsets[i + 1].as_usize(); + let l_start = l_offsets[i].as_usize() - l_first; + let l_end = l_offsets[i + 1].as_usize() - l_first; + let r_start = r_offsets[i].as_usize() - r_first; + let r_end = r_offsets[i + 1].as_usize() - r_first; seen.clear(); @@ -536,9 +546,16 @@ fn general_array_distinct( let mut offsets = Vec::with_capacity(array.len() + 1); offsets.push(OffsetSize::usize_as(0)); - // Convert all values to row format in a single batch for performance let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; - let rows = converter.convert_columns(&[Arc::clone(array.values())])?; + + // Only convert the visible portion of the values array. For sliced + // ListArrays, values() returns the full underlying array but only + // elements between the first and last offset are referenced. + let first_offset = value_offsets[0].as_usize(); + let visible_len = value_offsets[array.len()].as_usize() - first_offset; + let rows = + converter.convert_columns(&[array.values().slice(first_offset, visible_len)])?; + let mut final_rows = Vec::with_capacity(rows.num_rows()); let mut seen = HashSet::new(); for i in 0..array.len() { @@ -550,8 +567,8 @@ fn general_array_distinct( continue; } - let start = value_offsets[i].as_usize(); - let end = value_offsets[i + 1].as_usize(); + let start = value_offsets[i].as_usize() - first_offset; + let end = value_offsets[i + 1].as_usize() - first_offset; seen.clear(); seen.reserve(end - start); @@ -587,14 +604,132 @@ mod tests { use std::sync::Arc; use arrow::{ - array::{Int32Array, ListArray}, + array::{Array, AsArray, Int32Array, ListArray}, buffer::OffsetBuffer, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, Int32Type}, }; - use datafusion_common::{DataFusionError, config::ConfigOptions}; - use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion_common::{DataFusionError, Result, config::ConfigOptions}; + use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; + + use crate::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion, array_distinct_udf}; + + /// Build two sliced ListArrays and return them along with the shared list + /// field. + /// + /// l: [[1,2], [3,4], [5,6], [7,8]] → slice(1,2) → [[3,4], [5,6]] + /// r: [[1,3], [3,5], [5,7], [7,1]] → slice(1,2) → [[3,5], [5,7]] + fn make_sliced_pair() -> (ListArray, ListArray, Arc) { + let l = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + Some(vec![Some(5), Some(6)]), + Some(vec![Some(7), Some(8)]), + ]); + let r = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(3)]), + Some(vec![Some(3), Some(5)]), + Some(vec![Some(5), Some(7)]), + Some(vec![Some(7), Some(1)]), + ]); + let field = Arc::new(Field::new("item", l.data_type().clone(), true)); + (l.slice(1, 2), r.slice(1, 2), field) + } + + fn collect_i32_list(list: &ListArray) -> Vec> { + (0..list.len()) + .map(|i| { + let arr = list.value(i); + arr.as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec() + }) + .collect() + } + + #[test] + fn test_array_union_sliced_lists() -> Result<()> { + let (l, r, field) = make_sliced_pair(); + + let result = ArrayUnion::new().invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(l)), + ColumnarValue::Array(Arc::new(r)), + ], + arg_fields: vec![Arc::clone(&field), Arc::clone(&field)], + number_rows: 2, + return_field: Arc::clone(&field), + config_options: Arc::new(ConfigOptions::default()), + })?; + + let output = result.into_array(2)?; + let output = output.as_list::(); + let rows = collect_i32_list(output); + + // Row 0: union([3,4], [3,5]) = [3,4,5] + assert_eq!(rows[0], vec![3, 4, 5]); + // Row 1: union([5,6], [5,7]) = [5,6,7] + assert_eq!(rows[1], vec![5, 6, 7]); + Ok(()) + } + + #[test] + fn test_array_intersect_sliced_lists() -> Result<()> { + let (l, r, field) = make_sliced_pair(); + + let result = ArrayIntersect::new().invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(l)), + ColumnarValue::Array(Arc::new(r)), + ], + arg_fields: vec![Arc::clone(&field), Arc::clone(&field)], + number_rows: 2, + return_field: Arc::clone(&field), + config_options: Arc::new(ConfigOptions::default()), + })?; + + let output = result.into_array(2)?; + let output = output.as_list::(); + let rows = collect_i32_list(output); + + // Row 0: intersect([3,4], [3,5]) = [3] + assert_eq!(rows[0], vec![3]); + // Row 1: intersect([5,6], [5,7]) = [5] + assert_eq!(rows[1], vec![5]); + Ok(()) + } - use crate::set_ops::array_distinct_udf; + #[test] + fn test_array_distinct_sliced_list() -> Result<()> { + // [[1,1], [3,3,4], [5,5,6], [7,7]] → slice(1,2) → [[3,3,4], [5,5,6]] + let list = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(1)]), + Some(vec![Some(3), Some(3), Some(4)]), + Some(vec![Some(5), Some(5), Some(6)]), + Some(vec![Some(7), Some(7)]), + ]); + let sliced = list.slice(1, 2); + let field = Arc::new(Field::new("item", sliced.data_type().clone(), true)); + + let result = ArrayDistinct::new().invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(sliced))], + arg_fields: vec![Arc::clone(&field)], + number_rows: 2, + return_field: field, + config_options: Arc::new(ConfigOptions::default()), + })?; + + let output = result.into_array(2)?; + let output = output.as_list::(); + let rows = collect_i32_list(output); + + // Row 0: distinct([3,3,4]) = [3,4] + assert_eq!(rows[0], vec![3, 4]); + // Row 1: distinct([5,5,6]) = [5,6] + assert_eq!(rows[1], vec![5, 6]); + Ok(()) + } #[test] fn test_array_distinct_inner_nullability_result_type_match_return_type()