-
Notifications
You must be signed in to change notification settings - Fork 293
Description
What is the problem the feature request solves?
Replace disk-based shuffle with Arrow Flight for direct memory-to-memory data exchange between executors, eliminating intermediate disk I/O and leveraging Arrow's native IPC
format for efficient shuffle.
This discussion proposes adding native Arrow Flight support to DataFusion-Comet's shuffle implementation. Currently, Comet writes shuffle data in Arrow IPC format but relies on Spark's Netty-based BlockManager for network transfer. By implementing Arrow Flight in the Rust native layer, we can achieve true end-to-end zero-copy columnar shuffle, eliminating the JVM boundary crossing for network I/O.
Motivation
Comet already uses Arrow RecordBatches internally, but shuffle still goes through disk:
Current Flow:
Arrow RecordBatch → Arrow IPC → Compress → DISK → Network → DISK → Decompress → Arrow RecordBatch
Proposed Flow:
Arrow RecordBatch → Arrow Flight (gRPC) → Arrow RecordBatch
In other words :
Current Flow:
- Rust ShuffleWriter -> Arrow IPC files -> Disk
- Spark BlockManager -> Netty -> Remote Executor
- JNI -> Rust ShuffleReader -> RecordBatch
Proposed Flow:
- Rust ShuffleWriter -> In-memory Arrow buffers
- Rust FlightServer -> gRPC/HTTP2 -> Remote Rust FlightClient
- Rust ShuffleReader -> RecordBatch (zero-copy)
Describe the potential solution
No response
Additional context
Configuration
# Enable Arrow Flight shuffle
spark.shuffle.manager=org.apache.comet.shuffle.CometFlightShuffleManager
# Flight server configuration
spark.comet.shuffle.flight.enabled=true
spark.comet.shuffle.flight.port=50051
# Memory management
spark.comet.shuffle.flight.memoryFraction=0.3
spark.comet.shuffle.flight.spillThreshold=0.8
# Network configuration
spark.comet.shuffle.flight.maxMessageSize=67108864 # 64MB
spark.comet.shuffle.flight.compression=zstd
# Fault tolerance
spark.comet.shuffle.flight.retryAttempts=3
spark.comet.shuffle.flight.retryDelayMs=1000
Related Work
- Ballista: DataFusion's distributed query engine uses Arrow Flight
- Dask: Exploring Arrow Flight for task communication
- Ray: Uses gRPC for object transfer (similar concept)
- Spark 3.2 Push-Based Shuffle: Inspiration for push model
References
- https://arrow.apache.org/docs/format/Flight.html
- https://arrow.apache.org/docs/format/FlightSql.html
- https://github.com/apache/datafusion-comet/tree/main/native/core/src/execution/shuffle
- https://spark.apache.org/docs/latest/api/java/org/apache/spark/shuffle/ShuffleManager.html
- Apache Arrow Flight: https://arrow.apache.org/docs/format/Flight.html
- DataFusion Ballista: Uses Arrow Flight for distributed query execution