diff --git a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs index 9bf59d9e333d6..0ca338265ecc6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs +++ b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, StructArray}; -use arrow::compute::cast; use arrow::datatypes::{Field, FieldRef, Fields}; use arrow_schema::DataType; use datafusion_common::Result; @@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result { .collect() } -/// Casts dictionary-encoded arrays to their underlying value type, preserving row count. -/// Non-dictionary arrays are returned as-is. -fn flatten_dictionary_array(array: &ArrayRef) -> Result { - match array.data_type() { - DataType::Dictionary(_, value_type) => { - let casted = cast(array, value_type)?; - // Recursively flatten in case of nested dictionaries - flatten_dictionary_array(&casted) - } - _ => Ok(Arc::clone(array)), - } -} - /// Builds InList values from join key column arrays. /// /// If `join_key_arrays` is: @@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) -> Result { pub(super) fn build_struct_inlist_values( join_key_arrays: &[ArrayRef], ) -> Result> { - // Flatten any dictionary-encoded arrays - let flattened_arrays: Vec = join_key_arrays - .iter() - .map(flatten_dictionary_array) - .collect::>>()?; - // Build the source array/struct - let source_array: ArrayRef = if flattened_arrays.len() == 1 { + let source_array: ArrayRef = if join_key_arrays.len() == 1 { // Single column: use directly - Arc::clone(&flattened_arrays[0]) + Arc::clone(&join_key_arrays[0]) } else { // Multi-column: build StructArray once from all columns let fields = build_struct_fields( - &flattened_arrays + &join_key_arrays .iter() .map(|arr| arr.data_type().clone()) .collect::>(), @@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values( let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields .iter() .cloned() - .zip(flattened_arrays.iter().cloned()) + .zip(join_key_arrays.iter().cloned()) .collect(); Arc::new(StructArray::from(arrays_with_fields)) @@ -152,7 +132,14 @@ mod tests { assert_eq!( *result.data_type(), DataType::Struct( - build_struct_fields(&[DataType::Utf8, DataType::Int32]).unwrap() + build_struct_fields(&[ + DataType::Dictionary( + Box::new(DataType::Int8), + Box::new(DataType::Utf8) + ), + DataType::Int32 + ]) + .unwrap() ) ); } @@ -168,6 +155,6 @@ mod tests { .unwrap(); assert_eq!(result.len(), 3); - assert_eq!(*result.data_type(), DataType::Utf8); + assert_eq!(result.data_type(), dict_array.data_type()); } } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 8bb79d576990e..5e643273baed4 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -563,3 +563,103 @@ ORDER BY start_timestamp, trace_id LIMIT 1; ---- 2024-10-01T00:00:00 + + +statement ok +set datafusion.execution.parquet.pushdown_filters = false; + +# Regression test for https://github.com/apache/datafusion/issues/20696 +# Multi-column INNER JOIN with dictionary fails +# when parquet pushdown filters are enabled. + +statement ok +COPY ( + SELECT + to_timestamp_nanos(time_ns) AS time, + arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state, + arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city, + temp + FROM ( + VALUES + (200, 'CA', 'LA', 90.0), + (250, 'MA', 'Boston', 72.4), + (100, 'MA', 'Boston', 70.4), + (350, 'CA', 'LA', 90.0) + ) AS t(time_ns, state, city, temp) +) +TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet'; + +statement ok +COPY ( + SELECT + to_timestamp_nanos(time_ns) AS time, + arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state, + arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city, + temp, + reading + FROM ( + VALUES + (250, 'MA', 'Boston', 53.4, 51.0), + (100, 'MA', 'Boston', 50.4, 50.0) + ) AS t(time_ns, state, city, temp, reading) +) +TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet'; + +statement ok +CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/'; + +statement ok +CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/'; + +# Query should work both with and without filters +statement ok +set datafusion.execution.parquet.pushdown_filters = false; + +query RRR +SELECT + h2o_parquet_20696.temp AS h2o_temp, + o2_parquet_20696.temp AS o2_temp, + o2_parquet_20696.reading +FROM h2o_parquet_20696 +INNER JOIN o2_parquet_20696 + ON h2o_parquet_20696.time = o2_parquet_20696.time + AND h2o_parquet_20696.state = o2_parquet_20696.state + AND h2o_parquet_20696.city = o2_parquet_20696.city +WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z' + AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z'; +---- +72.4 53.4 51 +70.4 50.4 50 + + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +query RRR +SELECT + h2o_parquet_20696.temp AS h2o_temp, + o2_parquet_20696.temp AS o2_temp, + o2_parquet_20696.reading +FROM h2o_parquet_20696 +INNER JOIN o2_parquet_20696 + ON h2o_parquet_20696.time = o2_parquet_20696.time + AND h2o_parquet_20696.state = o2_parquet_20696.state + AND h2o_parquet_20696.city = o2_parquet_20696.city +WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z' + AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z'; +---- +72.4 53.4 51 +70.4 50.4 50 + +# Cleanup +statement ok +DROP TABLE h2o_parquet_20696; + +statement ok +DROP TABLE o2_parquet_20696; + +# Cleanup settings +statement ok +set datafusion.execution.parquet.pushdown_filters = false;