[FLINK-39704][runtime/jobmaster] Preserve globally terminal job result across leadership revocation#28201
Open
prashantbh wants to merge 1 commit into
Open
Conversation
…t across leadership revocation
Collaborator
Author
|
@flinkbot run azure |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes FLINK-39704: a race between a JobMaster's globally terminal job completion and a leadership revocation that arrives in the same window. The terminal result (
FAILED/CANCELED/FINISHED) was silently masked by a syntheticSUSPENDED, leaving the JobGraph and HA metadata behind. On the next leadership grant, the sameJobIDwas recovered into a newJobMasterServiceProcesseven though the job had already reached a globally terminal state — observed with Application Mode on Kubernetes HA; the same race is in the runner / process lifecycle (above theLeaderElectioninterface), so it can affect session clusters and ZooKeeper HA as well.The fix has two parts that close two distinct orderings of the same underlying race.
Brief change log
DefaultJobMasterServiceProcess.closeAsync()— deferresultFuture.completeExceptionally(JobNotFinishedException)until afterjobMasterService.closeAsync()settles. Previously it ran synchronously insidecloseAsync(), so an in-flight terminal completion from the scheduler (jobReachedGloballyTerminalState) would race the eager failure and lose; the scheduler's laterresultFuture.complete(...)then became a no-op. After the change, the scheduler has its full drain window to publish, andJobNotFinishedExceptionis only declared if nothing else completes the future first.JobMasterServiceLeadershipRunner— cache a globally terminal result the moment it is observed on the result-forwarding path (rememberGloballyTerminalResultIfCurrentProcess), and flush it from eithercompleteResultFutureAfterClose(on runner close) orgrantLeadership(on re-grant). Drain is chained insidesequentialOperationso it runs only after the prior process's stop and forwarding have fully settled. Revoke intentionally does not clearcurrentJobMasterServiceProcessLeaderId, so late terminal callbacks from the closing process can still satisfy the cache's leader-equality check.Verifying this change
This change added tests and can be verified as follows:
End-to-end reproduction on Kubernetes HA (before and after the fix)
Setup: minikube cluster with the Flink Kubernetes Operator running a
FlinkDeployment(apache/flink:2.2.0base, Kubernetes HA,StateMachineExample,restart-strategy: none). Two builds were tested: the unmodified release and the same release with this PR applied. Both were instrumented with two test-onlyThread.sleeps — one insideforwardResultFuture.whenComplete(Ordering A) and one insidejobReachedGloballyTerminalState(Ordering B) — gated behind JVM system properties so they are no-ops in production paths. The sleeps only widen the race window deterministically.Run sequence per repro:
RUNNING, then delete the TaskManager pod to push the job toward a globally terminal state.kubectl patch-ing the HA cluster ConfigMap to a foreignholderIdentitywith a short fresh lease, then letting the lease expire so the original JM re-acquires.Results — 3 runs per cell:
reached terminal stateFlushing globally terminal result(fix fires)was recovered successfully(reanimated terminal job)SUSPENDED(bug)FAILEDSUSPENDED(bug)FAILEDUnpatched runs show the bug: the terminal
FAILEDis masked asSUSPENDED, the JobGraph is left behind, and on the next leadership grantSuccessfully recovered 1 persisted job graphs→Job ... was recovered successfully→Creating new JobMasterServiceProcessfor aJobIDthat already reached a globally terminal state. Patched runs showCaching globally terminal job result→Flushing globally terminal result for job ... Job state: FAILED→ dispatcher seesreached terminal state FAILED, in the same JVM with no restart.Unit / integration tests
DefaultJobMasterServiceProcessTest#testTerminalResultPublishedDuringCloseWinsOverJobNotFinished— holds theJobMasterServicetermination future open, callsserviceProcess.closeAsync(), asserts the result is not yet completed, callsjobReachedGloballyTerminalState(FAILED)mid-close, then releases the service termination. Asserts the runner result isFAILED, notJobNotFinishedException. Fails on the old code, passes on the new code.JobMasterServiceLeadershipRunnerTest#testCloseAsyncFlushesCachedGloballyTerminalResultAfterRevokeand#testGrantLeadershipFlushesCachedTerminalResultObservedAfterRevoke— cover the runner-level cache + flush behavior for the close path and the re-grant path respectively, with the terminal result completing afternotLeader(). Both fail on a runner that fencescurrentJobMasterServiceProcessLeaderIdon revoke.JobMasterServiceLeadershipRunnerTest#testCloseAsyncCompletesWithGloballyTerminalResultObservedBeforeLeadershipRevoke— parameterized overFAILED/FINISHED/CANCELED, covers the symmetric ordering (terminal observed before revoke).Run locally with:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noLeaderElectioninterface)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude (Opus 4.7); Codex (GPT-5.5)