Skip to content

[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667

Open
weiqingy wants to merge 1 commit into
apache:mainfrom
weiqingy:665-impl
Open

[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries#667
weiqingy wants to merge 1 commit into
apache:mainfrom
weiqingy:665-impl

Conversation

@weiqingy
Copy link
Copy Markdown
Collaborator

@weiqingy weiqingy commented May 13, 2026

Closes #665.

What this PR does

DurableExecutionManager.checkpointIdToSeqNums leaked entries on aborted checkpoints. Flink calls notifyCheckpointAborted(...) when a checkpoint is aborted (timeout, alignment failure, backend pressure); the existing code only handled the complete path. Under sustained abort pressure the map grew unboundedly.

Production changes

  1. DurableExecutionManager.notifyCheckpointAborted(long) (new, package-private) — removes the entry from checkpointIdToSeqNums. No pruneState call — durable writes for an aborted checkpoint were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. Guarded by actionStateStore != null to mirror the symmetric guard from [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645.
  2. ActionExecutionOperator.notifyCheckpointAborted(long) (new @Override) — thin delegate to the manager, then super.notifyCheckpointAborted(...). Mirrors the existing notifyCheckpointComplete override exactly.
  3. Javadoc invariant statement on snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete strengthened to name BOTH release paths (complete OR abort). The actionStateStore != null guard now lives on three methods; the javadoc makes the three-way symmetry explicit and cross-links [Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645 + [Bug][runtime] DurableExecutionManager leaks checkpointIdToSeqNums entries on aborted checkpoints #665.
  4. @VisibleForTesting getCheckpointIdToSeqNums() accessor — mirrors getActionStateStore() precedent. (Same addition as in [runtime] Lock null-store symmetry invariant in DurableExecutionManager #666; the second PR to land drops the duplicate on rebase.)

Tests (DEM-level, three new)

  • notifyAbortedRemovesEntryWithoutPruning — entry released, durable state untouched. Uses a real InMemoryActionStateStore (not a mock) so wrongful pruning would be observable.
  • completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight checkpoints; one completes (state pruned), one aborts (state preserved), one remains.
  • noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op coverage.

Sanity-mutation verified locally:

  • Emptying the new method's body → 2 of 3 new tests fail.
  • Adding a wrongful actionStateStore.pruneState(...) call → 2 of 3 new tests fail (state was incorrectly pruned).

Operator-level harness test deferred to #646 — the new operator override is a one-line delegate; the logic is in the manager.

Test plan

  • mvn test -Dtest=DurableExecutionManagerTest -pl runtime — 5/5 pass (2 existing + 3 new)
  • mvn test -Dtest=ActionExecutionOperatorTest -pl runtime — 28/28 pass
  • mvn test -pl runtime — 307/307 pass (no regressions)
  • ./tools/lint.sh -c — 0 violations
  • ./tools/check-license.sh — clean (no new tracked files)
  • Sanity mutation: empty new method body → expected tests fail
  • Sanity mutation: wrongful pruneState call on abort → expected tests fail

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions Bot added doc-label-missing The Bot applies this label either because none or multiple labels were provided. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. doc-not-needed Your PR changes do not impact docs and removed doc-label-missing The Bot applies this label either because none or multiple labels were provided. labels May 13, 2026
@wenjin272
Copy link
Copy Markdown
Collaborator

Hi, @weiqingy. It appears that after the merge of #659, both #666 and #667 have some conflicts.

@weiqingy
Copy link
Copy Markdown
Collaborator Author

Hi @wenjin272, the PR has been updated to resolve the conflicts.

@weiqingy
Copy link
Copy Markdown
Collaborator Author

The one CI failure (it-python [java-17] [python-3.12] [flink-2.1]) is the known test_react_agent_on_local_runner LLM flake against Ollama qwen3:1.7b, not caused by this PR:

FAILED flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py::test_react_agent_on_local_runner
  - assert 432596736 == 1386528

The test expects 4444 × 312 = 1386528, but the LLM made an extra unnecessary multiply(1386528, 312) call and returned 432596736. The test source has a comment right next to the assertion: "This may be caused by the LLM response does not match the output schema, you can rerun this case."

This same failure (same exact numbers, 432596736 == 1386528) is currently failing on main at b38ae21 — the commit this PR is rebased onto — and on several other recent main-branch runs. Failure runs through the Python local_runner, which logs "Local runner does not support durable execution; recovery is not available." — the Java DurableExecutionManager / ActionExecutionOperator paths changed by this PR are never exercised.

Will re-run CI.

@wenjin272
Copy link
Copy Markdown
Collaborator

The one CI failure (it-python [java-17] [python-3.12] [flink-2.1]) is the known test_react_agent_on_local_runner LLM flake against Ollama qwen3:1.7b, not caused by this PR:

FAILED flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py::test_react_agent_on_local_runner
  - assert 432596736 == 1386528

The test expects 4444 × 312 = 1386528, but the LLM made an extra unnecessary multiply(1386528, 312) call and returned 432596736. The test source has a comment right next to the assertion: "This may be caused by the LLM response does not match the output schema, you can rerun this case."

This same failure (same exact numbers, 432596736 == 1386528) is currently failing on main at b38ae21 — the commit this PR is rebased onto — and on several other recent main-branch runs. Failure runs through the Python local_runner, which logs "Local runner does not support durable execution; recovery is not available." — the Java DurableExecutionManager / ActionExecutionOperator paths changed by this PR are never exercised.

Will re-run CI.

I believe we need to polish the stability and observability of CI in version 0.4. If you encounter any unstable cases, please contact me to rerun them. I now have the permission to rerun failed CI jobs.

…ntries

Issue apache#665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted
instead of notifyCheckpointComplete. The DurableExecutionManager only handled
the complete path, so the per-checkpoint sequence-number entry recorded by
snapshotLastCompletedSequenceNumbers was never released for aborted
checkpoints. Under sustained abort pressure (timeouts, alignment failures,
backend pressure), checkpointIdToSeqNums grew unboundedly.

Changes:

- Add DurableExecutionManager.notifyCheckpointAborted(long): removes the
  entry from checkpointIdToSeqNums, guarded by the same actionStateStore
  != null check as notifyCheckpointComplete. Does NOT prune durable action
  state — the aborted checkpoint's writes were never committed, so the
  prior committed checkpoint's recovery state is still load-bearing and
  must not be pruned.

- Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override
  that delegates to the manager and then calls super, mirroring the
  existing notifyCheckpointComplete override.

- Extend the symmetric-guard invariant javadoc on
  snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to
  name both release paths (complete OR abort). The actionStateStore !=
  null guard now lives on three methods; the cross-linked javadoc makes
  that explicit and cites issues apache#645 and apache#665.

- Three new DurableExecutionManagerTest cases (using the existing
  getCheckpointIdToSeqNums() @VisibleForTesting accessor introduced in
  apache#659):
  * notifyAbortedRemovesEntryWithoutPruning — entry released, durable
    state untouched (verified against a real InMemoryActionStateStore so
    wrongful pruning would be observable).
  * completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight
    checkpoints, one completes (state pruned), one aborts (state preserved),
    one remains.
  * noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op
    coverage matching the existing notifyCheckpointComplete null-store case.
Copy link
Copy Markdown
Collaborator

@wenjin272 wenjin272 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @weiqingy, LGTM. I left a comment that may need to be confirmed.

}
}

void maybePruneState(Object key, long sequenceNum) throws Exception {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that only tests are calling this method. We may need to verify whether this is an unnecessary interface or if the call was accidentally removed during a previous conflict resolution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][runtime] DurableExecutionManager leaks checkpointIdToSeqNums entries on aborted checkpoints

2 participants