Skip to content

[FLINK-39704][runtime/jobmaster] Preserve globally terminal job result across leadership revocation#28201

Open
prashantbh wants to merge 1 commit into
apache:masterfrom
prashantbh:prashant.bh/k8s-ha-repro-tests
Open

[FLINK-39704][runtime/jobmaster] Preserve globally terminal job result across leadership revocation#28201
prashantbh wants to merge 1 commit into
apache:masterfrom
prashantbh:prashant.bh/k8s-ha-repro-tests

Conversation

@prashantbh
Copy link
Copy Markdown

@prashantbh prashantbh commented May 19, 2026

What is the purpose of the change

Fixes FLINK-39704: a race between a JobMaster's globally terminal job completion and a leadership revocation that arrives in the same window. The terminal result (FAILED/CANCELED/FINISHED) was silently masked by a synthetic SUSPENDED, leaving the JobGraph and HA metadata behind. On the next leadership grant, the same JobID was recovered into a new JobMasterServiceProcess even though the job had already reached a globally terminal state — observed with Application Mode on Kubernetes HA; the same race is in the runner / process lifecycle (above the LeaderElection interface), so it can affect session clusters and ZooKeeper HA as well.

The fix has two parts that close two distinct orderings of the same underlying race.

Brief change log

  • DefaultJobMasterServiceProcess.closeAsync() — defer resultFuture.completeExceptionally(JobNotFinishedException) until after jobMasterService.closeAsync() settles. Previously it ran synchronously inside closeAsync(), so an in-flight terminal completion from the scheduler (jobReachedGloballyTerminalState) would race the eager failure and lose; the scheduler's later resultFuture.complete(...) then became a no-op. After the change, the scheduler has its full drain window to publish, and JobNotFinishedException is only declared if nothing else completes the future first.
  • JobMasterServiceLeadershipRunner — cache a globally terminal result the moment it is observed on the result-forwarding path (rememberGloballyTerminalResultIfCurrentProcess), and flush it from either completeResultFutureAfterClose (on runner close) or grantLeadership (on re-grant). Drain is chained inside sequentialOperation so it runs only after the prior process's stop and forwarding have fully settled. Revoke intentionally does not clear currentJobMasterServiceProcessLeaderId, so late terminal callbacks from the closing process can still satisfy the cache's leader-equality check.

Verifying this change

This change added tests and can be verified as follows:

End-to-end reproduction on Kubernetes HA (before and after the fix)

Setup: minikube cluster with the Flink Kubernetes Operator running a FlinkDeployment (apache/flink:2.2.0 base, Kubernetes HA, StateMachineExample, restart-strategy: none). Two builds were tested: the unmodified release and the same release with this PR applied. Both were instrumented with two test-only Thread.sleeps — one inside forwardResultFuture.whenComplete (Ordering A) and one inside jobReachedGloballyTerminalState (Ordering B) — gated behind JVM system properties so they are no-ops in production paths. The sleeps only widen the race window deterministically.

Run sequence per repro:

  1. Apply the CR with the corresponding sleep flag enabled (3000 ms).
  2. Wait for the job to be RUNNING, then delete the TaskManager pod to push the job toward a globally terminal state.
  3. When the instrumentation log line fires, force a leadership revocation by kubectl patch-ing the HA cluster ConfigMap to a foreign holderIdentity with a short fresh lease, then letting the lease expire so the original JM re-acquires.

Results — 3 runs per cell:

Build Ordering reached terminal state Flushing globally terminal result (fix fires) was recovered successfully (reanimated terminal job)
unpatched A SUSPENDED (bug) 0/3 3/3
patched A FAILED 3/3 0/3
unpatched B SUSPENDED (bug) 0/3 3/3
patched B FAILED 3/3 0/3

Unpatched runs show the bug: the terminal FAILED is masked as SUSPENDED, the JobGraph is left behind, and on the next leadership grant Successfully recovered 1 persisted job graphsJob ... was recovered successfullyCreating new JobMasterServiceProcess for a JobID that already reached a globally terminal state. Patched runs show Caching globally terminal job resultFlushing globally terminal result for job ... Job state: FAILED → dispatcher sees reached terminal state FAILED, in the same JVM with no restart.

Unit / integration tests

  • DefaultJobMasterServiceProcessTest#testTerminalResultPublishedDuringCloseWinsOverJobNotFinished — holds the JobMasterService termination future open, calls serviceProcess.closeAsync(), asserts the result is not yet completed, calls jobReachedGloballyTerminalState(FAILED) mid-close, then releases the service termination. Asserts the runner result is FAILED, not JobNotFinishedException. Fails on the old code, passes on the new code.
  • JobMasterServiceLeadershipRunnerTest#testCloseAsyncFlushesCachedGloballyTerminalResultAfterRevoke and #testGrantLeadershipFlushesCachedTerminalResultObservedAfterRevoke — cover the runner-level cache + flush behavior for the close path and the re-grant path respectively, with the terminal result completing after notLeader(). Both fail on a runner that fences currentJobMasterServiceProcessLeaderId on revoke.
  • JobMasterServiceLeadershipRunnerTest#testCloseAsyncCompletesWithGloballyTerminalResultObservedBeforeLeadershipRevoke — parameterized over FAILED/FINISHED/CANCELED, covers the symmetric ordering (terminal observed before revoke).

Run locally with:

mvn -pl flink-runtime -am -Dtest='JobMasterServiceLeadershipRunnerTest,DefaultJobMasterServiceProcessTest' test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes — touches the JobManager leadership runner and the JobMaster process lifecycle; relevant for both Kubernetes HA and ZooKeeper HA since the race is in the runner / process lifecycle (above the LeaderElection interface)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no (bug fix)
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes — Claude and OpenAI Codex were used as pair-programming assistants for analysis and test drafting. The author reviewed and validated the final code and tests.

Generated-by: Claude (Opus 4.7); Codex (GPT-5.5)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@prashantbh
Copy link
Copy Markdown
Author

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants