From 8878635e1489e2b985388f26a9ee191f965ab6e9 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 13:32:03 -0500 Subject: [PATCH 1/7] perf: Optimize array_distinct --- datafusion/functions-nested/src/set_ops.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 150559111fef3..8f6be94a5910e 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -19,9 +19,11 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, new_empty_array, + new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::compute::take; use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; @@ -539,7 +541,7 @@ fn general_array_distinct( // Convert all values to row format in a single batch for performance let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; let rows = converter.convert_columns(&[Arc::clone(array.values())])?; - let mut final_rows = Vec::with_capacity(rows.num_rows()); + let mut indices = Vec::with_capacity(rows.num_rows()); let mut seen = HashSet::new(); for i in 0..array.len() { let last_offset = *offsets.last().unwrap(); @@ -559,18 +561,17 @@ fn general_array_distinct( for idx in start..end { let row = rows.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push(idx as u32); } } offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - // Convert all collected distinct rows back - let final_values = if final_rows.is_empty() { + // Gather distinct values in a single pass, using the computed `indices`. + let final_values = if indices.is_empty() { new_empty_array(&dt) } else { - let arrays = converter.convert_rows(final_rows)?; - Arc::clone(&arrays[0]) + take(array.values().as_ref(), &UInt32Array::from(indices), None)? }; Ok(Arc::new(GenericListArray::::try_new( From 986a15370df0ceb2517f669401adc62120811450 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 14:19:25 -0500 Subject: [PATCH 2/7] Add a benchmark for array_except --- .../functions-nested/benches/array_set_ops.rs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index e3146921d7fe1..087e48d076d4d 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -23,6 +23,7 @@ use criterion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::except::ArrayExcept; use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; @@ -38,6 +39,7 @@ const SEED: u64 = 42; fn criterion_benchmark(c: &mut Criterion) { bench_array_union(c); bench_array_intersect(c); + bench_array_except(c); bench_array_distinct(c); } @@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) { group.finish(); } +fn bench_array_except(c: &mut Criterion) { + let mut group = c.benchmark_group("array_except"); + let udf = ArrayExcept::new(); + + for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { + for &array_size in ARRAY_SIZES { + let (array1, array2) = + create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio); + group.bench_with_input( + BenchmarkId::new(*overlap_label, array_size), + &array_size, + |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)), + ); + } + } + + group.finish(); +} + fn bench_array_distinct(c: &mut Criterion) { let mut group = c.benchmark_group("array_distinct"); let udf = ArrayDistinct::new(); From 593dc6a8a41b38412cf7180f2496594f96024e54 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 14:19:46 -0500 Subject: [PATCH 3/7] Optimize array_except --- datafusion/functions-nested/src/except.rs | 34 +++++++++++++---------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index 19a4e9573e35b..b413e2177b873 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -19,8 +19,11 @@ use crate::utils::{check_datatypes, make_scalar_function}; use arrow::array::new_null_array; -use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray}; +use arrow::array::{ + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, cast::AsArray, +}; use arrow::buffer::{NullBuffer, OffsetBuffer}; +use arrow::compute::take; use arrow::datatypes::{DataType, FieldRef}; use arrow::row::{RowConverter, SortField}; use datafusion_common::utils::{ListCoercion, take_function_args}; @@ -179,7 +182,7 @@ fn general_except( let mut offsets = Vec::::with_capacity(l.len() + 1); offsets.push(OffsetSize::usize_as(0)); - let mut rows = Vec::with_capacity(l_values.num_rows()); + let mut indices = Vec::with_capacity(l_values.num_rows()); let mut dedup = HashSet::new(); let nulls = NullBuffer::union(l.nulls(), r.nulls()); @@ -193,7 +196,7 @@ fn general_except( .as_ref() .is_some_and(|nulls| nulls.is_null(list_index)) { - offsets.push(OffsetSize::usize_as(rows.len())); + offsets.push(OffsetSize::usize_as(indices.len())); continue; } @@ -204,22 +207,25 @@ fn general_except( for element_index in l_start.as_usize()..l_end.as_usize() { let left_row = l_values.row(element_index); if dedup.insert(left_row) { - rows.push(left_row); + indices.push(element_index as u32); } } - offsets.push(OffsetSize::usize_as(rows.len())); + offsets.push(OffsetSize::usize_as(indices.len())); dedup.clear(); } - if let Some(values) = converter.convert_rows(rows)?.first() { - Ok(GenericListArray::::new( - field.to_owned(), - OffsetBuffer::new(offsets.into()), - values.to_owned(), - nulls, - )) + // Gather distinct left-side values by index + let values = if indices.is_empty() { + arrow::array::new_empty_array(&l.value_type()) } else { - internal_err!("array_except failed to convert rows") - } + take(l.values().as_ref(), &UInt32Array::from(indices), None)? + }; + + Ok(GenericListArray::::new( + field.to_owned(), + OffsetBuffer::new(offsets.into()), + values, + nulls, + )) } From 53bf5e306c222f967463b8cff33c3c53972dd5e5 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 14:19:57 -0500 Subject: [PATCH 4/7] Optimize array_intersect and array_union --- datafusion/functions-nested/src/set_ops.rs | 42 +++++++++++++--------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 8f6be94a5910e..90509c7e573b5 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -23,7 +23,7 @@ use arrow::array::{ new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; -use arrow::compute::take; +use arrow::compute::{concat, take}; use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; @@ -375,12 +375,17 @@ fn generic_set_lists( let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + // Combine value arrays so indices from both sides share a single index space. + let combined_values = + concat(&[l.values().as_ref(), r.values().as_ref()])?; + let r_offset = l.values().len(); + match set_op { SetOp::Union => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &converter, + l, r, &rows_l, &rows_r, field, &combined_values, r_offset, ), SetOp::Intersect => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &converter, + l, r, &rows_l, &rows_r, field, &combined_values, r_offset, ), } } @@ -393,7 +398,8 @@ fn generic_set_loop( rows_l: &arrow::row::Rows, rows_r: &arrow::row::Rows, field: Arc, - converter: &RowConverter, + combined_values: &ArrayRef, + r_offset: usize, ) -> Result { let l_offsets = l.value_offsets(); let r_offsets = r.value_offsets(); @@ -408,7 +414,7 @@ fn generic_set_loop( rows_l.num_rows().min(rows_r.num_rows()) }; - let mut final_rows = Vec::with_capacity(initial_capacity); + let mut indices = Vec::with_capacity(initial_capacity); // Reuse hash sets across iterations let mut seen = HashSet::new(); @@ -432,25 +438,27 @@ fn generic_set_loop( for idx in l_start..l_end { let row = rows_l.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push(idx as u32); } } for idx in r_start..r_end { let row = rows_r.row(idx); if seen.insert(row) { - final_rows.push(row); + indices.push((idx + r_offset) as u32); } } } else { let l_len = l_end - l_start; let r_len = r_end - r_start; - // Select shorter side for lookup, longer side for probing - let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len { - (rows_l, l_start..l_end, rows_r, r_start..r_end) - } else { - (rows_r, r_start..r_end, rows_l, l_start..l_end) - }; + // Select shorter side for lookup, longer side for probing. + // Track the probe side's offset into the combined values array. + let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) = + if l_len < r_len { + (rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset) + } else { + (rows_r, r_start..r_end, rows_l, l_start..l_end, 0) + }; lookup_set.clear(); lookup_set.reserve(lookup_range.len()); @@ -463,18 +471,18 @@ fn generic_set_loop( for idx in probe_range { let row = probe_rows.row(idx); if lookup_set.contains(&row) && seen.insert(row) { - final_rows.push(row); + indices.push((idx + probe_offset) as u32); } } } result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - let final_values = if final_rows.is_empty() { + // Gather distinct values by index from the combined values array + let final_values = if indices.is_empty() { new_empty_array(&l.value_type()) } else { - let arrays = converter.convert_rows(final_rows)?; - Arc::clone(&arrays[0]) + take(combined_values.as_ref(), &UInt32Array::from(indices), None)? }; let arr = GenericListArray::::try_new( From 5d2cd6e7e59372fedd1c17d72c1070715ec3c202 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 14:28:37 -0500 Subject: [PATCH 5/7] cargo fmt --- datafusion/functions-nested/src/set_ops.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 90509c7e573b5..4bfc06bdde381 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -376,16 +376,27 @@ fn generic_set_lists( let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; // Combine value arrays so indices from both sides share a single index space. - let combined_values = - concat(&[l.values().as_ref(), r.values().as_ref()])?; + let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?; let r_offset = l.values().len(); match set_op { SetOp::Union => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &combined_values, r_offset, + l, + r, + &rows_l, + &rows_r, + field, + &combined_values, + r_offset, ), SetOp::Intersect => generic_set_loop::( - l, r, &rows_l, &rows_r, field, &combined_values, r_offset, + l, + r, + &rows_l, + &rows_r, + field, + &combined_values, + r_offset, ), } } From 7c553db1e0f08ea163244303c0dd5494be592804 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 15:53:10 -0500 Subject: [PATCH 6/7] Avoid possible overflow on indices > u32::MAX --- datafusion/functions-nested/src/except.rs | 16 +++++++---- datafusion/functions-nested/src/set_ops.rs | 32 ++++++++++++++-------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index b413e2177b873..e84cbfa96c50a 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -20,7 +20,8 @@ use crate::utils::{check_datatypes, make_scalar_function}; use arrow::array::new_null_array; use arrow::array::{ - Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, cast::AsArray, + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array, + cast::AsArray, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute::take; @@ -182,7 +183,7 @@ fn general_except( let mut offsets = Vec::::with_capacity(l.len() + 1); offsets.push(OffsetSize::usize_as(0)); - let mut indices = Vec::with_capacity(l_values.num_rows()); + let mut indices: Vec = Vec::with_capacity(l_values.num_rows()); let mut dedup = HashSet::new(); let nulls = NullBuffer::union(l.nulls(), r.nulls()); @@ -207,7 +208,7 @@ fn general_except( for element_index in l_start.as_usize()..l_end.as_usize() { let left_row = l_values.row(element_index); if dedup.insert(left_row) { - indices.push(element_index as u32); + indices.push(element_index); } } @@ -215,11 +216,16 @@ fn general_except( dedup.clear(); } - // Gather distinct left-side values by index + // Gather distinct left-side values by index. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. let values = if indices.is_empty() { arrow::array::new_empty_array(&l.value_type()) + } else if OffsetSize::IS_LARGE { + let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(l.values().as_ref(), &indices, None)? } else { - take(l.values().as_ref(), &UInt32Array::from(indices), None)? + let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(l.values().as_ref(), &indices, None)? }; Ok(GenericListArray::::new( diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 4bfc06bdde381..e20f83016851d 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -19,8 +19,8 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, new_empty_array, - new_null_array, + Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array, + new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute::{concat, take}; @@ -425,7 +425,7 @@ fn generic_set_loop( rows_l.num_rows().min(rows_r.num_rows()) }; - let mut indices = Vec::with_capacity(initial_capacity); + let mut indices: Vec = Vec::with_capacity(initial_capacity); // Reuse hash sets across iterations let mut seen = HashSet::new(); @@ -449,13 +449,13 @@ fn generic_set_loop( for idx in l_start..l_end { let row = rows_l.row(idx); if seen.insert(row) { - indices.push(idx as u32); + indices.push(idx); } } for idx in r_start..r_end { let row = rows_r.row(idx); if seen.insert(row) { - indices.push((idx + r_offset) as u32); + indices.push(idx + r_offset); } } } else { @@ -482,18 +482,23 @@ fn generic_set_loop( for idx in probe_range { let row = probe_rows.row(idx); if lookup_set.contains(&row) && seen.insert(row) { - indices.push((idx + probe_offset) as u32); + indices.push(idx + probe_offset); } } } result_offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - // Gather distinct values by index from the combined values array + // Gather distinct values by index from the combined values array. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. let final_values = if indices.is_empty() { new_empty_array(&l.value_type()) + } else if OffsetSize::IS_LARGE { + let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(combined_values.as_ref(), &indices, None)? } else { - take(combined_values.as_ref(), &UInt32Array::from(indices), None)? + let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(combined_values.as_ref(), &indices, None)? }; let arr = GenericListArray::::try_new( @@ -560,7 +565,7 @@ fn general_array_distinct( // Convert all values to row format in a single batch for performance let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; let rows = converter.convert_columns(&[Arc::clone(array.values())])?; - let mut indices = Vec::with_capacity(rows.num_rows()); + let mut indices: Vec = Vec::with_capacity(rows.num_rows()); let mut seen = HashSet::new(); for i in 0..array.len() { let last_offset = *offsets.last().unwrap(); @@ -580,17 +585,22 @@ fn general_array_distinct( for idx in start..end { let row = rows.row(idx); if seen.insert(row) { - indices.push(idx as u32); + indices.push(idx); } } offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } // Gather distinct values in a single pass, using the computed `indices`. + // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX. let final_values = if indices.is_empty() { new_empty_array(&dt) + } else if OffsetSize::IS_LARGE { + let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + take(array.values().as_ref(), &indices, None)? } else { - take(array.values().as_ref(), &UInt32Array::from(indices), None)? + let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + take(array.values().as_ref(), &indices, None)? }; Ok(Arc::new(GenericListArray::::try_new( From f5a119e905985d608b38a2fccf46b0027e991791 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 28 Feb 2026 15:54:02 -0500 Subject: [PATCH 7/7] cargo fmt --- datafusion/functions-nested/src/except.rs | 6 ++++-- datafusion/functions-nested/src/set_ops.rs | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-nested/src/except.rs b/datafusion/functions-nested/src/except.rs index e84cbfa96c50a..932eecf4b8709 100644 --- a/datafusion/functions-nested/src/except.rs +++ b/datafusion/functions-nested/src/except.rs @@ -221,10 +221,12 @@ fn general_except( let values = if indices.is_empty() { arrow::array::new_empty_array(&l.value_type()) } else if OffsetSize::IS_LARGE { - let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); take(l.values().as_ref(), &indices, None)? } else { - let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); take(l.values().as_ref(), &indices, None)? }; diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index e20f83016851d..a3d2573747140 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -494,10 +494,12 @@ fn generic_set_loop( let final_values = if indices.is_empty() { new_empty_array(&l.value_type()) } else if OffsetSize::IS_LARGE { - let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); take(combined_values.as_ref(), &indices, None)? } else { - let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); take(combined_values.as_ref(), &indices, None)? }; @@ -596,10 +598,12 @@ fn general_array_distinct( let final_values = if indices.is_empty() { new_empty_array(&dt) } else if OffsetSize::IS_LARGE { - let indices = UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); + let indices = + UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::>()); take(array.values().as_ref(), &indices, None)? } else { - let indices = UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); + let indices = + UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::>()); take(array.values().as_ref(), &indices, None)? };