Defer ChainMonitor updates and persistence to flush()#4351
Defer ChainMonitor updates and persistence to flush()#4351joostjager wants to merge 5 commits intolightningdevkit:mainfrom
Conversation
|
👋 Thanks for assigning @TheBlueMatt as a reviewer! |
|
Closing this PR as #4345 seems to be the easiest way to go |
1f5cef4 to
30d05ca
Compare
|
The single commit was split into three: extracting internal methods, adding a deferred toggle, and implementing the deferral and flushing logic. flush() now delegates to the extracted internal methods rather than reimplementing persist/insert logic inline. Deferred mode is opt-in via a deferred bool rather than always-on. Test infrastructure was expanded with deferred-mode helpers and dedicated unit tests. |
2815bf9 to
3eb5644
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4351 +/- ##
==========================================
+ Coverage 85.87% 86.10% +0.23%
==========================================
Files 157 159 +2
Lines 103769 105410 +1641
Branches 103769 105410 +1641
==========================================
+ Hits 89115 90767 +1652
+ Misses 12158 12127 -31
- Partials 2496 2516 +20
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
f964466 to
b140bf9
Compare
|
This PR is now ready for review. LDK-node counterpart: lightningdevkit/ldk-node#782 |
08d53fa to
096f9d7
Compare
096f9d7 to
52081b6
Compare
|
Review comments addressed, diff @TheBlueMatt |
|
🔔 1st Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
|
🔔 2nd Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
|
🔔 3rd Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
52081b6 to
1f8f3a9
Compare
| PendingMonitorOp::NewMonitor { channel_id, monitor } => { | ||
| let logger = WithChannelMonitor::from(logger, &monitor, None); | ||
| let update_id = monitor.get_latest_update_id(); | ||
| log_trace!(logger, "Flushing new monitor"); | ||
| // Hold `pending_ops` across the internal call so that | ||
| // `watch_channel` (which checks `monitors` + `pending_ops` | ||
| // atomically) cannot race with this insertion. | ||
| match self.watch_channel_internal(channel_id, monitor) { | ||
| Ok(status) => { | ||
| drop(queue); | ||
| (channel_id, update_id, status) | ||
| }, | ||
| Err(()) => { | ||
| // `watch_channel` checks both `pending_ops` and `monitors` | ||
| // for duplicates before queueing, so this is unreachable. | ||
| unreachable!(); | ||
| }, | ||
| } |
There was a problem hiding this comment.
Bug/performance: pending_ops mutex is held throughout the entire watch_channel_internal call, which includes persister.persist_new_channel() (potentially slow I/O) and monitor.load_outputs_to_watch(). This blocks ALL concurrent deferred watch_channel, update_channel, and pending_operation_count calls for the duration of the persist — which could be significant with high-latency storage backends (the exact use case motivating deferred mode).
The comment explains this prevents a duplicate-detection race, but the blast radius is too wide: it also blocks update_channel for unrelated channels. Consider an alternative: instead of popping the entry from the queue, mark it as "being flushed" (e.g., with an enum variant or a flag), then release the lock before calling watch_channel_internal. A concurrent watch_channel would still see the entry in pending_ops and correctly detect the duplicate.
There was a problem hiding this comment.
I don't think this optimization is needed, as this is only called for new channels
Automated ReviewThe core deferred queueing design is sound for crash safety (InProgress return ensures CM retains pending state for replay). Two notable concerns:
See inline comments for additional details. 🤖 Generated with Claude Code |
I think this is safe because the funding isn't broadcast until the new monitor persist completes? |
| // Hold `pending_ops` across the internal call so that | ||
| // `watch_channel` (which checks `monitors` + `pending_ops` | ||
| // atomically) cannot race with this insertion. | ||
| match self.watch_channel_internal(channel_id, monitor) { | ||
| Ok(status) => { | ||
| drop(queue); | ||
| (channel_id, update_id, status) | ||
| }, | ||
| Err(()) => { | ||
| // `watch_channel` checks both `pending_ops` and `monitors` | ||
| // for duplicates before queueing, so this is unreachable. | ||
| unreachable!(); | ||
| }, | ||
| } |
There was a problem hiding this comment.
The pending_ops lock is held across the entire watch_channel_internal call, which includes self.persister.persist_new_channel() — a potentially blocking I/O operation (for the sync Persist trait). This blocks all concurrent deferred watch_channel and update_channel calls for the duration of the persistence.
By contrast, the Update path at line 1315 correctly drops the lock before calling update_channel_internal.
The comment explains the intent (preventing a race where a duplicate watch_channel slips through the gap between popping from pending_ops and inserting into monitors). However, the current approach is overly broad — it holds the lock through the entire persistence I/O, not just the in-memory insertion. Consider a more targeted approach, e.g.:
- Insert a sentinel/placeholder into
monitors(or a separate "in-flight" set) before dropping the lock, then replace it after persistence completes. - Or: insert the monitor into
monitorsfirst (viaentry.insert()), drop thepending_opslock, then persist. The duplicate check inwatch_channelwould see the entry inmonitorsand returnErr(())without needingpending_opsto be held.
For high-latency storage backends (remote databases), this lock holding could cause significant contention.
| // We capture pending_operation_count inside the persistence branch to | ||
| // avoid a race: ChannelManager handlers queue deferred monitor ops | ||
| // before the persistence flag is set. Capturing outside would let us | ||
| // observe pending ops while the flag is still unset, causing us to | ||
| // flush monitor writes without persisting the ChannelManager. | ||
| // Declared before futures so it outlives the Joiner (drop order). | ||
| let pending_monitor_writes; |
There was a problem hiding this comment.
The comment says "Declared before futures so it outlives the Joiner (drop order)" but pending_monitor_writes is usize (a Copy type). It's captured by value in the async block, so the original variable's drop order relative to the Joiner is irrelevant. The declaration could just as easily be inside the if block. Consider correcting the comment to avoid confusion.
| let mut pending_ops = self.pending_ops.lock().unwrap(); | ||
| debug_assert!( | ||
| { | ||
| let monitors = self.monitors.read().unwrap(); | ||
| let in_monitors = monitors.contains_key(&channel_id); | ||
| let in_pending = pending_ops.iter().any(|op| match op { | ||
| PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, | ||
| _ => false, | ||
| }); | ||
| in_monitors || in_pending | ||
| }, | ||
| } | ||
| "ChannelManager generated a channel update for a channel that was not yet registered!" | ||
| ); | ||
| pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); | ||
| ChannelMonitorUpdateStatus::InProgress |
There was a problem hiding this comment.
The channel-existence check is only a debug_assert — in release builds it's compiled out, so updates for channels that exist in neither monitors nor pending_ops are silently queued. During flush, update_channel_internal would either hit the #[cfg(not(debug_assertions))] branch (returning InProgress and silently losing the update) or in debug builds it would have already panicked at this assert.
This matches the pre-existing non-deferred behavior, but the deferred mode introduces a new subtlety: if a watch_channel is still pending and an update_channel arrives for the same channel, the update is queued after the watch. During flush, the watch is processed first (inserting the monitor), then the update finds it. This ordering is correct due to FIFO queue discipline. But if the watch somehow fails during flush (e.g., persistence error causing the monitor not to be fully set up), the subsequent update flush will still proceed — update_channel_internal would find the monitor (it was inserted despite the error) but the state could be inconsistent. Worth a comment documenting this assumed invariant.
| // Atomically check for duplicates in both the pending queue and the | ||
| // flushed monitor set. | ||
| let mut pending_ops = self.pending_ops.lock().unwrap(); | ||
| let monitors = self.monitors.read().unwrap(); | ||
| if monitors.contains_key(&channel_id) { | ||
| return Err(()); | ||
| } | ||
| entry.insert(MonitorHolder { | ||
| monitor, | ||
| pending_monitor_updates: Mutex::new(pending_monitor_updates), | ||
| let already_pending = pending_ops.iter().any(|op| match op { | ||
| PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, | ||
| _ => false, | ||
| }); | ||
| Ok(persist_res) | ||
| if already_pending { | ||
| return Err(()); | ||
| } | ||
| pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor }); | ||
| Ok(ChannelMonitorUpdateStatus::InProgress) |
There was a problem hiding this comment.
Deferred watch_channel takes locks in order pending_ops → monitors.read(). The flush NewMonitor path holds pending_ops then acquires monitors.write() (inside watch_channel_internal). This lock ordering is consistent, so no deadlock.
However, note that Confirm::transactions_confirmed, best_block_updated, and process_chain_data only iterate self.monitors — they do not process monitors sitting in the pending_ops queue. If a block containing a transaction relevant to a pending (not-yet-flushed) monitor arrives between watch_channel and flush, that monitor will miss the chain data. In practice the flush window (~100ms) makes this extremely unlikely for mainnet (10-min blocks), but it's possible in regtest/signet testing or if the flush is delayed.
Consider documenting this gap, or having flush → watch_channel_internal → load_outputs_to_watch trigger a rescan for the relevant block range.
Automated ReviewThe deferred queueing mechanism is well-designed for crash safety. Three new issues found (see inline comments):
Prior review concerns about 🤖 Generated with Claude Code |
8fdf138 to
3b4ef60
Compare
Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3b4ef60 to
2205fb5
Compare
| PendingMonitorOp::Update { channel_id, update } => { | ||
| let logger = WithContext::from(logger, None, Some(channel_id), None); | ||
| log_trace!(logger, "Flushing monitor update {}", update.update_id); | ||
| // Release `pending_ops` before the internal call so that | ||
| // concurrent `update_channel` queuing is not blocked. | ||
| drop(queue); | ||
| let update_id = update.update_id; | ||
| let status = self.update_channel_internal(channel_id, &update); | ||
| (channel_id, update_id, status) | ||
| }, |
There was a problem hiding this comment.
Bug (concurrent flush calls): Dropping pending_ops before update_channel_internal means two concurrent flush() calls can pop sequential updates for the same channel and apply them out of FIFO order. ChannelMonitorInner::update_monitor panics if latest_update_id + 1 != updates.update_id (channelmonitor.rs:4165-4166), so this is a crash.
Example: queue is [Update(X, id=1), Update(X, id=2)]. Thread A pops id=1, drops lock. Thread B pops id=2, drops lock. If the OS schedules Thread B's update_channel_internal first, it hits 0 + 1 != 2 → panic.
The background processor currently serializes flush calls, so this isn't hit today, but flush is a pub method taking &self. Options:
- Document that
flushmust not be called concurrently, or - Hold
pending_opsacross theupdate_channel_internalcall as well (matching the NewMonitor case), or - Add a separate
Mutex<()>flush guard to serialize the entire method.
| @@ -1131,7 +1140,12 @@ where | |||
| CHANNEL_MANAGER_PERSISTENCE_KEY, | |||
| channel_manager.get_cm().encode(), | |||
| ) | |||
| .await | |||
| .await?; | |||
|
|
|||
| // Flush monitor operations that were pending before we persisted. New updates | |||
| // that arrived after are left for the next iteration. | |||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); | |||
| Ok(()) | |||
| }; | |||
There was a problem hiding this comment.
Blocking sync I/O in async context: flush() calls persist_new_channel() / update_persisted_channel() which are synchronous Persist trait methods. For high-latency storage backends (e.g., remote databases), this blocks the async executor's thread for the duration of each persistence call, potentially starving other async tasks sharing the same runtime.
Today's impact is limited since the Persist trait is inherently sync and the background processor already does sync work, but it's worth noting that deferred mode moves N persistence operations into this await-interleaved path where previously they happened inline (in the CM's thread).
There was a problem hiding this comment.
No they aren't. Persist is async, if the user implements it that way. That said, we should actually document this - those using sync persist in an async BP might get hung up on this and see tokio issues...
Summary
Modify
ChainMonitorinternally to queuewatch_channelandupdate_channeloperations, returningInProgressuntilflush()is called. This enables persistence of monitor updates afterChannelManagerpersistence, ensuring correct ordering where theChannelManagerstate is never ahead of the monitor state on restart. The new behavior is opt-in via adeferredswitch.Key changes:
ChainMonitorgains adeferredswitch to enable the new queuing behaviorInProgressflush()applies pending operations and persists monitorsChannelManagerpersistence, then flush after persistence completesPerformance Impact
Multi-channel, multi-node load testing (using ldk-server chaos branch) shows no measurable throughput difference between deferred and direct persistence modes.
This is likely because forwarding and payment processing are already effectively single-threaded: the background processor batches all forwards for the entire node in a single pass, so the deferral overhead doesn't add any meaningful bottleneck to an already serialized path.
For high-latency storage (e.g., remote databases), there is also currently no significant impact because channel manager persistence already blocks event handling in the background processor loop (test). If the loop were parallelized to process events concurrently with persistence, deferred writing would become comparatively slower since it moves the channel manager round trip into the critical path. However, deferred writing would also benefit from loop parallelization, and could be further optimized by batching the monitor and manager writes into a single round trip.
Alternative Designs Considered
Several approaches were explored to solve the monitor/manager persistence ordering problem:
1. Queue at KVStore level (#4310)
Introduces a
QueuedKVStoreSyncwrapper that queues all writes in memory, committing them in a single batch at chokepoints where data leaves the system (get_and_clear_pending_msg_events,get_and_clear_pending_events). This approach aims for true atomic multi-key writes but requires KVStore backends that support transactions (e.g., SQLite); filesystem backends cannot achieve full atomicity.Trade-offs: Most general solution but requires changes to persistence boundaries and cannot fully close the desync gap with filesystem storage.
2. Queue at Persister level (#4317)
Updates
MonitorUpdatingPersisterto queue persist operations in memory, with actual writes happening onflush(). Addsflush()to thePersisttrait andChainMonitor.Trade-offs: Only fixes the issue for
MonitorUpdatingPersister; customPersistimplementations remain vulnerable to the race condition.3. Queue at ChainMonitor wrapper level (#4345)
Introduces
DeferredChainMonitor, a wrapper aroundChainMonitorthat implements the queue in a separate wrapper layer. AllChainMonitortraits (Listen,Confirm,EventsProvider, etc.) are passed through, allowing drop-in replacement.Trade-offs: Requires re-implementing all trait pass-throughs on the wrapper. Keeps the core
ChainMonitorunchanged but adds an external layer of indirection.