Skip to content

[VL] Add KeyGroupedPartitioning support to columnar shuffle#12084

Open
minni31 wants to merge 1 commit into
apache:mainfrom
minni31:oss/velox-keygrouped-partitioning
Open

[VL] Add KeyGroupedPartitioning support to columnar shuffle#12084
minni31 wants to merge 1 commit into
apache:mainfrom
minni31:oss/velox-keygrouped-partitioning

Conversation

@minni31
Copy link
Copy Markdown

@minni31 minni31 commented May 13, 2026

CONTEXT

KeyGroupedPartitioning is a Spark partitioning scheme used by V2 data source connectors (e.g., Iceberg, Paimon) where data is partitioned by specific key expressions with known unique partition values. Currently, Gluten's columnar shuffle exchange does not handle this partitioning type, causing a fallback to vanilla Spark for any query involving V2 sources with key-grouped partitioning.

WHAT

Adds KeyGroupedPartitioning support to the columnar shuffle exchange path in the Velox backend. The implementation reuses the existing JVM-side partition ID computation pattern (same mechanism as RangePartitioning):

  • Adds KeyGroupedPartitioning to the validation whitelist in ColumnarShuffleExchangeExecBase, allowing the columnar shuffle to accept this partitioning type.
  • Constructs a KeyGroupedPartitioner from the partitioning's uniquePartitionValues, mapping each partition key to its index.
  • Computes partition IDs on the JVM side by evaluating partition key expressions against each row (via BindReferences) and looking up the result in the KeyGroupedPartitioner. The pid column is prepended to each batch so the native shuffle writer can read it directly.
  • Reuses RangePartitioningShortName for the native partitioning descriptor since both Range and KeyGrouped use the same JVM-side pid prepend pattern — the native shuffle writer reads the prepended column rather than computing partition IDs natively.
  • Each key extraction allocates a fresh array per row and converts to an immutable ArraySeq to avoid aliasing issues with mutable array reuse.

Tests

Suite Tests Status
VeloxShufflePartitioningSuite 22 tests covering hash, range, round-robin, single partitioning, null semantics, data types, boundary cases, and KeyGrouped unit tests (key extraction, partitioner construction, end-to-end partition lookup) Local pass

Note: End-to-end KeyGroupedPartitioning tests require V2 data source connectors (Iceberg/Paimon) which are not available in this test module. The KeyGrouped unit tests validate key extraction, KeyGroupedPartitioner construction, and the full key-extraction-to-partition-lookup flow.

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions github-actions Bot added CORE works for Gluten Core VELOX labels May 13, 2026
@minni31 minni31 force-pushed the oss/velox-keygrouped-partitioning branch from 02ddcb4 to 92ac0bc Compare May 13, 2026 08:39
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

val valueMap = scala.collection.mutable.Map.empty[Seq[Any], Int]
k.uniquePartitionValues.zipWithIndex.foreach {
case (partition, index) =>
valueMap.update(partition.toSeq(dataTypes), index)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKING] numPartitions / valueMap.size mismatch when isPartiallyClustered=true or partitionValues contain duplicates

KeyGroupedPartitioner is constructed with the original n = KeyGroupedPartitioning.numPartitions (i.e. partitionValues.size, including replicas), while valueMap is built from the deduped uniquePartitionValues. When isPartiallyClustered=true (the SPJ skew-handling path), partitionValues deliberately replicates entries → numPartitions > uniquePartitionValues.size. Result:

  • pids [uniqueValues.size, n) are unreachable via the map lookup.
  • The getOrElseUpdate hash fallback (nonNegativeMod(key.hashCode, n)) can land any unknown key into [0, uniqueValues.size), colliding with an already-occupied pid.
  • Two distinct partition keys then end up in the same output partition, breaking the KGP per-partition-uniqueness invariant that downstream SPJ-aware operators rely on.

For reference, vanilla Spark master's ShuffleExchangeExec KGP branch (ShuffleExchangeExec.scala:378-383) calls k.toGrouped first — which dedups partitionKeys and redefines numPartitions := partitionKeys.lengthbefore constructing the KeyGroupedPartitioner. That canonicalization is what guarantees valueMap.size == numPartitions. (Spark 3.5's ShuffleExchangeExec has no KGP branch at all — KGP is consumed only by BatchScanExec and never re-shuffled, so the problem doesn't arise there either.)

Suggested fixes (any one):

  1. Use k.uniquePartitionValues.size as the partitioner's numPartitions instead of n (matches the dedup invariant).
  2. In ColumnarShuffleExchangeExecBase.scala:107 reject isPartiallyClustered=true and any case where partitionValues.size != uniquePartitionValues.size via ValidationResult.failed, restricting the columnar path to the strictly-1:1 case.
  3. Port the .toGrouped-equivalent canonicalization (dedup + renumber) before building valueMap and the partitioner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants