diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index e3146921d7fe1..087e48d076d4d 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -23,6 +23,7 @@ use criterion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::except::ArrayExcept; use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; @@ -38,6 +39,7 @@ const SEED: u64 = 42; fn criterion_benchmark(c: &mut Criterion) { bench_array_union(c); bench_array_intersect(c); + bench_array_except(c); bench_array_distinct(c); } @@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) { group.finish(); } +fn bench_array_except(c: &mut Criterion) { + let mut group = c.benchmark_group("array_except"); + let udf = ArrayExcept::new(); + + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } + } + + group.finish(); +} + fn bench_array_distinct(c: &mut Criterion) { let mut group = c.benchmark_group("array_distinct"); let udf = ArrayDistinct::new(); diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index 19a4e9573e35b..932eecf4b8709 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -19,8 +19,12 @@ use crate::utils::{check_datatypes, make_scalar_function}; use arrow::array::new_null_array; -use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray}; +use arrow::array::{ + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array, + cast::AsArray, +}; use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::compute::take; use arrow::datatypes::{DataType, FieldRef}; use arrow::row::{RowConverter, SortField}; use datafusion_common::utils::{ListCoercion, take_function_args}; @@ -179,7 +183,7 @@ fn general_except( let mut offsets = Vec::::with_capacity(l.len() + 1); offsets.push(OffsetSize::usize_as(0)); - let mut rows = Vec::with_capacity(l_values.num_rows()); + let mut indices: Vec = Vec::with_capacity(l_values.num_rows()); let mut dedup = HashSet::new(); let nulls = NullBuffer::union(l.nulls(), r.nulls()); @@ -193,7 +197,7 @@ fn general_except( .as_ref() .is_some_and(|nulls| nulls.is_null(list_index)) { - offsets.push(OffsetSize::usize_as(rows.len())); + offsets.push(OffsetSize::usize_as(indices.len())); continue; } @@ -204,22 +208,32 @@ fn general_except( for element_index in l_start.as_usize()..l_end.as_usize() { let left_row = l_values.row(element_index); if dedup.insert(left_row) { - rows.push(left_row); + indices.push(element_index); } } - offsets.push(OffsetSize::usize_as(rows.len())); + offsets.push(OffsetSize::usize_as(indices.len())); dedup.clear(); } - if let Some(values) = converter.convert_rows(rows)?.first() { - Ok(GenericListArray::::new( - field.to_owned(), - OffsetBuffer::new(offsets.into()), - values.to_owned(), - nulls, - )) + // Gather distinct left-side values by index. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. + let values = if indices.is_empty() { + arrow::array::new_empty_array(&l.value_type()) + } else if OffsetSize::IS_LARGE { + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(l.values().as_ref(), &indices, None)? } else { - internal_err!("array_except failed to convert rows") - } + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(l.values().as_ref(), &indices, None)? + }; + + Ok(GenericListArray::::new( + field.to_owned(), + OffsetBuffer::new(offsets.into()), + values, + nulls, + )) } diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 150559111fef3..a3d2573747140 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -19,9 +19,11 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array, + new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::compute::{concat, take}; use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; @@ -373,12 +375,28 @@ fn generic_set_lists( let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + // Combine value arrays so indices from both sides share a single index space. + let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?; + let r_offset = l.values().len(); + match set_op { SetOp::Union => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &converter, + l, + r, + &rows_l, + &rows_r, + field, + &combined_values, + r_offset, ), SetOp::Intersect => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &converter, + l, + r, + &rows_l, + &rows_r, + field, + &combined_values, + r_offset, ), } } @@ -391,7 +409,8 @@ fn generic_set_loop( rows_l: &arrow::row::Rows, rows_r: &arrow::row::Rows, field: Arc, - converter: &RowConverter, + combined_values: &ArrayRef, + r_offset: usize, ) -> Result { let l_offsets = l.value_offsets(); let r_offsets = r.value_offsets(); @@ -406,7 +425,7 @@ fn generic_set_loop( rows_l.num_rows().min(rows_r.num_rows()) }; - let mut final_rows = Vec::with_capacity(initial_capacity); + let mut indices: Vec = Vec::with_capacity(initial_capacity); // Reuse hash sets across iterations let mut seen = HashSet::new(); @@ -430,25 +449,27 @@ fn generic_set_loop( for idx in l_start..l_end { let row = rows_l.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push(idx); } } for idx in r_start..r_end { let row = rows_r.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push(idx + r_offset); } } } else { let l_len = l_end - l_start; let r_len = r_end - r_start; - // Select shorter side for lookup, longer side for probing - let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len { - (rows_l, l_start..l_end, rows_r, r_start..r_end) - } else { - (rows_r, r_start..r_end, rows_l, l_start..l_end) - }; + // Select shorter side for lookup, longer side for probing. + // Track the probe side's offset into the combined values array. + let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) = + if l_len < r_len { + (rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset) + } else { + (rows_r, r_start..r_end, rows_l, l_start..l_end, 0) + }; lookup_set.clear(); lookup_set.reserve(lookup_range.len()); @@ -461,18 +482,25 @@ fn generic_set_loop( for idx in probe_range { let row = probe_rows.row(idx); if lookup_set.contains(&row) && seen.insert(row) { - final_rows.push(row); + indices.push(idx + probe_offset); } } } result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - let final_values = if final_rows.is_empty() { + // Gather distinct values by index from the combined values array. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. + let final_values = if indices.is_empty() { new_empty_array(&l.value_type()) + } else if OffsetSize::IS_LARGE { + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(combined_values.as_ref(), &indices, None)? } else { - let arrays = converter.convert_rows(final_rows)?; - Arc::clone(&arrays[0]) + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(combined_values.as_ref(), &indices, None)? }; let arr = GenericListArray::::try_new( @@ -539,7 +567,7 @@ fn general_array_distinct( // Convert all values to row format in a single batch for performance let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; let rows = converter.convert_columns(&[Arc::clone(array.values())])?; - let mut final_rows = Vec::with_capacity(rows.num_rows()); + let mut indices: Vec = Vec::with_capacity(rows.num_rows()); let mut seen = HashSet::new(); for i in 0..array.len() { let last_offset = *offsets.last().unwrap(); @@ -559,18 +587,24 @@ fn general_array_distinct( for idx in start..end { let row = rows.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push(idx); } } offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - // Convert all collected distinct rows back - let final_values = if final_rows.is_empty() { + // Gather distinct values in a single pass, using the computed `indices`. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. + let final_values = if indices.is_empty() { new_empty_array(&dt) + } else if OffsetSize::IS_LARGE { + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(array.values().as_ref(), &indices, None)? } else { - let arrays = converter.convert_rows(final_rows)?; - Arc::clone(&arrays[0]) + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(array.values().as_ref(), &indices, None)? }; Ok(Arc::new(GenericListArray::::try_new(