Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 160 additions & 21 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ use crate::{
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Schema};
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::utils::{
conjunction_opt, reassign_expr_columns, split_conjunction,
};
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::{SchemaRef, TimeUnit};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err,
};
Expand Down Expand Up @@ -459,27 +465,55 @@ impl FileOpener for ParquetOpener {
// `row_filter` for details.
// ---------------------------------------------------------------------

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);
// When a dynamic filter is present, defer expensive string-column
// predicates to post-decode batch filtering to avoid blocking
// dynamic filter convergence via predicate cache overhead.
//
// Stats pruning still uses the full predicate (unaffected).
let mut leftover_filter: Option<Arc<dyn PhysicalExpr>> = None;

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let conjuncts = split_conjunction(&predicate);
let has_dynamic = conjuncts.iter().any(|c| is_dynamic_physical_expr(c));

let (row_filter_pred, leftover_pred) = if has_dynamic {
let (deferred, rf_conjuncts): (Vec<_>, Vec<_>) =
conjuncts.into_iter().partition(|c| {
!is_dynamic_physical_expr(c)
&& is_expensive_string_predicate(c, &physical_file_schema)
});
(
conjunction_opt(rf_conjuncts.into_iter().cloned()),
conjunction_opt(deferred.into_iter().cloned()),
)
} else {
(Some(predicate), None)
};

if let Some(row_filter_pred) = row_filter_pred {
let row_filter = row_filter::build_row_filter(
&row_filter_pred,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for \
'{row_filter_pred:?}': {e}"
);
}
};
}

leftover_filter = leftover_pred;
};
if force_filter_selections {
builder =
Expand Down Expand Up @@ -627,6 +661,11 @@ impl FileOpener for ParquetOpener {
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;

// Rebase leftover filter to stream schema (same as projection)
let leftover_filter = leftover_filter
.map(|f| reassign_expr_columns(f, &stream_schema))
.transpose()?;

let projector = projection.make_projector(&stream_schema)?;

let stream = stream.map_err(DataFusionError::from).map(move |b| {
Expand All @@ -636,6 +675,15 @@ impl FileOpener for ParquetOpener {
&predicate_cache_inner_records,
&predicate_cache_records,
);
// Apply leftover predicates that were excluded from the
// RowFilter to avoid expensive string column decode in the
// predicate cache path. Applied before projection so the
// filter columns are still available.
if let Some(ref filter_expr) = leftover_filter {
let array = filter_expr.evaluate(&b)?.into_array(b.num_rows())?;
let selection = as_boolean_array(&array)?;
b = filter_record_batch(&b, selection)?;
}
b = projector.project_batch(&b)?;
if replace_schema {
// Ensure the output batch has the expected schema.
Expand Down Expand Up @@ -1012,6 +1060,51 @@ fn should_enable_page_index(
.unwrap_or(false)
}

/// Returns true if the expression is a `!=` comparison on a string/binary column.
///
/// These are deferred from the RowFilter when a dynamic filter is present to
/// avoid predicate cache decode overhead
fn is_expensive_string_predicate(expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;

if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>()
&& *binary.op() == Operator::NotEq
{
return references_string_or_binary_column(binary.left(), schema)
|| references_string_or_binary_column(binary.right(), schema);
}
false
}

/// Returns true if the expression tree contains any reference to a string
/// or binary column. Also handles columns nested inside casts or other wrappers.
fn references_string_or_binary_column(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
) -> bool {
let mut found = false;
let _ = expr.apply(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>()
&& let Ok(field) = schema.field_with_name(col.name())
&& matches!(
field.data_type(),
DataType::Utf8
| DataType::LargeUtf8
| DataType::Utf8View
| DataType::Binary
| DataType::LargeBinary
| DataType::BinaryView
)
{
found = true;
return Ok(TreeNodeRecursion::Stop);
}
Ok(TreeNodeRecursion::Continue)
});
found
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down Expand Up @@ -2004,4 +2097,50 @@ mod test {
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
);
}

/// When a dynamic filter is present alongside a `!=` predicate on a
/// string column, the string predicate should be deferred from the
/// RowFilter to post-decode batch filtering. This test verifies:
/// 1. The deferred predicate is still applied (correct result count).
/// 2. With pushdown enabled, the dynamic filter goes into the RowFilter
/// while the expensive string predicate is evaluated post-decode.
#[tokio::test]
async fn test_dynamic_filter_defers_string_predicate() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// 3 rows: two non-empty strings and one empty string
let batch = record_batch!(
("id", Int32, vec![Some(1), Some(2), Some(3)]),
("s", Utf8, vec![Some("hello"), Some(""), Some("world")])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;

let schema = batch.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);

// Predicate: s <> '' (should match 2 of 3 rows)
// Wrap in a dynamic filter so the split logic triggers.
let neq_expr = col("s").not_eq(lit(""));
let predicate = make_dynamic_expr(logical2physical(&neq_expr, &schema));

let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();

let stream = opener.open(file).unwrap().await.unwrap();
let (_, num_rows) = count_batches_and_rows(stream).await;

// The deferred string predicate must still filter out the empty row.
assert_eq!(num_rows, 2, "s <> '' should return 2 of 3 rows");
}
}