diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 45e9a68cb63..4ccc798a010 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,6 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; +use lightning::chain::chainmonitor::MonitorEventSource; use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; use lightning::chain::transaction::OutPoint; use lightning::chain::{ @@ -363,9 +364,13 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 396ee277067..9ddcf51dcdf 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -67,6 +67,21 @@ use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// Identifies the source of a [`MonitorEvent`] for acknowledgment via +/// [`chain::Watch::ack_monitor_event`] once the event has been processed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MonitorEventSource { + /// The event ID assigned by the [`ChannelMonitor`]. + pub event_id: u64, + /// The channel from which the [`MonitorEvent`] originated. + pub channel_id: ChannelId, +} + +impl_writeable_tlv_based!(MonitorEventSource, { + (0, event_id, required), + (2, channel_id, required), +}); + /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. enum PendingMonitorOp { /// A new monitor to insert and persist. @@ -367,9 +382,6 @@ pub struct ChainMonitor< fee_estimator: F, persister: P, _entropy_source: ES, - /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly - /// from the user and not from a [`ChannelMonitor`]. - pending_monitor_events: Mutex, PublicKey)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -437,7 +449,6 @@ where logger, fee_estimator: feeest, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, @@ -656,7 +667,6 @@ where fee_estimator: feeest, persister, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), @@ -801,16 +811,11 @@ where return Ok(()); } let funding_txo = monitor_data.monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( + monitor_data.monitor.push_monitor_event(MonitorEvent::Completed { funding_txo, channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], - monitor_data.monitor.get_counterparty_node_id(), - )); + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); self.event_notifier.notify(); Ok(()) @@ -823,14 +828,11 @@ where pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let monitor = &monitors.get(&channel_id).unwrap().monitor; - let counterparty_node_id = monitor.get_counterparty_node_id(); - let funding_txo = monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), channel_id, - vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }], - counterparty_node_id, - )); + monitor_update_id, + }); self.event_notifier.notify(); } @@ -1610,11 +1612,11 @@ where fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { let _ = self.channel_monitor_updated(channel_id, update_id); } - let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); + let mut pending_monitor_events = Vec::new(); for monitor_state in self.monitors.read().unwrap().values() { let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 { @@ -1631,6 +1633,13 @@ where } pending_monitor_events } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&source.channel_id) { + monitor_state.monitor.ack_monitor_event(source.event_id); + } + } } impl< diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8e7b6035523..d5335a1ac94 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -68,8 +68,8 @@ use crate::util::byte_utils; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; use crate::util::ser::{ - MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, - U48, + Iterable, MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, + Writeable, Writer, U48, }; #[allow(unused_imports)] @@ -183,6 +183,15 @@ impl Readable for ChannelMonitorUpdate { } } +fn push_monitor_event( + pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent, + next_monitor_event_id: &mut u64, +) { + let id = *next_monitor_event_id; + *next_monitor_event_id += 1; + pending_monitor_events.push((id, event)); +} + /// An event to be processed by the ChannelManager. #[derive(Clone, PartialEq, Eq)] pub enum MonitorEvent { @@ -226,8 +235,6 @@ pub enum MonitorEvent { }, } impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, - // Note that Completed is currently never serialized to disk as it is generated only in - // ChainMonitor. (0, Completed) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -1280,7 +1287,20 @@ pub(crate) struct ChannelMonitorImpl { // Note that because the `event_lock` in `ChainMonitor` is only taken in // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. - pending_monitor_events: Vec, + pending_monitor_events: Vec<(u64, MonitorEvent)>, + // `MonitorEvent`s that have been provided to the `ChannelManager` via + // [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting + // [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it + // will be re-provided to the `ChannelManager` on startup. + provided_monitor_events: Vec<(u64, MonitorEvent)>, + /// When set, monitor events are retained until explicitly acked rather than cleared on read. + /// + /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on + /// startup, rather than from complexly polling and reconciling Channel{Monitor} APIs, as well as + /// move the responsibility of off-chain payment resolution from the Channel to the monitor. Will + /// be always set once support is complete. + persistent_events_enabled: bool, + next_monitor_event_id: u64, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1661,32 +1681,38 @@ pub(crate) fn write_chanmon_internal( writer.write_all(&payment_preimage.0[..])?; } - writer.write_all( - &(channel_monitor - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in channel_monitor.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below + if !channel_monitor.persistent_events_enabled { + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|(_, ev)| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for (_, event) in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below + } } + } else { + // If `persistent_events_enabled` is set, we'll write the events with their event ids in the + // TLV section below. + writer.write_all(&(0u64).to_be_bytes())?; } writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; @@ -1719,24 +1745,47 @@ pub(crate) fn write_chanmon_internal( channel_monitor.lockdown_from_offchain.write(writer)?; channel_monitor.holder_tx_signed.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = - match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) { - Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); - pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); - pending_monitor_events - }, - _ => channel_monitor.pending_monitor_events.clone(), - }; + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` + // for backwards compatibility. + let holder_force_closed_compat = + channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled { + Some(Iterable( + channel_monitor + .pending_monitor_events + .iter() + .map(|(_, ev)| ev) + .chain(holder_force_closed_compat.as_ref()), + )) + } else { + None + }; + + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(true); + let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { + Some(Iterable( + channel_monitor + .provided_monitor_events + .iter() + .chain(channel_monitor.pending_monitor_events.iter()), + )) + } else { + None + }; write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), + (4, pending_mon_evs_with_ids, option), + (5, pending_monitor_events_legacy, option), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -1755,6 +1804,7 @@ pub(crate) fn write_chanmon_internal( (34, channel_monitor.alternative_funding_confirmed, option), (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), + (39, channel_monitor.next_monitor_event_id, required), }); Ok(()) @@ -1938,6 +1988,9 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + provided_monitor_events: Vec::new(), + persistent_events_enabled: false, + next_monitor_event_id: 0, pending_events: Vec::new(), is_processing_pending_events: false, @@ -2151,10 +2204,45 @@ impl ChannelMonitor { /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. - pub fn get_and_clear_pending_monitor_events(&self) -> Vec { + pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } + pub(super) fn push_monitor_event(&self, event: MonitorEvent) { + self.inner.lock().unwrap().push_monitor_event(event); + } + + /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. + pub(super) fn ack_monitor_event(&self, event_id: u64) { + let inner = &mut *self.inner.lock().unwrap(); + inner.provided_monitor_events.retain(|(id, _)| *id != event_id); + } + + /// Enables persistent monitor events mode. When enabled, monitor events are retained until + /// explicitly acked rather than cleared on read. + pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) { + self.inner.lock().unwrap().persistent_events_enabled = enabled; + } + + /// Copies [`MonitorEvent`] state from `other` into `self`. + /// Used in tests to align transient runtime state before equality comparison after a + /// serialization round-trip. + #[cfg(any(test, feature = "_test_utils"))] + pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { + let (provided, pending, next_id) = { + let other_inner = other.inner.lock().unwrap(); + ( + other_inner.provided_monitor_events.clone(), + other_inner.pending_monitor_events.clone(), + other_inner.next_monitor_event_id, + ) + }; + let mut self_inner = self.inner.lock().unwrap(); + self_inner.provided_monitor_events = provided; + self_inner.pending_monitor_events = pending; + self_inner.next_monitor_event_id = next_id; + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3880,7 +3968,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - self.pending_monitor_events.push(event); + push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4471,12 +4559,16 @@ impl ChannelMonitorImpl { "Failing HTLC from late counterparty commitment update immediately \ (funding spend already confirmed)" ); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash, - payment_preimage: None, - source: source.clone(), - htlc_value_satoshis, - })); + push_monitor_event( + &mut self.pending_monitor_events, + MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash, + payment_preimage: None, + source: source.clone(), + htlc_value_satoshis, + }), + &mut self.next_monitor_event_id, + ); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx: None, resolving_txid: Some(confirmed_txid), @@ -4541,10 +4633,25 @@ impl ChannelMonitorImpl { &self.outputs_to_watch } - fn get_and_clear_pending_monitor_events(&mut self) -> Vec { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_monitor_events); - ret + fn push_monitor_event(&mut self, event: MonitorEvent) { + push_monitor_event( + &mut self.pending_monitor_events, + event, + &mut self.next_monitor_event_id, + ); + } + + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { + if self.persistent_events_enabled { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + self.provided_monitor_events.extend(ret.iter().cloned()); + ret + } else { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + ret + } } /// Gets the set of events that are repeated regularly (e.g. those which RBF bump @@ -5600,7 +5707,7 @@ impl ChannelMonitorImpl { ); log_info!(logger, "Channel closed by funding output spend in txid {txid}"); if !self.funding_spend_seen { - self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(())); + self.push_monitor_event(MonitorEvent::CommitmentTxConfirmed(())); } self.funding_spend_seen = true; @@ -5775,7 +5882,7 @@ impl ChannelMonitorImpl { log_debug!(logger, "HTLC {} failure update in {} has got enough confirmations to be passed upstream", &payment_hash, entry.txid); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { payment_hash, payment_preimage: None, source, @@ -5872,7 +5979,7 @@ impl ChannelMonitorImpl { continue; } let duplicate_event = self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -5885,12 +5992,12 @@ impl ChannelMonitorImpl { log_error!(logger, "Failing back HTLC {} upstream to preserve the \ channel as the forward HTLC hasn't resolved and our backward HTLC \ expires soon at {}", log_bytes!(htlc.payment_hash.0), inbound_htlc_expiry); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source: source.clone(), payment_preimage: None, payment_hash: htlc.payment_hash, htlc_value_satoshis: Some(htlc.amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } } @@ -6289,7 +6396,7 @@ impl ChannelMonitorImpl { if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), height, @@ -6302,16 +6409,16 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { @@ -6326,12 +6433,12 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else { self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { @@ -6624,16 +6731,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Some( - Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); - for _ in 0..pending_monitor_events_len { + let pending_monitor_events_legacy_len: u64 = Readable::read(reader)?; + let mut pending_monitor_events_legacy = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_legacy_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); + for _ in 0..pending_monitor_events_legacy_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::HolderForceClosed(outpoint), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.as_mut().unwrap().push(ev); + pending_monitor_events_legacy.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -6694,10 +6801,15 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut alternative_funding_confirmed = None; let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); + let mut persistent_events_enabled = false; + let mut next_monitor_event_id: Option = None; + let mut pending_mon_evs_with_ids: Option> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), + (2, persistent_events_enabled, (default_value, false)), (3, htlcs_resolved_on_chain, optional_vec), - (5, pending_monitor_events, optional_vec), + (4, pending_mon_evs_with_ids, optional_vec), + (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), @@ -6716,7 +6828,15 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (34, alternative_funding_confirmed, option), (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), + (39, next_monitor_event_id, option), }); + + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_events_enabled { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue) + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); @@ -6739,13 +6859,29 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both // events, we can remove the `HolderForceClosed` event and just keep the `HolderForceClosedWithInfo`. - if let Some(ref mut pending_monitor_events) = pending_monitor_events { - if pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && - pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) + if let Some(ref mut evs) = pending_monitor_events_legacy { + if evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && + evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) { - pending_monitor_events.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); - } - } + evs.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); + } + } + + // If persistent events are enabled, use the events with their persisted IDs from TLV 4. + // Otherwise, use the legacy events from TLV 5 and assign sequential IDs. + let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) = + if persistent_events_enabled { + let evs = pending_mon_evs_with_ids.unwrap_or_default() + .into_iter().map(|ev| (ev.0, ev.1)).collect(); + (next_monitor_event_id.unwrap_or(0), evs) + } else if let Some(events) = pending_monitor_events_legacy { + let next_id = next_monitor_event_id.unwrap_or(events.len() as u64); + let evs = events.into_iter().enumerate() + .map(|(i, ev)| (i as u64, ev)).collect(); + (next_id, evs) + } else { + (next_monitor_event_id.unwrap_or(0), Vec::new()) + }; let channel_parameters = channel_parameters.unwrap_or_else(|| { onchain_tx_handler.channel_parameters().clone() @@ -6864,7 +7000,10 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events.unwrap(), + pending_monitor_events, + provided_monitor_events: Vec::new(), + persistent_events_enabled, + next_monitor_event_id, pending_events, is_processing_pending_events: false, @@ -6913,6 +7052,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +/// Deserialization wrapper for reading a `(u64, MonitorEvent)`. +/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait +/// conflicts. +struct ReadableIdMonitorEvent(u64, MonitorEvent); + +impl MaybeReadable for ReadableIdMonitorEvent { + fn read(reader: &mut R) -> Result, DecodeError> { + let id: u64 = Readable::read(reader)?; + let event_opt: Option = MaybeReadable::read(reader)?; + match event_opt { + Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))), + None => Ok(None), + } + } +} + #[cfg(test)] pub(super) fn dummy_monitor( channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index bc47f1b1db6..0ce578a31cb 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,6 +18,7 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; @@ -336,6 +337,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Each event comes with a corresponding id. Once the event is processed, call + /// [`Watch::ack_monitor_event`] with the corresponding id and channel id. Unacknowledged events + /// will be re-provided by this method after startup. + /// /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. @@ -344,7 +349,16 @@ pub trait Watch { /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)>; + + /// Acknowledges and removes a [`MonitorEvent`] previously returned by + /// [`Watch::release_pending_monitor_events`] by its event ID. + /// + /// Once acknowledged, the event will no longer be returned by future calls to + /// [`Watch::release_pending_monitor_events`] and will not be replayed on restart. + /// + /// Events may be acknowledged in any order. + fn ack_monitor_event(&self, source: MonitorEventSource); } impl + ?Sized, W: Deref> @@ -364,9 +378,13 @@ impl + ?Sized, W: Der fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { self.deref().release_pending_monitor_events() } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.deref().ack_monitor_event(source) + } } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 623d028560f..a269b77836b 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4964,7 +4964,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but // separately. @@ -5012,7 +5012,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both @@ -5058,7 +5058,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - if let MonitorEvent::Completed { monitor_update_id, .. } = &completed_persist[0].2[0] { + if let (_, MonitorEvent::Completed { monitor_update_id, .. }) = &completed_persist[0].2[0] { assert_eq!(*monitor_update_id, 4); } else { panic!(); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 4241decfba9..3fc9f003465 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -30,6 +30,7 @@ use crate::blinded_path::message::BlindedMessagePath; use crate::chain::chaininterface::{ ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, CommitmentHTLCData, LATENCY_GRACE_PERIOD_BLOCKS, @@ -552,11 +553,21 @@ enum HTLCUpdateAwaitingACK { FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket, + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32], + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, } @@ -6568,7 +6579,9 @@ trait FailHTLCContents { type Message: FailHTLCMessageName; fn to_message(self, htlc_id: u64, channel_id: ChannelId) -> Self::Message; fn to_inbound_htlc_state(self) -> InboundHTLCState; - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK; + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK; } impl FailHTLCContents for msgs::OnionErrorPacket { type Message = msgs::UpdateFailHTLC; @@ -6583,8 +6596,10 @@ impl FailHTLCContents for msgs::OnionErrorPacket { fn to_inbound_htlc_state(self) -> InboundHTLCState { InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(self)) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { - HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self } + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source } } } impl FailHTLCContents for ([u8; 32], u16) { @@ -6603,11 +6618,14 @@ impl FailHTLCContents for ([u8; 32], u16) { failure_code: self.1, }) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, sha256_of_onion: self.0, failure_code: self.1, + monitor_event_source, } } } @@ -7348,9 +7366,10 @@ where /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( - &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L, + &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, err_packet, true, logger) + self.fail_htlc(htlc_id_arg, err_packet, true, monitor_event_source, logger) .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } @@ -7359,10 +7378,17 @@ where /// /// See [`Self::queue_fail_htlc`] for more info. pub fn queue_fail_malformed_htlc( - &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], logger: &L, + &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, (sha256_of_onion, failure_code), true, logger) - .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) + self.fail_htlc( + htlc_id_arg, + (sha256_of_onion, failure_code), + true, + monitor_event_source, + logger, + ) + .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. @@ -7370,7 +7396,7 @@ where #[rustfmt::skip] fn fail_htlc( &mut self, htlc_id_arg: u64, err_contents: E, mut force_holding_cell: bool, - logger: &L + monitor_event_source: Option, logger: &L ) -> Result, ChannelError> { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { panic!("Was asked to fail an HTLC when channel was not in an operational state"); @@ -7425,7 +7451,7 @@ where } } log_trace!(logger, "Placing failure for HTLC ID {} in holding cell.", htlc_id_arg); - self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg)); + self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg, monitor_event_source)); return Ok(None); } @@ -8319,7 +8345,7 @@ where /// returns `(None, Vec::new())`. pub fn maybe_free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) && self.context.channel_state.can_generate_new_commitment() { @@ -8333,7 +8359,7 @@ where /// for our counterparty. fn free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { assert!(matches!(self.context.channel_state, ChannelState::ChannelReady(_))); assert!(!self.context.channel_state.is_monitor_update_in_progress()); assert!(!self.context.channel_state.is_quiescent()); @@ -8363,6 +8389,7 @@ where let mut update_fulfill_count = 0; let mut update_fail_count = 0; let mut htlcs_to_fail = Vec::new(); + let mut monitor_events_to_ack = Vec::new(); for htlc_update in htlc_updates.drain(..) { // Note that this *can* fail, though it should be due to rather-rare conditions on // fee races with adding too many outputs which push our total payments just over @@ -8454,18 +8481,45 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => Some( - self.fail_htlc(htlc_id, err_packet.clone(), false, logger) + &HTLCUpdateAwaitingACK::FailHTLC { + htlc_id, + ref err_packet, + monitor_event_source, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + err_packet.clone(), + false, + monitor_event_source, + logger, + ) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + ) + }, &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, - } => Some( - self.fail_htlc(htlc_id, (sha256_of_onion, failure_code), false, logger) + monitor_event_source, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + (sha256_of_onion, failure_code), + false, + monitor_event_source, + logger, + ) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + ) + }, }; if let Some(res) = fail_htlc_res { match res { @@ -8517,7 +8571,11 @@ where Vec::new(), logger, ); - (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail) + ( + self.push_ret_blockable_mon_update(monitor_update) + .map(|upd| (upd, monitor_events_to_ack)), + htlcs_to_fail, + ) } else { (None, Vec::new()) } @@ -8543,7 +8601,7 @@ where ( Vec<(HTLCSource, PaymentHash)>, Vec<(StaticInvoice, BlindedMessagePath)>, - Option, + Option<(ChannelMonitorUpdate, Vec)>, ), ChannelError, > { @@ -8860,6 +8918,7 @@ where } else { "Blocked" }; + let mut monitor_events_to_ack = Vec::new(); macro_rules! return_with_htlcs_to_fail { ($htlcs_to_fail: expr) => { if !release_monitor { @@ -8868,7 +8927,12 @@ where .push(PendingChannelMonitorUpdate { update: monitor_update }); return Ok(($htlcs_to_fail, static_invoices, None)); } else { - return Ok(($htlcs_to_fail, static_invoices, Some(monitor_update))); + let events_to_ack = core::mem::take(&mut monitor_events_to_ack); + return Ok(( + $htlcs_to_fail, + static_invoices, + Some((monitor_update, events_to_ack)), + )); } }; } @@ -8876,7 +8940,8 @@ where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { - (Some(mut additional_update), htlcs_to_fail) => { + (Some((mut additional_update, holding_cell_monitor_events_to_ack)), htlcs_to_fail) => { + monitor_events_to_ack = holding_cell_monitor_events_to_ack; // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; @@ -15116,7 +15181,7 @@ impl Writeable for FundedChannel { // Store the attribution data for later writing. holding_cell_attribution_data.push(attribution_data.as_ref()); }, - &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => { + &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet, .. } => { 2u8.write(writer)?; htlc_id.write(writer)?; err_packet.data.write(writer)?; @@ -15128,6 +15193,7 @@ impl Writeable for FundedChannel { htlc_id, failure_code, sha256_of_onion, + .. } => { // We don't want to break downgrading by adding a new variant, so write a dummy // `::FailHTLC` variant and write the real malformed error as an optional TLV. @@ -15567,6 +15633,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> data: Readable::read(reader)?, attribution_data: None, }, + monitor_event_source: None, }, _ => return Err(DecodeError::InvalidValue), }); @@ -16021,7 +16088,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> let htlc_idx = holding_cell_htlc_updates .iter() .position(|htlc| { - if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet } = htlc { + if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet, .. } = htlc { let matches = *htlc_id == malformed_htlc_id; if matches { debug_assert!(err_packet.data.is_empty()) @@ -16036,6 +16103,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> htlc_id: malformed_htlc_id, failure_code, sha256_of_onion, + monitor_event_source: None, }; let _ = core::mem::replace(&mut holding_cell_htlc_updates[htlc_idx], malformed_htlc); @@ -17062,12 +17130,14 @@ mod tests { |htlc_id, attribution_data| HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: msgs::OnionErrorPacket { data: vec![42], attribution_data }, + monitor_event_source: None, }; let dummy_holding_cell_malformed_htlc = |htlc_id| HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source: None, }; let mut holding_cell_htlc_updates = Vec::with_capacity(12); for i in 0..16 { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 70617b20894..23edaf215ff 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -42,6 +42,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, @@ -496,8 +497,21 @@ impl PendingAddHTLCInfo { #[cfg_attr(test, derive(Clone, Debug, PartialEq))] pub(super) enum HTLCForwardInfo { AddHTLC(PendingAddHTLCInfo), - FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket }, - FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32] }, + FailHTLC { + htlc_id: u64, + err_packet: msgs::OnionErrorPacket, + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, + FailMalformedHTLC { + htlc_id: u64, + failure_code: u16, + sha256_of_onion: [u8; 32], + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, } /// Whether this blinded HTLC is being failed backwards by the introduction node or a blinded node, @@ -1474,6 +1488,9 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEventOptionAndFreeOtherChannel { event: Option, downstream_counterparty_and_funding_outpoint: EventUnblockedChannel, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, /// Indicates we should immediately resume the operation of another channel, unless there is /// some other reason why the channel is blocked. In practice this simply means immediately @@ -1491,7 +1508,14 @@ pub(crate) enum MonitorUpdateCompletionAction { downstream_counterparty_node_id: PublicKey, blocking_action: RAAMonitorUpdateBlockingAction, downstream_channel_id: ChannelId, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, + /// Indicates that one or more [`MonitorEvent`]s should be acknowledged via + /// [`chain::Watch::ack_monitor_event`] once the associated [`ChannelMonitorUpdate`] has been + /// durably persisted. + AckMonitorEvents { monitor_events_to_ack: Vec }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, @@ -1505,6 +1529,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, downstream_counterparty_node_id, required), (4, blocking_action, upgradable_required), (5, downstream_channel_id, required), + (7, monitor_event_source, option), }, (2, EmitEventOptionAndFreeOtherChannel) => { // LDK prior to 0.3 required this field. It will not be present for trampoline payments @@ -1512,6 +1537,10 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, // are in the process of being resolved. (0, event, upgradable_option), (1, downstream_counterparty_and_funding_outpoint, upgradable_required), + (3, monitor_event_source, option), + }, + (3, AckMonitorEvents) => { + (0, monitor_events_to_ack, required_vec), }, ); @@ -1537,12 +1566,29 @@ enum PostMonitorUpdateChanResume { }, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq)] pub(crate) struct PaymentCompleteUpdate { counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, channel_id: ChannelId, htlc_id: SentHTLCId, + /// The id of the [`MonitorEvent`] that triggered this update. + /// + /// Used when the [`Event`] corresponding to this update's [`EventCompletionAction`] has been + /// processed by the user, to remove the [`MonitorEvent`] from the [`ChannelMonitor`] and prevent + /// it from being re-provided on startup. + monitor_event_id: Option, +} + +impl PartialEq for PaymentCompleteUpdate { + fn eq(&self, other: &Self) -> bool { + self.counterparty_node_id == other.counterparty_node_id + && self.channel_funding_outpoint == other.channel_funding_outpoint + && self.channel_id == other.channel_id + && self.htlc_id == other.htlc_id + // monitor_event_id is excluded: it's metadata about the event source, + // not part of the semantic identity of the payment resolution. + } } impl_writeable_tlv_based!(PaymentCompleteUpdate, { @@ -1550,6 +1596,7 @@ impl_writeable_tlv_based!(PaymentCompleteUpdate, { (3, counterparty_node_id, required), (5, channel_id, required), (7, htlc_id, required), + (9, monitor_event_id, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -2952,6 +2999,15 @@ pub struct ChannelManager< /// offer they resolve to to the given one. pub testing_dnssec_proof_offer_resolution_override: Mutex>, + /// When set, monitors will repeatedly provide an event back to the `ChannelManager` on restart + /// until the event is explicitly acknowledged as processed. + /// + /// Allows us to reconstruct pending HTLC state by replaying monitor events on startup, rather + /// than from complexly polling and reconciling Channel{Monitor} APIs, as well as move the + /// responsibility of off-chain payment resolution from the Channel to the monitor. Will be + /// always set once support is complete. + persistent_monitor_events: bool, + #[cfg(test)] pub(super) entropy_source: ES, #[cfg(not(test))] @@ -3585,6 +3641,8 @@ impl< our_network_pubkey, current_timestamp, expanded_inbound_key, node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(), ); + #[cfg(test)] + let override_persistent_monitor_events = config.override_persistent_monitor_events; ChannelManager { config: RwLock::new(config), @@ -3641,6 +3699,28 @@ impl< logger, + persistent_monitor_events: { + #[cfg(not(test))] + { false } + #[cfg(test)] + { + override_persistent_monitor_events.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val), + }, + Err(_) => { + let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish(); + rand_val % 2 == 0 + }, + } + }) + } + }, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), } @@ -7581,12 +7661,14 @@ impl< HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { htlc_id: fail_htlc.htlc_id, err_packet: fail_htlc.into(), + monitor_event_source: None, }, HTLCFailureMsg::Malformed(fail_malformed_htlc) => { HTLCForwardInfo::FailMalformedHTLC { htlc_id: fail_malformed_htlc.htlc_id, sha256_of_onion: fail_malformed_htlc.sha256_of_onion, failure_code: fail_malformed_htlc.failure_code.into(), + monitor_event_source: None, } }, }; @@ -7911,11 +7993,15 @@ impl< continue; } }, - HTLCForwardInfo::FailHTLC { .. } | HTLCForwardInfo::FailMalformedHTLC { .. } => { + HTLCForwardInfo::FailHTLC { monitor_event_source, .. } + | HTLCForwardInfo::FailMalformedHTLC { monitor_event_source, .. } => { // Channel went away before we could fail it. This implies // the channel is now on chain and our counterparty is // trying to broadcast the HTLC-Timeout, but that's their // problem, not ours. + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } }, } } @@ -8116,7 +8202,7 @@ impl< } None }, - HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => { + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, monitor_event_source } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8124,7 +8210,16 @@ impl< { let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id)) + Some(( + chan.queue_fail_htlc( + htlc_id, + err_packet.clone(), + monitor_event_source, + &&logger, + ), + htlc_id, + monitor_event_source, + )) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8136,7 +8231,12 @@ impl< break; } }, - HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + HTLCForwardInfo::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + monitor_event_source, + } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8148,9 +8248,10 @@ impl< htlc_id, failure_code, sha256_of_onion, + monitor_event_source, &&logger, ); - Some((res, htlc_id)) + Some((res, htlc_id, monitor_event_source)) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8163,8 +8264,12 @@ impl< } }, }; - if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res { + if let Some((queue_fail_htlc_res, htlc_id, monitor_event_source)) = queue_fail_htlc_res + { if let Err(e) = queue_fail_htlc_res { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let ChannelError::Ignore(msg) = e { if let Some(chan) = peer_state .channel_by_id @@ -9174,6 +9279,10 @@ impl< onion_error ); + let monitor_event_source = from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); push_forward_htlcs_failure( *prev_outbound_scid_alias, get_htlc_forward_failure( @@ -9183,6 +9292,7 @@ impl< trampoline_shared_secret, phantom_shared_secret, *htlc_id, + monitor_event_source, ), ); @@ -9218,7 +9328,12 @@ impl< // necessarily want to fail all of our incoming HTLCs back yet. We may have other // outgoing HTLCs that need to resolve first. This will be tracked in our // pending_outbound_payments in a followup. - for current_hop_data in previous_hop_data { + let trampoline_monitor_event_source = + from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); + for (i, current_hop_data) in previous_hop_data.iter().enumerate() { let HTLCPreviousHopData { prev_outbound_scid_alias, htlc_id, @@ -9245,6 +9360,7 @@ impl< &incoming_trampoline_shared_secret, &None, *htlc_id, + if i == 0 { trampoline_monitor_event_source } else { None }, ), ); } @@ -9498,6 +9614,7 @@ impl< startup_replay: bool, next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, hop_data: HTLCPreviousHopData, attribution_data: Option, send_timestamp: Option, + monitor_event_source: Option, ) { let _prev_channel_id = hop_data.channel_id; let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); @@ -9587,6 +9704,7 @@ impl< downstream_counterparty_node_id: chan_to_release.counterparty_node_id, downstream_channel_id: chan_to_release.channel_id, blocking_action: chan_to_release.blocking_action, + monitor_event_source, }), None, ) @@ -9602,6 +9720,7 @@ impl< Some(MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint: chan_to_release, + monitor_event_source, }), None, ) @@ -9786,8 +9905,12 @@ impl< downstream_counterparty_node_id: node_id, blocking_action: blocker, downstream_channel_id: channel_id, + monitor_event_source, } = action { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { let mut peer_state = peer_state_mtx.lock().unwrap(); if let Some(blockers) = peer_state @@ -9946,6 +10069,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option, attribution_data: Option, send_timestamp: Option, + monitor_event_id: Option, ) { let startup_replay = !self.background_events_processed_since_startup.load(Ordering::Acquire); @@ -9964,6 +10088,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: next_channel_outpoint, channel_id: next_channel_id, htlc_id, + monitor_event_id, }; Some(EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(release)) } else { @@ -10044,6 +10169,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ hop_data, attribution_data, send_timestamp, + monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: next_channel_id }), ); }, HTLCSource::TrampolineForward { previous_hop_data, .. } => { @@ -10087,6 +10214,19 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ current_previous_hop_data, attribution_data.clone(), send_timestamp, + // Only pass the monitor event ID for the first hop. Once one + // inbound channel durably has the preimage, the outbound + // monitor event can be acked. The preimage remains in the + // outbound monitor's storage and will be replayed to all + // remaining inbound hops on restart via pending_claims_to_replay. + if i == 0 { + monitor_event_id.map(|id| MonitorEventSource { + event_id: id, + channel_id: next_channel_id, + }) + } else { + None + }, ); } }, @@ -10313,7 +10453,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(event) = event { self.pending_events.lock().unwrap().push_back((event, None)); } @@ -10327,13 +10471,22 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ downstream_counterparty_node_id, downstream_channel_id, blocking_action, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } self.handle_monitor_update_release( downstream_counterparty_node_id, downstream_channel_id, Some(blocking_action), ); }, + MonitorUpdateCompletionAction::AckMonitorEvents { monitor_events_to_ack } => { + for source in monitor_events_to_ack { + self.chain_monitor.ack_monitor_event(source); + } + }, } } @@ -11544,6 +11697,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fail_chan!("Already had channel with the new channel_id"); }, hash_map::Entry::Vacant(e) => { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { // There's no problem signing a counterparty's funding transaction if our monitor @@ -11714,6 +11868,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match chan .funding_signed(&msg, best_block, &self.signer_provider, &self.logger) .and_then(|(funded_chan, monitor)| { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); self.chain_monitor .watch_channel(funded_chan.context.channel_id(), monitor) .map_err(|()| { @@ -12544,6 +12699,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(next_user_channel_id), msg.attribution_data, send_timestamp, + None, ); Ok(()) @@ -12628,6 +12784,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(chan) = chan.as_funded_mut() { if let Some(monitor) = monitor_opt { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { @@ -12812,7 +12969,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ *counterparty_node_id); let (htlcs_to_fail, static_invoices, monitor_update_opt) = try_channel_entry!(self, peer_state, chan.revoke_and_ack(&msg, &self.fee_estimator, &&logger, mon_update_blocked), chan_entry); - if let Some(monitor_update) = monitor_update_opt { + if let Some((monitor_update, monitor_events_to_ack)) = monitor_update_opt { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(msg.channel_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); if let Some(data) = self.handle_new_monitor_update( @@ -13435,7 +13601,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (event_id, monitor_event) in monitor_events.drain(..) { + let monitor_event_source = MonitorEventSource { event_id, channel_id }; match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( @@ -13464,6 +13631,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ None, None, None, + Some(event_id), ); } else { log_trace!(logger, "Failing HTLC from our monitor"); @@ -13476,6 +13644,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: funding_outpoint, channel_id, htlc_id: SentHTLCId::from_source(&htlc_update.source), + monitor_event_id: Some(event_id), }); self.fail_htlc_backwards_internal( &htlc_update.source, @@ -13518,6 +13687,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::CommitmentTxConfirmed(_) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -13539,6 +13711,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::Completed { channel_id, monitor_update_id, .. } => { self.channel_monitor_updated( @@ -13546,6 +13721,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(monitor_update_id), &counterparty_node_id, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); }, } } @@ -13599,7 +13775,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { let update_res = monitor_opt - .map(|monitor_update| { + .map(|(monitor_update, monitor_events_to_ack)| { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(*chan_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, &mut peer_state.monitor_update_blocked_actions, @@ -14130,6 +14315,7 @@ fn get_htlc_forward_failure( blinded_failure: &Option, onion_error: &HTLCFailReason, incoming_packet_shared_secret: &[u8; 32], trampoline_shared_secret: &Option<[u8; 32]>, phantom_shared_secret: &Option<[u8; 32]>, htlc_id: u64, + monitor_event_source: Option, ) -> HTLCForwardInfo { // TODO: Correctly wrap the error packet twice if failing back a trampoline + phantom HTLC. let secondary_shared_secret = trampoline_shared_secret.or(*phantom_shared_secret); @@ -14141,19 +14327,20 @@ fn get_htlc_forward_failure( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, Some(BlindedFailure::FromBlindedNode) => HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source, }, None => { let err_packet = onion_error.get_encrypted_failure_packet( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, } } @@ -15246,8 +15433,13 @@ impl< channel_funding_outpoint, channel_id, htlc_id, + monitor_event_id, }, ) => { + if let Some(event_id) = monitor_event_id { + self.chain_monitor + .ack_monitor_event(MonitorEventSource { channel_id, event_id }); + } let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock = per_peer_state .get(&counterparty_node_id) @@ -17818,15 +18010,21 @@ impl Writeable for HTLCForwardInfo { 0u8.write(w)?; info.write(w)?; }, - Self::FailHTLC { htlc_id, err_packet } => { + Self::FailHTLC { htlc_id, err_packet, monitor_event_source } => { FAIL_HTLC_VARIANT_ID.write(w)?; write_tlv_fields!(w, { (0, htlc_id, required), (2, err_packet.data, required), (5, err_packet.attribution_data, option), + (7, monitor_event_source, option), }); }, - Self::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + Self::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + monitor_event_source, + } => { // Since this variant was added in 0.0.119, write this as `::FailHTLC` with an empty error // packet so older versions have something to fail back with, but serialize the real data as // optional TLVs for the benefit of newer versions. @@ -17836,6 +18034,7 @@ impl Writeable for HTLCForwardInfo { (1, failure_code, required), (2, Vec::::new(), required), (3, sha256_of_onion, required), + (7, monitor_event_source, option), }); }, } @@ -17856,6 +18055,7 @@ impl Readable for HTLCForwardInfo { (2, err_packet, required), (3, sha256_of_onion, option), (5, attribution_data, option), + (7, monitor_event_source, option), }); if let Some(failure_code) = malformed_htlc_failure_code { if attribution_data.is_some() { @@ -17865,6 +18065,7 @@ impl Readable for HTLCForwardInfo { htlc_id: _init_tlv_based_struct_field!(htlc_id, required), failure_code, sha256_of_onion: sha256_of_onion.ok_or(DecodeError::InvalidValue)?, + monitor_event_source, } } else { Self::FailHTLC { @@ -17873,6 +18074,7 @@ impl Readable for HTLCForwardInfo { data: _init_tlv_based_struct_field!(err_packet, required), attribution_data: _init_tlv_based_struct_field!(attribution_data, option), }, + monitor_event_source, } } }, @@ -18142,6 +18344,9 @@ impl< } } + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_monitor_events = self.persistent_monitor_events.then_some(true); + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -18154,6 +18359,7 @@ impl< (9, htlc_purposes, required_vec), (10, legacy_in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), + (12, persistent_monitor_events, option), (13, htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_opt, option), (15, self.inbound_payment_id_secret, required), @@ -18253,6 +18459,7 @@ pub(super) struct ChannelManagerData { forward_htlcs_legacy: HashMap>, pending_intercepted_htlcs_legacy: HashMap, decode_update_add_htlcs_legacy: HashMap>, + persistent_monitor_events: bool, // The `ChannelManager` version that was written. version: u8, } @@ -18438,6 +18645,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut inbound_payment_id_secret = None; let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); + let mut persistent_monitor_events = false; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18450,6 +18658,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (9, claimable_htlc_purposes, optional_vec), (10, legacy_in_flight_monitor_updates, option), (11, probing_cookie_secret, option), + (12, persistent_monitor_events, (default_value, false)), (13, amountless_claimable_htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_legacy, option), (15, inbound_payment_id_secret, option), @@ -18458,6 +18667,12 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), }); + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_monitor_events { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue); + } + // Merge legacy pending_outbound_payments fields into a single HashMap. // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) // > pending_outbound_payments_compat (non-TLV legacy) @@ -18574,6 +18789,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, version, + persistent_monitor_events, }) } } @@ -18875,6 +19091,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + persistent_monitor_events, version: _version, } = data; @@ -19636,6 +19853,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id, + monitor_event_id: None, }; let mut compl_action = Some( EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(update) @@ -19715,6 +19933,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id: SentHTLCId::from_source(&htlc_source), + monitor_event_id: None, }); failed_htlcs.push(( @@ -20138,6 +20357,8 @@ impl< logger: args.logger, config: RwLock::new(args.config), + persistent_monitor_events, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), }; @@ -20470,6 +20691,7 @@ impl< downstream_user_channel_id, None, None, + None, ); } diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 2368776dd3f..b0d1813c61a 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3591,6 +3591,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b let mut cfg = test_default_channel_config(); cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + // This test specifically tests lost monitor events, which requires the legacy + // (non-persistent) monitor event behavior. + cfg.override_persistent_monitor_events = Some(false); let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())]; let chanmon_cfgs = create_chanmon_cfgs(3); @@ -3800,27 +3803,83 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b } #[test] -fn test_lost_timeout_monitor_events() { +fn test_lost_timeout_monitor_events_a() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_b() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_c() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_d() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_e() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_f() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_g() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_h() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_i() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_j() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, false); - +} +#[test] +fn test_lost_timeout_monitor_events_k() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_l() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_m() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_n() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_o() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_p() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_q() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_r() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_s() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_t() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, true); } diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 7d198d2d70d..d405567473d 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -1423,15 +1423,39 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } #[test] -fn test_dup_htlc_onchain_doesnt_fail_on_reload() { +fn test_dup_htlc_onchain_doesnt_fail_on_reload_a() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_b() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_c() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_d() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_e() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_f() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_g() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_h() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_i() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false, false); } diff --git a/lightning/src/util/config.rs b/lightning/src/util/config.rs index e4158910b9a..6b0f7942423 100644 --- a/lightning/src/util/config.rs +++ b/lightning/src/util/config.rs @@ -1103,6 +1103,10 @@ pub struct UserConfig { /// /// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel pub reject_inbound_splices: bool, + /// If set to `Some`, overrides the random selection of whether to use persistent monitor + /// events. Only available in tests. + #[cfg(test)] + pub override_persistent_monitor_events: Option, } impl Default for UserConfig { @@ -1119,6 +1123,8 @@ impl Default for UserConfig { enable_htlc_hold: false, hold_outbound_htlcs_at_next_hop: false, reject_inbound_splices: true, + #[cfg(test)] + override_persistent_monitor_events: None, } } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index d31c16ccbf0..59c4e06a9d9 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -15,7 +15,7 @@ use crate::chain::chaininterface; #[cfg(any(test, feature = "_externalize_tests"))] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chaininterface::{ConfirmationTarget, TransactionType}; -use crate::chain::chainmonitor::{ChainMonitor, Persist}; +use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist}; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; @@ -650,6 +650,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { ) .unwrap() .1; + new_monitor.copy_monitor_event_state(&monitor); assert!(new_monitor == monitor); self.latest_monitor_update_id .lock() @@ -711,6 +712,9 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // it so it doesn't leak into the rest of the test. let failed_back = monitor.inner.lock().unwrap().failed_back_htlc_ids.clone(); new_monitor.inner.lock().unwrap().failed_back_htlc_ids = failed_back; + // The deserialized monitor will reset the monitor event state, so copy it from the live + // monitor before comparing. + new_monitor.copy_monitor_event_state(&monitor); if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() { assert_eq!(chan_id, channel_id); assert!(new_monitor != *monitor); @@ -724,7 +728,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { // Auto-flush pending operations so that the ChannelManager can pick up monitor // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) @@ -735,6 +739,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } #[cfg(any(test, feature = "_externalize_tests"))]