[CELEBORN-2226][CIP-14] Support RetryFetchChunk functionality for Cel…#3605
[CELEBORN-2226][CIP-14] Support RetryFetchChunk functionality for Cel…#3605afterincomparableyum wants to merge 1 commit intoapache:mainfrom
Conversation
|
I will rebase off of main and then open this PR once #3583 gets merged. |
bf7acef to
d3b5389
Compare
|
@afterincomparableyum thanks for these changes, do you have a list of unsupported functionality in the cpp client by any chance? |
|
@jaystarshot these are the ones I am working on. I have local changes for most of them made already, just opening PRs sequentially based on dependency on previous PRs. |
0195d54 to
09b8709
Compare
|
Ping @HolyLow @SteNicholas @RexXiong for review. |
There was a problem hiding this comment.
Pull request overview
Adds chunk-fetch retry behavior to the C++ Celeborn client input stream (mirroring Java behavior), including worker exclusion checks and peer-replica switching, and exposes a reader’s PartitionLocation to support that logic.
Changes:
- Added
getLocation()to thePartitionReaderinterface and implemented it inWorkerPartitionReader. - Replaced the
CelebornInputStream::getNextChunk()stub with retry/exclusion/peer-switching logic and updated reader/chunk advancement to handlenullptrchunks. - Added a unit test validating
WorkerPartitionReader::getLocation().
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| cpp/celeborn/client/tests/WorkerPartitionReaderTest.cpp | Adds a unit test for WorkerPartitionReader::getLocation(). |
| cpp/celeborn/client/reader/WorkerPartitionReader.h | Extends PartitionReader with getLocation() and declares override in WorkerPartitionReader. |
| cpp/celeborn/client/reader/WorkerPartitionReader.cpp | Implements WorkerPartitionReader::getLocation(). |
| cpp/celeborn/client/reader/CelebornInputStream.cpp | Implements chunk-fetch retry logic and updates chunk/reader progression to tolerate nullptr from getNextChunk(). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
@afterincomparableyum, thanks for contribution. I left minor comment for the changes. In addition, please take a look at the review comments of Copilot.
09b8709 to
7164346
Compare
…ebornInputStream in CppClient
Implement chunk-fetch retry logic in CelebornInputStream::getNextChunk(), matching the Java CelebornInputStream behavior. When a chunk fetch fails, the retry loop excludes the failed worker, switches to the peer replica (if available), and sleeps between retry rounds before creating a new reader.
- Add getLocation() to PartitionReader interface and WorkerPartitionReader
- Replace the stub getNextChunk() with full retry logic: excluded worker
checks, peer switching, configurable retry count, sleep between retries
- Update moveToNextChunk() and moveToNextReader() to handle nullable
returns from getNextChunk()
- Add unit test for WorkerPartitionReader::getLocation()
- Add unit tests for getNextChunk() retry logic
7164346 to
7022c89
Compare
|
Comments resolved @SteNicholas , and CI should pass as well. |
Implement chunk-fetch retry logic in CelebornInputStream::getNextChunk(), matching the Java CelebornInputStream behavior. When a chunk fetch fails, the retry loop excludes the failed worker, switches to the peer replica (if available), and sleeps between retry rounds before creating a new reader.
Added getLocation() to PartitionReader interface and WorkerPartitionReader
Replaced the stub getNextChunk() with full retry logic: excluded worker checks, peer switching, configurable retry count, sleep between retries
Updated moveToNextChunk() and moveToNextReader() to handle nullable returns from getNextChunk()
Added unit test for WorkerPartitionReader::getLocation()
Added unit tests for getNextChunk() retry logic
CI and build passes