Skip to content

[SPARK-56877][SQL] Enforce KeyedPartitioning invariant in PartitioningCollection#55901

Open
peter-toth wants to merge 2 commits into
apache:masterfrom
peter-toth:SPARK-56877-enforce-keyedpartitioning-invariant-in-collection
Open

[SPARK-56877][SQL] Enforce KeyedPartitioning invariant in PartitioningCollection#55901
peter-toth wants to merge 2 commits into
apache:masterfrom
peter-toth:SPARK-56877-enforce-keyedpartitioning-invariant-in-collection

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

  • Add a require in PartitioningCollection that all KeyedPartitionings reachable from the collection share the same partitionKeys reference (eq) and have matching expression arity. The check walks the partitioning tree via foreach so nested collections are covered.
  • Add a smart factory PartitioningCollection.fromPartitionings that interns partitionKeys references across KeyedPartitionings. Use this at sites that combine independently-computed partitionings (joins) where keys are structurally equal but not reference-equal. The factory uses manual recursion rather than transformWithPruning because KeyedPartitioning.equals compares partitionKeys element-wise, which would make transformWithPruning discard the rule's replacement as structurally-equal-to-input.
  • In GroupPartitionsExec.outputPartitioning, hoist val partitionKeys = groupedPartitions.map(_._1) above the transform so every rebuilt KeyedPartitioning shares the same partitionKeys reference. Drop the ad-hoc consistency assert (now enforced by PartitioningCollection).
  • Switch ShuffledJoin and StreamingSymmetricHashJoinExec to PartitioningCollection.fromPartitionings for their inner-join outputPartitioning.
  • Update affected tests to construct collections via fromPartitionings. Rewrite the SPARK-46367 arity-mismatch test in ProjectedOrderingAndPartitioningSuite since the scenario is now rejected at PartitioningCollection construction rather than inside AliasAwareOutputExpression.

Why are the changes needed?

The "all KeyedPartitionings in a collection must agree on partitionKeys" invariant already existed informally -- GroupPartitionsExec.outputPartitioning had a runtime assert checking ==, AliasAwareOutputExpression.projectKeyedPartitionings asserted matching arity, and various consumers relied on the invariant being upheld. Consolidating the check into the PartitioningCollection constructor makes it load-bearing: any future construction site that violates it fails immediately rather than producing silently divergent state. The stronger eq check enables cheap reference comparisons downstream and is naturally achievable when collections are built through the smart factory.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing test suites (EnsureRequirementsSuite, GroupPartitionsExecSuite, ProjectedOrderingAndPartitioningSuite) updated to use PartitioningCollection.fromPartitionings where they previously constructed collections from independently-built KeyedPartitionings. The SPARK-46367 test was rewritten to assert that the invalid mixed-arity scenario is rejected at PartitioningCollection construction.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 4.7

…ningCollection`

### What changes were proposed in this pull request?

- Add a `require` in `PartitioningCollection` that all `KeyedPartitioning`s reachable from the collection share the same `partitionKeys` reference (`eq`) and have matching expression arity. The check walks the partitioning tree via `foreach` so nested collections are covered.
- Add a smart factory `PartitioningCollection.fromPartitionings` that interns `partitionKeys` references across `KeyedPartitioning`s. Use this at sites that combine independently-computed partitionings (joins) where keys are structurally equal but not reference-equal. The factory uses manual recursion rather than `transformWithPruning` because `KeyedPartitioning.equals` compares `partitionKeys` element-wise, which would make `transformWithPruning` discard the rule's replacement as structurally-equal-to-input.
- In `GroupPartitionsExec.outputPartitioning`, hoist `val partitionKeys = groupedPartitions.map(_._1)` above the `transform` so every rebuilt `KeyedPartitioning` shares the same `partitionKeys` reference. Drop the ad-hoc consistency assert (now enforced by `PartitioningCollection`).
- Switch `ShuffledJoin` and `StreamingSymmetricHashJoinExec` to `PartitioningCollection.fromPartitionings` for their inner-join `outputPartitioning`.
- Update affected tests to construct collections via `fromPartitionings`. Rewrite the `SPARK-46367` arity-mismatch test in `ProjectedOrderingAndPartitioningSuite` since the scenario is now rejected at `PartitioningCollection` construction rather than inside `AliasAwareOutputExpression`.

### Why are the changes needed?

The "all `KeyedPartitioning`s in a collection must agree on `partitionKeys`" invariant already existed informally -- `GroupPartitionsExec.outputPartitioning` had a runtime assert checking `==`, `AliasAwareOutputExpression.projectKeyedPartitionings` asserted matching arity, and various consumers relied on the invariant being upheld. Consolidating the check into the `PartitioningCollection` constructor makes it load-bearing: any future construction site that violates it fails immediately rather than producing silently divergent state. The stronger `eq` check enables cheap reference comparisons downstream and is naturally achievable when collections are built through the smart factory.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test suites (`EnsureRequirementsSuite`, `GroupPartitionsExecSuite`, `ProjectedOrderingAndPartitioningSuite`) updated to use `PartitioningCollection.fromPartitionings` where they previously constructed collections from independently-built `KeyedPartitioning`s. The `SPARK-46367` test was rewritten to assert that the invalid mixed-arity scenario is rejected at `PartitioningCollection` construction.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 4.7
@peter-toth
Copy link
Copy Markdown
Contributor Author

cc @cloud-fan

Copy link
Copy Markdown
Member

@yaooqinn yaooqinn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing this.

@@ -699,6 +705,24 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.map(_.numPartitions).distinct.length == 1,
s"PartitioningCollection requires all of its partitionings have the same numPartitions.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit (no need to change): would extracting this block to a private def checkInvariant(): Unit; checkInvariant() read closer to the other Distribution/Partitioning case-class init blocks in this file (e.g. ClusteredDistribution L90-94, OrderedDistribution L168-172) which use single-line require(...)? Just a question — current form works.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Extracted in a4329cc.

case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
PartitioningCollection.fromPartitionings(
Seq(left.outputPartitioning, right.outputPartitioning))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should BroadcastHashJoinExec.scala:87 (case ps => PartitioningCollection(ps) in expandOutputPartitioning) and AliasAwareOutputExpression.scala:43 (case ps => PartitioningCollection(ps) in outputPartitioning) also use this new fromPartitionings factory for consistency? Both build a PartitioningCollection from independently-computed partitionings that may contain KeyedPartitionings, which looks like the same scenario fromPartitionings was introduced for. Or are these intentionally left as direct construction because the inputs already satisfy the partitionKeys eq invariant by other means?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Techincally, .fromPartitionings() is not necessary in BroadcastHashJoinExec and in AliasAwareOutputExpression, but it is kind of a no-op when the input already satisfies the invariant, so we can make them consistent. Changed in a4329cc.

Copy link
Copy Markdown
Member

@yaooqinn yaooqinn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you @peter-toth!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants