-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix: clear handled OFFSET before child recursion in LimitPushdown #22525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -375,6 +375,14 @@ pub(crate) fn pushdown_limits( | |
| (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; | ||
| } | ||
|
|
||
| // Once a limit has been materialized above the current node, child | ||
| // subtrees should not inherit its `skip`. Keep `fetch`, but clear | ||
| // `skip` before recursing so child-local limits are not merged with | ||
| // an `OFFSET` that has already been applied. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am surprised this doesn't need to check for the actual sort keys too 🤔 |
||
| if global_state.satisfied { | ||
| global_state.skip = 0; | ||
| } | ||
|
|
||
| // Apply pushdown limits in children | ||
| let children = new_node.data.children(); | ||
| let mut changed = false; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -989,3 +989,58 @@ c-4 | |
|
|
||
| statement ok | ||
| DROP TABLE t21176; | ||
|
|
||
| # Regression test for https://github.com/apache/datafusion/issues/22489 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this guards against the issue: I reverted the code change locally diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 63c4f21bd9..6164d86e53 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -375,14 +375,6 @@ pub(crate) fn pushdown_limits(
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}
- // Once a limit has been materialized above the current node, child
- // subtrees should not inherit its `skip`. Keep `fetch`, but clear
- // `skip` before recursing so child-local limits are not merged with
- // an `OFFSET` that has already been applied.
- if global_state.satisfied {
- global_state.skip = 0;
- }
-
// Apply pushdown limits in children
let children = new_node.data.children();
let mut changed = false;And then I ran the tests: cargo test --profile=ci --test sqllogictests
...
Running with 16 test threads (available parallelism: 16)
Completed 472 test files in 9 seconds
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is a proposed fix:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks you for this |
||
| # An outer ORDER BY / OFFSET must not reduce an inner LIMIT when the two are | ||
| # separated by a sort on a *different* key. | ||
|
|
||
| statement ok | ||
| set datafusion.execution.target_partitions = 4; | ||
|
|
||
| statement ok | ||
| CREATE TABLE t22489 (g INT, x INT, y INT) AS VALUES (1, 10, 4), (2, 20, 3), (3, 30, 2), (4, 40, 1); | ||
|
|
||
| # Inner ORDER BY sx DESC LIMIT 4 keeps all four groups; the outer ORDER BY | ||
| # sy DESC OFFSET 1 then drops only the sy-max group (g=1), so g=2,3,4 remain. | ||
| query III | ||
| SELECT * FROM ( | ||
| SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g | ||
| ORDER BY sx DESC LIMIT 4 | ||
| ) q | ||
| ORDER BY sy DESC | ||
| OFFSET 1; | ||
| ---- | ||
| 2 20 3 | ||
| 3 30 2 | ||
| 4 40 1 | ||
|
|
||
| query TT | ||
| EXPLAIN | ||
| SELECT * FROM ( | ||
| SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g | ||
| ORDER BY sx DESC LIMIT 4 | ||
| ) q | ||
| ORDER BY sy DESC | ||
| OFFSET 1; | ||
| ---- | ||
| logical_plan | ||
| 01)Limit: skip=1, fetch=None | ||
| 02)--Sort: q.sy DESC NULLS FIRST | ||
| 03)----SubqueryAlias: q | ||
| 04)------Sort: sx DESC NULLS FIRST, fetch=4 | ||
| 05)--------Projection: t22489.g, sum(t22489.x) AS sx, sum(t22489.y) AS sy | ||
| 06)----------Aggregate: groupBy=[[t22489.g]], aggr=[[sum(CAST(t22489.x AS Int64)), sum(CAST(t22489.y AS Int64))]] | ||
| 07)------------TableScan: t22489 projection=[g, x, y] | ||
| physical_plan | ||
| 01)GlobalLimitExec: skip=1, fetch=None | ||
| 02)--SortExec: expr=[sy@2 DESC], preserve_partitioning=[false] | ||
| 03)----SortPreservingMergeExec: [sx@1 DESC], fetch=4 | ||
| 04)------SortExec: TopK(fetch=4), expr=[sx@1 DESC], preserve_partitioning=[true] | ||
| 05)--------ProjectionExec: expr=[g@0 as g, sum(t22489.x)@1 as sx, sum(t22489.y)@2 as sy] | ||
| 06)----------AggregateExec: mode=FinalPartitioned, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)] | ||
| 07)------------RepartitionExec: partitioning=Hash([g@0], 4), input_partitions=1 | ||
| 08)--------------AggregateExec: mode=Partial, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)] | ||
| 09)----------------DataSourceExec: partitions=1, partition_sizes=[1] | ||
|
|
||
| statement ok | ||
| DROP TABLE t22489; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth calling out that this is a different sort order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also add a test (or confirm one already exists) where the two sorts DO have the same sort and ensure the limit is still pushed