[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667
[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667weiqingy wants to merge 1 commit into
Conversation
|
Hi @wenjin272, the PR has been updated to resolve the conflicts. |
|
The one CI failure ( The test expects This same failure (same exact numbers, Will re-run CI. |
I believe we need to polish the stability and observability of CI in version 0.4. If you encounter any unstable cases, please contact me to rerun them. I now have the permission to rerun failed CI jobs. |
…ntries Issue apache#665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted instead of notifyCheckpointComplete. The DurableExecutionManager only handled the complete path, so the per-checkpoint sequence-number entry recorded by snapshotLastCompletedSequenceNumbers was never released for aborted checkpoints. Under sustained abort pressure (timeouts, alignment failures, backend pressure), checkpointIdToSeqNums grew unboundedly. Changes: - Add DurableExecutionManager.notifyCheckpointAborted(long): removes the entry from checkpointIdToSeqNums, guarded by the same actionStateStore != null check as notifyCheckpointComplete. Does NOT prune durable action state — the aborted checkpoint's writes were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. - Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override that delegates to the manager and then calls super, mirroring the existing notifyCheckpointComplete override. - Extend the symmetric-guard invariant javadoc on snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to name both release paths (complete OR abort). The actionStateStore != null guard now lives on three methods; the cross-linked javadoc makes that explicit and cites issues apache#645 and apache#665. - Three new DurableExecutionManagerTest cases (using the existing getCheckpointIdToSeqNums() @VisibleForTesting accessor introduced in apache#659): * notifyAbortedRemovesEntryWithoutPruning — entry released, durable state untouched (verified against a real InMemoryActionStateStore so wrongful pruning would be observable). * completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight checkpoints, one completes (state pruned), one aborts (state preserved), one remains. * noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op coverage matching the existing notifyCheckpointComplete null-store case.
| } | ||
| } | ||
|
|
||
| void maybePruneState(Object key, long sequenceNum) throws Exception { |
There was a problem hiding this comment.
It appears that only tests are calling this method. We may need to verify whether this is an unnecessary interface or if the call was accidentally removed during a previous conflict resolution.
Closes #665.
What this PR does
DurableExecutionManager.checkpointIdToSeqNumsleaked entries on aborted checkpoints. Flink callsnotifyCheckpointAborted(...)when a checkpoint is aborted (timeout, alignment failure, backend pressure); the existing code only handled the complete path. Under sustained abort pressure the map grew unboundedly.Production changes
DurableExecutionManager.notifyCheckpointAborted(long)(new, package-private) — removes the entry fromcheckpointIdToSeqNums. NopruneStatecall — durable writes for an aborted checkpoint were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. Guarded byactionStateStore != nullto mirror the symmetric guard from [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645.ActionExecutionOperator.notifyCheckpointAborted(long)(new@Override) — thin delegate to the manager, thensuper.notifyCheckpointAborted(...). Mirrors the existingnotifyCheckpointCompleteoverride exactly.snapshotLastCompletedSequenceNumbersandnotifyCheckpointCompletestrengthened to name BOTH release paths (complete OR abort). TheactionStateStore != nullguard now lives on three methods; the javadoc makes the three-way symmetry explicit and cross-links [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645 + [Bug][runtime] DurableExecutionManager leaks checkpointIdToSeqNums entries on aborted checkpoints #665.@VisibleForTesting getCheckpointIdToSeqNums()accessor — mirrorsgetActionStateStore()precedent. (Same addition as in [runtime] Lock null-store symmetry invariant in DurableExecutionManager #666; the second PR to land drops the duplicate on rebase.)Tests (DEM-level, three new)
notifyAbortedRemovesEntryWithoutPruning— entry released, durable state untouched. Uses a realInMemoryActionStateStore(not a mock) so wrongful pruning would be observable.completedAndAbortedInterleavedKeepsInFlightEntries— three in-flight checkpoints; one completes (state pruned), one aborts (state preserved), one remains.noStoreModeNotifyCheckpointAbortedIsNoOp— symmetric null-store no-op coverage.Sanity-mutation verified locally:
actionStateStore.pruneState(...)call → 2 of 3 new tests fail (state was incorrectly pruned).Operator-level harness test deferred to #646 — the new operator override is a one-line delegate; the logic is in the manager.
Test plan
mvn test -Dtest=DurableExecutionManagerTest -pl runtime— 5/5 pass (2 existing + 3 new)mvn test -Dtest=ActionExecutionOperatorTest -pl runtime— 28/28 passmvn test -pl runtime— 307/307 pass (no regressions)./tools/lint.sh -c— 0 violations./tools/check-license.sh— clean (no new tracked files)pruneStatecall on abort → expected tests failDocumentation
doc-neededdoc-not-neededdoc-included