feat: stop using FFI in native shuffle read path#3731
feat: stop using FFI in native shuffle read path#3731andygrove wants to merge 17 commits intoapache:mainfrom
Conversation
Adds a design document for bypassing Arrow FFI in the shuffle read path when both the shuffle writer and downstream operator are native.
Add a new ShuffleScanExec operator that pulls compressed shuffle blocks from JVM via CometShuffleBlockIterator and decodes them natively using read_ipc_compressed(). Uses the pre-pull pattern (get_next_batch called externally before poll_next) to avoid JNI calls on tokio threads.
Fix two bugs discovered during testing: - ClassCastException: factory closure incorrectly cast Partition to CometExecPartition before extracting ShuffledRowRDDPartition; the partition passed to the factory is already the unwrapped partition from the input RDD - NoSuchElementException in SQLShuffleReadMetricsReporter: metrics field in CometShuffledBatchRDD was not exposed as a val, causing Map.empty to be used instead of the real shuffle metrics map Add Scala integration test that runs a repartition+aggregate query with direct read enabled and disabled to verify result parity. Add Rust unit test for read_ipc_compressed codec round-trip.
- Remove redundant getCurrentBlockLength() JNI call (reuse hasNext() return value) - Make readAsRawStream() lazy instead of materializing all streams to a List - Remove pointless DirectByteBuffer re-allocation in close() - Remove dead sparkPlanToInputIdx map
Skip test_read_compressed_ipc_block under Miri since it calls foreign zstd functions that Miri cannot execute.
a7e9659 to
19cb04b
Compare
parthchandra
left a comment
There was a problem hiding this comment.
Not the expert on shuffle but this looks pretty comprehensive to me. And a great optimization!
|
|
||
| val COMET_SHUFFLE_DIRECT_READ_ENABLED: ConfigEntry[Boolean] = | ||
| conf("spark.comet.shuffle.directRead.enabled") | ||
| .category(CATEGORY_EXEC) |
There was a problem hiding this comment.
Should this be CATEGORY_SHUFFLE ?
| } | ||
| } | ||
|
|
||
| test("shuffle direct read produces same results as FFI path") { |
There was a problem hiding this comment.
Should we also test a case with multiple shuffles in the plan?
Nice work! I didn't expect removing FFI to bring such great benefits. Could you share where these benefits mainly come from? Is it due to fewer JNI calls, or was the overhead from ArrowImporter relatively high? |
|
I read the relevant implementation in Gluten, which defines a lightweight ColumanBatch that only holds a nativeHandle and does not make arrow imports. |
| int bytesRead = channel.read(headerBuf); | ||
| if (bytesRead < 0) { | ||
| if (headerBuf.position() == 0) { | ||
| return -1; |
There was a problem hiding this comment.
We can call close() earlier here.
| // Field count discarded - schema determined by ShuffleScan protobuf fields | ||
| headerBuf.getLong(); | ||
|
|
||
| long bytesToRead = compressedLength - 8; |
There was a problem hiding this comment.
nit: add a comment explaining why -8
| case rdd: CometShuffledBatchRDD => | ||
| val dep = rdd.dependency | ||
| val rddMetrics = rdd.metrics | ||
| factories(scanIdx) = (context, part) => { |
There was a problem hiding this comment.
Much of the logic here duplicates CometShuffledBatchRDD#compute. Perhaps we could add a computeAsShuffleBlockIterator method to CometShuffledBatchRDD and reuse createReader logic. Like:
class CometShuffledBatchRDD {
def computeAsShuffleBlockIterator(context: TaskContext, split: Partition): CometShuffleBlockIterator = {
...
}
}
factories(scanIdx) = rdd.computeAsShuffleBlockIterator
There was a problem hiding this comment.
In that case, we no longer need to buildShuffleBlockIteratorFactories; we can compute it in CometExecRDD like this:
class CometExecRDD {
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val partition = split.asInstanceOf[CometExecPartition]
val inputs = inputRDDs.zip(partition.inputPartitions).zipWithIndex.map {
case ((rdd: CometShuffledBatchRDD, part), idx) if shuffleScanIndices.contains(idx) =>
rdd.computeAsShuffleBlockIterator(part, context)
case ((rdd, part), _) => rdd.iterator(part, context)
}
...
}
Which issue does this PR close?
Performance improvements for native shuffle read. Shows 13% improvement in TPC-H @ 1TB.
Rationale for this change
Simplifies the shuffle direct read code path, removing unnecessary FFI transfers.
What changes are included in this PR?
How are these changes tested?