diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 910cbcf2210a0..1f2b1d0a585d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -94,6 +94,16 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = { if (kps.isEmpty) return LazyList.empty val numPositions = kps.head.expressions.length + // The function assumes all input KPs share the same `partitionKeys`, which implies matching + // expression arity. This invariant is asserted by [[GroupPartitionsExec]] and is established + // by the constructors of [[PartitioningCollection]] feeding this method (a join's + // `PartitioningCollection(left.outputPartitioning, right.outputPartitioning)` combines KPs + // that have been aligned by [[EnsureRequirements]] to the same join keys). If the invariant + // is ever violated upstream, fail early with a clear message instead of throwing an opaque + // `IndexOutOfBoundsException` from `kp.expressions(i)` below. + assert(kps.tail.forall(_.expressions.length == numPositions), + s"All input KeyedPartitionings must share the same expression arity, " + + s"but got: ${kps.map(_.expressions.length).mkString(", ")}.") val alternativesPerPosition: IndexedSeq[LazyList[Expression]] = if (hasAlias) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index db664b04ef08b..a38570924620a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -586,6 +586,29 @@ class ProjectedOrderingAndPartitioningSuite case other => fail(s"Expected KeyedPartitioning, got $other") } } + + test("SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear assertion") { + // The function assumes all input KPs share the same arity (the invariant asserted by + // `GroupPartitionsExec`). Without the assert below, indexing `kp.expressions(i)` for + // `i >= kp.expressions.length` would throw an opaque `IndexOutOfBoundsException`. The assert + // surfaces the real cause -- an upstream node violated the invariant -- so the bug can be + // fixed at the producer. + val x = AttributeReference("x", IntegerType)() + val y = AttributeReference("y", IntegerType)() + val keys2d = Seq(InternalRow(1, 1), InternalRow(2, 2)) + val keys1d = Seq(InternalRow(1), InternalRow(2)) + val child = DummyLeafExecWithPartitioning( + output = Seq(x, y), + partitioning = PartitioningCollection(Seq( + KeyedPartitioning(Seq(x, y), keys2d), + KeyedPartitioning(Seq(x), keys1d)))) + val project = ProjectExec(Seq(x), child) + val e = intercept[AssertionError] { + project.outputPartitioning + } + assert(e.getMessage.contains("All input KeyedPartitionings must share the same expression " + + "arity")) + } } private case class DummyLeafExecWithPartitioning(