[SPARK-57851][SQL] Shuffle-free single-task execution for small queries#56928
Open
viirya wants to merge 2 commits into
Open
[SPARK-57851][SQL] Shuffle-free single-task execution for small queries#56928viirya wants to merge 2 commits into
viirya wants to merge 2 commits into
Conversation
### What changes were proposed in this pull request? This adds a conservative optimizer rule `MarkSingleTaskExecution` that marks small single-partition scans, optionally with a shuffle-inducing operator on top (sort, aggregate, distinct, window, limit/offset, expand) or an in-memory `LocalRelation`, as candidates for single-task execution. Such a scan reports a `SinglePartition` output partitioning, allowing `EnsureRequirements` to elide the shuffle that would otherwise be inserted before the operator on top. The rule runs as the last optimizer batch and marks eligible `LogicalRelation`/`LocalRelation` nodes with a `TreeNodeTag`. The planning strategies propagate the mark to `FileSourceScanExec`/`LocalTableScanExec`. `FileSourceScanExec` additionally gates on file count and size thresholds using the generic `ScanFileListing`, reports `SinglePartition`, and coalesces its input RDD to a single partition as a correctness backstop. `ExpandExec` forwards `SinglePartition` from its child, since Expand never moves rows across partitions. The feature is controlled by new internal configs under `spark.sql.optimizer.singleTaskExecution.*` and is disabled by default. Join is intentionally left out for now; union is already covered by the existing `spark.sql.unionOutputPartitioning`. This is part of the SPIP umbrella SPARK-56978 (Faster queries in local laptop mode), covering the shuffle-free local execution category. ### Why are the changes needed? For small, low-latency queries the fixed cost of a shuffle (scheduling, serialization, network) dominates. When the input is already a single small partition, the shuffle before a sort/aggregate/window is unnecessary and can be removed to reduce latency. ### Does this PR introduce _any_ user-facing change? No. The optimization is behind internal configs and is disabled by default. ### How was this patch tested? New `MarkSingleTaskExecutionSuite` (14 tests) covering the marking decision, `SinglePartition` output with no shuffle, empty-scan correctness, disabled-flag negatives, join/subquery ineligibility, and the leaf-parallelism override. `SQLConfSuite` passes as a config-wiring regression check. Co-authored-by: Isaac
e2fff1a to
7e18780
Compare
Co-authored-by: Claude Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This adds a conservative optimizer rule
MarkSingleTaskExecutionthat marks small single-partition scans, optionally with a shuffle-inducing operator on top (sort, aggregate, distinct, window, limit/offset, expand) or an in-memoryLocalRelation, as candidates for single-task execution. Such a scan reports aSinglePartitionoutput partitioning, allowingEnsureRequirementsto elide the shuffle that would otherwise be inserted before the operator on top.Details:
LogicalRelation/LocalRelationnodes with aTreeNodeTag.FileSourceStrategy/SparkStrategiespropagate the mark toFileSourceScanExec/LocalTableScanExec.FileSourceScanExecadditionally gates on file count and size thresholds using the genericScanFileListing, reportsSinglePartition, and coalesces its input RDD to a single partition as a correctness backstop when the estimate does not match the runtime partition count.LocalTableScanExecreads its data in a single partition and reportsSinglePartition.ExpandExecforwardsSinglePartitionfrom its child, since Expand only replicates rows within a partition and never moves rows across partitions.The feature is controlled by new internal configs under
spark.sql.optimizer.singleTaskExecution.*and is disabled by default. Join is intentionally left out for now and can be added as a follow-up; union is already covered by the existingspark.sql.unionOutputPartitioning.This is part of the SPIP umbrella SPARK-56978 (Faster queries in local laptop mode), covering the shuffle-free local execution for small queries category.
Why are the changes needed?
For small, low-latency queries the fixed cost of a shuffle (scheduling, serialization, network) dominates the total runtime. When the input is already a single small partition, the shuffle inserted before a sort/aggregate/window is unnecessary and can be removed to reduce latency, without affecting correctness.
Does this PR introduce any user-facing change?
No. The optimization is behind internal configs (
spark.sql.optimizer.singleTaskExecution.*) and is disabled by default.How was this patch tested?
New
MarkSingleTaskExecutionSuite(14 tests) covering:SinglePartitionoutput with no shuffle in the final physical plan;SQLConfSuitepasses as a config-wiring regression check.Was this patch authored or co-authored using generative AI tooling?
Yes, using Claude Code.