Skip to content

[FLINK-39645][connector-base] Preserve restored HybridSource splits during snapshot#28191

Open
YuanHanzhong wants to merge 2 commits into
apache:masterfrom
YuanHanzhong:codex/FLINK-39645-hybrid-source-restored-splits
Open

[FLINK-39645][connector-base] Preserve restored HybridSource splits during snapshot#28191
YuanHanzhong wants to merge 2 commits into
apache:masterfrom
YuanHanzhong:codex/FLINK-39645-hybrid-source-restored-splits

Conversation

@YuanHanzhong
Copy link
Copy Markdown
Contributor

What is the purpose of the change

HybridSourceReader can be checkpointed during recovery after restored HybridSourceSplit instances were added back, but before the SwitchSourceEvent creates the underlying reader. In that window currentReader is still null, and snapshotState() previously snapshotted an empty list. A second failure from that checkpoint would then lose the recovered splits.

This keeps the already-restored hybrid splits in the checkpoint state until the source switch hands them to the underlying reader.

Brief change log

  • Add a regression test for snapshotting recovered HybridSourceSplit state before the recovery SwitchSourceEvent.
  • Return a defensive copy of restoredSplits when snapshotState() runs before a current reader exists.

Verifying this change

  • ./mvnw -pl flink-connectors/flink-connector-base -DskipITs -Dtest=HybridSourceReaderTest#testReaderRecoverySnapshotBeforeSwitchSourceEvent -Dsurefire.failIfNoSpecifiedTests=false test
  • ./mvnw -pl flink-connectors/flink-connector-base -DskipITs -Dtest=HybridSourceReaderTest -Dsurefire.failIfNoSpecifiedTests=false test
  • ./mvnw -pl flink-connectors/flink-connector-base -DskipITs test
  • git diff --check

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 18, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@spuru9 spuru9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment

assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));

MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use Integer.MAX_VALUE rather than the magic literal 2147483647

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks. I switched this to Integer.MAX_VALUE and reran the focused HybridSourceReader test:

./mvnw -pl flink-connectors/flink-connector-base -DskipITs -Dtest=HybridSourceReaderTest#testReaderRecoverySnapshotBeforeSwitchSourceEvent -Dsurefire.failIfNoSpecifiedTests=false test

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants