[VL] Add KeyGroupedPartitioning support to columnar shuffle#12084
[VL] Add KeyGroupedPartitioning support to columnar shuffle#12084minni31 wants to merge 1 commit into
Conversation
|
Run Gluten Clickhouse CI on x86 |
02ddcb4 to
92ac0bc
Compare
|
Run Gluten Clickhouse CI on x86 |
| val valueMap = scala.collection.mutable.Map.empty[Seq[Any], Int] | ||
| k.uniquePartitionValues.zipWithIndex.foreach { | ||
| case (partition, index) => | ||
| valueMap.update(partition.toSeq(dataTypes), index) |
There was a problem hiding this comment.
[BLOCKING] numPartitions / valueMap.size mismatch when isPartiallyClustered=true or partitionValues contain duplicates
KeyGroupedPartitioner is constructed with the original n = KeyGroupedPartitioning.numPartitions (i.e. partitionValues.size, including replicas), while valueMap is built from the deduped uniquePartitionValues. When isPartiallyClustered=true (the SPJ skew-handling path), partitionValues deliberately replicates entries → numPartitions > uniquePartitionValues.size. Result:
- pids
[uniqueValues.size, n)are unreachable via the map lookup. - The
getOrElseUpdatehash fallback (nonNegativeMod(key.hashCode, n)) can land any unknown key into[0, uniqueValues.size), colliding with an already-occupied pid. - Two distinct partition keys then end up in the same output partition, breaking the KGP per-partition-uniqueness invariant that downstream SPJ-aware operators rely on.
For reference, vanilla Spark master's ShuffleExchangeExec KGP branch (ShuffleExchangeExec.scala:378-383) calls k.toGrouped first — which dedups partitionKeys and redefines numPartitions := partitionKeys.length — before constructing the KeyGroupedPartitioner. That canonicalization is what guarantees valueMap.size == numPartitions. (Spark 3.5's ShuffleExchangeExec has no KGP branch at all — KGP is consumed only by BatchScanExec and never re-shuffled, so the problem doesn't arise there either.)
Suggested fixes (any one):
- Use
k.uniquePartitionValues.sizeas the partitioner'snumPartitionsinstead ofn(matches the dedup invariant). - In
ColumnarShuffleExchangeExecBase.scala:107rejectisPartiallyClustered=trueand any case wherepartitionValues.size != uniquePartitionValues.sizeviaValidationResult.failed, restricting the columnar path to the strictly-1:1 case. - Port the
.toGrouped-equivalent canonicalization (dedup + renumber) before buildingvalueMapand the partitioner.
CONTEXT
KeyGroupedPartitioningis a Spark partitioning scheme used by V2 data source connectors (e.g., Iceberg, Paimon) where data is partitioned by specific key expressions with known unique partition values. Currently, Gluten's columnar shuffle exchange does not handle this partitioning type, causing a fallback to vanilla Spark for any query involving V2 sources with key-grouped partitioning.WHAT
Adds
KeyGroupedPartitioningsupport to the columnar shuffle exchange path in the Velox backend. The implementation reuses the existing JVM-side partition ID computation pattern (same mechanism asRangePartitioning):KeyGroupedPartitioningto the validation whitelist inColumnarShuffleExchangeExecBase, allowing the columnar shuffle to accept this partitioning type.KeyGroupedPartitionerfrom the partitioning'suniquePartitionValues, mapping each partition key to its index.BindReferences) and looking up the result in theKeyGroupedPartitioner. The pid column is prepended to each batch so the native shuffle writer can read it directly.RangePartitioningShortNamefor the native partitioning descriptor since both Range and KeyGrouped use the same JVM-side pid prepend pattern — the native shuffle writer reads the prepended column rather than computing partition IDs natively.ArraySeqto avoid aliasing issues with mutable array reuse.Tests
VeloxShufflePartitioningSuiteNote: End-to-end KeyGroupedPartitioning tests require V2 data source connectors (Iceberg/Paimon) which are not available in this test module. The KeyGrouped unit tests validate key extraction,
KeyGroupedPartitionerconstruction, and the full key-extraction-to-partition-lookup flow.