refactor(workflow-engine): optimize loop history pruning#4370
refactor(workflow-engine): optimize loop history pruning#4370NathanFlurry merged 3 commits intomainfrom
Conversation
|
🚅 Deployed to the rivet-pr-4370 environment in rivet-frontend
|
Code ReviewOverall this is a solid refactor. Collapsing three config knobs (commitInterval/historyEvery/historyKeep) into two (historyPruneInterval/historySize) is a cleaner API, the incremental compaction tracking avoids O(N) rescans on every checkpoint, and range-based deletions reduce driver round-trips. A few things to address before merge. Breaking change not labeled commitInterval, historyEvery, and historyKeep are removed with no deprecation shim. TypeScript will catch call-sites at compile time, but the PR does not check the Breaking change box and the description does not mention semver implications. Please mark it and call it out in the changelog. Deferred flush does not run in parallel with user code The PR description says defers checkpoint flushes to run in parallel with user code for performance, and the inline comment repeats this. In practice the flush is awaited at the very top of the next iteration, before config.run is called, so there is no overlap with user code. The actual benefit is that the IO begins slightly earlier (right after the prune decision rather than at the next-iteration boundary), which can reduce the perceived latency if the IO completes before the await is reached. That is a legitimate micro-optimization, but the comments should reflect what actually happens so future readers are not confused. Missing input validation for historyPruneInterval The removed code had a TODO comment noting that commitInterval must be greater than 0. That TODO was dropped without adding validation. With historyPruneInterval = 0, iteration modulo 0 evaluates to NaN in JavaScript, so the periodic prune branch never fires and history grows without bound. Add a guard at loop entry: if (historyPruneInterval <= 0) throw new Error(historyPruneInterval must be greater than 0) deleteRange is a breaking addition to EngineDriver Adding a required method to a public interface is a breaking change for external implementors. Only InMemoryDriver (testing) and ActorWorkflowDriver (rivetkit) are updated in this PR. If any other concrete implementations exist they will fail at compile time. Consider whether deleteRange should have a default implementation (e.g. a fallback loop of delete calls) so existing implementors do not break immediately. Non-null assertion in refactored deleteEntriesWithPrefix storage.ts uses deletions.prefixes[0]! after the refactor. collectDeletionsForPrefix always pushes exactly one element, so this is safe today, but the assertion is fragile. Either destructure directly or add a runtime check to make the invariant explicit. Minor: weak assertion in historyPruneInterval=1 test tests/loops.test.ts (should handle historyPruneInterval of 1) uses expect(minIteration).toBeGreaterThanOrEqual(2). With historyPruneInterval=1 and historySize=1, after the final break-path prune iteration 2 is deleted and only iteration 3 remains, so the lower bound should be >= 3. Using >= 2 makes the test pass even if one extra iteration leaks through. Minor: test comment calls break iteration 9 but variable is 8 In the should-not-re-delete-already-pruned-iterations test, the comment says At break (iteration 9). The iteration variable when the break fires is 8; the value 9 comes from passing iteration + 1 into collectLoopPruning. The comment mixes the two meanings. Clarifying (e.g. at break, currentIteration passed as iteration+1=9) would help readers trace through the logic. What looks good
|
Consolidate historyEvery and historyKeep config options into commitInterval to simplify the API. Add incremental compaction tracking via lastForgottenUpTo to avoid re-scanning already-forgotten iterations. Defer checkpoint flushes to run in parallel with the next iteration's user code. Introduce PendingDeletions interface for collecting deletions to execute alongside checkpoint writes. Expand test coverage for compaction behavior, crash recovery, and queue timeout edge cases. Update all documentation to reflect the simplified API.
…nterval and add historySize Rename checkpointInterval to historyPruneInterval to clarify that it controls how often old iterations are pruned. Add separate historySize option to control how many iterations are retained, defaulting to historyPruneInterval. Rename internal method collectLoopCompaction to collectLoopPruning and update all terminology from compact/checkpoint to prune throughout comments, tests, and docs.
Replace per-iteration prefix deletes with a single deleteRange call and track lastPrunedUpTo to avoid redundant re-scans. Add deleteRange to EngineDriver interface with implementations in both test and production drivers.
22226bf to
a44899b
Compare
Description
This change consolidates loop history management by merging
historyEvery/historyKeepconfig options intohistoryPruneIntervalandhistorySize. It adds incremental compaction tracking vialastPrunedUpToto avoid redundant deletion scans, uses range-based deletions (deleteRange) instead of per-iteration prefix deletes, and defers checkpoint flushes to run in parallel with user code for performance.Known issue: queue message replay bug (NOT fixed in this PR)
A failing test (
should not replay the previous loop message after queue.next timeout) documents a bug wherequeue.nextBatchwith a timeout inside a loop replays previously consumed messages. This test is intentionally failing to demonstrate the issue for a future fix.Type of change
How Has This Been Tested?
Added new test cases covering:
Checklist: