Skip to content

[CELEBORN-2226][CIP-14] Support RetryFetchChunk functionality for Cel…#3605

Open
afterincomparableyum wants to merge 1 commit intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2226
Open

[CELEBORN-2226][CIP-14] Support RetryFetchChunk functionality for Cel…#3605
afterincomparableyum wants to merge 1 commit intoapache:mainfrom
afterincomparableyum:cpp-client/celeborn-2226

Conversation

@afterincomparableyum
Copy link
Contributor

@afterincomparableyum afterincomparableyum commented Feb 19, 2026

Implement chunk-fetch retry logic in CelebornInputStream::getNextChunk(), matching the Java CelebornInputStream behavior. When a chunk fetch fails, the retry loop excludes the failed worker, switches to the peer replica (if available), and sleeps between retry rounds before creating a new reader.

Added getLocation() to PartitionReader interface and WorkerPartitionReader

Replaced the stub getNextChunk() with full retry logic: excluded worker checks, peer switching, configurable retry count, sleep between retries

Updated moveToNextChunk() and moveToNextReader() to handle nullable returns from getNextChunk()

Added unit test for WorkerPartitionReader::getLocation()

Added unit tests for getNextChunk() retry logic

CI and build passes

@afterincomparableyum
Copy link
Contributor Author

afterincomparableyum commented Feb 19, 2026

I will rebase off of main and then open this PR once #3583 gets merged.

@jaystarshot
Copy link
Contributor

@afterincomparableyum thanks for these changes, do you have a list of unsupported functionality in the cpp client by any chance?

@afterincomparableyum
Copy link
Contributor Author

afterincomparableyum commented Feb 24, 2026

@jaystarshot these are the ones I am working on.

I have local changes for most of them made already, just opening PRs sequentially based on dependency on previous PRs.
https://issues.apache.org/jira/browse/CELEBORN-2199

@afterincomparableyum afterincomparableyum force-pushed the cpp-client/celeborn-2226 branch 6 times, most recently from 0195d54 to 09b8709 Compare February 26, 2026 07:42
@afterincomparableyum afterincomparableyum marked this pull request as ready for review February 26, 2026 07:44
@afterincomparableyum
Copy link
Contributor Author

Ping @HolyLow @SteNicholas @RexXiong for review.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds chunk-fetch retry behavior to the C++ Celeborn client input stream (mirroring Java behavior), including worker exclusion checks and peer-replica switching, and exposes a reader’s PartitionLocation to support that logic.

Changes:

  • Added getLocation() to the PartitionReader interface and implemented it in WorkerPartitionReader.
  • Replaced the CelebornInputStream::getNextChunk() stub with retry/exclusion/peer-switching logic and updated reader/chunk advancement to handle nullptr chunks.
  • Added a unit test validating WorkerPartitionReader::getLocation().

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
cpp/celeborn/client/tests/WorkerPartitionReaderTest.cpp Adds a unit test for WorkerPartitionReader::getLocation().
cpp/celeborn/client/reader/WorkerPartitionReader.h Extends PartitionReader with getLocation() and declares override in WorkerPartitionReader.
cpp/celeborn/client/reader/WorkerPartitionReader.cpp Implements WorkerPartitionReader::getLocation().
cpp/celeborn/client/reader/CelebornInputStream.cpp Implements chunk-fetch retry logic and updates chunk/reader progression to tolerate nullptr from getNextChunk().

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afterincomparableyum, thanks for contribution. I left minor comment for the changes. In addition, please take a look at the review comments of Copilot.

…ebornInputStream in CppClient

Implement chunk-fetch retry logic in CelebornInputStream::getNextChunk(), matching the Java CelebornInputStream behavior. When a chunk fetch fails, the retry loop excludes the failed worker, switches to the peer replica (if available), and sleeps between retry rounds before creating a new reader.

    - Add getLocation() to PartitionReader interface and WorkerPartitionReader
    - Replace the stub getNextChunk() with full retry logic: excluded worker
      checks, peer switching, configurable retry count, sleep between retries
    - Update moveToNextChunk() and moveToNextReader() to handle nullable
      returns from getNextChunk()
    - Add unit test for WorkerPartitionReader::getLocation()
    - Add unit tests for getNextChunk() retry logic
@afterincomparableyum
Copy link
Contributor Author

Comments resolved @SteNicholas , and CI should pass as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants