[SPARK-56962][SS][RTM][StreamingShuffle][Part2] Add StreamingShuffleOutputTracker for streaming shuffle coordination#56008
Open
jerrypeng wants to merge 1 commit into
Conversation
Introduces the driver-side coordination service for streaming shuffle.
StreamingShuffleOutputTracker:
- Abstract base class with three key APIs:
registerShuffleWriterTask(shuffleId, mapId, location) -- called by writers
getAllShuffleWriterTaskLocations(shuffleId) -- barrier: returns None until all writers up
getAvailableShuffleWriterTaskLocations(shuffleId) -- progressive: partial results OK
- StreamingShuffleOutputTrackerMaster (driver): ConcurrentHashMap-backed store with a
message-loop thread pool (reuses SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS) that
dispatches RPC replies off the Netty threads
- StreamingShuffleOutputTrackerWorker (executor): thin RPC proxy to the master
- StreamingShuffleOutputTrackerMasterEndpoint: Netty RPC endpoint that feeds
incoming requests into the master's blocking queue
- StreamingShuffleTaskLocation(executorId, host, port): location record
- ShuffleLocationResponse(locations, total): partial-availability response type
- StreamingShuffleInfo(numMaps, numReduces, jobId): per-shuffle metadata on the driver
Adds the NUM_MAPPERS, NUM_REDUCERS, and TASK_LOCATION log keys used by
the tracker.
Configs and the writer/reader-specific error conditions are introduced
in the follow-up commits that need them, so each commit's API surface
matches its callers.
StreamingShuffleOutputTrackerSuite covers register/lookup, barrier
behavior, partial-availability behavior, unregister, and concurrent
registration scenarios.
Co-authored-by: Isaac
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Context
This is the second PR in a stack of nine that contributes a new streaming shuffle implementation to Apache Spark. Streaming shuffle is a push-based shuffle designed for low-latency,
continuously-running queries (e.g., Real-Time mode in Structured Streaming) where the map and reduce stages must run concurrently rather than sequentially. Each map task hosts a Netty
server that pushes records to reduce tasks as they are produced; reduce tasks open clients to those servers and consume records as a stream — no on-disk materialization, no map-stage
barrier.
Because the full implementation spans the network protocol, driver-side coordination, plugin layer, executor-side writer/reader, engine integration, and tests, it is split into nine
independently reviewable PRs:
network-common, pure Java.ShuffleManagerplugin entry point, the bidirectional Netty handlers, and aTaskContext-aware logging trait.SparkEnvand registers shuffles fromDAGScheduler.StreamingShuffleWriter(server-side push).StreamingShuffleReader(client-side pull).This PR has no compile dependency on PR1 (the tracker lives in
org.apache.spark.*and does not reference any of the protocol classes), so the two can be reviewed in parallel. EachPR compiles standalone. The plugin only becomes usable end-to-end after PRs 1–7.
Changes in this PR
This PR introduces the driver-side coordination service that lets streaming shuffle writers and readers discover each other.
StreamingShuffleOutputTracker(abstract base) exposes four public methods:registerShuffle(shuffleId, numMaps, numReduces, jobId)DAGScheduler(PR 4)registerShuffleWriterTask(shuffleId, mapId, location)(executorId, host, port).getAllShuffleWriterTaskLocations(shuffleId)Noneuntil all writers have registered; then returns the full map.getAvailableShuffleWriterTaskLocations(shuffleId)The base is specialized by:
StreamingShuffleOutputTrackerMaster(driver) — backed by aConcurrentHashMap, with a message-loop thread pool that dispatches RPC replies off the Netty threads so a slowdownstream cannot stall the I/O loop. Reuses
SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADSfor sizing.StreamingShuffleOutputTrackerWorker(executor) — thin RPC proxy that forwards every call to the master.StreamingShuffleOutputTrackerMasterEndpoint— Netty RPC endpoint that feeds incoming requests into the master's blocking queue.Supporting types:
StreamingShuffleTaskLocation(executorId, host, port)— the location record published by writers and consumed by readers.ShuffleLocationResponse(locations, numShuffleWriterTasks)— the response type for the progressive (getAvailable…) API; carries the total expected count so a reader can tell how manywriters it still has to discover.
StreamingShuffleInfo(numMaps, numReduces, jobId)— per-shuffle metadata held on the driver.This PR also adds the three log keys used by the tracker —
NUM_MAPPERS,NUM_REDUCERS, andTASK_LOCATION— toLogKeys.java.Why are the changes needed?
The default
SortShuffleManagerrequires every map task to finish writing its output to disk before any reduce task can start. That model is not workable for the streaming shuffleintroduced over the rest of this stack, where map and reduce tasks must coexist for the lifetime of the query and reduce tasks need to start consuming records the moment they are
produced.
For that to work, a reduce task has to be able to ask the driver, "where is writer N's TransportServer running?" and the driver has to track the answer as writers come online. That
coordination is what
StreamingShuffleOutputTrackerprovides:getAllShuffleWriterTaskLocations) returnsNoneuntil every writer has registered — useful when a reader needs the full set upfront. The progressive API (
getAvailableShuffleWriterTaskLocations) returns whatever subset is known so far together with the total expected count — what the streaming-shuffle reader(PR 6) actually uses, so it can begin consuming from the first writer while later writers are still launching.
pattern
MapOutputTrackerMasteruses for the same reason.MapOutputTracker. The streaming shuffle does not produceMapStatusoutputs, so it cannot reuse the existing tracker, but it does need an analogouscoordination point.
This PR's coordination layer is the natural foundation for the rest of the stack: every subsequent PR depends on it.
Does this PR introduce any user-facing change?
No.
StreamingShuffleOutputTrackeris only instantiated when the configuredspark.shuffle.managerisStreamingShuffleManagerorMultiShuffleManager. Those manager classes areintroduced in later PRs in the stack (PR 3 and PR 7), and the
SparkEnvwiring that creates the tracker is introduced in PR 4. Until those land, there is no caller for any of the code inthis PR, and there is no observable behavior change for any user.
How was this patch tested?
StreamingShuffleOutputTrackerSuiteruns 11 unit tests covering:RpcEnv)master start and stop,test tracker workflowRpcEnv)register task for shuffle that doesn't exist,get task location for shuffle that doesn't existregister shuffle,register and get writer task locations,unregister shuffle,multiple shufflesget available writer task locations(returns partial results before all writers register)register writer before shuffle failsconcurrent registration(100 concurrent register calls, asserts all 100 land)To run locally:
build/sbt 'core/testOnly *StreamingShuffleOutputTrackerSuite'
Output:
[info] StreamingShuffleOutputTrackerSuite:
[info] - master start and stop (279 milliseconds)
[info] - test tracker workflow (26 milliseconds)
[info] - register task for shuffle that doesn't exist (19 milliseconds)
[info] - get task location for shuffle that doesn't exist (13 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - register shuffle (3 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - register and get writer task locations (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - get available writer task locations (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - unregister shuffle (1 millisecond)
[info] - StreamingShuffleOutputTrackerMaster - register writer before shuffle fails (2 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - concurrent registration (11 milliseconds)
[info] - StreamingShuffleOutputTrackerMaster - multiple shuffles (1 millisecond)
[info] Run completed in 1 second, 589 milliseconds.
[info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
Was this patch authored or co-authored using generative AI tooling?
co-authored with claude