diff --git a/datafusion/functions-nested/benches/array_has.rs b/datafusion/functions-nested/benches/array_has.rs index 302ef9168650..f5e66d56c0ef 100644 --- a/datafusion/functions-nested/benches/array_has.rs +++ b/datafusion/functions-nested/benches/array_has.rs @@ -51,6 +51,9 @@ fn criterion_benchmark(c: &mut Criterion) { bench_array_has_strings(c); bench_array_has_all_strings(c); bench_array_has_any_strings(c); + + // Benchmark for array_has_any with one scalar arg + bench_array_has_any_scalar(c); } fn bench_array_has(c: &mut Criterion, array_size: usize) { @@ -183,22 +186,24 @@ fn bench_array_has_all(c: &mut Criterion, array_size: usize) { group.finish(); } +const SMALL_ARRAY_SIZE: usize = NEEDLE_SIZE; + fn bench_array_has_any(c: &mut Criterion, array_size: usize) { let mut group = c.benchmark_group("array_has_any"); - let haystack = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY); - let list_type = haystack.data_type().clone(); + let first_arr = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let list_type = first_arr.data_type().clone(); let config_options = Arc::new(ConfigOptions::default()); let return_field: Arc = Field::new("result", DataType::Boolean, true).into(); let arg_fields: Vec> = vec![ - Field::new("haystack", list_type.clone(), false).into(), - Field::new("needle", list_type.clone(), false).into(), + Field::new("first", list_type.clone(), false).into(), + Field::new("second", list_type.clone(), false).into(), ]; // Benchmark: some elements match - let needle_match = create_int64_list_array(NUM_ROWS, NEEDLE_SIZE, 0.0); + let second_match = create_int64_list_array(NUM_ROWS, SMALL_ARRAY_SIZE, 0.0); let args_match = vec![ - ColumnarValue::Array(haystack.clone()), - ColumnarValue::Array(needle_match), + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Array(second_match), ]; group.bench_with_input( BenchmarkId::new("some_match", array_size), @@ -221,11 +226,14 @@ fn bench_array_has_any(c: &mut Criterion, array_size: usize) { ); // Benchmark: no match - let needle_no_match = - create_int64_list_array_with_offset(NUM_ROWS, NEEDLE_SIZE, array_size as i64); + let second_no_match = create_int64_list_array_with_offset( + NUM_ROWS, + SMALL_ARRAY_SIZE, + array_size as i64, + ); let args_no_match = vec![ - ColumnarValue::Array(haystack.clone()), - ColumnarValue::Array(needle_no_match), + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Array(second_no_match), ]; group.bench_with_input( BenchmarkId::new("no_match", array_size), @@ -247,6 +255,59 @@ fn bench_array_has_any(c: &mut Criterion, array_size: usize) { }, ); + // Benchmark: scalar second arg, some match + let scalar_second_match = create_int64_scalar_list(SMALL_ARRAY_SIZE, 0); + let args_scalar_match = vec![ + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Scalar(scalar_second_match), + ]; + group.bench_with_input( + BenchmarkId::new("scalar_some_match", array_size), + &array_size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args_scalar_match.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); + + // Benchmark: scalar second arg, no match + let scalar_second_no_match = + create_int64_scalar_list(SMALL_ARRAY_SIZE, array_size as i64); + let args_scalar_no_match = vec![ + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Scalar(scalar_second_no_match), + ]; + group.bench_with_input( + BenchmarkId::new("scalar_no_match", array_size), + &array_size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args_scalar_no_match.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); + group.finish(); } @@ -378,17 +439,17 @@ fn bench_array_has_any_strings(c: &mut Criterion) { let sizes = vec![10, 100, 500]; for &size in &sizes { - let haystack = create_string_list_array(NUM_ROWS, size, NULL_DENSITY); - let list_type = haystack.data_type().clone(); + let first_arr = create_string_list_array(NUM_ROWS, size, NULL_DENSITY); + let list_type = first_arr.data_type().clone(); let arg_fields: Vec> = vec![ - Field::new("haystack", list_type.clone(), false).into(), - Field::new("needle", list_type.clone(), false).into(), + Field::new("first", list_type.clone(), false).into(), + Field::new("second", list_type.clone(), false).into(), ]; - let needle_match = create_string_list_array(NUM_ROWS, NEEDLE_SIZE, 0.0); + let second_match = create_string_list_array(NUM_ROWS, SMALL_ARRAY_SIZE, 0.0); let args_match = vec![ - ColumnarValue::Array(haystack.clone()), - ColumnarValue::Array(needle_match), + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Array(second_match), ]; group.bench_with_input(BenchmarkId::new("some_match", size), &size, |b, _| { let udf = ArrayHasAny::new(); @@ -406,11 +467,11 @@ fn bench_array_has_any_strings(c: &mut Criterion) { }) }); - let needle_no_match = - create_string_list_array_with_prefix(NUM_ROWS, NEEDLE_SIZE, "missing_"); + let second_no_match = + create_string_list_array_with_prefix(NUM_ROWS, SMALL_ARRAY_SIZE, "missing_"); let args_no_match = vec![ - ColumnarValue::Array(haystack.clone()), - ColumnarValue::Array(needle_no_match), + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Array(second_no_match), ]; group.bench_with_input(BenchmarkId::new("no_match", size), &size, |b, _| { let udf = ArrayHasAny::new(); @@ -427,6 +488,142 @@ fn bench_array_has_any_strings(c: &mut Criterion) { ) }) }); + + // Benchmark: scalar second arg, some match + let scalar_second_match = create_string_scalar_list(SMALL_ARRAY_SIZE, "value_"); + let args_scalar_match = vec![ + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Scalar(scalar_second_match), + ]; + group.bench_with_input( + BenchmarkId::new("scalar_some_match", size), + &size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args_scalar_match.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); + + // Benchmark: scalar second arg, no match + let scalar_second_no_match = + create_string_scalar_list(SMALL_ARRAY_SIZE, "missing_"); + let args_scalar_no_match = vec![ + ColumnarValue::Array(first_arr.clone()), + ColumnarValue::Scalar(scalar_second_no_match), + ]; + group.bench_with_input( + BenchmarkId::new("scalar_no_match", size), + &size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args_scalar_no_match.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +/// Benchmarks array_has_any with one scalar arg. Varies the scalar argument +/// size while keeping the columnar array small (3 elements per row). +fn bench_array_has_any_scalar(c: &mut Criterion) { + let mut group = c.benchmark_group("array_has_any_scalar"); + let config_options = Arc::new(ConfigOptions::default()); + let return_field: Arc = Field::new("result", DataType::Boolean, true).into(); + + let array_size = 3; + let scalar_sizes = vec![1, 10, 100, 1000]; + + // i64 benchmarks + let first_arr_i64 = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let list_type_i64 = first_arr_i64.data_type().clone(); + let arg_fields_i64: Vec> = vec![ + Field::new("first", list_type_i64.clone(), false).into(), + Field::new("second", list_type_i64.clone(), false).into(), + ]; + + for &scalar_size in &scalar_sizes { + let scalar_arg = create_int64_scalar_list(scalar_size, array_size as i64); + let args = vec![ + ColumnarValue::Array(first_arr_i64.clone()), + ColumnarValue::Scalar(scalar_arg), + ]; + group.bench_with_input( + BenchmarkId::new("i64_no_match", scalar_size), + &scalar_size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields_i64.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); + } + + // String benchmarks + let first_arr_str = create_string_list_array(NUM_ROWS, array_size, NULL_DENSITY); + let list_type_str = first_arr_str.data_type().clone(); + let arg_fields_str: Vec> = vec![ + Field::new("first", list_type_str.clone(), false).into(), + Field::new("second", list_type_str.clone(), false).into(), + ]; + + for &scalar_size in &scalar_sizes { + let scalar_arg = create_string_scalar_list(scalar_size, "missing_"); + let args = vec![ + ColumnarValue::Array(first_arr_str.clone()), + ColumnarValue::Scalar(scalar_arg), + ]; + group.bench_with_input( + BenchmarkId::new("string_no_match", scalar_size), + &scalar_size, + |b, _| { + let udf = ArrayHasAny::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields_str.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone(), + config_options: config_options.clone(), + }) + .unwrap(), + ) + }) + }, + ); } group.finish(); @@ -548,5 +745,37 @@ fn create_string_list_array_with_prefix( ) } +/// Create a `ScalarValue::List` containing a single list of `size` i64 elements, +/// with values starting at `offset`. +fn create_int64_scalar_list(size: usize, offset: i64) -> ScalarValue { + let values = (0..size as i64) + .map(|i| Some(i + offset)) + .collect::(); + let list = ListArray::try_new( + Arc::new(Field::new("item", DataType::Int64, true)), + OffsetBuffer::new(vec![0, size as i32].into()), + Arc::new(values), + None, + ) + .unwrap(); + ScalarValue::List(Arc::new(list)) +} + +/// Create a `ScalarValue::List` containing a single list of `size` string elements, +/// with values like "{prefix}0", "{prefix}1", etc. +fn create_string_scalar_list(size: usize, prefix: &str) -> ScalarValue { + let values = (0..size) + .map(|i| Some(format!("{prefix}{i}"))) + .collect::(); + let list = ListArray::try_new( + Arc::new(Field::new("item", DataType::Utf8, true)), + OffsetBuffer::new(vec![0, size as i32].into()), + Arc::new(values), + None, + ) + .unwrap(); + ScalarValue::List(Arc::new(list)) +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index e34239ed4919..8411a32202e6 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -17,7 +17,10 @@ //! [`ScalarUDFImpl`] definitions for array_has, array_has_all and array_has_any functions. -use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Datum, Scalar}; +use arrow::array::{ + Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar, + StringArrayType, +}; use arrow::buffer::BooleanBuffer; use arrow::datatypes::DataType; use arrow::row::{RowConverter, Rows, SortField}; @@ -38,6 +41,7 @@ use crate::make_array::make_array_udf; use crate::utils::make_scalar_function; use std::any::Any; +use std::collections::HashSet; use std::sync::Arc; // Create static instances of ScalarUDFs for each function @@ -55,7 +59,7 @@ make_udf_expr_and_func!(ArrayHasAll, ); make_udf_expr_and_func!(ArrayHasAny, array_has_any, - haystack_array needle_array, // arg names + first_array second_array, // arg names "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", // doc array_has_any_udf // internal function name ); @@ -303,10 +307,8 @@ impl<'a> ArrayWrapper<'a> { fn offsets(&self) -> Box + 'a> { match self { ArrayWrapper::FixedSizeList(arr) => { - let offsets = (0..=arr.len()) - .step_by(arr.value_length() as usize) - .collect::>(); - Box::new(offsets.into_iter()) + let value_length = arr.value_length() as usize; + Box::new((0..=arr.len()).map(move |i| i * value_length)) } ArrayWrapper::List(arr) => { Box::new(arr.offsets().iter().map(|o| (*o) as usize)) @@ -316,6 +318,14 @@ impl<'a> ArrayWrapper<'a> { } } } + + fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> { + match self { + ArrayWrapper::FixedSizeList(arr) => arr.nulls(), + ArrayWrapper::List(arr) => arr.nulls(), + ArrayWrapper::LargeList(arr) => arr.nulls(), + } + } } fn array_has_dispatch_for_array<'a>( @@ -487,6 +497,218 @@ fn array_has_any_inner(args: &[ArrayRef]) -> Result { array_has_all_and_any_inner(args, ComparisonType::Any) } +/// Fast path for `array_has_any` when exactly one argument is a scalar. +fn array_has_any_with_scalar( + columnar_arg: &ColumnarValue, + scalar_arg: &ScalarValue, +) -> Result { + if scalar_arg.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))); + } + + // Convert the scalar to a 1-element ListArray, then extract the inner values + let scalar_array = scalar_arg.to_array_of_size(1)?; + let scalar_list: ArrayWrapper = scalar_array.as_ref().try_into()?; + let offsets: Vec = scalar_list.offsets().collect(); + let scalar_values = scalar_list + .values() + .slice(offsets[0], offsets[1] - offsets[0]); + + // If scalar list is empty, result is always false + if scalar_values.is_empty() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)))); + } + + match scalar_values.data_type() { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + array_has_any_with_scalar_string(columnar_arg, &scalar_values) + } + _ => array_has_any_with_scalar_general(columnar_arg, &scalar_values), + } +} + +/// When the scalar argument has more elements than this, the scalar fast path +/// builds a HashSet for O(1) lookups. At or below this threshold, it falls +/// back to a linear scan, since hashing every columnar element is more +/// expensive than a linear scan over a short array. +const SCALAR_SMALL_THRESHOLD: usize = 8; + +/// String-specialized scalar fast path for `array_has_any`. +fn array_has_any_with_scalar_string( + columnar_arg: &ColumnarValue, + scalar_values: &ArrayRef, +) -> Result { + let (col_arr, is_scalar_output) = match columnar_arg { + ColumnarValue::Array(arr) => (Arc::clone(arr), false), + ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true), + }; + + let col_list: ArrayWrapper = col_arr.as_ref().try_into()?; + let col_values = col_list.values(); + let col_offsets: Vec = col_list.offsets().collect(); + let col_nulls = col_list.nulls(); + + let scalar_lookup = ScalarStringLookup::new(scalar_values); + let has_null_scalar = scalar_values.null_count() > 0; + + let result = match col_values.data_type() { + DataType::Utf8 => array_has_any_string_inner( + col_values.as_string::(), + &col_offsets, + col_nulls, + has_null_scalar, + &scalar_lookup, + ), + DataType::LargeUtf8 => array_has_any_string_inner( + col_values.as_string::(), + &col_offsets, + col_nulls, + has_null_scalar, + &scalar_lookup, + ), + DataType::Utf8View => array_has_any_string_inner( + col_values.as_string_view(), + &col_offsets, + col_nulls, + has_null_scalar, + &scalar_lookup, + ), + _ => unreachable!("array_has_any_with_scalar_string called with non-string type"), + }; + + if is_scalar_output { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &result, 0, + )?)) + } else { + Ok(ColumnarValue::Array(result)) + } +} + +/// Pre-computed lookup structure for the scalar string fastpath. +enum ScalarStringLookup<'a> { + /// Large scalar: HashSet for O(1) lookups. + Set(HashSet<&'a str>), + /// Small scalar: Vec for linear scan. + List(Vec>), +} + +impl<'a> ScalarStringLookup<'a> { + fn new(scalar_values: &'a ArrayRef) -> Self { + let strings = string_array_to_vec(scalar_values.as_ref()); + if strings.len() > SCALAR_SMALL_THRESHOLD { + ScalarStringLookup::Set(strings.into_iter().flatten().collect()) + } else { + ScalarStringLookup::List(strings) + } + } + + fn contains(&self, value: &str) -> bool { + match self { + ScalarStringLookup::Set(set) => set.contains(value), + ScalarStringLookup::List(list) => list.contains(&Some(value)), + } + } +} + +/// Inner implementation of the string scalar fast path, generic over string +/// array type to allow direct element access by index. +fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>( + col_strings: C, + col_offsets: &[usize], + col_nulls: Option<&arrow::buffer::NullBuffer>, + has_null_scalar: bool, + scalar_lookup: &ScalarStringLookup<'_>, +) -> ArrayRef { + let num_rows = col_offsets.len() - 1; + let mut builder = BooleanArray::builder(num_rows); + + for i in 0..num_rows { + if col_nulls.is_some_and(|v| v.is_null(i)) { + builder.append_null(); + continue; + } + let start = col_offsets[i]; + let end = col_offsets[i + 1]; + let found = (start..end).any(|j| { + if col_strings.is_null(j) { + has_null_scalar + } else { + scalar_lookup.contains(col_strings.value(j)) + } + }); + builder.append_value(found); + } + + Arc::new(builder.finish()) +} + +/// General scalar fast path for `array_has_any`, using RowConverter for +/// type-erased comparison. +fn array_has_any_with_scalar_general( + columnar_arg: &ColumnarValue, + scalar_values: &ArrayRef, +) -> Result { + let converter = + RowConverter::new(vec![SortField::new(scalar_values.data_type().clone())])?; + let scalar_rows = converter.convert_columns(&[Arc::clone(scalar_values)])?; + + let (col_arr, is_scalar_output) = match columnar_arg { + ColumnarValue::Array(arr) => (Arc::clone(arr), false), + ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true), + }; + + let col_list: ArrayWrapper = col_arr.as_ref().try_into()?; + let col_rows = converter.convert_columns(&[Arc::clone(col_list.values())])?; + let col_offsets: Vec = col_list.offsets().collect(); + let col_nulls = col_list.nulls(); + + let mut builder = BooleanArray::builder(col_list.len()); + let num_scalar = scalar_rows.num_rows(); + + if num_scalar > SCALAR_SMALL_THRESHOLD { + // Large scalar: build HashSet for O(1) lookups + let scalar_set: HashSet> = (0..num_scalar) + .map(|i| Box::from(scalar_rows.row(i).as_ref())) + .collect(); + + for i in 0..col_list.len() { + if col_nulls.is_some_and(|v| v.is_null(i)) { + builder.append_null(); + continue; + } + let start = col_offsets[i]; + let end = col_offsets[i + 1]; + let found = + (start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref())); + builder.append_value(found); + } + } else { + // Small scalar: linear scan avoids HashSet hashing overhead + for i in 0..col_list.len() { + if col_nulls.is_some_and(|v| v.is_null(i)) { + builder.append_null(); + continue; + } + let start = col_offsets[i]; + let end = col_offsets[i + 1]; + let found = (start..end) + .any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k))); + builder.append_value(found); + } + } + + let result: ArrayRef = Arc::new(builder.finish()); + + if is_scalar_output { + Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( + &result, 0, + )?)) + } else { + Ok(ColumnarValue::Array(result)) + } +} + #[user_doc( doc_section(label = "Array Functions"), description = "Returns true if all elements of sub-array exist in array.", @@ -563,8 +785,8 @@ impl ScalarUDFImpl for ArrayHasAll { #[user_doc( doc_section(label = "Array Functions"), - description = "Returns true if any elements exist in both arrays.", - syntax_example = "array_has_any(array, sub-array)", + description = "Returns true if the arrays have any elements in common.", + syntax_example = "array_has_any(array1, array2)", sql_example = r#"```sql > select array_has_any([1, 2, 3], [3, 4]); +------------------------------------------+ @@ -574,11 +796,11 @@ impl ScalarUDFImpl for ArrayHasAll { +------------------------------------------+ ```"#, argument( - name = "array", + name = "array1", description = "Array expression. Can be a constant, column, or function, and any combination of array operators." ), argument( - name = "sub-array", + name = "array2", description = "Array expression. Can be a constant, column, or function, and any combination of array operators." ) )] @@ -623,7 +845,16 @@ impl ScalarUDFImpl for ArrayHasAny { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { - make_scalar_function(array_has_any_inner)(&args.args) + let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?; + + // array_has_any is symmetric: if either argument is scalar, build a + // HashSet from it and probe with the rows of the other argument. + match (&first_arg, &second_arg) { + (cv, ColumnarValue::Scalar(scalar)) | (ColumnarValue::Scalar(scalar), cv) => { + array_has_any_with_scalar(cv, scalar) + } + _ => make_scalar_function(array_has_any_inner)(&args.args), + } } fn aliases(&self) -> &[String] { diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 78d13066d9ec..a9da8c4d8c7f 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -3543,16 +3543,16 @@ array_has_all(array, sub-array) ### `array_has_any` -Returns true if any elements exist in both arrays. +Returns true if the arrays have any elements in common. ```sql -array_has_any(array, sub-array) +array_has_any(array1, array2) ``` #### Arguments -- **array**: Array expression. Can be a constant, column, or function, and any combination of array operators. -- **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators. +- **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators. +- **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators. #### Example