Skip to content

Arrow Flight Shuffle for Comet #3596

@Shekharrajak

Description

@Shekharrajak

What is the problem the feature request solves?

Replace disk-based shuffle with Arrow Flight for direct memory-to-memory data exchange between executors, eliminating intermediate disk I/O and leveraging Arrow's native IPC
format for efficient shuffle.

This discussion proposes adding native Arrow Flight support to DataFusion-Comet's shuffle implementation. Currently, Comet writes shuffle data in Arrow IPC format but relies on Spark's Netty-based BlockManager for network transfer. By implementing Arrow Flight in the Rust native layer, we can achieve true end-to-end zero-copy columnar shuffle, eliminating the JVM boundary crossing for network I/O.

Motivation

Comet already uses Arrow RecordBatches internally, but shuffle still goes through disk:

Current Flow:
Arrow RecordBatch → Arrow IPC → Compress → DISK → Network → DISK → Decompress → Arrow RecordBatch

Proposed Flow:
Arrow RecordBatch → Arrow Flight (gRPC) → Arrow RecordBatch

In other words :

Current Flow:

  1. Rust ShuffleWriter -> Arrow IPC files -> Disk
  2. Spark BlockManager -> Netty -> Remote Executor
  3. JNI -> Rust ShuffleReader -> RecordBatch

Proposed Flow:

  1. Rust ShuffleWriter -> In-memory Arrow buffers
  2. Rust FlightServer -> gRPC/HTTP2 -> Remote Rust FlightClient
  3. Rust ShuffleReader -> RecordBatch (zero-copy)

Describe the potential solution

No response

Additional context

Configuration


  # Enable Arrow Flight shuffle
  spark.shuffle.manager=org.apache.comet.shuffle.CometFlightShuffleManager

  # Flight server configuration
  spark.comet.shuffle.flight.enabled=true
  spark.comet.shuffle.flight.port=50051

  # Memory management
  spark.comet.shuffle.flight.memoryFraction=0.3
  spark.comet.shuffle.flight.spillThreshold=0.8

  # Network configuration
  spark.comet.shuffle.flight.maxMessageSize=67108864  # 64MB
  spark.comet.shuffle.flight.compression=zstd

  # Fault tolerance
  spark.comet.shuffle.flight.retryAttempts=3
  spark.comet.shuffle.flight.retryDelayMs=1000

Related Work

  • Ballista: DataFusion's distributed query engine uses Arrow Flight
  • Dask: Exploring Arrow Flight for task communication
  • Ray: Uses gRPC for object transfer (similar concept)
  • Spark 3.2 Push-Based Shuffle: Inspiration for push model

References


Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions