From a147ba1ea583ddf7b30bd97228048a82e66fa8f4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 13:48:04 +0000 Subject: [PATCH 01/16] Update BestBlock to store ANTI_REORG_DELAY * 2 recent block hashes On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In conjunction with 403dc1a48bb71ae794f6883ae0b760aad44cda39 (which allows us to disconnect blocks without having the stored header), this should allow us to replay chain state after a reorg even if we no longer have access to the top few blocks of the old chain tip. While we only really need to store `ANTI_REORG_DELAY` blocks (as we generally assume that any deeper reorg won't happen and thus we don't guarantee we handle it correctly), its nice to store a few more to be able to handle more than a six block reorg. While other parts of the codebase may not be entirely robust against such a reorg if the transactions confirmed change out from under us, its entirely possible (and, indeed, common) for reorgs to contain nearly identical transactions. --- lightning/src/chain/mod.rs | 78 ++++++++++++++++++++++++++++++++++++-- lightning/src/util/ser.rs | 27 +++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index b4cc6a302ae..3f31c4b3789 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,9 +18,10 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, +}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::impl_writeable_tlv_based; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::HTLCDescriptor; @@ -42,13 +43,20 @@ pub struct BestBlock { pub block_hash: BlockHash, /// The height at which the block was confirmed. pub height: u32, + /// Previous blocks immediately before [`Self::block_hash`], in reverse chronological order. + /// + /// These ensure we can find the fork point of a reorg if our block source no longer has the + /// previous best tip after a restart. + pub previous_blocks: [Option; ANTI_REORG_DELAY as usize * 2], } impl BestBlock { /// Constructs a `BestBlock` that represents the genesis block at height 0 of the given /// network. pub fn from_network(network: Network) -> Self { - BestBlock { block_hash: genesis_block(network).header.block_hash(), height: 0 } + let block_hash = genesis_block(network).header.block_hash(); + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height: 0, previous_blocks } } /// Returns a `BestBlock` as identified by the given block hash and height. @@ -56,13 +64,75 @@ impl BestBlock { /// This is not exported to bindings users directly as the bindings auto-generate an /// equivalent `new`. pub fn new(block_hash: BlockHash, height: u32) -> Self { - BestBlock { block_hash, height } + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height, previous_blocks } + } + + /// Advances to a new block at height [`Self::height`] + 1. + pub fn advance(&mut self, new_hash: BlockHash) { + // Shift all block hashes to the right (making room for the old tip at index 0) + for i in (1..self.previous_blocks.len()).rev() { + self.previous_blocks[i] = self.previous_blocks[i - 1]; + } + + // The old tip becomes the new index 0 (tip-1) + self.previous_blocks[0] = Some(self.block_hash); + + // Update to the new tip + self.block_hash = new_hash; + self.height += 1; + } + + /// Returns the block hash at the given height, if available in our history. + pub fn get_hash_at_height(&self, height: u32) -> Option { + if height > self.height { + return None; + } + if height == self.height { + return Some(self.block_hash); + } + + // offset = 1 means we want tip-1, which is block_hashes[0] + // offset = 2 means we want tip-2, which is block_hashes[1], etc. + let offset = self.height.saturating_sub(height) as usize; + if offset >= 1 && offset <= self.previous_blocks.len() { + self.previous_blocks[offset - 1] + } else { + None + } + } + + /// Find the most recent common ancestor between two BestBlocks by searching their block hash + /// histories. + /// + /// Returns the common block hash and height, or None if no common block is found in the + /// available histories. + pub fn find_common_ancestor(&self, other: &BestBlock) -> Option<(BlockHash, u32)> { + // First check if either tip matches + if self.block_hash == other.block_hash && self.height == other.height { + return Some((self.block_hash, self.height)); + } + + // Check all heights covered by self's history + let min_height = self.height.saturating_sub(self.previous_blocks.len() as u32); + for check_height in (min_height..=self.height).rev() { + if let Some(self_hash) = self.get_hash_at_height(check_height) { + if let Some(other_hash) = other.get_hash_at_height(check_height) { + if self_hash == other_hash { + return Some((self_hash, check_height)); + } + } + } + } + None } } impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), + (1, previous_blocks_read, (legacy, [Option; ANTI_REORG_DELAY as usize * 2], |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; ANTI_REORG_DELAY as usize * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index f821aa5afc0..2bd8a9eb879 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -1448,6 +1448,33 @@ impl Readable for BlockHash { } } +impl Writeable for [Option; 12] { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + for hash_opt in self { + match hash_opt { + Some(hash) => hash.write(w)?, + None => ([0u8; 32]).write(w)?, + } + } + Ok(()) + } +} + +impl Readable for [Option; 12] { + fn read(r: &mut R) -> Result { + use bitcoin::hashes::Hash; + + let mut res = [None; 12]; + for hash_opt in res.iter_mut() { + let buf: [u8; 32] = Readable::read(r)?; + if buf != [0; 32] { + *hash_opt = Some(BlockHash::from_slice(&buf[..]).unwrap()); + } + } + Ok(res) + } +} + impl Writeable for ChainHash { fn write(&self, w: &mut W) -> Result<(), io::Error> { w.write_all(self.as_bytes()) From 053250129ea74e21413b3e06e6871b41039ea8af Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:34 +0000 Subject: [PATCH 02/16] f add claude's bestblock test --- lightning/src/chain/mod.rs | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 3f31c4b3789..faed0577521 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -532,3 +532,45 @@ impl ClaimId { ClaimId(Sha256::from_engine(engine).to_byte_array()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + #[test] + fn test_best_block() { + let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); + let mut chain_a = BestBlock::new(hash1, 100); + let mut chain_b = BestBlock::new(hash1, 100); + + // Test get_hash_at_height on initial block + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), None); + assert_eq!(chain_a.get_hash_at_height(99), None); + + // Test find_common_ancestor with identical blocks + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + let hash2 = BlockHash::from_slice(&[2; 32]).unwrap(); + chain_a.advance(hash2); + assert_eq!(chain_a.height, 101); + assert_eq!(chain_a.block_hash, hash2); + assert_eq!(chain_a.previous_blocks[0], Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), Some(hash2)); + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + + // Test find_common_ancestor with different heights + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with diverged chains but the same height + let hash_b3 = BlockHash::from_slice(&[33; 32]).unwrap(); + chain_b.advance(hash_b3); + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with no common history + let hash_other = BlockHash::from_slice(&[99; 32]).unwrap(); + let chain_c = BestBlock::new(hash_other, 200); + assert_eq!(chain_a.find_common_ancestor(&chain_c), None); + } +} From 06189346a5fb211eb57282e88fd06715e852a881 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:19 +0000 Subject: [PATCH 03/16] f use hard-coded values to break constant changes leading to ser break --- lightning/src/chain/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index faed0577521..4ebff2ef81b 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -130,9 +130,11 @@ impl BestBlock { impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), - (1, previous_blocks_read, (legacy, [Option; ANTI_REORG_DELAY as usize * 2], |us: &BestBlock| Some(us.previous_blocks))), + // Note that any change to the previous_blocks array length will change the serialization + // format and thus it is specified without constants here. + (1, previous_blocks_read, (legacy, [Option; 6 * 2], |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), - (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; ANTI_REORG_DELAY as usize * 2]))), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; 6 * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the From 9ce8da5a5ccf4d85a014ce0dc2fceb3e28331a92 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 14:08:16 +0000 Subject: [PATCH 04/16] Return `BestBlock` when deserializing chain-synced structs The deserialization of `ChannelMonitor`, `ChannelManager`, and `OutputSweeper` is implemented for a `(BlockHash, ...)` pair rather than on the object itself. This ensures developers are pushed to think about initial chain sync after deserialization and provides the latest chain sync state conviniently at deserialization-time. In the previous commit we started storing additional recent block hashes in `BestBlock` for use during initial sync to ensure we can handle reorgs while offline if the chain source loses the reorged-out blocks. Here, we move the deserialization routines to be on a `(BestBlock, ...)` pair instead of `(BlockHash, ...)`, providing access to those recent block hashes at deserialization-time. --- lightning-block-sync/src/init.rs | 17 ++++---- lightning/src/chain/channelmonitor.rs | 14 +++---- lightning/src/ln/chanmon_update_fail_tests.rs | 4 +- lightning/src/ln/channelmanager.rs | 20 +++++----- lightning/src/ln/functional_test_utils.rs | 8 ++-- lightning/src/ln/functional_tests.rs | 7 ++-- lightning/src/ln/reload_tests.rs | 11 +++--- lightning/src/util/persist.rs | 39 ++++++++++--------- lightning/src/util/test_utils.rs | 9 +++-- 9 files changed, 64 insertions(+), 65 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index a870f8ca88c..61f44c6139e 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -40,11 +40,10 @@ where /// switching to [`SpvClient`]. For example: /// /// ``` -/// use bitcoin::hash_types::BlockHash; /// use bitcoin::network::Network; /// /// use lightning::chain; -/// use lightning::chain::Watch; +/// use lightning::chain::{BestBlock, Watch}; /// use lightning::chain::chainmonitor; /// use lightning::chain::chainmonitor::ChainMonitor; /// use lightning::chain::channelmonitor::ChannelMonitor; @@ -89,14 +88,14 @@ where /// logger: &L, /// persister: &P, /// ) { -/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// // Read a serialized channel monitor paired with the best block when it was persisted. /// let serialized_monitor = "..."; -/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor)>::read( /// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); /// -/// // Read the channel manager paired with the block hash when it was persisted. +/// // Read the channel manager paired with the best block when it was persisted. /// let serialized_manager = "..."; -/// let (manager_block_hash, mut manager) = { +/// let (manager_best_block, mut manager) = { /// let read_args = ChannelManagerReadArgs::new( /// entropy_source, /// node_signer, @@ -110,7 +109,7 @@ where /// config, /// vec![&mut monitor], /// ); -/// <(BlockHash, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( +/// <(BestBlock, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( /// &mut Cursor::new(&serialized_manager), read_args).unwrap() /// }; /// @@ -118,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 515a3dc5f1d..1b63d9238f0 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1059,7 +1059,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. /// -/// Like the [`ChannelManager`], deserialization is implemented for `(BlockHash, ChannelMonitor)`, +/// Like the [`ChannelManager`], deserialization is implemented for `(BestBlock, ChannelMonitor)`, /// providing you with the last block hash which was connected before shutting down. You must begin /// syncing the chain from that point, disconnecting and connecting blocks as required to get to /// the best chain on startup. Note that all [`ChannelMonitor`]s passed to a [`ChainMonitor`] must @@ -1067,7 +1067,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// initialization. /// /// For those loading potentially-ancient [`ChannelMonitor`]s, deserialization is also implemented -/// for `Option<(BlockHash, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] +/// for `Option<(BestBlock, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] /// that was first created in LDK prior to 0.0.110 and last updated prior to LDK 0.0.119. In such /// cases, the `Option<(..)>` deserialization option may return `Ok(None)` rather than failing to /// deserialize, allowing you to differentiate between the two cases. @@ -6419,7 +6419,7 @@ where const MAX_ALLOC_SIZE: usize = 64 * 1024; impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for (BlockHash, ChannelMonitor) + for (BestBlock, ChannelMonitor) { fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { match >::read(reader, args) { @@ -6431,7 +6431,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for Option<(BlockHash, ChannelMonitor)> + for Option<(BestBlock, ChannelMonitor)> { #[rustfmt::skip] fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { @@ -6856,14 +6856,14 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP To continue, run a v0.1 release, send/route a payment over the channel or close it."); } } - Ok(Some((best_block.block_hash, monitor))) + Ok(Some((best_block, monitor))) } } #[cfg(test)] mod tests { use bitcoin::amount::Amount; - use bitcoin::hash_types::{BlockHash, Txid}; + use bitcoin::hash_types::Txid; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; @@ -6960,7 +6960,7 @@ mod tests { nodes[1].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, broadcast_tx)], conf_height); - let (_, pre_update_monitor) = <(BlockHash, ChannelMonitor<_>)>::read( + let (_, pre_update_monitor) = <(BestBlock, ChannelMonitor<_>)>::read( &mut io::Cursor::new(&get_monitor!(nodes[1], channel.2).encode()), (&nodes[1].keys_manager.backing, &nodes[1].keys_manager.backing)).unwrap(); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index ff499d049d4..4e32ad46ebd 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -16,7 +16,7 @@ use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::chainmonitor::ChainMonitor; use crate::chain::channelmonitor::{ChannelMonitor, MonitorEvent, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::chain::{ChannelMonitorUpdateStatus, Listen, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Listen, Watch}; use crate::events::{ClosureReason, Event, HTLCHandlingFailureType, PaymentPurpose}; use crate::ln::channel::AnnouncementSigsState; use crate::ln::channelmanager::{PaymentId, RAACommitmentOrder, RecipientOnionFields, Retry}; @@ -94,7 +94,7 @@ fn test_monitor_and_persister_update_fail() { let chain_mon = { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan.2).unwrap(); - let (_, new_monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, new_monitor) = <(BestBlock, ChannelMonitor)>::read( &mut &monitor.encode()[..], (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 10c77505408..1b881213de2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1942,7 +1942,6 @@ where /// detailed in the [`ChannelManagerReadArgs`] documentation. /// /// ``` -/// use bitcoin::BlockHash; /// use bitcoin::network::Network; /// use lightning::chain::BestBlock; /// # use lightning::chain::channelmonitor::ChannelMonitor; @@ -1991,8 +1990,8 @@ where /// entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, /// router, message_router, logger, config, channel_monitors.iter().collect(), /// ); -/// let (block_hash, channel_manager) = -/// <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; +/// let (best_block, channel_manager) = +/// <(BestBlock, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; /// /// // Update the ChannelManager and ChannelMonitors with the latest chain data /// // ... @@ -2558,7 +2557,7 @@ where /// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds /// will be lost (modulo on-chain transaction fees). /// -/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which +/// Note that the deserializer is only implemented for `(`[`BestBlock`]`, `[`ChannelManager`]`)`, which /// tells you the last block hash which was connected. You should get the best block tip before using the manager. /// See [`chain::Listen`] and [`chain::Confirm`] for more details. /// @@ -2625,7 +2624,6 @@ where /// [`peer_disconnected`]: msgs::BaseMessageHandler::peer_disconnected /// [`funding_created`]: msgs::FundingCreated /// [`funding_transaction_generated`]: Self::funding_transaction_generated -/// [`BlockHash`]: bitcoin::hash_types::BlockHash /// [`update_channel`]: chain::Watch::update_channel /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`read`]: ReadableArgs::read @@ -17535,7 +17533,7 @@ impl< MR: Deref, L: Deref + Clone, > ReadableArgs> - for (BlockHash, Arc>) + for (BestBlock, Arc>) where M::Target: chain::Watch<::EcdsaSigner>, T::Target: BroadcasterInterface, @@ -17550,9 +17548,9 @@ where fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let (blockhash, chan_manager) = - <(BlockHash, ChannelManager)>::read(reader, args)?; - Ok((blockhash, Arc::new(chan_manager))) + let (best_block, chan_manager) = + <(BestBlock, ChannelManager)>::read(reader, args)?; + Ok((best_block, Arc::new(chan_manager))) } } @@ -17568,7 +17566,7 @@ impl< MR: Deref, L: Deref + Clone, > ReadableArgs> - for (BlockHash, ChannelManager) + for (BestBlock, ChannelManager) where M::Target: chain::Watch<::EcdsaSigner>, T::Target: BroadcasterInterface, @@ -19369,7 +19367,7 @@ where //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((best_block_hash.clone(), channel_manager)) + Ok((best_block, channel_manager)) } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 2cf5ea96acb..c3c657f226e 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -848,7 +848,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mon = self.chain_monitor.chain_monitor.get_monitor(channel_id).unwrap(); mon.write(&mut w).unwrap(); let (_, deserialized_monitor) = - <(BlockHash, ChannelMonitor)>::read( + <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -876,7 +876,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); <( - BlockHash, + BestBlock, ChannelManager< &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, @@ -1305,7 +1305,7 @@ pub fn _reload_node<'a, 'b, 'c>( let mut monitors_read = Vec::with_capacity(monitors_encoded.len()); for encoded in monitors_encoded { let mut monitor_read = &encoded[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read( &mut monitor_read, (node.keys_manager, node.keys_manager), ) @@ -1320,7 +1320,7 @@ pub fn _reload_node<'a, 'b, 'c>( for monitor in monitors_read.iter() { assert!(channel_monitors.insert(monitor.channel_id(), monitor).is_none()); } - <(BlockHash, TestChannelManager<'b, 'c>)>::read( + <(BestBlock, TestChannelManager<'b, 'c>)>::read( &mut node_read, ChannelManagerReadArgs { config, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index fcb348c690d..66c37217adf 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -19,6 +19,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch}; use crate::events::{ ClosureReason, Event, HTLCHandlingFailureType, PathFailure, PaymentFailureReason, @@ -7280,7 +7281,7 @@ pub fn test_update_err_monitor_lockdown() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7386,7 +7387,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7436,7 +7437,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index a38262e6952..9f8e25b21f4 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -11,7 +11,7 @@ //! Functional tests which test for correct behavior across node restarts. -use crate::chain::{ChannelMonitorUpdateStatus, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateStep}; use crate::routing::router::{PaymentParameters, RouteParameters}; @@ -29,7 +29,6 @@ use crate::util::ser::{Writeable, ReadableArgs}; use crate::util::config::UserConfig; use bitcoin::hashes::Hash; -use bitcoin::hash_types::BlockHash; use types::payment::{PaymentHash, PaymentPreimage}; use crate::prelude::*; @@ -410,7 +409,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_stale_monitors = Vec::new(); for serialized in node_0_stale_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_stale_monitors.push(monitor); } @@ -418,14 +417,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_monitors = Vec::new(); for serialized in node_0_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_monitors.push(monitor); } let mut nodes_0_read = &nodes_0_serialized[..]; if let Err(msgs::DecodeError::DangerousValue) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, @@ -443,7 +442,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut nodes_0_read = &nodes_0_serialized[..]; let (_, nodes_0_deserialized_tmp) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..46d522b2285 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -14,7 +14,7 @@ use alloc::sync::Arc; use bitcoin::hashes::hex::FromHex; -use bitcoin::{BlockHash, Txid}; +use bitcoin::Txid; use core::convert::Infallible; use core::future::Future; @@ -32,6 +32,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; @@ -447,7 +448,7 @@ impl Persist( kv_store: K, entropy_source: ES, signer_provider: SP, -) -> Result::EcdsaSigner>)>, io::Error> +) -> Result::EcdsaSigner>)>, io::Error> where K::Target: KVStoreSync, ES::Target: EntropySource + Sized, @@ -459,7 +460,7 @@ where CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )? { - match ::EcdsaSigner>)>>::read( + match ::EcdsaSigner>)>>::read( &mut io::Cursor::new(kv_store.read( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -467,7 +468,7 @@ where )?), (&*entropy_source, &*signer_provider), ) { - Ok(Some((block_hash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { let monitor_name = MonitorName::from_str(&stored_key)?; if channel_monitor.persistence_key() != monitor_name { return Err(io::Error::new( @@ -476,7 +477,7 @@ where )); } - res.push((block_hash, channel_monitor)); + res.push((best_block, channel_monitor)); }, Ok(None) => {}, Err(_) => { @@ -652,7 +653,7 @@ where pub fn read_all_channel_monitors_with_updates( &self, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { poll_sync_future(self.0.read_all_channel_monitors_with_updates()) @@ -675,7 +676,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key)) } @@ -863,7 +864,7 @@ where pub async fn read_all_channel_monitors_with_updates( &self, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; @@ -897,7 +898,7 @@ where pub async fn read_all_channel_monitors_with_updates_parallel( self: &Arc, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > where @@ -949,7 +950,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { self.0.read_channel_monitor_with_updates(monitor_key).await } @@ -1069,7 +1070,7 @@ where { pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { match self.maybe_read_channel_monitor_with_updates(monitor_key).await? { Some(res) => Ok(res), @@ -1088,7 +1089,7 @@ where async fn maybe_read_channel_monitor_with_updates( &self, monitor_key: &str, ) -> Result< - Option<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Option<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let monitor_name = MonitorName::from_str(monitor_key)?; @@ -1097,7 +1098,7 @@ where .kv_store .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key)); let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await; - let (block_hash, monitor) = match read_res? { + let (best_block, monitor) = match read_res? { Some(res) => res, None => return Ok(None), }; @@ -1128,14 +1129,14 @@ where io::Error::new(io::ErrorKind::Other, "Monitor update failed") })?; } - Ok(Some((block_hash, monitor))) + Ok(Some((best_block, monitor))) } /// Read a channel monitor. async fn maybe_read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, ) -> Result< - Option<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Option<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; @@ -1146,12 +1147,12 @@ where if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); } - match ::EcdsaSigner>)>>::read( + match ::EcdsaSigner>)>>::read( &mut monitor_cursor, (&*self.entropy_source, &*self.signer_provider), ) { Ok(None) => Ok(None), - Ok(Some((blockhash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { if channel_monitor.persistence_key() != *monitor_name { log_error!( self.logger, @@ -1163,7 +1164,7 @@ where "ChannelMonitor was stored under the wrong key", )) } else { - Ok(Some((blockhash, channel_monitor))) + Ok(Some((best_block, channel_monitor))) } }, Err(e) => { @@ -1342,7 +1343,7 @@ where async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { - Ok((_block_hash, monitor)) => monitor, + Ok((_best_block, monitor)) => monitor, Err(_) => return, }; let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..ab749910323 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -20,6 +20,7 @@ use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::WatchedOutput; use crate::events::bump_transaction::sync::WalletSourceSync; use crate::events::bump_transaction::Utxo; @@ -67,7 +68,7 @@ use bitcoin::amount::Amount; use bitcoin::block::Block; use bitcoin::constants::genesis_block; use bitcoin::constants::ChainHash; -use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hash_types::Txid; use bitcoin::hashes::{hex::FromHex, Hash}; use bitcoin::network::Network; use bitcoin::script::{Builder, Script, ScriptBuf}; @@ -564,7 +565,7 @@ impl<'a> TestChainMonitor<'a> { // underlying `ChainMonitor`. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -596,7 +597,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // monitor to a serialized copy and get he same one back. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -652,7 +653,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let monitor = self.chain_monitor.get_monitor(channel_id).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) From 513c57a43f3f7d06e1aeaf3451997b55b799ed4e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 16:01:55 +0000 Subject: [PATCH 05/16] Replace `Cache::block_disconnected` with `blocks_disconnected` In 403dc1a48bb71ae794f6883ae0b760aad44cda39 we converted the `Listen` disconnect semantics to only pass the fork point, rather than each block being disconnected. We did not, however, update the semantics of `lightning-block-sync`'s `Cache` to reduce patch size. Here we go ahead and do so, dropping `ChainDifference::disconnected_blocks` as well as its no longer needed. --- lightning-block-sync/src/init.rs | 8 +++---- lightning-block-sync/src/lib.rs | 37 +++++++++++++------------------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 61f44c6139e..07575c6f523 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -175,7 +175,9 @@ where let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; let difference = chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - chain_notifier.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != old_header { + chain_notifier.disconnect_blocks(difference.common_ancestor); + } (difference.common_ancestor, difference.connected_blocks) }; @@ -215,9 +217,7 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { unreachable!() } - fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { - None - } + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} } /// Wrapper for supporting dynamically sized chain listeners. diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 02593047658..91de5beaca3 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -202,9 +202,11 @@ pub trait Cache { /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); - /// Called when a block has been disconnected from the best chain. Once disconnected, a block's - /// header is no longer needed and thus can be removed. - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; + /// Called when blocks have been disconnected from the best chain. Only the fork point + /// (best comon ancestor) is provided. + /// + /// Once disconnected, a block's header is no longer needed and thus can be removed. + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } /// Unbounded cache of block headers keyed by block hash. @@ -219,8 +221,8 @@ impl Cache for UnboundedCache { self.insert(block_hash, block_header); } - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.remove(block_hash) + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); } } @@ -315,9 +317,6 @@ struct ChainDifference { /// If there are any disconnected blocks, this is where the chain forked. common_ancestor: ValidatedBlockHeader, - /// Blocks that were disconnected from the chain since the last poll. - disconnected_blocks: Vec, - /// Blocks that were connected to the chain since the last poll. connected_blocks: Vec, } @@ -341,7 +340,9 @@ where .find_difference(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; - self.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != *old_header { + self.disconnect_blocks(difference.common_ancestor); + } self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) .await } @@ -354,7 +355,6 @@ where &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { - let mut disconnected_blocks = Vec::new(); let mut connected_blocks = Vec::new(); let mut current = current_header; let mut previous = *prev_header; @@ -369,7 +369,6 @@ where let current_height = current.height; let previous_height = previous.height; if current_height <= previous_height { - disconnected_blocks.push(previous); previous = self.look_up_previous_header(chain_poller, &previous).await?; } if current_height >= previous_height { @@ -379,7 +378,7 @@ where } let common_ancestor = current; - Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + Ok(ChainDifference { common_ancestor, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -394,16 +393,10 @@ where } /// Notifies the chain listeners of disconnected blocks. - fn disconnect_blocks(&mut self, disconnected_blocks: Vec) { - for header in disconnected_blocks.iter() { - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, *header); - } - } - if let Some(block) = disconnected_blocks.last() { - let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1); - self.chain_listener.blocks_disconnected(fork_point); - } + fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) { + self.header_cache.blocks_disconnected(&fork_point); + let best_block = BestBlock::new(fork_point.block_hash, fork_point.height); + self.chain_listener.blocks_disconnected(best_block); } /// Notifies the chain listeners of connected blocks. From 7453fbe76f5b09f371f4f344fca33decf29ff648 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 15:49:11 +0000 Subject: [PATCH 06/16] Pass a `BestBlock` to `init::synchronize_listeners` On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In a previous commit, we moved deserialization of various structs to return the `BestBlock` rather than a `BlockHash`. Here we move to actually using it, taking a `BestBlock` in place of `BlockHash` to `init::synchronize_listeners` and walking the `previous_blocks` list to find the fork point rather than relying on the `Cache`. --- lightning-block-sync/src/init.rs | 51 ++++++++++---------------- lightning-block-sync/src/lib.rs | 45 ++++++++++++++++++++++- lightning-block-sync/src/poll.rs | 13 +++++++ lightning-block-sync/src/test_utils.rs | 17 +++++++++ 4 files changed, 93 insertions(+), 33 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 07575c6f523..4fdd3efabd8 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -117,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); @@ -143,39 +143,28 @@ pub async fn synchronize_listeners< L: chain::Listen + ?Sized, >( block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BlockHash, &L)>, + mut chain_listeners: Vec<(BestBlock, &L)>, ) -> BlockSourceResult where B::Target: BlockSource, { let best_header = validate_best_block_header(&*block_source).await?; - // Fetch the header for the block hash paired with each listener. - let mut chain_listeners_with_old_headers = Vec::new(); - for (old_block_hash, chain_listener) in chain_listeners.drain(..) { - let old_header = match header_cache.look_up(&old_block_hash) { - Some(header) => *header, - None => { - block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? - }, - }; - chain_listeners_with_old_headers.push((old_header, chain_listener)) - } - // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; - let difference = - chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - if difference.common_ancestor != old_header { + let difference = chain_notifier + .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) + .await?; + if difference.common_ancestor.block_hash != old_best_block.block_hash { chain_notifier.disconnect_blocks(difference.common_ancestor); } (difference.common_ancestor, difference.connected_blocks) @@ -281,9 +270,9 @@ mod tests { let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); let listeners = vec![ - (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), - (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen), - (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen), + (chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen), + (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), + (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; let mut cache = chain.header_cache(0..=4); match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { @@ -313,9 +302,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -350,9 +339,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -368,18 +357,18 @@ mod tests { let main_chain = Blockchain::default().with_height(2); let fork_chain = main_chain.fork_at_height(1); let new_tip = main_chain.tip(); - let old_tip = fork_chain.tip(); + let old_best_block = fork_chain.best_block(); let listener = MockChainListener::new() .expect_blocks_disconnected(*fork_chain.at_height(1)) .expect_block_connected(*new_tip); - let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)]; + let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; let mut cache = fork_chain.header_cache(2..=2); match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { Ok(_) => { assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_tip.block_hash)); + assert!(cache.contains_key(&old_best_block.block_hash)); }, Err(e) => panic!("Unexpected error: {:?}", e), } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 91de5beaca3..ad2cb4215b3 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -337,7 +337,7 @@ where chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { let difference = self - .find_difference(new_header, old_header, chain_poller) + .find_difference_from_header(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; if difference.common_ancestor != *old_header { @@ -347,11 +347,52 @@ where .await } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_best_block` as its tip. + /// + /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` + /// field as fallback if needed, then finds the common ancestor. + async fn find_difference_from_best_block( + &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + chain_poller: &mut P, + ) -> BlockSourceResult { + // Try to resolve the header for the previous best block. First try the block_hash, + // then fall back to previous_blocks if that fails. + let cur_tip = core::iter::once((0, &prev_best_block.block_hash)); + let prev_tips = + prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| { + if let Some(block_hash) = hash_opt { + Some((idx as u32 + 1, block_hash)) + } else { + None + } + }); + let mut found_header = None; + for (height_diff, block_hash) in cur_tip.chain(prev_tips) { + if let Some(header) = self.header_cache.look_up(block_hash) { + found_header = Some(*header); + break; + } + let height = prev_best_block.height.checked_sub(height_diff).ok_or( + BlockSourceError::persistent("BestBlock had more previous_blocks than its height"), + )?; + if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { + found_header = Some(header); + break; + } + } + let found_header = found_header.ok_or_else(|| { + BlockSourceError::persistent("could not resolve any block from BestBlock") + })?; + + self.find_difference_from_header(current_header, &found_header, chain_poller).await + } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the /// chain with `prev_header` as its tip. /// /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. - async fn find_difference( + async fn find_difference_from_header( &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 13e0403c3b6..fd8c546c56f 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -31,6 +31,11 @@ pub trait Poll { fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, ) -> impl Future> + Send + 'a; + + /// Returns the header for a given hash and optional height hint. + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -258,6 +263,14 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll ) -> impl Future> + Send + 'a { async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) } } + + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a { + Box::pin(async move { + self.block_source.get_header(block_hash, height_hint).await?.validate(*block_hash) + }) + } } #[cfg(test)] diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 40788e4d08c..3d7870afb1e 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -104,6 +104,18 @@ impl Blockchain { block_header.validate(block_hash).unwrap() } + pub fn best_block_at_height(&self, height: usize) -> BestBlock { + let mut previous_blocks = [None; 12]; + for (i, height) in (0..height).rev().take(12).enumerate() { + previous_blocks[i] = Some(self.blocks[height].block_hash()); + } + BestBlock { + height: height as u32, + block_hash: self.blocks[height].block_hash(), + previous_blocks, + } + } + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { assert!(!self.blocks.is_empty()); assert!(height < self.blocks.len()); @@ -123,6 +135,11 @@ impl Blockchain { self.at_height(self.blocks.len() - 1) } + pub fn best_block(&self) -> BestBlock { + assert!(!self.blocks.is_empty()); + self.best_block_at_height(self.blocks.len() - 1) + } + pub fn disconnect_tip(&mut self) -> Option { self.blocks.pop() } From 2073860447e08233640ea03868443081d7588c4b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:20 +0000 Subject: [PATCH 07/16] Make the `Cache` trait priv, just use `UnboundedCache` publicly In the previous commit, we moved to relying on `BestBlock::previous_blocks` to find the fork point in `lightning-block-sync`'s `init::synchronize_listeners`. Here we now drop the `Cache` parameter as we no longer rely on it. Because we now have no reason to want a persistent `Cache`, we remove the trait from the public interface. However, to keep disconnections reliable we return the `UnboundedCache` we built up during initial sync from `init::synchronize_listeners` which we expect developers to pass to `SpvClient::new`. --- lightning-block-sync/src/init.rs | 94 +++++++------------------------- lightning-block-sync/src/lib.rs | 61 ++++++++++++--------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 4fdd3efabd8..5e1c344e3f5 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,7 +2,7 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; use bitcoin::block::Header; use bitcoin::hash_types::BlockHash; @@ -32,9 +32,9 @@ where /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. /// -/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of -/// failure, each listener may be left at a different block hash than the one it was originally -/// paired with. +/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In +/// the case of failure, each listener may be left at a different block hash than the one it was +/// originally paired with. /// /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before /// switching to [`SpvClient`]. For example: @@ -114,14 +114,13 @@ where /// }; /// /// // Synchronize any channel monitors and the channel manager to be on the best block. -/// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ /// (monitor_best_block, &monitor_listener as &dyn chain::Listen), /// (manager_best_block, &manager as &dyn chain::Listen), /// ]; -/// let chain_tip = init::synchronize_listeners( -/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// let (chain_cache, chain_tip) = init::synchronize_listeners( +/// block_source, Network::Bitcoin, listeners).await.unwrap(); /// /// // Allow the chain monitor to watch any channels. /// let monitor = monitor_listener.0; @@ -130,21 +129,16 @@ where /// // Create an SPV client to notify the chain monitor and channel manager of block events. /// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); /// let mut chain_listener = (chain_monitor, &manager); -/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener); /// } /// ``` /// /// [`SpvClient`]: crate::SpvClient /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -pub async fn synchronize_listeners< - B: Deref + Sized + Send + Sync, - C: Cache, - L: chain::Listen + ?Sized, ->( - block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult +pub async fn synchronize_listeners( + block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, +) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -155,12 +149,13 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); + let mut header_cache = UnboundedCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. - let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = + ChainNotifier { header_cache: &mut header_cache, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -181,32 +176,14 @@ where // Connect new blocks for all listeners at once to avoid re-fetching blocks. if let Some(common_ancestor) = most_common_ancestor { let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; chain_notifier .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) .await .map_err(|(e, _)| e)?; } - Ok(best_header) -} - -/// A wrapper to make a cache read-only. -/// -/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one -/// listener. -struct ReadOnlyCache<'a, C: Cache>(&'a mut C); - -impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.0.look_up(block_hash) - } - - fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { - unreachable!() - } - - fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} + Ok((header_cache, best_header)) } /// Wrapper for supporting dynamically sized chain listeners. @@ -274,9 +251,8 @@ mod tests { (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; - let mut cache = chain.header_cache(0..=4); - match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, chain.tip()), + match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -306,11 +282,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -343,33 +316,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - - #[tokio::test] - async fn cache_connected_and_keep_disconnected_blocks() { - let main_chain = Blockchain::default().with_height(2); - let fork_chain = main_chain.fork_at_height(1); - let new_tip = main_chain.tip(); - let old_best_block = fork_chain.best_block(); - - let listener = MockChainListener::new() - .expect_blocks_disconnected(*fork_chain.at_height(1)) - .expect_block_connected(*new_tip); - - let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; - let mut cache = fork_chain.header_cache(2..=2); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(_) => { - assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_best_block.block_hash)); - }, + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index ad2cb4215b3..4ed8a466629 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -170,18 +170,13 @@ pub enum BlockData { /// sources for the best chain tip. During this process it detects any chain forks, determines which /// constitutes the best chain, and updates the listener accordingly with any blocks that were /// connected or disconnected since the last poll. -/// -/// Block headers for the best chain are maintained in the parameterized cache, allowing for a -/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. -/// Hence, there is a trade-off between a lower memory footprint and potentially increased network -/// I/O as headers are re-fetched during fork detection. -pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +pub struct SpvClient where L::Target: chain::Listen, { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier<'a, C, L>, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -194,7 +189,7 @@ where /// Implementations may define how long to retain headers such that it's unlikely they will ever be /// needed to disconnect a block. In cases where block sources provide access to headers on stale /// forks reliably, caches may be entirely unnecessary. -pub trait Cache { +pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; @@ -226,7 +221,21 @@ impl Cache for UnboundedCache { } } -impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> +impl Cache for &mut UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); + } +} + +impl SpvClient where L::Target: chain::Listen, { @@ -241,7 +250,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -295,15 +304,15 @@ where /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: lightning::chain::Listen -pub struct ChainNotifier<'a, C: Cache, L: Deref> +pub(crate) struct ChainNotifier where L::Target: chain::Listen, { /// Cache for looking up headers before fetching from a block source. - header_cache: &'a mut C, + pub(crate) header_cache: C, /// Listener that will be notified of connected or disconnected blocks. - chain_listener: L, + pub(crate) chain_listener: L, } /// Changes made to the chain between subsequent polls that transformed it from having one chain tip @@ -321,7 +330,7 @@ struct ChainDifference { connected_blocks: Vec, } -impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +impl ChainNotifier where L::Target: chain::Listen, { @@ -481,9 +490,9 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -500,9 +509,9 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -520,9 +529,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -540,9 +549,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -560,9 +569,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -581,9 +590,9 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { From 63df6814d1cc8beb66c83c0b1a1f15c8203bcc69 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:58 +0000 Subject: [PATCH 08/16] Make `UnboundedCache` bounded In the previous commit we moved to hard-coding `UnboundedCache` in the `lightning-block-sync` interface. This is great, except that its an unbounded cache that can use arbitrary amounts of memory (though never really all that much - its just headers that come in while we're running). Here we simply limit the size, and while we're at it give it a more generic `HeaderCache` name. --- lightning-block-sync/src/init.rs | 7 ++-- lightning-block-sync/src/lib.rs | 49 ++++++++++++++++---------- lightning-block-sync/src/test_utils.rs | 9 ++--- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 5e1c344e3f5..ddb90d6d97f 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,10 +2,9 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; +use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; -use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; use lightning::chain; @@ -138,7 +137,7 @@ where /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor pub async fn synchronize_listeners( block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> +) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -149,7 +148,7 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - let mut header_cache = UnboundedCache::new(); + let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 4ed8a466629..8f8bd84fd2a 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -176,7 +176,7 @@ where { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -204,34 +204,47 @@ pub(crate) trait Cache { fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } -/// Unbounded cache of block headers keyed by block hash. -pub type UnboundedCache = std::collections::HashMap; +/// Bounded cache of block headers keyed by block hash. +/// +/// Retains only the last `ANTI_REORG_DELAY * 2` block headers based on height. +pub struct HeaderCache(std::collections::HashMap); -impl Cache for UnboundedCache { +impl HeaderCache { + /// Creates a new empty header cache. + pub fn new() -> Self { + Self(std::collections::HashMap::new()) + } +} + +impl Cache for HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + self.0.insert(block_hash, block_header); + + // Remove headers older than a week. + let cutoff_height = block_header.height.saturating_sub(6 * 24 * 7); + self.0.retain(|_, header| header.height >= cutoff_height); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height < fork_point.height); } } -impl Cache for &mut UnboundedCache { +impl Cache for &mut HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + (*self).block_connected(block_hash, block_header); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height < fork_point.height); } } @@ -250,7 +263,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -490,7 +503,7 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -509,7 +522,7 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -529,7 +542,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -549,7 +562,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -569,7 +582,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -590,7 +603,7 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 3d7870afb1e..89cb3e81d60 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,6 +1,7 @@ use crate::poll::{Validate, ValidatedBlockHeader}; use crate::{ - BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, UnboundedCache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, Cache, + HeaderCache, }; use bitcoin::block::{Block, Header, Version}; @@ -144,12 +145,12 @@ impl Blockchain { self.blocks.pop() } - pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { - let mut cache = UnboundedCache::new(); + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> HeaderCache { + let mut cache = HeaderCache::new(); for i in heights { let value = self.at_height(i); let key = value.header.block_hash(); - assert!(cache.insert(key, value).is_none()); + cache.block_connected(key, value); } cache } From 2e7582466b473cfb3c165fc201e6342bf99b9937 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:11 +0000 Subject: [PATCH 09/16] f correct cache retention on reorg --- lightning-block-sync/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 8f8bd84fd2a..37d87339e06 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -230,7 +230,7 @@ impl Cache for HeaderCache { } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.0.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } @@ -244,7 +244,7 @@ impl Cache for &mut HeaderCache { } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.0.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } From de829767211c11912bec1ad0f6689d544f091086 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 16:51:38 +0000 Subject: [PATCH 10/16] f correct + constify header cache limit --- lightning-block-sync/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 37d87339e06..5f1ae88fcc5 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -204,9 +204,12 @@ pub(crate) trait Cache { fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } +/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`]. +pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7; + /// Bounded cache of block headers keyed by block hash. /// -/// Retains only the last `ANTI_REORG_DELAY * 2` block headers based on height. +/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height. pub struct HeaderCache(std::collections::HashMap); impl HeaderCache { @@ -225,7 +228,7 @@ impl Cache for HeaderCache { self.0.insert(block_hash, block_header); // Remove headers older than a week. - let cutoff_height = block_header.height.saturating_sub(6 * 24 * 7); + let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT); self.0.retain(|_, header| header.height >= cutoff_height); } From c91cdacc3bc99f0f5bb982f67d8c2817e7b1f5c7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 01:10:30 +0000 Subject: [PATCH 11/16] Consolidate all the pub aync utils to `native_async` --- lightning-background-processor/src/lib.rs | 4 +-- lightning/src/chain/chainmonitor.rs | 3 +- lightning/src/events/bump_transaction/mod.rs | 2 +- lightning/src/events/bump_transaction/sync.rs | 3 +- lightning/src/sign/mod.rs | 2 +- lightning/src/util/async_poll.rs | 28 ----------------- lightning/src/util/mod.rs | 2 +- lightning/src/util/native_async.rs | 31 ++++++++++++++++++- lightning/src/util/persist.rs | 4 +-- lightning/src/util/test_utils.rs | 2 +- 10 files changed, 41 insertions(+), 40 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..f5434a90cb3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -53,9 +53,9 @@ use lightning::routing::utxo::UtxoLookup; use lightning::sign::{ ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender, }; -#[cfg(not(c_bindings))] -use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; +#[cfg(not(c_bindings))] +use lightning::util::native_async::MaybeSend; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..93c5a371128 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -51,10 +51,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::{EntropySource, PeerStorageKey, SignerProvider}; use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::types::features::{InitFeatures, NodeFeatures}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; diff --git a/lightning/src/events/bump_transaction/mod.rs b/lightning/src/events/bump_transaction/mod.rs index e141d9b8abc..4240a27f27f 100644 --- a/lightning/src/events/bump_transaction/mod.rs +++ b/lightning/src/events/bump_transaction/mod.rs @@ -37,8 +37,8 @@ use crate::sign::{ ChannelDerivationParameters, HTLCDescriptor, SignerProvider, P2WPKH_WITNESS_WEIGHT, }; use crate::sync::Mutex; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::amount::Amount; use bitcoin::consensus::Encodable; diff --git a/lightning/src/events/bump_transaction/sync.rs b/lightning/src/events/bump_transaction/sync.rs index 1328c2c1b3a..03ffa479097 100644 --- a/lightning/src/events/bump_transaction/sync.rs +++ b/lightning/src/events/bump_transaction/sync.rs @@ -18,8 +18,9 @@ use crate::chain::chaininterface::BroadcasterInterface; use crate::chain::ClaimId; use crate::prelude::*; use crate::sign::SignerProvider; -use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; +use crate::util::async_poll::dummy_waker; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut}; diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 26252c74dd2..f90f966331d 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -58,7 +58,7 @@ use crate::ln::script::ShutdownScript; use crate::offers::invoice::UnsignedBolt12Invoice; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; -use crate::util::async_poll::MaybeSend; +use crate::util::native_async::MaybeSend; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 57df5b26cb0..23ca1aad603 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -164,31 +164,3 @@ const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } - -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Sync as MaybeSync; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSync {} -#[cfg(not(feature = "std"))] -impl MaybeSync for T where T: ?Sized {} - -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Send as MaybeSend; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSend {} -#[cfg(not(feature = "std"))] -impl MaybeSend for T where T: ?Sized {} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index dcbea904b51..51e608d185f 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,7 +20,7 @@ pub mod mut_global; pub mod anchor_channel_reserves; -pub mod async_poll; +pub(crate) mod async_poll; #[cfg(fuzzing)] pub mod base32; #[cfg(not(fuzzing))] diff --git a/lightning/src/util/native_async.rs b/lightning/src/util/native_async.rs index 0c380f2b1d1..31b07c2f3b5 100644 --- a/lightning/src/util/native_async.rs +++ b/lightning/src/util/native_async.rs @@ -9,8 +9,9 @@ #[cfg(all(test, feature = "std"))] use crate::sync::{Arc, Mutex}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; +#[cfg(test)] +use alloc::boxed::Box; #[cfg(all(test, not(feature = "std")))] use alloc::rc::Rc; @@ -53,6 +54,34 @@ trait MaybeSendableFuture: Future + MaybeSend + 'static {} #[cfg(test)] impl + MaybeSend + 'static> MaybeSendableFuture for F {} +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Sync as MaybeSync; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSync {} +#[cfg(not(feature = "std"))] +impl MaybeSync for T where T: ?Sized {} + +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Send as MaybeSend; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSend {} +#[cfg(not(feature = "std"))] +impl MaybeSend for T where T: ?Sized {} + /// A simple [`FutureSpawner`] which holds [`Future`]s until they are manually polled via /// [`Self::poll_futures`]. #[cfg(all(test, feature = "std"))] diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 46d522b2285..5b3a88c856d 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -37,10 +37,10 @@ use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; use crate::util::async_poll::{ - dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, + dummy_waker, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, }; use crate::util::logger::Logger; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::wakers::Notifier; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index ab749910323..b873ba48810 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,7 +51,6 @@ use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; -use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -59,6 +58,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; +use crate::util::native_async::MaybeSend; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; From f36e1ffdd0f608e2432b1a873ca5a3c1fac6fe41 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 18 Oct 2025 01:36:01 +0000 Subject: [PATCH 12/16] Add `async_poll.rs` to `lightning-block-sync` In the next commit we'll fetch blocks during initial connection in parallel, which requires a multi-future poller. Here we add a symlink to the existing `lightning` `async_poll.rs` file, making it available in `lightning-block-sync` --- lightning-block-sync/src/async_poll.rs | 1 + lightning-block-sync/src/lib.rs | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 120000 lightning-block-sync/src/async_poll.rs diff --git a/lightning-block-sync/src/async_poll.rs b/lightning-block-sync/src/async_poll.rs new file mode 120000 index 00000000000..eb85cdac697 --- /dev/null +++ b/lightning-block-sync/src/async_poll.rs @@ -0,0 +1 @@ +../../lightning/src/util/async_poll.rs \ No newline at end of file diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 5f1ae88fcc5..7788103e6a2 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -16,9 +16,11 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +extern crate alloc; +extern crate core; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; @@ -42,6 +44,9 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +#[allow(unused)] +mod async_poll; + use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::block::{Block, Header}; From 1eabd3f071bc534e00ba3c9e85d17607906020af Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 7 Dec 2025 23:30:57 +0000 Subject: [PATCH 13/16] Fetch blocks from source in parallel during initial sync In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial. --- lightning-block-sync/src/init.rs | 85 +++++++++++++++++--------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index ddb90d6d97f..0a0ee3c4bc1 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -1,8 +1,9 @@ //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects //! from disk. -use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; +use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; +use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -146,7 +147,6 @@ where // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); - let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { @@ -167,19 +167,53 @@ where // Keep track of the most common ancestor and all blocks connected across all listeners. chain_listeners_at_height.push((common_ancestor.height, chain_listener)); if connected_blocks.len() > most_connected_blocks.len() { - most_common_ancestor = Some(common_ancestor); most_connected_blocks = connected_blocks; } } - // Connect new blocks for all listeners at once to avoid re-fetching blocks. - if let Some(common_ancestor) = most_common_ancestor { - let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; - chain_notifier - .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) - .await - .map_err(|(e, _)| e)?; + while !most_connected_blocks.is_empty() { + #[cfg(not(test))] + const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded + #[cfg(test)] + const MAX_BLOCKS_AT_ONCE: usize = 2; + + let mut fetch_block_futures = + Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len())); + for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) { + let fetch_future = chain_poller.fetch_block(header); + fetch_block_futures + .push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) }))); + } + let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); + + let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; + for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { + *result = Some((header.height, block_res?)); + } + debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); + debug_assert!(fetched_blocks + .is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX))); + + for (listener_height, listener) in chain_listeners_at_height.iter() { + // Connect blocks for this listener. + for result in fetched_blocks.iter() { + if let Some((height, block_data)) = result { + if *height > *listener_height { + match &**block_data { + BlockData::FullBlock(block) => { + listener.block_connected(&block, *height); + }, + BlockData::HeaderOnly(header_data) => { + listener.filtered_block_connected(&header_data, &[], *height); + }, + } + } + } + } + } + + most_connected_blocks + .truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE)); } Ok((header_cache, best_header)) @@ -200,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } -/// A set of dynamically sized chain listeners, each paired with a starting block height. -struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); - -impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { - fn block_connected(&self, block: &bitcoin::Block, height: u32) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.block_connected(block, height); - } - } - } - - fn filtered_block_connected( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.filtered_block_connected(header, txdata, height); - } - } - } - - fn blocks_disconnected(&self, _fork_point: BestBlock) { - unreachable!() - } -} - #[cfg(test)] mod tests { use super::*; From c12f01c7709e655e7a4502b07b037504c75ccb3d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 12:18:01 +0000 Subject: [PATCH 14/16] Silence "elided lifetime has a name" warnings in no-std locking --- lightning/src/sync/nostd_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 12070741918..18055d1ebe4 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -61,7 +61,7 @@ impl<'a, T: 'a> LockTestExt<'a> for Mutex { } type ExclLock = MutexGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard<'a, T> { self.lock().unwrap() } } @@ -132,7 +132,7 @@ impl<'a, T: 'a> LockTestExt<'a> for RwLock { } type ExclLock = RwLockWriteGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard<'a, T> { self.write().unwrap() } } From 458f70938e69cc8cd734bae2f8e085cd8651a5fb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 14:15:47 +0000 Subject: [PATCH 15/16] Use the header cache across listeners during initial disconnect In `lightning-blocksync::init::synchronize_listeners`, we may have many listeners we want to do a chain diff on. When doing so, we should make sure we utilize our header cache, rather than querying our chain source for every header we need for each listener. Here we do so, inserting into the cache as we do chain diffs. On my node with a bitcoind on localhost, this brings the calculate-differences step of `init::synchronize_listeners` from ~500ms to under 150ms. --- lightning-block-sync/src/lib.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 7788103e6a2..bb3fcc7f1ff 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -198,6 +198,10 @@ pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + /// Inserts the given block header during a find_difference operation, implying it might not be + /// the best header. + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + /// Called when a block has been connected to the best chain to ensure it is available to be /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); @@ -229,6 +233,15 @@ impl Cache for HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert(block_hash, block_header); + + // Remove headers older than our newest header minus a week. + let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0); + let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { self.0.insert(block_hash, block_header); @@ -247,6 +260,10 @@ impl Cache for &mut HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + (*self).insert_during_diff(block_hash, block_header); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { (*self).block_connected(block_hash, block_header); } @@ -383,7 +400,7 @@ where /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` /// field as fallback if needed, then finds the common ancestor. async fn find_difference_from_best_block( - &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + &mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, chain_poller: &mut P, ) -> BlockSourceResult { // Try to resolve the header for the previous best block. First try the block_hash, @@ -408,6 +425,7 @@ where )?; if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { found_header = Some(header); + self.header_cache.insert_during_diff(*block_hash, header); break; } } From 6bd7c74a04b205692cbdc319c9faeb3515d7543b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 17:22:06 +0000 Subject: [PATCH 16/16] Include recent blocks in the `synchronize_listeners`-returned cache When `synchronize_listeners` runs, it returns a cache of the headers it needed when doing chain difference-finding. This allows us to ensure that when we start running normally we have all the recent headers in case we need them to reorg. Sadly, in some cases it was returning a mostly-empty cache. Because it was only being filled during block difference reconciliation it would only get a block around each listener's fork point. Worse, because we were calling `disconnect_blocks` with the cache the cache would assume we were reorging against the main chain and drop blocks we actually want. Instead, we avoid dropping blocks on `disconnect_blocks` calls and ensure we always add connected blocks to the cache. --- lightning-block-sync/src/init.rs | 62 ++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 0a0ee3c4bc1..60804cc9f80 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -3,7 +3,7 @@ use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; -use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::{BlockData, BlockSource, BlockSourceResult, Cache, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -153,8 +153,9 @@ where // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); + let mut cache_wrapper = HeaderCacheNoDisconnect(&mut header_cache); let mut chain_notifier = - ChainNotifier { header_cache: &mut header_cache, chain_listener }; + ChainNotifier { header_cache: &mut cache_wrapper, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -188,7 +189,9 @@ where let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { - *result = Some((header.height, block_res?)); + let block = block_res?; + header_cache.block_connected(header.block_hash, *header); + *result = Some((header.height, block)); } debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); debug_assert!(fetched_blocks @@ -234,10 +237,34 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } +/// Wrapper around HeaderCache that ignores `blocks_disconnected` calls, retaining disconnected +/// blocks in the cache. This is useful during initial sync to keep headers available across +/// multiple listeners. +struct HeaderCacheNoDisconnect<'a>(&'a mut HeaderCache); + +impl<'a> crate::Cache for &mut HeaderCacheNoDisconnect<'a> { + fn look_up(&self, block_hash: &bitcoin::hash_types::BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn insert_during_diff(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert_during_diff(block_hash, block_header); + } + + fn block_connected(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.block_connected(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) { + // Intentionally ignore disconnections to retain blocks in cache + } +} + #[cfg(test)] mod tests { use super::*; use crate::test_utils::{Blockchain, MockChainListener}; + use crate::Cache; #[tokio::test] async fn sync_from_same_chain() { @@ -258,7 +285,13 @@ mod tests { (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, chain.tip()); + assert!(cache.look_up(&chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(4).block_hash).is_some()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -289,7 +322,15 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_2.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_3.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -323,7 +364,16 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(4).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } }