From a7af6119c0895d731f776e6becd5b26596b0c634 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 26 Feb 2026 09:03:17 +0100 Subject: [PATCH 1/5] Clarify that each pending monitor update ID must be marked complete The previous wording implied that persisting a full ChannelMonitor would automatically resolve all pending updates. Reword to make clear that each update ID still needs to be individually marked complete via channel_monitor_updated, even after a full monitor persistence. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..74e5e03d07f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -83,8 +83,10 @@ use core::sync::atomic::{AtomicUsize, Ordering}; /// the background with [`ChainMonitor::list_pending_monitor_updates`] and /// [`ChainMonitor::get_monitor`]. /// -/// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can -/// be marked as complete via [`ChainMonitor::channel_monitor_updated`]. +/// Each pending update must be individually marked as complete by calling +/// [`ChainMonitor::channel_monitor_updated`] with the corresponding update ID. Note that +/// persisting a full [`ChannelMonitor`] covers all prior updates, but each update ID still +/// needs to be marked complete separately. /// /// If at some point no further progress can be made towards persisting the pending updates, the /// node should simply shut down. From e03d0105667ba9267d99ba3b9a54da966adf8ee9 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 25 Feb 2026 09:01:13 +0100 Subject: [PATCH 2/5] Extract shared dummy_monitor helper in channelmonitor.rs Extract the ChannelMonitor construction boilerplate that was duplicated across channelmonitor test functions into a reusable #[cfg(test)] pub(super) dummy_monitor helper, generic over the signer type. AI tools were used in preparing this commit. --- lightning/src/chain/channelmonitor.rs | 162 +++++++++++--------------- 1 file changed, 70 insertions(+), 92 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 37351460634..1fbc197a057 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -6740,6 +6740,71 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +#[cfg(test)] +pub(super) fn dummy_monitor( + channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, +) -> ChannelMonitor { + use crate::ln::chan_utils::{ChannelPublicKeys, CounterpartyChannelTransactionParameters}; + use crate::sign::{ChannelSigner, InMemorySigner}; + use bitcoin::network::Network; + + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = + crate::chain::transaction::OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = crate::ln::script::ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = BestBlock::from_network(Network::Testnet); + let signer = wrap_signer(keys); + ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) +} + #[cfg(test)] mod tests { use bitcoin::amount::Amount; @@ -6769,23 +6834,16 @@ mod tests { weight_revoked_received_htlc, WEIGHT_REVOKED_OUTPUT, }; use crate::chain::transaction::OutPoint; - use crate::chain::{BestBlock, Confirm}; + use crate::chain::Confirm; use crate::io; - use crate::ln::chan_utils::{ - self, ChannelPublicKeys, ChannelTransactionParameters, - CounterpartyChannelTransactionParameters, HTLCOutputInCommitment, - HolderCommitmentTransaction, - }; + use crate::ln::chan_utils::{self, HTLCOutputInCommitment, HolderCommitmentTransaction}; use crate::ln::channel_keys::{ - DelayedPaymentBasepoint, DelayedPaymentKey, HtlcBasepoint, RevocationBasepoint, - RevocationKey, + DelayedPaymentBasepoint, DelayedPaymentKey, RevocationBasepoint, RevocationKey, }; use crate::ln::channelmanager::{HTLCSource, PaymentId}; use crate::ln::functional_test_utils::*; use crate::ln::outbound_payment::RecipientOnionFields; - use crate::ln::script::ShutdownScript; use crate::ln::types::ChannelId; - use crate::sign::{ChannelSigner, InMemorySigner}; use crate::sync::Arc; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::{PaymentHash, PaymentPreimage}; @@ -6955,51 +7013,11 @@ mod tests { } } - let keys = InMemorySigner::new( - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - true, - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - [41; 32], - [0; 32], - [0; 32], - ); - - let counterparty_pubkeys = ChannelPublicKeys { - funding_pubkey: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[44; 32]).unwrap()), - revocation_basepoint: RevocationBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[45; 32]).unwrap())), - payment_point: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[46; 32]).unwrap()), - delayed_payment_basepoint: DelayedPaymentBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[47; 32]).unwrap())), - htlc_basepoint: HtlcBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[48; 32]).unwrap())) - }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; let channel_id = ChannelId::v1_from_funding_outpoint(funding_outpoint); - let channel_parameters = ChannelTransactionParameters { - holder_pubkeys: keys.pubkeys(&secp_ctx), - holder_selected_contest_delay: 66, - is_outbound_from_holder: true, - counterparty_parameters: Some(CounterpartyChannelTransactionParameters { - pubkeys: counterparty_pubkeys, - selected_contest_delay: 67, - }), - funding_outpoint: Some(funding_outpoint), - splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), - channel_value_satoshis: 0, - }; // Prune with one old state and a holder commitment tx holding a few overlaps with the // old state. - let shutdown_pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(shutdown_pubkey); - let best_block = BestBlock::from_network(Network::Testnet); - let monitor = ChannelMonitor::new( - Secp256k1::new(), keys, Some(shutdown_script.into_inner()), 0, &ScriptBuf::new(), - &channel_parameters, true, 0, HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), - best_block, dummy_key, channel_id, false, - ); + let monitor = super::dummy_monitor(channel_id, |keys| keys); let nondust_htlcs = preimages_slice_to_htlcs!(preimages[0..10]); let dummy_commitment_tx = HolderCommitmentTransaction::dummy(0, funding_outpoint, nondust_htlcs); @@ -7218,49 +7236,9 @@ mod tests { let dummy_key = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let keys = InMemorySigner::new( - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - true, - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - [41; 32], - [0; 32], - [0; 32], - ); - - let counterparty_pubkeys = ChannelPublicKeys { - funding_pubkey: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[44; 32]).unwrap()), - revocation_basepoint: RevocationBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[45; 32]).unwrap())), - payment_point: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[46; 32]).unwrap()), - delayed_payment_basepoint: DelayedPaymentBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[47; 32]).unwrap())), - htlc_basepoint: HtlcBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[48; 32]).unwrap())), - }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; let channel_id = ChannelId::v1_from_funding_outpoint(funding_outpoint); - let channel_parameters = ChannelTransactionParameters { - holder_pubkeys: keys.pubkeys(&secp_ctx), - holder_selected_contest_delay: 66, - is_outbound_from_holder: true, - counterparty_parameters: Some(CounterpartyChannelTransactionParameters { - pubkeys: counterparty_pubkeys, - selected_contest_delay: 67, - }), - funding_outpoint: Some(funding_outpoint), - splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), - channel_value_satoshis: 0, - }; - let shutdown_pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(shutdown_pubkey); - let best_block = BestBlock::from_network(Network::Testnet); - let monitor = ChannelMonitor::new( - Secp256k1::new(), keys, Some(shutdown_script.into_inner()), 0, &ScriptBuf::new(), - &channel_parameters, true, 0, HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), - best_block, dummy_key, channel_id, false, - ); + let monitor = super::dummy_monitor(channel_id, |keys| keys); let chan_id = monitor.inner.lock().unwrap().channel_id(); let payment_hash = PaymentHash([1; 32]); From 939f7981994a0bc58e1c4d71eec3b9d4be399ae2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 13:55:24 +0100 Subject: [PATCH 3/5] Extract watch_channel_internal/update_channel_internal from Watch impl Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 300 +++++++++++++++------------- 1 file changed, 156 insertions(+), 144 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 74e5e03d07f..17f79528b07 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1060,6 +1060,160 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } } impl< @@ -1274,155 +1428,13 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); - } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), - }); - Ok(persist_res) + self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, - } + self.update_channel_internal(channel_id, update) } fn release_pending_monitor_events( From dcfb304bef1e4073c6d9d1ceb7ede16cbb792b17 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:46:35 +0100 Subject: [PATCH 4/5] Add deferred bool to ChainMonitor Add a `deferred` parameter to `ChainMonitor::new` and `ChainMonitor::new_async_beta`. When set to true, the Watch trait methods (watch_channel and update_channel) will unimplemented!() for now. All existing callers pass false to preserve current behavior. Co-Authored-By: Claude Opus 4.6 --- fuzz/src/chanmon_consistency.rs | 1 + fuzz/src/full_stack.rs | 1 + fuzz/src/lsps_message.rs | 1 + lightning/src/chain/chainmonitor.rs | 21 +++++++++++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 1 + lightning/src/ln/channelmanager.rs | 4 ++-- lightning/src/util/test_utils.rs | 1 + 7 files changed, 24 insertions(+), 6 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 21623fdba1e..56ed26f8906 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -282,6 +282,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 085165e9e02..9959966c156 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -603,6 +603,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 8371d1c5fc7..a4c4108a6cc 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 17f79528b07..99f792fc531 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -373,6 +373,9 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + deferred: bool, } impl< @@ -399,7 +402,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -416,6 +419,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } } @@ -605,7 +609,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -621,6 +625,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } @@ -1428,13 +1433,21 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - self.watch_channel_internal(channel_id, monitor) + if !self.deferred { + return self.watch_channel_internal(channel_id, monitor); + } + + unimplemented!(); } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - self.update_channel_internal(channel_id, update) + if !self.deferred { + return self.update_channel_internal(channel_id, update); + } + + unimplemented!(); } fn release_pending_monitor_events( diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 5e544c7502d..d4db30a0952 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4924,6 +4924,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 08cbb6f6bf7..72f278fa2e9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -21093,7 +21093,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -21103,7 +21103,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 22be4367c7a..9d645749b20 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -536,6 +536,7 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + false, ), keys_manager, expect_channel_force_closed: Mutex::new(None), From 710c9fe5ff14dbb7a449b4204feda499ddcd3656 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:48:31 +0100 Subject: [PATCH 5/5] Implement deferred monitor write queueing and flushing Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 --- lightning-background-processor/src/lib.rs | 94 ++++++- lightning/src/chain/chainmonitor.rs | 327 +++++++++++++++++++++- lightning/src/ln/functional_test_utils.rs | 44 ++- lightning/src/util/test_utils.rs | 59 +++- 4 files changed, 508 insertions(+), 16 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f052f3d8d4c..5feb805a828 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -775,6 +775,17 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp /// The `fetch_time` parameter should return the current wall clock time, if one is available. If /// no time is available, some features may be disabled, however the node will still operate fine. /// +/// Note that when deferred monitor writes are enabled on [`ChainMonitor`], this function flushes +/// pending writes after persisting the [`ChannelManager`]. If the [`Persist`] implementation +/// performs blocking I/O and returns [`Completed`] synchronously rather than returning +/// [`InProgress`], this will block the async executor. +/// +/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +/// [`Persist`]: lightning::chain::chainmonitor::Persist +/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager +/// [`Completed`]: lightning::chain::ChannelMonitorUpdateStatus::Completed +/// [`InProgress`]: lightning::chain::ChannelMonitorUpdateStatus::InProgress +/// /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you /// could setup `process_events_async` like this: /// ``` @@ -1118,9 +1129,18 @@ where None => {}, } + // We capture pending_operation_count inside the persistence branch to + // avoid a race: ChannelManager handlers queue deferred monitor ops + // before the persistence flag is set. Capturing outside would let us + // observe pending ops while the flag is still unset, causing us to + // flush monitor writes without persisting the ChannelManager. + // Declared before futures so it outlives the Joiner (drop order). + let pending_monitor_writes; + let mut futures = Joiner::new(); if channel_manager.get_cm().get_and_clear_needs_persistence() { + pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); log_trace!(logger, "Persisting ChannelManager..."); let fut = async { @@ -1131,7 +1151,12 @@ where CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), ) - .await + .await?; + + // Flush monitor operations that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + Ok(()) }; // TODO: Once our MSRV is 1.68 we should be able to drop the Box let mut fut = Box::pin(fut); @@ -1373,6 +1398,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1407,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor operations that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1684,7 +1714,15 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + if channel_manager.get_cm().get_and_clear_needs_persistence() { + // We capture pending_operation_count inside the persistence + // branch to avoid a race: ChannelManager handlers queue + // deferred monitor ops before the persistence flag is set. + // Capturing outside would let us observe pending ops while + // the flag is still unset, causing us to flush monitor + // writes without persisting the ChannelManager. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1693,6 +1731,10 @@ impl BackgroundProcessor { channel_manager.get_cm().encode(), ))?; log_trace!(logger, "Done persisting ChannelManager."); + + // Flush monitor operations that were pending before we persisted. + // New updates that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); } if let Some(liquidity_manager) = liquidity_manager.as_ref() { @@ -1809,12 +1851,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor operations that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1943,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2444,6 +2492,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2567,6 +2616,8 @@ mod tests { (persist_dir, nodes) } + /// Opens a channel between two nodes without a running `BackgroundProcessor`, + /// so deferred monitor operations are flushed manually at each step. macro_rules! open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ begin_open_channel!($node_a, $node_b, $channel_value); @@ -2582,12 +2633,19 @@ mod tests { tx.clone(), ) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no + // deferred op is queued and FundingCreated is available immediately. let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's new monitor (watch_channel) so it releases the + // FundingSigned message. + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2595,6 +2653,11 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's new monitor (watch_channel) queued by + // handle_funding_signed. + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); get_event!($node_a, Event::ChannelPending); tx }}; @@ -2720,6 +2783,20 @@ mod tests { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } + /// Waits until the background processor has flushed all pending deferred monitor + /// operations for the given node. Panics if the pending count does not reach zero + /// within `EVENT_DEADLINE`. + fn wait_for_flushed(chain_monitor: &ChainMonitor) { + let start = std::time::Instant::now(); + while chain_monitor.pending_operation_count() > 0 { + assert!( + start.elapsed() < EVENT_DEADLINE, + "Pending monitor operations were not flushed within deadline" + ); + std::thread::sleep(Duration::from_millis(10)); + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -3060,11 +3137,21 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no deferred op is + // queued and the FundingCreated message is available immediately. let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so + // events and FundingSigned are released. + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Wait for the bg processor to flush the new monitor (watch_channel) queued by + // handle_funding_signed. + wait_for_flushed(&nodes[0].chain_monitor); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3125,6 +3212,9 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Wait for the bg processor to flush the monitor update triggered by force close + // so the commitment tx is broadcast. + wait_for_flushed(&nodes[0].chain_monitor); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 99f792fc531..29045cd410e 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -376,6 +385,8 @@ pub struct ChainMonitor< /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. deferred: bool, + /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. + pending_ops: Mutex>>, } impl< @@ -398,6 +409,18 @@ where /// /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. /// + /// When `deferred` is `true`, [`chain::Watch::watch_channel`] and + /// [`chain::Watch::update_channel`] calls are not executed immediately. Instead, they are + /// queued internally and must be flushed by the caller via [`Self::flush`]. Use + /// [`Self::pending_operation_count`] to check how many operations are queued, then call + /// [`Self::flush`] to process them. This allows the caller to ensure that the + /// [`ChannelManager`] is persisted before its associated monitors, avoiding the risk of + /// force closures from a crash between monitor and channel manager persistence. + /// + /// When `deferred` is `false`, monitor operations are executed inline as usual. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + /// /// This is not exported to bindings users as async is not supported outside of Rust. pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, @@ -420,6 +443,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -604,6 +628,16 @@ where /// is obtained by the [`ChannelManager`] through [`NodeSigner`] to decrypt peer backups. /// Using an inconsistent or incorrect key will result in the inability to decrypt previously encrypted backups. /// + /// When `deferred` is `true`, [`chain::Watch::watch_channel`] and + /// [`chain::Watch::update_channel`] calls are not executed immediately. Instead, they are + /// queued internally and must be flushed by the caller via [`Self::flush`]. Use + /// [`Self::pending_operation_count`] to check how many operations are queued, then call + /// [`Self::flush`] to process them. This allows the caller to ensure that the + /// [`ChannelManager`] is persisted before its associated monitors, avoiding the risk of + /// force closures from a crash between monitor and channel manager persistence. + /// + /// When `deferred` is `false`, monitor operations are executed inline as usual. + /// /// [`NodeSigner`]: crate::sign::NodeSigner /// [`NodeSigner::get_peer_storage_key`]: crate::sign::NodeSigner::get_peer_storage_key /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager @@ -626,6 +660,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1219,6 +1254,86 @@ where }, } } + + /// Returns the number of pending monitor operations queued for later execution. + /// + /// When the `ChainMonitor` is constructed with `deferred` set to `true`, + /// [`chain::Watch::watch_channel`] and [`chain::Watch::update_channel`] calls are queued + /// instead of being executed immediately. Call this method to determine how many operations + /// are waiting, then pass the result to [`Self::flush`] to process them. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes the first `count` pending monitor operations that were queued while the + /// `ChainMonitor` operates in deferred mode. `count` must not exceed the number of + /// pending operations returned by [`Self::pending_operation_count`]. + /// + /// A typical usage pattern is to call [`Self::pending_operation_count`], persist the + /// [`ChannelManager`], then pass the count to this method to flush the queued operations. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + pub fn flush(&self, count: usize, logger: &L) { + if count > 0 { + log_info!(logger, "Flushing up to {} monitor operations", count); + } + for _ in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => { + debug_assert!(false, "flush count exceeded queue length"); + return; + }, + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor } => { + let logger = WithChannelMonitor::from(logger, &monitor, None); + let update_id = monitor.get_latest_update_id(); + log_trace!(logger, "Flushing new monitor"); + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => (channel_id, update_id, status), + Err(()) => { + // `watch_channel` checks both `pending_ops` and `monitors` + // for duplicates before queueing, so this is unreachable. + unreachable!(); + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + log_trace!(logger, "Flushing monitor update {}", update.update_id); + // Hold `pending_ops` across the internal call so that + // concurrent `flush` calls cannot reorder sequential + // updates for the same channel. + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + debug_assert!(false, "channel_monitor_updated failed: {:?}", e); + log_error!(logger, "channel_monitor_updated failed: {:?}", e); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Neither watch_channel_internal nor update_channel_internal + // return UnrecoverableError; they panic on that variant + // before it can be returned. + unreachable!(); + }, + } + } + } } impl< @@ -1437,7 +1552,22 @@ where return self.watch_channel_internal(channel_id, monitor); } - unimplemented!(); + // Atomically check for duplicates in both the pending queue and the + // flushed monitor set. + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { + return Err(()); + } + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + if already_pending { + return Err(()); + } + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor }); + Ok(ChannelMonitorUpdateStatus::InProgress) } fn update_channel( @@ -1447,7 +1577,21 @@ where return self.update_channel_internal(channel_id, update); } - unimplemented!(); + let mut pending_ops = self.pending_ops.lock().unwrap(); + debug_assert!( + { + let monitors = self.monitors.read().unwrap(); + let in_monitors = monitors.contains_key(&channel_id); + let in_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + in_monitors || in_pending + }, + "ChannelManager generated a channel update for a channel that was not yet registered!" + ); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress } fn release_pending_monitor_events( @@ -1577,12 +1721,22 @@ where #[cfg(test)] mod tests { - use crate::chain::channelmonitor::ANTI_REORG_DELAY; + use super::ChainMonitor; + use crate::chain::channelmonitor::{ChannelMonitorUpdate, ANTI_REORG_DELAY}; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::types::ChannelId; + use crate::sign::NodeSigner; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::Network; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1840,4 +1994,171 @@ mod tests { }) .is_err()); } + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let monitor = crate::chain::channelmonitor::dummy_monitor(chan, |keys| { + TestChannelSigner::new(DynSigner::new(keys)) + }); + let status = Watch::watch_channel(&deferred, chan, monitor); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(0, &&logger); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations, verifying correctness through a complete channel open + /// and payment flow. Operations are auto-flushed via the `TestChainMonitor` + /// `release_pending_monitor_events` helper. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index aa7eaa509ce..3429013f659 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4499,6 +4499,7 @@ pub fn create_chanmon_cfgs_internal( fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, ) -> Vec> where F: Fn( @@ -4511,14 +4512,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4555,6 +4567,20 @@ pub fn create_node_cfgs<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_default, + false, + ) +} + +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, ) } @@ -4567,6 +4593,7 @@ pub fn create_node_cfgs_with_persisters<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_default, + false, ) } @@ -4579,6 +4606,7 @@ pub fn create_node_cfgs_with_node_id_message_router<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_node_id_router, + false, ) } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 9d645749b20..55daab4c58a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -508,6 +508,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -523,6 +524,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -536,9 +569,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), - false, + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -546,6 +580,10 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -676,6 +714,12 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. When not in deferred mode the queue is empty so this only + // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) + // work with deferred chain monitors. + let count = self.chain_monitor.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger); return self.chain_monitor.release_pending_monitor_events(); } } @@ -835,6 +879,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -845,9 +891,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -857,8 +909,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; }