[SPARK-56877][SQL] Enforce KeyedPartitioning invariant in PartitioningCollection#55901
Conversation
…ningCollection` ### What changes were proposed in this pull request? - Add a `require` in `PartitioningCollection` that all `KeyedPartitioning`s reachable from the collection share the same `partitionKeys` reference (`eq`) and have matching expression arity. The check walks the partitioning tree via `foreach` so nested collections are covered. - Add a smart factory `PartitioningCollection.fromPartitionings` that interns `partitionKeys` references across `KeyedPartitioning`s. Use this at sites that combine independently-computed partitionings (joins) where keys are structurally equal but not reference-equal. The factory uses manual recursion rather than `transformWithPruning` because `KeyedPartitioning.equals` compares `partitionKeys` element-wise, which would make `transformWithPruning` discard the rule's replacement as structurally-equal-to-input. - In `GroupPartitionsExec.outputPartitioning`, hoist `val partitionKeys = groupedPartitions.map(_._1)` above the `transform` so every rebuilt `KeyedPartitioning` shares the same `partitionKeys` reference. Drop the ad-hoc consistency assert (now enforced by `PartitioningCollection`). - Switch `ShuffledJoin` and `StreamingSymmetricHashJoinExec` to `PartitioningCollection.fromPartitionings` for their inner-join `outputPartitioning`. - Update affected tests to construct collections via `fromPartitionings`. Rewrite the `SPARK-46367` arity-mismatch test in `ProjectedOrderingAndPartitioningSuite` since the scenario is now rejected at `PartitioningCollection` construction rather than inside `AliasAwareOutputExpression`. ### Why are the changes needed? The "all `KeyedPartitioning`s in a collection must agree on `partitionKeys`" invariant already existed informally -- `GroupPartitionsExec.outputPartitioning` had a runtime assert checking `==`, `AliasAwareOutputExpression.projectKeyedPartitionings` asserted matching arity, and various consumers relied on the invariant being upheld. Consolidating the check into the `PartitioningCollection` constructor makes it load-bearing: any future construction site that violates it fails immediately rather than producing silently divergent state. The stronger `eq` check enables cheap reference comparisons downstream and is naturally achievable when collections are built through the smart factory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test suites (`EnsureRequirementsSuite`, `GroupPartitionsExecSuite`, `ProjectedOrderingAndPartitioningSuite`) updated to use `PartitioningCollection.fromPartitionings` where they previously constructed collections from independently-built `KeyedPartitioning`s. The `SPARK-46367` test was rewritten to assert that the invalid mixed-arity scenario is rejected at `PartitioningCollection` construction. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code 4.7
|
cc @cloud-fan |
| @@ -699,6 +705,24 @@ case class PartitioningCollection(partitionings: Seq[Partitioning]) | |||
| partitionings.map(_.numPartitions).distinct.length == 1, | |||
| s"PartitioningCollection requires all of its partitionings have the same numPartitions.") | |||
|
|
|||
There was a problem hiding this comment.
Style nit (no need to change): would extracting this block to a private def checkInvariant(): Unit; checkInvariant() read closer to the other Distribution/Partitioning case-class init blocks in this file (e.g. ClusteredDistribution L90-94, OrderedDistribution L168-172) which use single-line require(...)? Just a question — current form works.
| case _: InnerLike => | ||
| PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) | ||
| PartitioningCollection.fromPartitionings( | ||
| Seq(left.outputPartitioning, right.outputPartitioning)) |
There was a problem hiding this comment.
Should BroadcastHashJoinExec.scala:87 (case ps => PartitioningCollection(ps) in expandOutputPartitioning) and AliasAwareOutputExpression.scala:43 (case ps => PartitioningCollection(ps) in outputPartitioning) also use this new fromPartitionings factory for consistency? Both build a PartitioningCollection from independently-computed partitionings that may contain KeyedPartitionings, which looks like the same scenario fromPartitionings was introduced for. Or are these intentionally left as direct construction because the inputs already satisfy the partitionKeys eq invariant by other means?
There was a problem hiding this comment.
Techincally, .fromPartitionings() is not necessary in BroadcastHashJoinExec and in AliasAwareOutputExpression, but it is kind of a no-op when the input already satisfies the invariant, so we can make them consistent. Changed in a4329cc.
yaooqinn
left a comment
There was a problem hiding this comment.
LGTM, thank you @peter-toth!
What changes were proposed in this pull request?
requireinPartitioningCollectionthat allKeyedPartitionings reachable from the collection share the samepartitionKeysreference (eq) and have matching expression arity. The check walks the partitioning tree viaforeachso nested collections are covered.PartitioningCollection.fromPartitioningsthat internspartitionKeysreferences acrossKeyedPartitionings. Use this at sites that combine independently-computed partitionings (joins) where keys are structurally equal but not reference-equal. The factory uses manual recursion rather thantransformWithPruningbecauseKeyedPartitioning.equalscomparespartitionKeyselement-wise, which would maketransformWithPruningdiscard the rule's replacement as structurally-equal-to-input.GroupPartitionsExec.outputPartitioning, hoistval partitionKeys = groupedPartitions.map(_._1)above thetransformso every rebuiltKeyedPartitioningshares the samepartitionKeysreference. Drop the ad-hoc consistency assert (now enforced byPartitioningCollection).ShuffledJoinandStreamingSymmetricHashJoinExectoPartitioningCollection.fromPartitioningsfor their inner-joinoutputPartitioning.fromPartitionings. Rewrite theSPARK-46367arity-mismatch test inProjectedOrderingAndPartitioningSuitesince the scenario is now rejected atPartitioningCollectionconstruction rather than insideAliasAwareOutputExpression.Why are the changes needed?
The "all
KeyedPartitionings in a collection must agree onpartitionKeys" invariant already existed informally --GroupPartitionsExec.outputPartitioninghad a runtime assert checking==,AliasAwareOutputExpression.projectKeyedPartitioningsasserted matching arity, and various consumers relied on the invariant being upheld. Consolidating the check into thePartitioningCollectionconstructor makes it load-bearing: any future construction site that violates it fails immediately rather than producing silently divergent state. The strongereqcheck enables cheap reference comparisons downstream and is naturally achievable when collections are built through the smart factory.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing test suites (
EnsureRequirementsSuite,GroupPartitionsExecSuite,ProjectedOrderingAndPartitioningSuite) updated to usePartitioningCollection.fromPartitioningswhere they previously constructed collections from independently-builtKeyedPartitionings. TheSPARK-46367test was rewritten to assert that the invalid mixed-arity scenario is rejected atPartitioningCollectionconstruction.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 4.7