Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,123 @@ fn no_limit_preserves_plan_identity() -> Result<()> {

Ok(())
}

#[test]
fn outer_offset_does_not_leak_through_sort_into_inner_limit() -> Result<()> {
// Regression test for https://github.com/apache/datafusion/issues/22489
//
// When an outer OFFSET is separated from an inner LIMIT by a SortExec
// with different sort keys, the outer skip must not reduce the inner
// fetch. Before the fix, combine_limit merged them, producing
// GlobalLimitExec(skip=1, fetch=7) instead of preserving the inner
// LIMIT 8.
//
// Plan structure:
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
// SortExec: [c1 DESC] (outer sort — different key)
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
// SortExec: [c2 ASC] (inner sort — different key)
// EmptyExec
let schema = create_schema();
let empty = empty_exec(Arc::clone(&schema));

let inner_ordering: LexOrdering = [PhysicalSortExpr {
expr: col("c2", &schema)?,
options: SortOptions::default(),
}]
.into();
let inner_sort = sort_exec(inner_ordering, empty);
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));

let outer_ordering: LexOrdering = [PhysicalSortExpr {
expr: col("c1", &schema)?,
options: SortOptions {
descending: true,
nulls_first: false,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth calling out that this is a different sort order

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add a test (or confirm one already exists) where the two sorts DO have the same sort and ensure the limit is still pushed

},
}]
.into();
let outer_sort = sort_exec(outer_ordering, inner_limit);
let outer_limit = global_limit_exec(outer_sort, 1, None);

let initial = format_plan(&outer_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
GlobalLimitExec: skip=0, fetch=8
SortExec: expr=[c2@1 ASC], preserve_partitioning=[false]
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
SortExec: TopK(fetch=8), expr=[c2@1 ASC], preserve_partitioning=[false]
EmptyExec
"
);

Ok(())
}

#[test]
fn outer_offset_with_same_sort_key_still_pushes_limit() -> Result<()> {
// Companion to outer_offset_does_not_leak_through_sort_into_inner_limit:
// when both sorts use the *same* key, the inner LIMIT should still be
// pushed into the SortExec as TopK.
//
// Plan structure:
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
// SortExec: [c1 ASC] (outer sort — same key)
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
// SortExec: [c1 ASC] (inner sort — same key)
// EmptyExec
let schema = create_schema();
let empty = empty_exec(Arc::clone(&schema));

let ordering: LexOrdering = [PhysicalSortExpr {
expr: col("c1", &schema)?,
options: SortOptions::default(),
}]
.into();

let inner_sort = sort_exec(ordering.clone(), empty);
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));
let outer_sort = sort_exec(ordering, inner_limit);
let outer_limit = global_limit_exec(outer_sort, 1, None);

let initial = format_plan(&outer_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
GlobalLimitExec: skip=0, fetch=8
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=1, fetch=None
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
SortExec: TopK(fetch=8), expr=[c1@0 ASC], preserve_partitioning=[false]
EmptyExec
"
);

Ok(())
}
8 changes: 8 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ pub(crate) fn pushdown_limits(
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Once a limit has been materialized above the current node, child
// subtrees should not inherit its `skip`. Keep `fetch`, but clear
// `skip` before recursing so child-local limits are not merged with
// an `OFFSET` that has already been applied.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised this doesn't need to check for the actual sort keys too 🤔

if global_state.satisfied {
global_state.skip = 0;
}

// Apply pushdown limits in children
let children = new_node.data.children();
let mut changed = false;
Expand Down
55 changes: 55 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,58 @@ c-4

statement ok
DROP TABLE t21176;

# Regression test for https://github.com/apache/datafusion/issues/22489
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this guards against the issue:

I reverted the code change locally

diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 63c4f21bd9..6164d86e53 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -375,14 +375,6 @@ pub(crate) fn pushdown_limits(
         (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
     }

-    // Once a limit has been materialized above the current node, child
-    // subtrees should not inherit its `skip`. Keep `fetch`, but clear
-    // `skip` before recursing so child-local limits are not merged with
-    // an `OFFSET` that has already been applied.
-    if global_state.satisfied {
-        global_state.skip = 0;
-    }
-
     // Apply pushdown limits in children
     let children = new_node.data.children();
     let mut changed = false;

And then I ran the tests:

cargo test --profile=ci --test sqllogictests
...
Running with 16 test threads (available parallelism: 16)
Completed 472 test files in 9 seconds

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for this

# An outer ORDER BY / OFFSET must not reduce an inner LIMIT when the two are
# separated by a sort on a *different* key.

statement ok
set datafusion.execution.target_partitions = 4;

statement ok
CREATE TABLE t22489 (g INT, x INT, y INT) AS VALUES (1, 10, 4), (2, 20, 3), (3, 30, 2), (4, 40, 1);

# Inner ORDER BY sx DESC LIMIT 4 keeps all four groups; the outer ORDER BY
# sy DESC OFFSET 1 then drops only the sy-max group (g=1), so g=2,3,4 remain.
query III
SELECT * FROM (
SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g
ORDER BY sx DESC LIMIT 4
) q
ORDER BY sy DESC
OFFSET 1;
----
2 20 3
3 30 2
4 40 1

query TT
EXPLAIN
SELECT * FROM (
SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g
ORDER BY sx DESC LIMIT 4
) q
ORDER BY sy DESC
OFFSET 1;
----
logical_plan
01)Limit: skip=1, fetch=None
02)--Sort: q.sy DESC NULLS FIRST
03)----SubqueryAlias: q
04)------Sort: sx DESC NULLS FIRST, fetch=4
05)--------Projection: t22489.g, sum(t22489.x) AS sx, sum(t22489.y) AS sy
06)----------Aggregate: groupBy=[[t22489.g]], aggr=[[sum(CAST(t22489.x AS Int64)), sum(CAST(t22489.y AS Int64))]]
07)------------TableScan: t22489 projection=[g, x, y]
physical_plan
01)GlobalLimitExec: skip=1, fetch=None
02)--SortExec: expr=[sy@2 DESC], preserve_partitioning=[false]
03)----SortPreservingMergeExec: [sx@1 DESC], fetch=4
04)------SortExec: TopK(fetch=4), expr=[sx@1 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[g@0 as g, sum(t22489.x)@1 as sx, sum(t22489.y)@2 as sy]
06)----------AggregateExec: mode=FinalPartitioned, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)]
07)------------RepartitionExec: partitioning=Hash([g@0], 4), input_partitions=1
08)--------------AggregateExec: mode=Partial, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)]
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t22489;
Loading