Skip to content

[SPARK-56962][SS][RTM][StreamingShuffle][Part2] Add StreamingShuffleOutputTracker for streaming shuffle coordination#56008

Open
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:stack/streaming-shuffle-pr2-tracker-solo
Open

[SPARK-56962][SS][RTM][StreamingShuffle][Part2] Add StreamingShuffleOutputTracker for streaming shuffle coordination#56008
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:stack/streaming-shuffle-pr2-tracker-solo

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Context

This is the second PR in a stack of nine that contributes a new streaming shuffle implementation to Apache Spark. Streaming shuffle is a push-based shuffle designed for low-latency,
continuously-running queries (e.g., Real-Time mode in Structured Streaming) where the map and reduce stages must run concurrently rather than sequentially. Each map task hosts a Netty
server that pushes records to reduce tasks as they are produced; reduce tasks open clients to those servers and consume records as a stream — no on-disk materialization, no map-stage
barrier.

Because the full implementation spans the network protocol, driver-side coordination, plugin layer, executor-side writer/reader, engine integration, and tests, it is split into nine
independently reviewable PRs:

  1. Wire protocol ([SPARK-56674][SS][RTM][StreamingShuffle][Part1] Add streaming shuffle wire protocol  #55620) — binary message types in network-common, pure Java.
  2. Output tracker (this PR) — driver-side coordination service mapping shuffle IDs to writer task locations.
  3. Shuffle manager + Netty handlers + logging mixin — the ShuffleManager plugin entry point, the bidirectional Netty handlers, and a TaskContext-aware logging trait.
  4. SparkEnv + DAGScheduler integration — wires the tracker into SparkEnv and registers shuffles from DAGScheduler.
  5. WriterStreamingShuffleWriter (server-side push).
  6. ReaderStreamingShuffleReader (client-side pull).
  7. MultiShuffleManager — routes per-shuffle to streaming or sort shuffle based on a task-local property.
  8. Tests — end-to-end suite for the streaming shuffle plugin.
  9. Documentation — design and configuration reference.

This PR has no compile dependency on PR1 (the tracker lives in org.apache.spark.* and does not reference any of the protocol classes), so the two can be reviewed in parallel. Each
PR compiles standalone. The plugin only becomes usable end-to-end after PRs 1–7.

Changes in this PR

This PR introduces the driver-side coordination service that lets streaming shuffle writers and readers discover each other.

StreamingShuffleOutputTracker (abstract base) exposes four public methods:

API Caller Behavior
registerShuffle(shuffleId, numMaps, numReduces, jobId) DAGScheduler (PR 4) Records the shuffle's metadata.
registerShuffleWriterTask(shuffleId, mapId, location) Writer task (PR 5) Publishes a writer's (executorId, host, port).
getAllShuffleWriterTaskLocations(shuffleId) Reader (PR 6), barrier API Returns None until all writers have registered; then returns the full map.
getAvailableShuffleWriterTaskLocations(shuffleId) Reader (PR 6), progressive API Returns whatever writers have registered so far, together with the total expected count.

The base is specialized by:

  • StreamingShuffleOutputTrackerMaster (driver) — backed by a ConcurrentHashMap, with a message-loop thread pool that dispatches RPC replies off the Netty threads so a slow
    downstream cannot stall the I/O loop. Reuses SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS for sizing.
  • StreamingShuffleOutputTrackerWorker (executor) — thin RPC proxy that forwards every call to the master.
  • StreamingShuffleOutputTrackerMasterEndpoint — Netty RPC endpoint that feeds incoming requests into the master's blocking queue.

Supporting types:

  • StreamingShuffleTaskLocation(executorId, host, port) — the location record published by writers and consumed by readers.
  • ShuffleLocationResponse(locations, numShuffleWriterTasks) — the response type for the progressive (getAvailable…) API; carries the total expected count so a reader can tell how many
    writers it still has to discover.
  • StreamingShuffleInfo(numMaps, numReduces, jobId) — per-shuffle metadata held on the driver.

This PR also adds the three log keys used by the tracker — NUM_MAPPERS, NUM_REDUCERS, and TASK_LOCATION — to LogKeys.java.

Why are the changes needed?

The default SortShuffleManager requires every map task to finish writing its output to disk before any reduce task can start. That model is not workable for the streaming shuffle
introduced over the rest of this stack, where map and reduce tasks must coexist for the lifetime of the query and reduce tasks need to start consuming records the moment they are
produced.

For that to work, a reduce task has to be able to ask the driver, "where is writer N's TransportServer running?" and the driver has to track the answer as writers come online. That
coordination is what StreamingShuffleOutputTracker provides:

  • It exposes two lookup modes. The barrier API (getAllShuffleWriterTaskLocations) returns None until every writer has registered — useful when a reader needs the full set up
    front. The progressive API (getAvailableShuffleWriterTaskLocations) returns whatever subset is known so far together with the total expected count — what the streaming-shuffle reader
    (PR 6) actually uses, so it can begin consuming from the first writer while later writers are still launching.
  • It uses an off-Netty dispatcher thread pool on the master so that slow paths (e.g., a reader doing a lookup at task-launch time) never block the RPC I/O loop. This is the same
    pattern MapOutputTrackerMaster uses for the same reason.
  • It is a separate service from MapOutputTracker. The streaming shuffle does not produce MapStatus outputs, so it cannot reuse the existing tracker, but it does need an analogous
    coordination point.

This PR's coordination layer is the natural foundation for the rest of the stack: every subsequent PR depends on it.

Does this PR introduce any user-facing change?

No.

StreamingShuffleOutputTracker is only instantiated when the configured spark.shuffle.manager is StreamingShuffleManager or MultiShuffleManager. Those manager classes are
introduced in later PRs in the stack (PR 3 and PR 7), and the SparkEnv wiring that creates the tracker is introduced in PR 4. Until those land, there is no caller for any of the code in
this PR, and there is no observable behavior change for any user.

How was this patch tested?

StreamingShuffleOutputTrackerSuite runs 11 unit tests covering:

Group Tests
RPC lifecycle (through RpcEnv) master start and stop, test tracker workflow
Negative cases (through RpcEnv) register task for shuffle that doesn't exist, get task location for shuffle that doesn't exist
Master register/lookup (in-process) register shuffle, register and get writer task locations, unregister shuffle, multiple shuffles
Master progressive lookup (in-process) get available writer task locations (returns partial results before all writers register)
Master negative case (in-process) register writer before shuffle fails
Master concurrency concurrent registration (100 concurrent register calls, asserts all 100 land)

To run locally:

build/sbt 'core/testOnly *StreamingShuffleOutputTrackerSuite'

Output:

[info] StreamingShuffleOutputTrackerSuite:
[info] - master start and stop (279 milliseconds)
[info] - test tracker workflow (26 milliseconds)
[info] - register task for shuffle that doesn't exist (19 milliseconds)
[info] - get task location for shuffle that doesn't exist (13 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - register shuffle (3 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - register and get writer task locations (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - get available writer task locations (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - unregister shuffle (1 millisecond)
[info] - StreamingShuffleOutputTrackerMaster - register writer before shuffle fails (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - concurrent registration (11 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - multiple shuffles (1 millisecond)
[info] Run completed in 1 second, 589 milliseconds.
[info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Was this patch authored or co-authored using generative AI tooling?

co-authored with claude

Introduces the driver-side coordination service for streaming shuffle.

StreamingShuffleOutputTracker:
- Abstract base class with three key APIs:
    registerShuffleWriterTask(shuffleId, mapId, location) -- called by writers
    getAllShuffleWriterTaskLocations(shuffleId) -- barrier: returns None until all writers up
    getAvailableShuffleWriterTaskLocations(shuffleId) -- progressive: partial results OK
- StreamingShuffleOutputTrackerMaster (driver): ConcurrentHashMap-backed store with a
  message-loop thread pool (reuses SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS) that
  dispatches RPC replies off the Netty threads
- StreamingShuffleOutputTrackerWorker (executor): thin RPC proxy to the master
- StreamingShuffleOutputTrackerMasterEndpoint: Netty RPC endpoint that feeds
  incoming requests into the master's blocking queue
- StreamingShuffleTaskLocation(executorId, host, port): location record
- ShuffleLocationResponse(locations, total): partial-availability response type
- StreamingShuffleInfo(numMaps, numReduces, jobId): per-shuffle metadata on the driver

Adds the NUM_MAPPERS, NUM_REDUCERS, and TASK_LOCATION log keys used by
the tracker.

Configs and the writer/reader-specific error conditions are introduced
in the follow-up commits that need them, so each commit's API surface
matches its callers.

StreamingShuffleOutputTrackerSuite covers register/lookup, barrier
behavior, partial-availability behavior, unregister, and concurrent
registration scenarios.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant