Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, StructArray};
use arrow::compute::cast;
use arrow::datatypes::{Field, FieldRef, Fields};
use arrow_schema::DataType;
use datafusion_common::Result;
Expand All @@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
.collect()
}

/// Casts dictionary-encoded arrays to their underlying value type, preserving row count.
/// Non-dictionary arrays are returned as-is.
fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
DataType::Dictionary(_, value_type) => {
let casted = cast(array, value_type)?;
// Recursively flatten in case of nested dictionaries
flatten_dictionary_array(&casted)
}
_ => Ok(Arc::clone(array)),
}
}

/// Builds InList values from join key column arrays.
///
/// If `join_key_arrays` is:
Expand All @@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
pub(super) fn build_struct_inlist_values(
join_key_arrays: &[ArrayRef],
) -> Result<Option<ArrayRef>> {
// Flatten any dictionary-encoded arrays
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased that the "fix" is to delete code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mark of a great engineer 😉

let flattened_arrays: Vec<ArrayRef> = join_key_arrays
.iter()
.map(flatten_dictionary_array)
.collect::<Result<Vec<_>>>()?;

// Build the source array/struct
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
let source_array: ArrayRef = if join_key_arrays.len() == 1 {
// Single column: use directly
Arc::clone(&flattened_arrays[0])
Arc::clone(&join_key_arrays[0])
} else {
// Multi-column: build StructArray once from all columns
let fields = build_struct_fields(
&flattened_arrays
&join_key_arrays
.iter()
.map(|arr| arr.data_type().clone())
.collect::<Vec<_>>(),
Expand All @@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values(
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
.iter()
.cloned()
.zip(flattened_arrays.iter().cloned())
.zip(join_key_arrays.iter().cloned())
.collect();

Arc::new(StructArray::from(arrays_with_fields))
Expand Down Expand Up @@ -152,7 +132,14 @@ mod tests {
assert_eq!(
*result.data_type(),
DataType::Struct(
build_struct_fields(&[DataType::Utf8, DataType::Int32]).unwrap()
build_struct_fields(&[
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Utf8)
),
DataType::Int32
])
.unwrap()
)
);
}
Expand All @@ -168,6 +155,6 @@ mod tests {
.unwrap();

assert_eq!(result.len(), 3);
assert_eq!(*result.data_type(), DataType::Utf8);
assert_eq!(result.data_type(), dict_array.data_type());
}
}
105 changes: 105 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,108 @@ DROP TABLE t_union_mem;

statement ok
DROP TABLE t_union_parquet;

# Cleanup settings
statement ok
set datafusion.optimizer.max_passes = 3;

statement ok
set datafusion.execution.parquet.pushdown_filters = false;


# Regression test for https://github.com/apache/datafusion/issues/20696
# Multi-column INNER JOIN with dictionary fails
# when parquet pushdown filters are enabled.


statement ok
COPY (
SELECT
to_timestamp_nanos(time_ns) AS time,
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
temp
FROM (
VALUES
(200, 'CA', 'LA', 90.0),
(250, 'MA', 'Boston', 72.4),
(100, 'MA', 'Boston', 70.4),
(350, 'CA', 'LA', 90.0)
) AS t(time_ns, state, city, temp)
)
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet';

statement ok
COPY (
SELECT
to_timestamp_nanos(time_ns) AS time,
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
temp,
reading
FROM (
VALUES
(250, 'MA', 'Boston', 53.4, 51.0),
(100, 'MA', 'Boston', 50.4, 50.0)
) AS t(time_ns, state, city, temp, reading)
)
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet';

statement ok
CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/';

statement ok
CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/';

# Query should work both with and without filters
statement ok
set datafusion.execution.parquet.pushdown_filters = false;

query RRR
SELECT
h2o_parquet_20696.temp AS h2o_temp,
o2_parquet_20696.temp AS o2_temp,
o2_parquet_20696.reading
FROM h2o_parquet_20696
INNER JOIN o2_parquet_20696
ON h2o_parquet_20696.time = o2_parquet_20696.time
AND h2o_parquet_20696.state = o2_parquet_20696.state
AND h2o_parquet_20696.city = o2_parquet_20696.city
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
----
72.4 53.4 51
70.4 50.4 50


statement ok
set datafusion.execution.parquet.pushdown_filters = true;

query RRR
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test fails with

Parquet error: External: Compute error: Error evaluating filter predicate: ArrowError(InvalidArgumentError("Can't compare arrays of different types"), Some(""))

Prior to this fix

SELECT
h2o_parquet_20696.temp AS h2o_temp,
o2_parquet_20696.temp AS o2_temp,
o2_parquet_20696.reading
FROM h2o_parquet_20696
INNER JOIN o2_parquet_20696
ON h2o_parquet_20696.time = o2_parquet_20696.time
AND h2o_parquet_20696.state = o2_parquet_20696.state
AND h2o_parquet_20696.city = o2_parquet_20696.city
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
----
72.4 53.4 51
70.4 50.4 50

# Cleanup
statement ok
DROP TABLE h2o_parquet_20696;

statement ok
DROP TABLE o2_parquet_20696;

# Cleanup settings
statement ok
set datafusion.execution.parquet.pushdown_filters = false;
Loading