diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..8ca51155173dd 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,10 +24,14 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::DataType; +use arrow::compute::filter_record_batch; +use arrow::datatypes::{DataType, Schema}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr::utils::{ + conjunction_opt, reassign_expr_columns, split_conjunction, +}; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::HashMap; use std::pin::Pin; @@ -35,8 +39,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow::datatypes::{SchemaRef, TimeUnit}; +use datafusion_common::cast::as_boolean_array; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, }; @@ -459,27 +465,55 @@ impl FileOpener for ParquetOpener { // `row_filter` for details. // --------------------------------------------------------------------- - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); + // When a dynamic filter is present, defer expensive string-column + // predicates to post-decode batch filtering to avoid blocking + // dynamic filter convergence via predicate cache overhead. + // + // Stats pruning still uses the full predicate (unaffected). + let mut leftover_filter: Option> = None; - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } + if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { + let conjuncts = split_conjunction(&predicate); + let has_dynamic = conjuncts.iter().any(|c| is_dynamic_physical_expr(c)); + + let (row_filter_pred, leftover_pred) = if has_dynamic { + let (deferred, rf_conjuncts): (Vec<_>, Vec<_>) = + conjuncts.into_iter().partition(|c| { + !is_dynamic_physical_expr(c) + && is_expensive_string_predicate(c, &physical_file_schema) + }); + ( + conjunction_opt(rf_conjuncts.into_iter().cloned()), + conjunction_opt(deferred.into_iter().cloned()), + ) + } else { + (Some(predicate), None) }; + + if let Some(row_filter_pred) = row_filter_pred { + let row_filter = row_filter::build_row_filter( + &row_filter_pred, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for \ + '{row_filter_pred:?}': {e}" + ); + } + }; + } + + leftover_filter = leftover_pred; }; if force_filter_selections { builder = @@ -627,6 +661,11 @@ impl FileOpener for ParquetOpener { let projection = projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + // Rebase leftover filter to stream schema (same as projection) + let leftover_filter = leftover_filter + .map(|f| reassign_expr_columns(f, &stream_schema)) + .transpose()?; + let projector = projection.make_projector(&stream_schema)?; let stream = stream.map_err(DataFusionError::from).map(move |b| { @@ -636,6 +675,15 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); + // Apply leftover predicates that were excluded from the + // RowFilter to avoid expensive string column decode in the + // predicate cache path. Applied before projection so the + // filter columns are still available. + if let Some(ref filter_expr) = leftover_filter { + let array = filter_expr.evaluate(&b)?.into_array(b.num_rows())?; + let selection = as_boolean_array(&array)?; + b = filter_record_batch(&b, selection)?; + } b = projector.project_batch(&b)?; if replace_schema { // Ensure the output batch has the expected schema. @@ -1012,6 +1060,51 @@ fn should_enable_page_index( .unwrap_or(false) } +/// Returns true if the expression is a `!=` comparison on a string/binary column. +/// +/// These are deferred from the RowFilter when a dynamic filter is present to +/// avoid predicate cache decode overhead +fn is_expensive_string_predicate(expr: &Arc, schema: &Schema) -> bool { + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::BinaryExpr; + + if let Some(binary) = expr.as_any().downcast_ref::() + && *binary.op() == Operator::NotEq + { + return references_string_or_binary_column(binary.left(), schema) + || references_string_or_binary_column(binary.right(), schema); + } + false +} + +/// Returns true if the expression tree contains any reference to a string +/// or binary column. Also handles columns nested inside casts or other wrappers. +fn references_string_or_binary_column( + expr: &Arc, + schema: &Schema, +) -> bool { + let mut found = false; + let _ = expr.apply(|e| { + if let Some(col) = e.as_any().downcast_ref::() + && let Ok(field) = schema.field_with_name(col.name()) + && matches!( + field.data_type(), + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + | DataType::BinaryView + ) + { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }); + found +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -2004,4 +2097,50 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// When a dynamic filter is present alongside a `!=` predicate on a + /// string column, the string predicate should be deferred from the + /// RowFilter to post-decode batch filtering. This test verifies: + /// 1. The deferred predicate is still applied (correct result count). + /// 2. With pushdown enabled, the dynamic filter goes into the RowFilter + /// while the expensive string predicate is evaluated post-decode. + #[tokio::test] + async fn test_dynamic_filter_defers_string_predicate() { + let store = Arc::new(InMemory::new()) as Arc; + + // 3 rows: two non-empty strings and one empty string + let batch = record_batch!( + ("id", Int32, vec![Some(1), Some(2), Some(3)]), + ("s", Utf8, vec![Some("hello"), Some(""), Some("world")]) + ) + .unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let schema = batch.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + // Predicate: s <> '' (should match 2 of 3 rows) + // Wrap in a dynamic filter so the split logic triggers. + let neq_expr = col("s").not_eq(lit("")); + let predicate = make_dynamic_expr(logical2physical(&neq_expr, &schema)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0, 1]) + .with_predicate(predicate) + .with_pushdown_filters(true) + .with_reorder_filters(true) + .build(); + + let stream = opener.open(file).unwrap().await.unwrap(); + let (_, num_rows) = count_batches_and_rows(stream).await; + + // The deferred string predicate must still filter out the empty row. + assert_eq!(num_rows, 2, "s <> '' should return 2 of 3 rows"); + } }