From b42ef358537e97db29ba9832270a45fc6ca1c273 Mon Sep 17 00:00:00 2001 From: darmie Date: Tue, 17 Feb 2026 21:52:58 +0000 Subject: [PATCH 1/6] Skip RowFilter pushdown when filter columns are already projected When all predicate columns are in the output projection, late materialization provides no I/O benefit. Replace the expensive RowFilter path with a lightweight batch-level filter to avoid CachedArrayReader/ReadPlanBuilder/try_next_batch overhead. --- .gitignore | 3 + datafusion/datasource-parquet/src/opener.rs | 117 +++++++++++++++----- 2 files changed, 93 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index 8466a72adaec8..89151ce5cd404 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,6 @@ rat.txt # data generated by examples datafusion-examples/examples/datafusion-examples/ + +# Profiling artifacts (flamegraphs, comparison tables) +profiling-artifacts/ diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..4a8e56781e0ac 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -27,9 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -46,6 +46,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_plan::filter::batch_filter; use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, }; @@ -460,26 +461,57 @@ impl FileOpener for ParquetOpener { // --------------------------------------------------------------------- // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); + // + // When all predicate columns are already in the output projection, + // skip the RowFilter (late materialization) path and instead apply + // the predicate as a batch-level filter after decoding. This avoids + // the substantial CPU overhead of CachedArrayReader, ReadPlanBuilder, + // and try_next_batch when there are no non-projected columns to skip. + let batch_filter_predicate = if let Some(predicate) = + pushdown_filters.then_some(predicate).flatten() + { + let predicate_col_indices: HashSet = + collect_columns(&predicate) + .iter() + .map(|c| c.index()) + .collect(); + let projection_col_indices: HashSet = + projection.column_indices().into_iter().collect(); + + let skip_row_filter = !predicate_col_indices.is_empty() + && predicate_col_indices.is_subset(&projection_col_indices); + + if skip_row_filter { + debug!( + "Skipping RowFilter pushdown: all predicate columns {:?} \ + are in the output projection {:?}; will apply as batch filter", + predicate_col_indices, projection_col_indices, + ); + Some(predicate) + } else { + let row_filter = row_filter::build_row_filter( + &predicate, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + ); - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{predicate:?}': {e}" + ); + } + }; + None + } + } else { + None }; if force_filter_selections { builder = @@ -627,6 +659,12 @@ impl FileOpener for ParquetOpener { let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + // Also remap the batch filter predicate to the stream schema + let batch_filter_predicate = batch_filter_predicate + .map(|pred| reassign_expr_columns(pred, &stream_schema)) + .transpose()?; + let has_batch_filter = batch_filter_predicate.is_some(); + let projector = projection.make_projector(&stream_schema)?; let stream = stream.map_err(DataFusionError::from).map(move |b| { @@ -636,6 +674,10 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); + // Apply batch-level filter when RowFilter pushdown was skipped + if let Some(ref filter_pred) = batch_filter_predicate { + b = batch_filter(&b, filter_pred)?; + } b = projector.project_batch(&b)?; if replace_schema { // Ensure the output batch has the expected schema. @@ -664,13 +706,34 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- + // + // When batch-level filtering is active (RowFilter pushdown was + // skipped), also filter out empty batches that result from the + // predicate removing all rows in a decoded batch. if let Some(file_pruner) = file_pruner { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) + if has_batch_filter { + Ok(EarlyStoppingStream::new( + stream + .try_filter(|batch| { + std::future::ready(batch.num_rows() > 0) + }) + .boxed(), + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } + } else if has_batch_filter { + Ok(stream + .try_filter(|batch| std::future::ready(batch.num_rows() > 0)) + .boxed()) } else { Ok(stream.boxed()) } From ee954adc501f675859647efee257bd7927191df4 Mon Sep 17 00:00:00 2001 From: darmie Date: Tue, 17 Feb 2026 22:14:29 +0000 Subject: [PATCH 2/6] Add test for batch filter path and simplify stream wrapping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a dedicated test verifying that when all predicate columns are in the output projection, the opener skips RowFilter and applies a batch filter instead — and that both the batch filter and RowFilter paths produce correct results. Simplify the 4-way stream branching into two independent steps: first apply the empty-batch filter, then optionally wrap with EarlyStoppingStream. --- datafusion/datasource-parquet/src/opener.rs | 147 ++++++++++++++++---- 1 file changed, 120 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 4a8e56781e0ac..f201ad76e23a2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -706,36 +706,27 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- - // + // When batch-level filtering is active (RowFilter pushdown was - // skipped), also filter out empty batches that result from the - // predicate removing all rows in a decoded batch. - if let Some(file_pruner) = file_pruner { - if has_batch_filter { - Ok(EarlyStoppingStream::new( - stream - .try_filter(|batch| { - std::future::ready(batch.num_rows() > 0) - }) - .boxed(), - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } - } else if has_batch_filter { - Ok(stream + // skipped), filter out empty batches that result from the predicate + // removing all rows in a decoded batch. + let stream = if has_batch_filter { + stream .try_filter(|batch| std::future::ready(batch.num_rows() > 0)) - .boxed()) + .boxed() } else { - Ok(stream.boxed()) + stream.boxed() + }; + + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream) } })) } @@ -2067,4 +2058,106 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// When all predicate columns are already in the output projection and + /// pushdown_filters is enabled, the opener should skip the RowFilter + /// (late materialization) path and apply filtering as a post-decode + /// batch filter instead. This avoids the overhead of + /// CachedArrayReader / ReadPlanBuilder / try_next_batch when late + /// materialization provides no I/O benefit. + #[tokio::test] + async fn test_skip_row_filter_when_filter_cols_subset_of_projection() { + let store = Arc::new(InMemory::new()) as Arc; + + // 4 rows: a=[1,2,2,4], b=[10,20,30,40] + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]), + ("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)]) + ) + .unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + let schema = batch.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + // Case 1: filter_cols ⊆ projection_cols → batch filter path + // Filter: a = 2, Projection: [a, b] + // predicate_col_indices = {0}, projection_col_indices = {0, 1} → subset + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 2, "batch filter should return 2 matching rows"); + assert_eq!(num_batches, 1); + + // Case 2: filter_cols ⊄ projection_cols → RowFilter path + // Filter: b = 20, Projection: [a] (only column a) + // predicate_col_indices = {1}, projection_col_indices = {0} → NOT subset + let expr = col("b").eq(lit(20)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![2], + "RowFilter should return correct filtered values" + ); + + // Case 3: filter_cols ⊆ projection_cols, no matches → 0 rows + // Filter: a = 99, Projection: [a] + // predicate_col_indices = {0}, projection_col_indices = {0} → subset + let expr = col("a").eq(lit(99)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 0, "no rows should match"); + assert_eq!(num_batches, 0, "empty batches should be filtered out"); + + // Case 4: verify correct values in batch filter path + // Filter: a = 2, Projection: [a] + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![2, 2], + "batch filter should return correct values" + ); + } } From 77f631523fc4d4cec368419a56d89d2c8ec4aac5 Mon Sep 17 00:00:00 2001 From: darmie Date: Wed, 18 Feb 2026 00:11:36 +0000 Subject: [PATCH 3/6] Refine batch filter guard: count only static conjuncts Skip dynamic filter expressions (TopK, join pushdown) when deciding whether a predicate is single-conjunct. This preserves the batch filter optimization for queries like Q25 (WHERE col <> '' ORDER BY col LIMIT N) where TopK adds runtime conjuncts, while still routing multi-conjunct static predicates through RowFilter for incremental evaluation. --- datafusion/datasource-parquet/src/opener.rs | 95 ++++++++++++++++++--- 1 file changed, 83 insertions(+), 12 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f201ad76e23a2..f40ee3c6946de 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -27,7 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; +use datafusion_physical_expr::utils::{ + collect_columns, reassign_expr_columns, split_conjunction, +}; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -462,11 +464,20 @@ impl FileOpener for ParquetOpener { // Filter pushdown: evaluate predicates during scan // - // When all predicate columns are already in the output projection, - // skip the RowFilter (late materialization) path and instead apply - // the predicate as a batch-level filter after decoding. This avoids - // the substantial CPU overhead of CachedArrayReader, ReadPlanBuilder, - // and try_next_batch when there are no non-projected columns to skip. + // When the predicate's static conjuncts (excluding dynamic + // filters from TopK / joins) form a single expression whose + // columns are all in the output projection, the RowFilter + // (late materialization) path provides minimal benefit: the + // predicate columns must be decoded for the projection anyway, + // and with only one static conjunct there is no incremental + // evaluation advantage. Apply the predicate as a batch-level + // filter after decoding instead, avoiding the overhead of + // CachedArrayReader / ReadPlanBuilder / try_next_batch. + // + // Multi-conjunct static predicates are left on the RowFilter + // path because the RowFilter evaluates conjuncts incrementally + // — a selective first conjunct can avoid decoding expensive + // later columns for non-matching rows. let batch_filter_predicate = if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { @@ -478,8 +489,17 @@ impl FileOpener for ParquetOpener { let projection_col_indices: HashSet = projection.column_indices().into_iter().collect(); + // Count only static conjuncts — dynamic filters (e.g. + // from TopK or join pushdown) are runtime-generated and + // reference the same projected columns, so they don't + // benefit from RowFilter's incremental evaluation. + let static_conjunct_count = split_conjunction(&predicate) + .iter() + .filter(|c| !is_dynamic_physical_expr(c)) + .count(); let skip_row_filter = !predicate_col_indices.is_empty() - && predicate_col_indices.is_subset(&projection_col_indices); + && predicate_col_indices.is_subset(&projection_col_indices) + && static_conjunct_count <= 1; if skip_row_filter { debug!( @@ -1079,10 +1099,10 @@ mod test { stats::Precision, }; use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; - use datafusion_expr::{col, lit}; + use datafusion_expr::{Operator, col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal}, planner::logical2physical, projection::ProjectionExprs, }; @@ -2083,9 +2103,10 @@ mod test { u64::try_from(data_size).unwrap(), ); - // Case 1: filter_cols ⊆ projection_cols → batch filter path + // Case 1: single conjunct, filter_cols ⊂ projection_cols → batch filter // Filter: a = 2, Projection: [a, b] - // predicate_col_indices = {0}, projection_col_indices = {0, 1} → subset + // predicate_col_indices = {0} ⊆ projection_col_indices = {0, 1} + // Single conjunct + subset → skip RowFilter, use batch filter. let expr = col("a").eq(lit(2)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2152,12 +2173,62 @@ mod test { .with_pushdown_filters(true) .with_reorder_filters(true) .build(); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let values = collect_int32_values(stream).await; assert_eq!( values, vec![2, 2], "batch filter should return correct values" ); + + // Case 5: multi-conjunct predicate → RowFilter path (even when + // filter_cols == projection_cols). RowFilter evaluates conjuncts + // incrementally, so a selective first conjunct can avoid decoding + // expensive later columns for non-matching rows. + // Filter: a = 2 AND b = 20, Projection: [a, b] + // predicate_col_indices = {0, 1}, projection_col_indices = {0, 1} + // Exact match BUT multi-conjunct → keep RowFilter for incremental eval. + let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20))); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 1, "multi-conjunct RowFilter should return 1 row"); + assert_eq!(num_batches, 1); + + // Case 6: single static conjunct + dynamic filter → batch filter path + // Simulates TopK dynamic filter: `a = 2 AND (a < 3)` + // Only 1 static conjunct, dynamic conjuncts are ignored for the + // single-conjunct check → still uses batch filter path. + let static_expr = logical2physical(&col("a").eq(lit(2)), &schema); + let dynamic_expr = + make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema)); + let combined: Arc = Arc::new(BinaryExpr::new( + static_expr, + Operator::And, + dynamic_expr, + )); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(combined) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!( + num_rows, 2, + "single static conjunct + dynamic filter should use batch filter" + ); + assert_eq!(num_batches, 1); } } From ee47492fb95d01e113e29fd36c672594eb7d2898 Mon Sep 17 00:00:00 2001 From: darmie Date: Wed, 18 Feb 2026 09:10:40 +0000 Subject: [PATCH 4/6] Tighten batch filter guard to exact column match Change is_subset to strict equality for predicate vs projection column indices. When there are non-predicate projection columns (e.g. SELECT * WHERE col = X), RowFilter provides significant value by skipping their decode for non-matching rows. Only skip RowFilter when every projected column is a predicate column. Also exclude dynamic filter expressions (TopK, join pushdown) when counting conjuncts, so runtime-generated filters don't prevent the batch filter optimization for single static predicates. --- datafusion/datasource-parquet/src/opener.rs | 55 +++++++++++++++------ 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f40ee3c6946de..e5de632377e7d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -464,16 +464,21 @@ impl FileOpener for ParquetOpener { // Filter pushdown: evaluate predicates during scan // - // When the predicate's static conjuncts (excluding dynamic - // filters from TopK / joins) form a single expression whose - // columns are all in the output projection, the RowFilter - // (late materialization) path provides minimal benefit: the - // predicate columns must be decoded for the projection anyway, - // and with only one static conjunct there is no incremental + // When predicate columns exactly match the projection columns + // and there is at most one static conjunct (excluding dynamic + // filters from TopK / joins), the RowFilter (late + // materialization) path provides no benefit: every projected + // column is a predicate column, so there are no extra columns + // whose decode could be skipped for non-matching rows, and + // with a single static conjunct there is no incremental // evaluation advantage. Apply the predicate as a batch-level // filter after decoding instead, avoiding the overhead of // CachedArrayReader / ReadPlanBuilder / try_next_batch. // + // When there are non-predicate projection columns (e.g. + // SELECT * WHERE col = X), RowFilter is valuable because it + // skips decoding those extra columns for non-matching rows. + // // Multi-conjunct static predicates are left on the RowFilter // path because the RowFilter evaluates conjuncts incrementally // — a selective first conjunct can avoid decoding expensive @@ -498,7 +503,7 @@ impl FileOpener for ParquetOpener { .filter(|c| !is_dynamic_physical_expr(c)) .count(); let skip_row_filter = !predicate_col_indices.is_empty() - && predicate_col_indices.is_subset(&projection_col_indices) + && predicate_col_indices == projection_col_indices && static_conjunct_count <= 1; if skip_row_filter { @@ -2103,10 +2108,29 @@ mod test { u64::try_from(data_size).unwrap(), ); - // Case 1: single conjunct, filter_cols ⊂ projection_cols → batch filter + // Case 1: filter_cols == projection_cols → batch filter path + // Filter: a = 2, Projection: [a] + // predicate_col_indices = {0} == projection_col_indices = {0} + // Single conjunct + exact match → skip RowFilter, use batch filter. + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 2, "batch filter should return 2 matching rows"); + assert_eq!(num_batches, 1); + + // Case 1b: filter_cols ⊂ projection_cols → RowFilter path // Filter: a = 2, Projection: [a, b] - // predicate_col_indices = {0} ⊆ projection_col_indices = {0, 1} - // Single conjunct + subset → skip RowFilter, use batch filter. + // predicate_col_indices = {0} ≠ projection_col_indices = {0, 1} + // RowFilter skips decoding non-predicate column b for non-matching rows. let expr = col("a").eq(lit(2)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2119,7 +2143,7 @@ mod test { .build(); let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; - assert_eq!(num_rows, 2, "batch filter should return 2 matching rows"); + assert_eq!(num_rows, 2, "RowFilter should return 2 matching rows"); assert_eq!(num_batches, 1); // Case 2: filter_cols ⊄ projection_cols → RowFilter path @@ -2204,9 +2228,10 @@ mod test { assert_eq!(num_batches, 1); // Case 6: single static conjunct + dynamic filter → batch filter path - // Simulates TopK dynamic filter: `a = 2 AND (a < 3)` - // Only 1 static conjunct, dynamic conjuncts are ignored for the - // single-conjunct check → still uses batch filter path. + // Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N` + // Predicate: `a = 2 AND (a < 3)`, Projection: [a] + // predicate_col_indices = {0} == projection_col_indices = {0} + // Only 1 static conjunct (dynamic excluded) → batch filter path. let static_expr = logical2physical(&col("a").eq(lit(2)), &schema); let dynamic_expr = make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema)); @@ -2218,7 +2243,7 @@ mod test { let opener = ParquetOpenerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) - .with_projection_indices(&[0, 1]) + .with_projection_indices(&[0]) .with_predicate(combined) .with_pushdown_filters(true) .with_reorder_filters(true) From 21d16862de7e91bd656fd96efa253439eb31d489 Mon Sep 17 00:00:00 2001 From: darmie Date: Thu, 19 Feb 2026 17:45:10 +0000 Subject: [PATCH 5/6] Per-conjunct RowFilter demotion in build_row_filter() Instead of an all-or-nothing decision about RowFilter vs batch filter, evaluate each conjunct individually: keep conjuncts that save column decoding (extra projected columns not referenced by the conjunct) or can use a page index, demote the rest to batch-level filtering. This moves the decision logic from opener.rs into build_row_filter(), which now accepts projection column indices and returns demoted conjuncts alongside the RowFilter. --- .../benches/parquet_nested_filter_pushdown.rs | 14 +- datafusion/datasource-parquet/src/opener.rs | 132 ++++++------------ .../datasource-parquet/src/row_filter.rs | 104 ++++++++++---- 3 files changed, 123 insertions(+), 127 deletions(-) diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs index ed92031f86c6b..58c2ded558e77 100644 --- a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock}; @@ -115,9 +116,16 @@ fn scan_with_predicate( let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics); let builder = if pushdown { - if let Some(row_filter) = - build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)? - { + let all_cols: HashSet = (0..file_schema.fields().len()).collect(); + let (row_filter, _demoted) = build_row_filter( + predicate, + file_schema, + &metadata, + false, + &file_metrics, + &all_cols, + )?; + if let Some(row_filter) = row_filter { builder.with_row_filter(row_filter) } else { builder diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e5de632377e7d..818b0b7666ad1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -27,9 +27,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::{ - collect_columns, reassign_expr_columns, split_conjunction, -}; +use datafusion_physical_expr::utils::{conjunction_opt, reassign_expr_columns}; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::{HashMap, HashSet}; use std::pin::Pin; @@ -462,79 +460,35 @@ impl FileOpener for ParquetOpener { // `row_filter` for details. // --------------------------------------------------------------------- - // Filter pushdown: evaluate predicates during scan - // - // When predicate columns exactly match the projection columns - // and there is at most one static conjunct (excluding dynamic - // filters from TopK / joins), the RowFilter (late - // materialization) path provides no benefit: every projected - // column is a predicate column, so there are no extra columns - // whose decode could be skipped for non-matching rows, and - // with a single static conjunct there is no incremental - // evaluation advantage. Apply the predicate as a batch-level - // filter after decoding instead, avoiding the overhead of - // CachedArrayReader / ReadPlanBuilder / try_next_batch. + // Filter pushdown: evaluate predicates during scan. // - // When there are non-predicate projection columns (e.g. - // SELECT * WHERE col = X), RowFilter is valuable because it - // skips decoding those extra columns for non-matching rows. - // - // Multi-conjunct static predicates are left on the RowFilter - // path because the RowFilter evaluates conjuncts incrementally - // — a selective first conjunct can avoid decoding expensive - // later columns for non-matching rows. + // Each conjunct is evaluated individually inside + // `build_row_filter`: conjuncts whose required columns leave + // extra projected columns unread benefit from late + // materialization and stay in the RowFilter; conjuncts that + // reference all projected columns are demoted to batch-level + // filtering to avoid the overhead of the RowFilter machinery. let batch_filter_predicate = if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let predicate_col_indices: HashSet = - collect_columns(&predicate) - .iter() - .map(|c| c.index()) - .collect(); let projection_col_indices: HashSet = projection.column_indices().into_iter().collect(); - // Count only static conjuncts — dynamic filters (e.g. - // from TopK or join pushdown) are runtime-generated and - // reference the same projected columns, so they don't - // benefit from RowFilter's incremental evaluation. - let static_conjunct_count = split_conjunction(&predicate) - .iter() - .filter(|c| !is_dynamic_physical_expr(c)) - .count(); - let skip_row_filter = !predicate_col_indices.is_empty() - && predicate_col_indices == projection_col_indices - && static_conjunct_count <= 1; - - if skip_row_filter { - debug!( - "Skipping RowFilter pushdown: all predicate columns {:?} \ - are in the output projection {:?}; will apply as batch filter", - predicate_col_indices, projection_col_indices, - ); - Some(predicate) - } else { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); + let (row_filter, demoted) = row_filter::build_row_filter( + &predicate, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + &projection_col_indices, + )?; - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; - None + if let Some(filter) = row_filter { + builder = builder.with_row_filter(filter); } + + // Combine demoted conjuncts into a single batch filter + conjunction_opt(demoted) } else { None }; @@ -2084,12 +2038,10 @@ mod test { ); } - /// When all predicate columns are already in the output projection and - /// pushdown_filters is enabled, the opener should skip the RowFilter - /// (late materialization) path and apply filtering as a post-decode - /// batch filter instead. This avoids the overhead of - /// CachedArrayReader / ReadPlanBuilder / try_next_batch when late - /// materialization provides no I/O benefit. + /// Per-conjunct RowFilter demotion: when a conjunct's required columns + /// cover all projected columns, it provides no column-decode savings + /// and is demoted to batch-level filtering. Conjuncts with extra + /// projected columns stay in the RowFilter for late materialization. #[tokio::test] async fn test_skip_row_filter_when_filter_cols_subset_of_projection() { let store = Arc::new(InMemory::new()) as Arc; @@ -2110,8 +2062,8 @@ mod test { // Case 1: filter_cols == projection_cols → batch filter path // Filter: a = 2, Projection: [a] - // predicate_col_indices = {0} == projection_col_indices = {0} - // Single conjunct + exact match → skip RowFilter, use batch filter. + // Conjunct cols = {0}, projection = {0} → no extra cols to skip + // decoding → demoted to batch filter. let expr = col("a").eq(lit(2)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2129,8 +2081,8 @@ mod test { // Case 1b: filter_cols ⊂ projection_cols → RowFilter path // Filter: a = 2, Projection: [a, b] - // predicate_col_indices = {0} ≠ projection_col_indices = {0, 1} - // RowFilter skips decoding non-predicate column b for non-matching rows. + // Conjunct cols = {0}, projection = {0, 1} → extra col b → RowFilter + // skips decoding column b for non-matching rows. let expr = col("a").eq(lit(2)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2148,7 +2100,7 @@ mod test { // Case 2: filter_cols ⊄ projection_cols → RowFilter path // Filter: b = 20, Projection: [a] (only column a) - // predicate_col_indices = {1}, projection_col_indices = {0} → NOT subset + // Conjunct cols = {1}, projection = {0} → extra col a → RowFilter let expr = col("b").eq(lit(20)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2167,9 +2119,9 @@ mod test { "RowFilter should return correct filtered values" ); - // Case 3: filter_cols ⊆ projection_cols, no matches → 0 rows + // Case 3: no matches → 0 rows via batch filter // Filter: a = 99, Projection: [a] - // predicate_col_indices = {0}, projection_col_indices = {0} → subset + // Conjunct cols = {0}, projection = {0} → no extra cols → batch filter let expr = col("a").eq(lit(99)); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2205,13 +2157,10 @@ mod test { "batch filter should return correct values" ); - // Case 5: multi-conjunct predicate → RowFilter path (even when - // filter_cols == projection_cols). RowFilter evaluates conjuncts - // incrementally, so a selective first conjunct can avoid decoding - // expensive later columns for non-matching rows. + // Case 5: multi-conjunct predicate → RowFilter path // Filter: a = 2 AND b = 20, Projection: [a, b] - // predicate_col_indices = {0, 1}, projection_col_indices = {0, 1} - // Exact match BUT multi-conjunct → keep RowFilter for incremental eval. + // Per-conjunct: `a = 2` has extra col b, `b = 20` has extra col a + // → both kept in RowFilter for incremental evaluation. let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20))); let predicate = logical2physical(&expr, &schema); let opener = ParquetOpenerBuilder::new() @@ -2230,16 +2179,13 @@ mod test { // Case 6: single static conjunct + dynamic filter → batch filter path // Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N` // Predicate: `a = 2 AND (a < 3)`, Projection: [a] - // predicate_col_indices = {0} == projection_col_indices = {0} - // Only 1 static conjunct (dynamic excluded) → batch filter path. + // Per-conjunct: both conjuncts reference only col a = projection + // → no extra cols → all demoted to batch filter. let static_expr = logical2physical(&col("a").eq(lit(2)), &schema); let dynamic_expr = make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema)); - let combined: Arc = Arc::new(BinaryExpr::new( - static_expr, - Operator::And, - dynamic_expr, - )); + let combined: Arc = + Arc::new(BinaryExpr::new(static_expr, Operator::And, dynamic_expr)); let opener = ParquetOpenerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2924208c5bd99..389acad4a6d26 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -65,7 +65,7 @@ //! continue to be evaluated after the batches are materialized. use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; use arrow::array::BooleanArray; @@ -81,6 +81,7 @@ use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; @@ -567,24 +568,29 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, -) -> Result> { + projection_col_indices: &HashSet, +) -> Result<(Option, Vec>)> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; let time = &file_metrics.row_pushdown_eval_time; @@ -593,21 +599,46 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); - // Determine which conjuncts can be evaluated as ArrowPredicates, if any - let mut candidates: Vec = predicates - .into_iter() - .map(|expr| { - FilterCandidateBuilder::new(Arc::clone(expr), Arc::clone(file_schema)) - .build(metadata) - }) - .collect::, _>>()? - .into_iter() - .flatten() - .collect(); + // Evaluate each conjunct: keep those that benefit from late + // materialization (column-decode savings or page-index pruning), + // demote the rest for batch-level filtering. + let mut candidates: Vec = Vec::new(); + let mut demoted: Vec> = Vec::new(); + + for pred_expr in predicates { + let candidate = + FilterCandidateBuilder::new(Arc::clone(pred_expr), Arc::clone(file_schema)) + .build(metadata)?; + + match candidate { + Some(c) => { + // Check if RowFilter saves column decoding for this conjunct: + // if there are projected columns NOT referenced by this + // conjunct, RowFilter can skip decoding them for non-matching + // rows. + let conjunct_cols: HashSet = collect_columns(&c.expr) + .iter() + .map(|col| col.index()) + .collect(); + let has_extra_cols = projection_col_indices + .iter() + .any(|idx| !conjunct_cols.contains(idx)); + + if has_extra_cols || c.can_use_index { + candidates.push(c); + } else { + demoted.push(c.expr); + } + } + None => { + // Cannot be used as a RowFilter (e.g. unsupported nested + // column types); keep existing behaviour and drop silently. + } + } + } - // no candidates if candidates.is_empty() { - return Ok(None); + return Ok((None, demoted)); } if reorder_predicates { @@ -625,7 +656,7 @@ pub fn build_row_filter( // This ensures: rows_matched + rows_pruned = total rows processed let total_candidates = candidates.len(); - candidates + let filters = candidates .into_iter() .enumerate() .map(|(idx, candidate)| { @@ -650,8 +681,9 @@ pub fn build_row_filter( ) .map(|pred| Box::new(pred) as _) }) - .collect::, _>>() - .map(|filters| Some(RowFilter::new(filters))) + .collect::, _>>()?; + + Ok((Some(RowFilter::new(filters)), demoted)) } #[cfg(test)] @@ -937,10 +969,20 @@ mod test { let file_metrics = ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); - let row_filter = - build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) - .expect("building row filter") - .expect("row filter should exist"); + // Projection with an extra column index so every conjunct has at + // least one "extra" projected column and stays in the RowFilter. + // These tests exercise RowFilter mechanics, not per-conjunct demotion. + let projection: HashSet = (0..file_schema.fields().len() + 1).collect(); + let (row_filter, _demoted) = build_row_filter( + &expr, + &file_schema, + &metadata, + false, + &file_metrics, + &projection, + ) + .expect("building row filter"); + let row_filter = row_filter.expect("row filter should exist"); let reader = parquet_reader_builder .with_row_filter(row_filter) From 700abfb167f85423671f6755d27654341cbaae66 Mon Sep 17 00:00:00 2001 From: darmie Date: Thu, 19 Feb 2026 18:03:23 +0000 Subject: [PATCH 6/6] Add cfg guards for force_hash_collisions in hash_utils Gate hash_string_view_array_inner, hash_dictionary_inner, and the itertools import with #[cfg(not(feature = "force_hash_collisions"))] to fix dead-code and unused-import warnings under --all-features. --- datafusion/common/src/hash_utils.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 1489f688c3e78..d789e846ccb5a 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -23,6 +23,7 @@ use arrow::array::*; use arrow::datatypes::*; #[cfg(not(feature = "force_hash_collisions"))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; +#[cfg(not(feature = "force_hash_collisions"))] use itertools::Itertools; #[cfg(not(feature = "force_hash_collisions"))] @@ -278,6 +279,7 @@ fn hash_array( /// HAS_NULLS: do we have to check null in the inner loop /// HAS_BUFFERS: if true, array has external buffers; if false, all strings are inlined/ less then 12 bytes /// REHASH: if true, combining with existing hash, otherwise initializing +#[cfg(not(feature = "force_hash_collisions"))] #[inline(never)] fn hash_string_view_array_inner< T: ByteViewType, @@ -398,6 +400,7 @@ fn hash_generic_byte_view_array( /// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys /// - `HAS_NULL_VALUES`: Whether to check for null dictionary values /// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false) +#[cfg(not(feature = "force_hash_collisions"))] #[inline(never)] fn hash_dictionary_inner< K: ArrowDictionaryKeyType,