Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ rat.txt

# data generated by examples
datafusion-examples/examples/datafusion-examples/

# Profiling artifacts (flamegraphs, comparison tables)
profiling-artifacts/
300 changes: 276 additions & 24 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::utils::{
collect_columns, reassign_expr_columns, split_conjunction,
};
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -46,6 +48,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::filter::batch_filter;
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
Expand Down Expand Up @@ -460,26 +463,80 @@ impl FileOpener for ParquetOpener {
// ---------------------------------------------------------------------

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);
//
// When predicate columns exactly match the projection columns
// and there is at most one static conjunct (excluding dynamic
// filters from TopK / joins), the RowFilter (late
// materialization) path provides no benefit: every projected
// column is a predicate column, so there are no extra columns
// whose decode could be skipped for non-matching rows, and
// with a single static conjunct there is no incremental
// evaluation advantage. Apply the predicate as a batch-level
// filter after decoding instead, avoiding the overhead of
// CachedArrayReader / ReadPlanBuilder / try_next_batch.
//
// When there are non-predicate projection columns (e.g.
// SELECT * WHERE col = X), RowFilter is valuable because it
// skips decoding those extra columns for non-matching rows.
//
// Multi-conjunct static predicates are left on the RowFilter
// path because the RowFilter evaluates conjuncts incrementally
// — a selective first conjunct can avoid decoding expensive
// later columns for non-matching rows.
let batch_filter_predicate = if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten()
{
let predicate_col_indices: HashSet<usize> =
collect_columns(&predicate)
.iter()
.map(|c| c.index())
.collect();
let projection_col_indices: HashSet<usize> =
projection.column_indices().into_iter().collect();

// Count only static conjuncts — dynamic filters (e.g.
// from TopK or join pushdown) are runtime-generated and
// reference the same projected columns, so they don't
// benefit from RowFilter's incremental evaluation.
let static_conjunct_count = split_conjunction(&predicate)
.iter()
.filter(|c| !is_dynamic_physical_expr(c))
.count();
let skip_row_filter = !predicate_col_indices.is_empty()
&& predicate_col_indices == projection_col_indices
&& static_conjunct_count <= 1;

if skip_row_filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct - we shouldn't skip all the row filters altogether, we should only skip a individual row filter once it can not filter out more columns.

E.g.

select a, b from t where a = 1 and b=2

We still benefit from one of them (.e.g a=1), but not from both (b=2)

This change seems to remove them both?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I believe that example is handled correctly.

For select a, b from t where a = 1 and b = 2:

  • predicate_col_indices = {a, b}, projection_col_indices = {a, b} exact match
  • static_conjunct_count = 2 , the <= 1 check fails, RowFilter path is used

The batch filter path only triggers when these conditions hold:

  1. predicate columns exactly equal projection columns (strict ==, not subset)
  2. at most one static conjunct
  3. predicate columns are non-empty

I agree that the ideal approach is per-conjunct: evaluate each filter individually and only promote to RowFilter the ones that provide column-skip savings. That's closer to what adriangb is exploring in #20363 with adaptive selectivity tracking. This PR is a narrower fix for the simplest degenerate case.

Copy link
Contributor

@Dandandan Dandandan Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the ideal approach is per-conjunct: evaluate each filter individually and only promote to RowFilter the ones that provide column-skip savings.

But this shouldn't require the adaptive selectivity tracking per se, right? Instead of either removing all predicates we can do the calculation on individual RowFilter level as how they are created so we can filter out the rowfilters not being able to save on decoding columns that follow.

Looking from my local benchmarks, the result still seems a bit mixed, I think likely because some RowFilters do still benefit from page filtering even if they are not benefiting from skipping IO on following columns. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
can_use_index in FilterCandidate already tracks page index eligibility. I can move this logic into build_row_filter() to work per-conjunct: keep candidates that save on column decoding or can use page index, demote the rest to batch filter. Will update.

debug!(
"Skipping RowFilter pushdown: all predicate columns {:?} \
are in the output projection {:?}; will apply as batch filter",
predicate_col_indices, projection_col_indices,
);
Some(predicate)
} else {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
};
match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
};
None
}
} else {
None
};
if force_filter_selections {
builder =
Expand Down Expand Up @@ -627,6 +684,12 @@ impl FileOpener for ParquetOpener {
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;

// Also remap the batch filter predicate to the stream schema
let batch_filter_predicate = batch_filter_predicate
.map(|pred| reassign_expr_columns(pred, &stream_schema))
.transpose()?;
let has_batch_filter = batch_filter_predicate.is_some();

let projector = projection.make_projector(&stream_schema)?;

let stream = stream.map_err(DataFusionError::from).map(move |b| {
Expand All @@ -636,6 +699,10 @@ impl FileOpener for ParquetOpener {
&predicate_cache_inner_records,
&predicate_cache_records,
);
// Apply batch-level filter when RowFilter pushdown was skipped
if let Some(ref filter_pred) = batch_filter_predicate {
b = batch_filter(&b, filter_pred)?;
}
b = projector.project_batch(&b)?;
if replace_schema {
// Ensure the output batch has the expected schema.
Expand Down Expand Up @@ -664,6 +731,18 @@ impl FileOpener for ParquetOpener {
// ----------------------------------------------------------------------
// Step: wrap the stream so a dynamic filter can stop the file scan early
// ----------------------------------------------------------------------

// When batch-level filtering is active (RowFilter pushdown was
// skipped), filter out empty batches that result from the predicate
// removing all rows in a decoded batch.
let stream = if has_batch_filter {
stream
.try_filter(|batch| std::future::ready(batch.num_rows() > 0))
.boxed()
} else {
stream.boxed()
};

if let Some(file_pruner) = file_pruner {
Ok(EarlyStoppingStream::new(
stream,
Expand All @@ -672,7 +751,7 @@ impl FileOpener for ParquetOpener {
)
.boxed())
} else {
Ok(stream.boxed())
Ok(stream)
}
}))
}
Expand Down Expand Up @@ -1025,10 +1104,10 @@ mod test {
stats::Precision,
};
use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener};
use datafusion_expr::{col, lit};
use datafusion_expr::{Operator, col, lit};
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal},
planner::logical2physical,
projection::ProjectionExprs,
};
Expand Down Expand Up @@ -2004,4 +2083,177 @@ mod test {
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
);
}

/// When all predicate columns are already in the output projection and
/// pushdown_filters is enabled, the opener should skip the RowFilter
/// (late materialization) path and apply filtering as a post-decode
/// batch filter instead. This avoids the overhead of
/// CachedArrayReader / ReadPlanBuilder / try_next_batch when late
/// materialization provides no I/O benefit.
#[tokio::test]
async fn test_skip_row_filter_when_filter_cols_subset_of_projection() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// 4 rows: a=[1,2,2,4], b=[10,20,30,40]
let batch = record_batch!(
("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]),
("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
let schema = batch.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);

// Case 1: filter_cols == projection_cols → batch filter path
// Filter: a = 2, Projection: [a]
// predicate_col_indices = {0} == projection_col_indices = {0}
// Single conjunct + exact match → skip RowFilter, use batch filter.
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 2, "batch filter should return 2 matching rows");
assert_eq!(num_batches, 1);

// Case 1b: filter_cols ⊂ projection_cols → RowFilter path
// Filter: a = 2, Projection: [a, b]
// predicate_col_indices = {0} ≠ projection_col_indices = {0, 1}
// RowFilter skips decoding non-predicate column b for non-matching rows.
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 2, "RowFilter should return 2 matching rows");
assert_eq!(num_batches, 1);

// Case 2: filter_cols ⊄ projection_cols → RowFilter path
// Filter: b = 20, Projection: [a] (only column a)
// predicate_col_indices = {1}, projection_col_indices = {0} → NOT subset
let expr = col("b").eq(lit(20));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let values = collect_int32_values(stream).await;
assert_eq!(
values,
vec![2],
"RowFilter should return correct filtered values"
);

// Case 3: filter_cols ⊆ projection_cols, no matches → 0 rows
// Filter: a = 99, Projection: [a]
// predicate_col_indices = {0}, projection_col_indices = {0} → subset
let expr = col("a").eq(lit(99));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 0, "no rows should match");
assert_eq!(num_batches, 0, "empty batches should be filtered out");

// Case 4: verify correct values in batch filter path
// Filter: a = 2, Projection: [a]
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let values = collect_int32_values(stream).await;
assert_eq!(
values,
vec![2, 2],
"batch filter should return correct values"
);

// Case 5: multi-conjunct predicate → RowFilter path (even when
// filter_cols == projection_cols). RowFilter evaluates conjuncts
// incrementally, so a selective first conjunct can avoid decoding
// expensive later columns for non-matching rows.
// Filter: a = 2 AND b = 20, Projection: [a, b]
// predicate_col_indices = {0, 1}, projection_col_indices = {0, 1}
// Exact match BUT multi-conjunct → keep RowFilter for incremental eval.
let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20)));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 1, "multi-conjunct RowFilter should return 1 row");
assert_eq!(num_batches, 1);

// Case 6: single static conjunct + dynamic filter → batch filter path
// Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N`
// Predicate: `a = 2 AND <dynamic>(a < 3)`, Projection: [a]
// predicate_col_indices = {0} == projection_col_indices = {0}
// Only 1 static conjunct (dynamic excluded) → batch filter path.
let static_expr = logical2physical(&col("a").eq(lit(2)), &schema);
let dynamic_expr =
make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema));
let combined: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
static_expr,
Operator::And,
dynamic_expr,
));
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(combined)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(
num_rows, 2,
"single static conjunct + dynamic filter should use batch filter"
);
assert_eq!(num_batches, 1);
}
}
Loading