From b5fd4be158932f5f4ee0f95876d04f121d509f3f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 May 2026 14:34:56 +0000 Subject: [PATCH 1/2] [SPARK-46367][SQL][FOLLOWUP] Assert same arity in projectKeyedPartitionings ### What changes were proposed in this pull request? Follow-up to https://github.com/apache/spark/pull/55519. `PartitioningPreservingUnaryExecNode.projectKeyedPartitionings` assumes all input KPs share the same `partitionKeys`, which implies the same expression arity. This invariant is asserted by `GroupPartitionsExec` and is established by every upstream constructor of `PartitioningCollection` that feeds this method (a join's `PartitioningCollection(left.outputPartitioning, right.outputPartitioning)` combines KPs that `EnsureRequirements` has aligned to the same join keys). If the invariant is ever violated upstream, indexing `kp.expressions(i)` for `i >= kp.expressions.length` throws an opaque `IndexOutOfBoundsException` that points at this method rather than at the producer. Add an `assert` that surfaces the real cause with a clear message. The invariant is unchanged; this just turns silent misuse into a debuggable failure, so a producer-side bug can be fixed at its source. ### Why are the changes needed? Improve error message when an upstream node violates the same-arity invariant. No behavior change on valid plans. ### Does this PR introduce _any_ user-facing change? No (assertion-only; planner internal). ### How was this patch tested? New unit test `SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear assertion` in `ProjectedOrderingAndPartitioningSuite` that constructs a mixed-arity `PartitioningCollection` child and verifies the assert fires with the expected message instead of throwing `IndexOutOfBoundsException`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 --- .../AliasAwareOutputExpression.scala | 10 ++++++++ ...rojectedOrderingAndPartitioningSuite.scala | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 910cbcf2210a0..3960a271cb906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -94,6 +94,16 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = { if (kps.isEmpty) return LazyList.empty val numPositions = kps.head.expressions.length + // The function assumes all input KPs share the same `partitionKeys`, which implies matching + // expression arity. This invariant is asserted by [[GroupPartitionsExec]] and is established + // by the constructors of [[PartitioningCollection]] feeding this method (a join's + // `PartitioningCollection(left.outputPartitioning, right.outputPartitioning)` combines KPs + // that have been aligned by [[EnsureRequirements]] to the same join keys). If the invariant + // is ever violated upstream, fail early with a clear message instead of throwing an opaque + // `IndexOutOfBoundsException` from `kp.expressions(i)` below. + assert(kps.forall(_.expressions.length == numPositions), + s"All input KeyedPartitionings must share the same expression arity, " + + s"but got: ${kps.map(_.expressions.length).mkString(", ")}.") val alternativesPerPosition: IndexedSeq[LazyList[Expression]] = if (hasAlias) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index db664b04ef08b..a38570924620a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -586,6 +586,29 @@ class ProjectedOrderingAndPartitioningSuite case other => fail(s"Expected KeyedPartitioning, got $other") } } + + test("SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear assertion") { + // The function assumes all input KPs share the same arity (the invariant asserted by + // `GroupPartitionsExec`). Without the assert below, indexing `kp.expressions(i)` for + // `i >= kp.expressions.length` would throw an opaque `IndexOutOfBoundsException`. The assert + // surfaces the real cause -- an upstream node violated the invariant -- so the bug can be + // fixed at the producer. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(2, 2)) + val keys1d = Seq(InternalRow(1), InternalRow(2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = PartitioningCollection(Seq( + KeyedPartitioning(Seq(x, y), keys2d), + KeyedPartitioning(Seq(x), keys1d)))) + val project = ProjectExec(Seq(x), child) + val e = intercept[AssertionError] { + project.outputPartitioning + } + assert(e.getMessage.contains("All input KeyedPartitionings must share the same expression " + + "arity")) + } } private case class DummyLeafExecWithPartitioning( From 0647789026214c8cdcf19a82de8beaeed36ce8a7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 14 May 2026 17:12:50 +0000 Subject: [PATCH 2/2] [SPARK-46367][SQL][FOLLOWUP] Address review feedback Use `kps.tail.forall(...)` since `numPositions` is taken from `kps.head`. --- .../apache/spark/sql/execution/AliasAwareOutputExpression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 3960a271cb906..1f2b1d0a585d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -101,7 +101,7 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode // that have been aligned by [[EnsureRequirements]] to the same join keys). If the invariant // is ever violated upstream, fail early with a clear message instead of throwing an opaque // `IndexOutOfBoundsException` from `kp.expressions(i)` below. - assert(kps.forall(_.expressions.length == numPositions), + assert(kps.tail.forall(_.expressions.length == numPositions), s"All input KeyedPartitionings must share the same expression arity, " + s"but got: ${kps.map(_.expressions.length).mkString(", ")}.")