Skip to content

feat: stop using FFI in native shuffle read path#3731

Open
andygrove wants to merge 17 commits intoapache:mainfrom
andygrove:shuffle-direct-read
Open

feat: stop using FFI in native shuffle read path#3731
andygrove wants to merge 17 commits intoapache:mainfrom
andygrove:shuffle-direct-read

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Mar 18, 2026

Which issue does this PR close?

Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.

Rationale for this change

Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.

What changes are included in this PR?

How are these changes tested?

Adds a design document for bypassing Arrow FFI in the shuffle read
path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks
from JVM via CometShuffleBlockIterator and decodes them natively using
read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called
externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing:
- ClassCastException: factory closure incorrectly cast Partition to
  CometExecPartition before extracting ShuffledRowRDDPartition; the
  partition passed to the factory is already the unwrapped partition
  from the input RDD
- NoSuchElementException in SQLShuffleReadMetricsReporter: metrics
  field in CometShuffledBatchRDD was not exposed as a val, causing
  Map.empty to be used instead of the real shuffle metrics map

Add Scala integration test that runs a repartition+aggregate query
with direct read enabled and disabled to verify result parity.
Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value)
- Make readAsRawStream() lazy instead of materializing all streams to a List
- Remove pointless DirectByteBuffer re-allocation in close()
- Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls
foreign zstd functions that Miri cannot execute.
@andygrove andygrove marked this pull request as ready for review March 19, 2026 13:59
@andygrove andygrove changed the title feat: bypass Arrow FFI for native shuffle read path feat: replace Arrow IPC with raw buffer format in shuffle Mar 19, 2026
@andygrove andygrove changed the title feat: replace Arrow IPC with raw buffer format in shuffle feat: shuffle direct read and raw buffer shuffle format Mar 19, 2026
@andygrove andygrove marked this pull request as draft March 19, 2026 16:33
@andygrove andygrove force-pushed the shuffle-direct-read branch from a7e9659 to 19cb04b Compare March 19, 2026 16:36
@andygrove andygrove changed the title feat: shuffle direct read and raw buffer shuffle format feat: stop using FFI in native shuffle read path Mar 19, 2026
@andygrove andygrove marked this pull request as ready for review March 19, 2026 16:38
@andygrove andygrove requested review from wForget March 19, 2026 17:02
Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the expert on shuffle but this looks pretty comprehensive to me. And a great optimization!


val COMET_SHUFFLE_DIRECT_READ_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.directRead.enabled")
.category(CATEGORY_EXEC)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be CATEGORY_SHUFFLE ?

}
}

test("shuffle direct read produces same results as FFI path") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test a case with multiple shuffles in the plan?

@wForget
Copy link
Member

wForget commented Mar 20, 2026

Shows 13% improvement in TPC-H @ 1TB.

Nice work! I didn't expect removing FFI to bring such great benefits. Could you share where these benefits mainly come from? Is it due to fewer JNI calls, or was the overhead from ArrowImporter relatively high?

@wForget
Copy link
Member

wForget commented Mar 20, 2026

I read the relevant implementation in Gluten, which defines a lightweight ColumanBatch that only holds a nativeHandle and does not make arrow imports.

https://github.com/apache/gluten/blob/main/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java

int bytesRead = channel.read(headerBuf);
if (bytesRead < 0) {
if (headerBuf.position() == 0) {
return -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call close() earlier here.

// Field count discarded - schema determined by ShuffleScan protobuf fields
headerBuf.getLong();

long bytesToRead = compressedLength - 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment explaining why -8

case rdd: CometShuffledBatchRDD =>
val dep = rdd.dependency
val rddMetrics = rdd.metrics
factories(scanIdx) = (context, part) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of the logic here duplicates CometShuffledBatchRDD#compute. Perhaps we could add a computeAsShuffleBlockIterator method to CometShuffledBatchRDD and reuse createReader logic. Like:

class CometShuffledBatchRDD {
  def computeAsShuffleBlockIterator(context: TaskContext, split: Partition): CometShuffleBlockIterator = {
     ...
  }
}

factories(scanIdx) = rdd.computeAsShuffleBlockIterator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we no longer need to buildShuffleBlockIteratorFactories; we can compute it in CometExecRDD like this:

class CometExecRDD {
  override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
    val partition = split.asInstanceOf[CometExecPartition]

    val inputs = inputRDDs.zip(partition.inputPartitions).zipWithIndex.map {
      case ((rdd: CometShuffledBatchRDD, part), idx) if shuffleScanIndices.contains(idx) =>
        rdd.computeAsShuffleBlockIterator(part, context)
      case ((rdd, part), _) => rdd.iterator(part, context)
    }
...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants