From c63a6872a2844f4acb9da0a82c885f33e531745d Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 10 Apr 2026 10:45:16 +0800 Subject: [PATCH 1/2] fix: impl handle_child_pushdown_result for SortExec --- .../physical_optimizer/filter_pushdown.rs | 289 ++++++++++++++++++ datafusion/physical-plan/src/sorts/sort.rs | 48 ++- 2 files changed, 336 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index a255c07545829..3afdfb865963f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -5146,3 +5146,292 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() { " ); } + +// ==== Filter pushdown through SortExec tests ==== + +/// FilterExec above a plain SortExec (no fetch) should be pushed below it. +/// The scan supports pushdown, so the filter lands in the DataSourceExec. +#[test] +fn test_filter_pushdown_through_sort_into_scan() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + )); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + " + ); +} + +/// FilterExec above a plain SortExec (no fetch) when the scan does NOT +/// support pushdown. The filter should still move below the sort, landing +/// as a new FilterExec between SortExec and DataSourceExec. +#[test] +fn test_filter_pushdown_through_sort_no_scan_support() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + )); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +/// Multiple conjunctive filters above a plain SortExec should all be +/// pushed below the sort as a single FilterExec. +#[test] +fn test_multiple_filters_pushdown_through_sort() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + )); + // WHERE a = 'foo' AND b = 'bar' + let predicate = Arc::new(BinaryExpr::new( + col_lit_predicate("a", "foo", &schema()), + Operator::And, + col_lit_predicate("b", "bar", &schema()), + )) as Arc; + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo AND b@1 = bar + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - FilterExec: a@0 = foo AND b@1 = bar + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +/// FilterExec above a SortExec with fetch (TopK) must NOT be pushed below, +/// because limiting happens after filtering — changing the order would alter +/// the result set. +#[test] +fn test_filter_not_pushed_through_sort_with_fetch() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + ) + .with_fetch(Some(10)), + ); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - FilterExec: a@0 = foo + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +/// FilterExec above a SortExec with fetch (TopK) must NOT be pushed below, +/// because limiting happens after filtering — changing the order would alter +/// the result set. +#[test] +fn test_filter_pushed_through_sort_with_fetch() { + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let sort = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + ) + .with_fetch(Some(10)), + ); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = foo + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + " + ); +} + +/// FilterExec with a projection above SortExec. The filter should be pushed +/// below the sort, and the projection should be preserved as a +/// ProjectionExec on top. +#[test] +fn test_filter_with_projection_pushdown_through_sort() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + )); + // FilterExec: b = 'bar', projection=[a] (only output column a) + let predicate = col_lit_predicate("b", "bar", &schema()); + let plan = Arc::new( + FilterExecBuilder::new(predicate, sort) + .apply_projection(Some(vec![0])) + .unwrap() + .build() + .unwrap(), + ); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar, projection=[a@0] + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - ProjectionExec: expr=[a@0 as a] + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - FilterExec: b@1 = bar + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +/// SortExec with preserve_partitioning=true should keep that setting after +/// filters are pushed below it. +#[test] +fn test_filter_pushdown_through_sort_preserves_partitioning() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + ) + .with_preserve_partitioning(true), + ); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} + +/// FilterExec **with a fetch limit** above a plain SortExec. When the filter +/// is pushed below the sort the fetch should be propagated to the SortExec +/// (turning it into a TopK). +#[test] +fn test_filter_with_fetch_pushdown_through_sort() { + let scan = TestScanBuilder::new(schema()).with_support(false).build(); + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new_default( + col("a", &schema()).unwrap(), + )]) + .unwrap(), + scan, + )); + let predicate = col_lit_predicate("a", "foo", &schema()); + let plan = Arc::new( + FilterExecBuilder::new(predicate, sort) + .with_fetch(Some(10)) + .build() + .unwrap(), + ); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo, fetch=10 + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + output: + Ok: + - SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false] + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false + " + ); +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 583bfa29b04ad..cdaafd5407f7a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -30,8 +30,10 @@ use crate::execution_plan::{ Boundedness, CardinalityEffect, EmissionType, has_same_children_properties, }; use crate::expressions::PhysicalSortExpr; +use crate::filter::FilterExec; use crate::filter_pushdown::{ - ChildFilterDescription, FilterDescription, FilterPushdownPhase, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PushedDown, }; use crate::limit::LimitStream; use crate::metrics::{ @@ -1424,6 +1426,50 @@ impl ExecutionPlan for SortExec { Ok(FilterDescription::new().with_child(child)) } + + fn handle_child_pushdown_result( + &self, + phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &datafusion_common::config::ConfigOptions, + ) -> Result>> { + // Only absorb filters in Pre phase for a plain sort (no fetch). + // A sort with fetch (TopK) must not accept filters: reordering + // filter vs. limit would change semantics. + if phase != FilterPushdownPhase::Pre || self.fetch.is_some() { + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); + } + + // Collect parent filters that were NOT successfully pushed to our child. + let unsupported_filters: Vec> = child_pushdown_result + .parent_filters + .iter() + .filter(|&f| matches!(f.all(), PushedDown::No)) + .map(|f| Arc::clone(&f.filter)) + .collect(); + + if unsupported_filters.is_empty() { + // All filters were pushed — nothing extra to do. + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); + } + + // Build a single conjunctive predicate from the unsupported filters + // and insert a FilterExec between this SortExec and its child. + let predicate = datafusion_physical_expr::conjunction(unsupported_filters); + let new_child = + Arc::new(FilterExec::try_new(predicate, Arc::clone(self.input()))?) + as Arc; + let new_sort = Arc::new( + SortExec::new(self.expr.clone(), new_child) + .with_fetch(self.fetch()) + .with_preserve_partitioning(self.preserve_partitioning()), + ) as Arc; + + Ok(FilterPushdownPropagation { + filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], + updated_node: Some(new_sort), + }) + } } #[cfg(test)] From f8edfafe50f58cbeab6bba73ed25779bc0f7ea2d Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 10 Apr 2026 12:17:08 +0800 Subject: [PATCH 2/2] add test case --- .../physical_optimizer/filter_pushdown.rs | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 3afdfb865963f..70e369c9a6774 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -25,7 +25,11 @@ use arrow::{ use arrow_schema::SortOptions; use datafusion::{ assert_batches_eq, - logical_expr::Operator, + datasource::{MemTable, provider_as_source}, + execution::session_state::SessionStateBuilder, + logical_expr::{ + LogicalPlanBuilder, Operator, col as logical_col, lit as logical_lit, + }, physical_plan::{ PhysicalExpr, expressions::{BinaryExpr, Column, Literal}, @@ -5435,3 +5439,61 @@ fn test_filter_with_fetch_pushdown_through_sort() { " ); } + +#[tokio::test] +async fn test_filter_pushdown_through_sort_with_projection() { + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Utf8, false), + Field::new("body", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec!["2026-04-09T02:00:00Z"])), + Arc::new(StringArray::from(vec!["starting datafusion query engine"])), + ], + ) + .unwrap(); + + let session_state = SessionStateBuilder::new() + .with_config(SessionConfig::new().with_target_partitions(1)) + .with_default_features() + .with_optimizer_rules(vec![]) + .build(); + let ctx = SessionContext::new_with_state(session_state); + + let table = Arc::new(MemTable::try_new(schema, vec![vec![batch]]).unwrap()); + let logical_plan = LogicalPlanBuilder::scan("logs", provider_as_source(table), None) + .unwrap() + .sort(vec![logical_col("time").sort(false, false)]) + .unwrap() + .filter(logical_col("body").like(logical_lit("%datafusion%"))) + .unwrap() + .project(vec![logical_col("time")]) + .unwrap() + .build() + .unwrap(); + + let plan = ctx + .state() + .create_physical_plan(&logical_plan) + .await + .unwrap(); + + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), false), + @r" + OptimizationTest: + input: + - FilterExec: body@1 LIKE %datafusion%, projection=[time@0] + - SortExec: expr=[time@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: partitions=1, partition_sizes=[1] + output: + Ok: + - ProjectionExec: expr=[time@0 as time] + - SortExec: expr=[time@0 DESC NULLS LAST], preserve_partitioning=[false] + - FilterExec: body@1 LIKE %datafusion% + - DataSourceExec: partitions=1, partition_sizes=[1] + " + ); +}