Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = {
if (kps.isEmpty) return LazyList.empty
val numPositions = kps.head.expressions.length
// The function assumes all input KPs share the same `partitionKeys`, which implies matching
// expression arity. This invariant is asserted by [[GroupPartitionsExec]] and is established
// by the constructors of [[PartitioningCollection]] feeding this method (a join's
// `PartitioningCollection(left.outputPartitioning, right.outputPartitioning)` combines KPs
// that have been aligned by [[EnsureRequirements]] to the same join keys). If the invariant
// is ever violated upstream, fail early with a clear message instead of throwing an opaque
// `IndexOutOfBoundsException` from `kp.expressions(i)` below.
assert(kps.tail.forall(_.expressions.length == numPositions),
s"All input KeyedPartitionings must share the same expression arity, " +
s"but got: ${kps.map(_.expressions.length).mkString(", ")}.")

val alternativesPerPosition: IndexedSeq[LazyList[Expression]] =
if (hasAlias) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,29 @@ class ProjectedOrderingAndPartitioningSuite
case other => fail(s"Expected KeyedPartitioning, got $other")
}
}

test("SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear assertion") {
// The function assumes all input KPs share the same arity (the invariant asserted by
// `GroupPartitionsExec`). Without the assert below, indexing `kp.expressions(i)` for
// `i >= kp.expressions.length` would throw an opaque `IndexOutOfBoundsException`. The assert
// surfaces the real cause -- an upstream node violated the invariant -- so the bug can be
// fixed at the producer.
val x = AttributeReference("x", IntegerType)()
val y = AttributeReference("y", IntegerType)()
val keys2d = Seq(InternalRow(1, 1), InternalRow(2, 2))
val keys1d = Seq(InternalRow(1), InternalRow(2))
val child = DummyLeafExecWithPartitioning(
output = Seq(x, y),
partitioning = PartitioningCollection(Seq(
KeyedPartitioning(Seq(x, y), keys2d),
KeyedPartitioning(Seq(x), keys1d))))
val project = ProjectExec(Seq(x), child)
val e = intercept[AssertionError] {
project.outputPartitioning
}
assert(e.getMessage.contains("All input KeyedPartitionings must share the same expression " +
"arity"))
}
}

private case class DummyLeafExecWithPartitioning(
Expand Down