From 764d0ab724aeb01dd71bd1a76c935a312d5ed707 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Wed, 4 Mar 2026 10:46:11 -0500 Subject: [PATCH 1/2] correct parquet leaf index mapping when schema contains struct cols --- .../datasource-parquet/src/row_filter.rs | 125 +++++++++++++----- 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2924208c5bd99..0a00aea15aada 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -242,10 +242,10 @@ impl FilterCandidateBuilder { let root_indices: Vec<_> = required_columns.required_columns.into_iter().collect(); + let leaf_indices = leaf_indices_for_roots( &root_indices, metadata.file_metadata().schema_descr(), - required_columns.nested, ); let projected_schema = Arc::new(self.file_schema.project(&root_indices)?); @@ -277,8 +277,6 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: Vec, - /// Tracks the nested column behavior found during traversal. - nested_behavior: NestedColumnSupport, /// Whether nested list columns are supported by the predicate semantics. allow_list_columns: bool, /// The Arrow schema of the parquet file. @@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> { non_primitive_columns: false, projected_columns: false, required_columns: Vec::new(), - nested_behavior: NestedColumnSupport::PrimitiveOnly, allow_list_columns, file_schema, } @@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> { /// `None` if the type is supported and pushdown can continue. fn handle_nested_type(&mut self, data_type: &DataType) -> Option { if self.is_nested_type_supported(data_type) { - // Update to ListsSupported if we haven't encountered unsupported types yet - if self.nested_behavior == NestedColumnSupport::PrimitiveOnly { - self.nested_behavior = NestedColumnSupport::ListsSupported; - } None } else { // Block pushdown for unsupported nested types: // - Structs (regardless of predicate support) // - Lists without supported predicates - self.nested_behavior = NestedColumnSupport::Unsupported; self.non_primitive_columns = true; Some(TreeNodeRecursion::Jump) } @@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> { self.required_columns.dedup(); PushdownColumns { required_columns: self.required_columns, - nested: self.nested_behavior, } } } @@ -391,21 +382,6 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { /// /// This enum makes explicit the different states a predicate can be in /// with respect to nested column handling during Parquet decoding. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum NestedColumnSupport { - /// Expression references only primitive (non-nested) columns. - /// These can always be pushed down to the Parquet decoder. - PrimitiveOnly, - /// Expression references list columns with supported predicates - /// (e.g., array_has, array_has_all, IS NULL). - /// These can be pushed down to the Parquet decoder. - ListsSupported, - /// Expression references unsupported nested types (e.g., structs) - /// or list columns without supported predicates. - /// These cannot be pushed down and must be evaluated after decoding. - Unsupported, -} - /// Result of checking which columns are required for filter pushdown. #[derive(Debug)] struct PushdownColumns { @@ -413,7 +389,6 @@ struct PushdownColumns { /// the filter expression. Must be in ascending order for correct schema /// projection matching. required_columns: Vec, - nested: NestedColumnSupport, } /// Checks if a given expression can be pushed down to the parquet decoder. @@ -437,15 +412,12 @@ fn pushdown_columns( fn leaf_indices_for_roots( root_indices: &[usize], schema_descr: &SchemaDescriptor, - nested: NestedColumnSupport, ) -> Vec { - // For primitive-only columns, root indices ARE the leaf indices - if nested == NestedColumnSupport::PrimitiveOnly { - return root_indices.to_vec(); - } - - // For List columns, expand to the single leaf column (item field) - // For Struct columns (unsupported), this would expand to multiple leaves + // Always map root (Arrow) indices to Parquet leaf indices via the schema + // descriptor. Arrow root indices only equal Parquet leaf indices when the + // schema has no group columns (Struct, Map, etc.); when group columns + // exist, their children become separate leaves and shift all subsequent + // leaf indices. let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); (0..schema_descr.num_columns()) @@ -1088,6 +1060,91 @@ mod test { .expect("parsing schema") } + /// Regression test: when a schema has Struct columns, Arrow field indices diverge + /// from Parquet leaf indices (Struct children become separate leaves). The + /// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are equal, + /// so a filter on a primitive column *after* a Struct gets the wrong leaf index. + /// + /// Schema: + /// Arrow indices: col_a=0 struct_col=1 col_b=2 + /// Parquet leaves: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3 + /// + /// A filter on col_b should project Parquet leaf 3, but the bug causes it to + /// project leaf 2 (struct_col.y). + #[test] + fn test_filter_pushdown_leaf_index_with_struct_in_schema() { + use arrow::array::{Int32Array, StringArray, StructArray}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("col_a", DataType::Int32, false), + Field::new( + "struct_col", + DataType::Struct( + vec![ + Arc::new(Field::new("x", DataType::Int32, true)), + Arc::new(Field::new("y", DataType::Int32, true)), + ] + .into(), + ), + true, + ), + Field::new("col_b", DataType::Utf8, false), + ])); + + let col_a = Arc::new(Int32Array::from(vec![1, 2, 3])); + let struct_col = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("x", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![10, 20, 30])) as _, + ), + ( + Arc::new(Field::new("y", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![100, 200, 300])) as _, + ), + ])); + let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"])); + + let batch = + RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, col_b]) + .unwrap(); + + let file = NamedTempFile::new().expect("temp file"); + let mut writer = + ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None) + .expect("writer"); + writer.write(&batch).expect("write batch"); + writer.close().expect("close writer"); + + let reader_file = file.reopen().expect("reopen file"); + let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file) + .expect("reader builder"); + let metadata = builder.metadata().clone(); + let file_schema = builder.schema().clone(); + + // sanity check: 4 Parquet leaves, 3 Arrow fields + assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4); + assert_eq!(file_schema.fields().len(), 3); + + // build a filter candidate for `col_b = 'target'` through the public API + let expr = col("col_b").eq(Expr::Literal( + ScalarValue::Utf8(Some("target".to_string())), + None, + )); + let expr = logical2physical(&expr, &file_schema); + + let candidate = FilterCandidateBuilder::new(expr, file_schema) + .build(&metadata) + .expect("building candidate") + .expect("filter on primitive col_b should be pushable"); + + // col_b is Parquet leaf 3 (shifted by struct_col's two children). + assert_eq!( + candidate.projection.leaf_indices, + vec![3], + "leaf_indices should be [3] for col_b" + ); + } + /// Sanity check that the given expression could be evaluated against the given schema without any errors. /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc. fn check_expression_can_evaluate_against_schema( From 29f68913683a507029071fb0aee345d791b4d1a2 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Wed, 4 Mar 2026 13:32:04 -0500 Subject: [PATCH 2/2] feedback --- .../datasource-parquet/src/row_filter.rs | 1 + .../test_files/parquet_filter_pushdown.slt | 49 ++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 0a00aea15aada..62ba53bb871ef 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -418,6 +418,7 @@ fn leaf_indices_for_roots( // schema has no group columns (Struct, Map, etc.); when group columns // exist, their children become separate leaves and shift all subsequent // leaf indices. + // Struct columns are unsupported. let root_set: BTreeSet<_> = root_indices.iter().copied().collect(); (0..schema_descr.num_columns()) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index ef82bd1391759..6c4383f997f81 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z' 72.4 53.4 51 70.4 50.4 50 - statement ok set datafusion.execution.parquet.pushdown_filters = true; @@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696; # Cleanup settings statement ok set datafusion.execution.parquet.pushdown_filters = false; + +########## +# Regression test: filter pushdown with Struct columns in schema +# +# When a schema has Struct columns, Arrow field indices diverge from Parquet +# leaf indices (Struct children become separate leaves). A filter on a +# primitive column *after* a Struct must use the correct Parquet leaf index. +# +# Schema: +# Arrow: col_a=0 struct_col=1 col_b=2 +# Parquet: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3 +########## + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +COPY ( + SELECT + column1 as col_a, + column2 as struct_col, + column3 as col_b + FROM VALUES + (1, {x: 10, y: 100}, 'aaa'), + (2, {x: 20, y: 200}, 'target'), + (3, {x: 30, y: 300}, 'zzz') +) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet' +STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE t_struct_filter +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'; + +# Filter on col_b (the primitive column after the struct). +# Before the fix, this returned 0 rows because the filter read struct_col.y +# (Parquet leaf 2) instead of col_b (Parquet leaf 3). +query IT +SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target'; +---- +2 target + +# Clean up +statement ok +set datafusion.execution.parquet.pushdown_filters = false; + +statement ok +DROP TABLE t_struct_filter;