diff --git a/.gitignore b/.gitignore index 8466a72adaec8..89151ce5cd404 100644 --- a/.gitignore +++ b/.gitignore @@ -75,3 +75,6 @@ rat.txt # data generated by examples datafusion-examples/examples/datafusion-examples/ + +# Profiling artifacts (flamegraphs, comparison tables) +profiling-artifacts/ diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 1489f688c3e78..d789e846ccb5a 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -23,6 +23,7 @@ use arrow::array::*; use arrow::datatypes::*; #[cfg(not(feature = "force_hash_collisions"))] use arrow::{downcast_dictionary_array, downcast_primitive_array}; +#[cfg(not(feature = "force_hash_collisions"))] use itertools::Itertools; #[cfg(not(feature = "force_hash_collisions"))] @@ -278,6 +279,7 @@ fn hash_array( /// HAS_NULLS: do we have to check null in the inner loop /// HAS_BUFFERS: if true, array has external buffers; if false, all strings are inlined/ less then 12 bytes /// REHASH: if true, combining with existing hash, otherwise initializing +#[cfg(not(feature = "force_hash_collisions"))] #[inline(never)] fn hash_string_view_array_inner< T: ByteViewType, @@ -398,6 +400,7 @@ fn hash_generic_byte_view_array( /// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys /// - `HAS_NULL_VALUES`: Whether to check for null dictionary values /// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false) +#[cfg(not(feature = "force_hash_collisions"))] #[inline(never)] fn hash_dictionary_inner< K: ArrowDictionaryKeyType, diff --git a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs index ed92031f86c6b..58c2ded558e77 100644 --- a/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs +++ b/datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock}; @@ -115,9 +116,16 @@ fn scan_with_predicate( let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics); let builder = if pushdown { - if let Some(row_filter) = - build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)? - { + let all_cols: HashSet = (0..file_schema.fields().len()).collect(); + let (row_filter, _demoted) = build_row_filter( + predicate, + file_schema, + &metadata, + false, + &file_metrics, + &all_cols, + )?; + if let Some(row_filter) = row_filter { builder.with_row_filter(row_filter) } else { builder diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..818b0b7666ad1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -27,9 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{conjunction_opt, reassign_expr_columns}; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -46,6 +46,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_plan::filter::batch_filter; use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, }; @@ -459,27 +460,37 @@ impl FileOpener for ParquetOpener { // `row_filter` for details. // --------------------------------------------------------------------- - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( + // Filter pushdown: evaluate predicates during scan. + // + // Each conjunct is evaluated individually inside + // `build_row_filter`: conjuncts whose required columns leave + // extra projected columns unread benefit from late + // materialization and stay in the RowFilter; conjuncts that + // reference all projected columns are demoted to batch-level + // filtering to avoid the overhead of the RowFilter machinery. + let batch_filter_predicate = if let Some(predicate) = + pushdown_filters.then_some(predicate).flatten() + { + let projection_col_indices: HashSet = + projection.column_indices().into_iter().collect(); + + let (row_filter, demoted) = row_filter::build_row_filter( &predicate, &physical_file_schema, builder.metadata(), reorder_predicates, &file_metrics, - ); + &projection_col_indices, + )?; - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } - }; + if let Some(filter) = row_filter { + builder = builder.with_row_filter(filter); + } + + // Combine demoted conjuncts into a single batch filter + conjunction_opt(demoted) + } else { + None }; if force_filter_selections { builder = @@ -627,6 +638,12 @@ impl FileOpener for ParquetOpener { let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + // Also remap the batch filter predicate to the stream schema + let batch_filter_predicate = batch_filter_predicate + .map(|pred| reassign_expr_columns(pred, &stream_schema)) + .transpose()?; + let has_batch_filter = batch_filter_predicate.is_some(); + let projector = projection.make_projector(&stream_schema)?; let stream = stream.map_err(DataFusionError::from).map(move |b| { @@ -636,6 +653,10 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); + // Apply batch-level filter when RowFilter pushdown was skipped + if let Some(ref filter_pred) = batch_filter_predicate { + b = batch_filter(&b, filter_pred)?; + } b = projector.project_batch(&b)?; if replace_schema { // Ensure the output batch has the expected schema. @@ -664,6 +685,18 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------------------- // Step: wrap the stream so a dynamic filter can stop the file scan early // ---------------------------------------------------------------------- + + // When batch-level filtering is active (RowFilter pushdown was + // skipped), filter out empty batches that result from the predicate + // removing all rows in a decoded batch. + let stream = if has_batch_filter { + stream + .try_filter(|batch| std::future::ready(batch.num_rows() > 0)) + .boxed() + } else { + stream.boxed() + }; + if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream, @@ -672,7 +705,7 @@ impl FileOpener for ParquetOpener { ) .boxed()) } else { - Ok(stream.boxed()) + Ok(stream) } })) } @@ -1025,10 +1058,10 @@ mod test { stats::Precision, }; use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; - use datafusion_expr::{col, lit}; + use datafusion_expr::{Operator, col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal}, planner::logical2physical, projection::ProjectionExprs, }; @@ -2004,4 +2037,169 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// Per-conjunct RowFilter demotion: when a conjunct's required columns + /// cover all projected columns, it provides no column-decode savings + /// and is demoted to batch-level filtering. Conjuncts with extra + /// projected columns stay in the RowFilter for late materialization. + #[tokio::test] + async fn test_skip_row_filter_when_filter_cols_subset_of_projection() { + let store = Arc::new(InMemory::new()) as Arc; + + // 4 rows: a=[1,2,2,4], b=[10,20,30,40] + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]), + ("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)]) + ) + .unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + let schema = batch.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + // Case 1: filter_cols == projection_cols → batch filter path + // Filter: a = 2, Projection: [a] + // Conjunct cols = {0}, projection = {0} → no extra cols to skip + // decoding → demoted to batch filter. + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 2, "batch filter should return 2 matching rows"); + assert_eq!(num_batches, 1); + + // Case 1b: filter_cols ⊂ projection_cols → RowFilter path + // Filter: a = 2, Projection: [a, b] + // Conjunct cols = {0}, projection = {0, 1} → extra col b → RowFilter + // skips decoding column b for non-matching rows. + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 2, "RowFilter should return 2 matching rows"); + assert_eq!(num_batches, 1); + + // Case 2: filter_cols ⊄ projection_cols → RowFilter path + // Filter: b = 20, Projection: [a] (only column a) + // Conjunct cols = {1}, projection = {0} → extra col a → RowFilter + let expr = col("b").eq(lit(20)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![2], + "RowFilter should return correct filtered values" + ); + + // Case 3: no matches → 0 rows via batch filter + // Filter: a = 99, Projection: [a] + // Conjunct cols = {0}, projection = {0} → no extra cols → batch filter + let expr = col("a").eq(lit(99)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 0, "no rows should match"); + assert_eq!(num_batches, 0, "empty batches should be filtered out"); + + // Case 4: verify correct values in batch filter path + // Filter: a = 2, Projection: [a] + let expr = col("a").eq(lit(2)); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![2, 2], + "batch filter should return correct values" + ); + + // Case 5: multi-conjunct predicate → RowFilter path + // Filter: a = 2 AND b = 20, Projection: [a, b] + // Per-conjunct: `a = 2` has extra col b, `b = 20` has extra col a + // → both kept in RowFilter for incremental evaluation. + let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20))); + let predicate = logical2physical(&expr, &schema); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_rows, 1, "multi-conjunct RowFilter should return 1 row"); + assert_eq!(num_batches, 1); + + // Case 6: single static conjunct + dynamic filter → batch filter path + // Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N` + // Predicate: `a = 2 AND (a < 3)`, Projection: [a] + // Per-conjunct: both conjuncts reference only col a = projection + // → no extra cols → all demoted to batch filter. + let static_expr = logical2physical(&col("a").eq(lit(2)), &schema); + let dynamic_expr = + make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema)); + let combined: Arc = + Arc::new(BinaryExpr::new(static_expr, Operator::And, dynamic_expr)); + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_predicate(combined) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!( + num_rows, 2, + "single static conjunct + dynamic filter should use batch filter" + ); + assert_eq!(num_batches, 1); + } } diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2924208c5bd99..389acad4a6d26 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -65,7 +65,7 @@ //! continue to be evaluated after the batches are materialized. use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashSet}; use std::sync::Arc; use arrow::array::BooleanArray; @@ -81,6 +81,7 @@ use datafusion_common::Result; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{PhysicalExpr, split_conjunction}; @@ -567,24 +568,29 @@ fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result, file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, -) -> Result> { + projection_col_indices: &HashSet, +) -> Result<(Option, Vec>)> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; let time = &file_metrics.row_pushdown_eval_time; @@ -593,21 +599,46 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); - // Determine which conjuncts can be evaluated as ArrowPredicates, if any - let mut candidates: Vec = predicates - .into_iter() - .map(|expr| { - FilterCandidateBuilder::new(Arc::clone(expr), Arc::clone(file_schema)) - .build(metadata) - }) - .collect::, _>>()? - .into_iter() - .flatten() - .collect(); + // Evaluate each conjunct: keep those that benefit from late + // materialization (column-decode savings or page-index pruning), + // demote the rest for batch-level filtering. + let mut candidates: Vec = Vec::new(); + let mut demoted: Vec> = Vec::new(); + + for pred_expr in predicates { + let candidate = + FilterCandidateBuilder::new(Arc::clone(pred_expr), Arc::clone(file_schema)) + .build(metadata)?; + + match candidate { + Some(c) => { + // Check if RowFilter saves column decoding for this conjunct: + // if there are projected columns NOT referenced by this + // conjunct, RowFilter can skip decoding them for non-matching + // rows. + let conjunct_cols: HashSet = collect_columns(&c.expr) + .iter() + .map(|col| col.index()) + .collect(); + let has_extra_cols = projection_col_indices + .iter() + .any(|idx| !conjunct_cols.contains(idx)); + + if has_extra_cols || c.can_use_index { + candidates.push(c); + } else { + demoted.push(c.expr); + } + } + None => { + // Cannot be used as a RowFilter (e.g. unsupported nested + // column types); keep existing behaviour and drop silently. + } + } + } - // no candidates if candidates.is_empty() { - return Ok(None); + return Ok((None, demoted)); } if reorder_predicates { @@ -625,7 +656,7 @@ pub fn build_row_filter( // This ensures: rows_matched + rows_pruned = total rows processed let total_candidates = candidates.len(); - candidates + let filters = candidates .into_iter() .enumerate() .map(|(idx, candidate)| { @@ -650,8 +681,9 @@ pub fn build_row_filter( ) .map(|pred| Box::new(pred) as _) }) - .collect::, _>>() - .map(|filters| Some(RowFilter::new(filters))) + .collect::, _>>()?; + + Ok((Some(RowFilter::new(filters)), demoted)) } #[cfg(test)] @@ -937,10 +969,20 @@ mod test { let file_metrics = ParquetFileMetrics::new(0, &format!("{func_name}.parquet"), &metrics); - let row_filter = - build_row_filter(&expr, &file_schema, &metadata, false, &file_metrics) - .expect("building row filter") - .expect("row filter should exist"); + // Projection with an extra column index so every conjunct has at + // least one "extra" projected column and stays in the RowFilter. + // These tests exercise RowFilter mechanics, not per-conjunct demotion. + let projection: HashSet = (0..file_schema.fields().len() + 1).collect(); + let (row_filter, _demoted) = build_row_filter( + &expr, + &file_schema, + &metadata, + false, + &file_metrics, + &projection, + ) + .expect("building row filter"); + let row_filter = row_filter.expect("row filter should exist"); let reader = parquet_reader_builder .with_row_filter(row_filter)