diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..f5434a90cb3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -53,9 +53,9 @@ use lightning::routing::utxo::UtxoLookup; use lightning::sign::{ ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender, }; -#[cfg(not(c_bindings))] -use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; +#[cfg(not(c_bindings))] +use lightning::util::native_async::MaybeSend; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, diff --git a/lightning-block-sync/src/async_poll.rs b/lightning-block-sync/src/async_poll.rs new file mode 120000 index 00000000000..eb85cdac697 --- /dev/null +++ b/lightning-block-sync/src/async_poll.rs @@ -0,0 +1 @@ +../../lightning/src/util/async_poll.rs \ No newline at end of file diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index a870f8ca88c..60804cc9f80 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -1,11 +1,11 @@ //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects //! from disk. -use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; +use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; +use crate::{BlockData, BlockSource, BlockSourceResult, Cache, ChainNotifier, HeaderCache}; use bitcoin::block::Header; -use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; use lightning::chain; @@ -32,19 +32,18 @@ where /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. /// -/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of -/// failure, each listener may be left at a different block hash than the one it was originally -/// paired with. +/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In +/// the case of failure, each listener may be left at a different block hash than the one it was +/// originally paired with. /// /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before /// switching to [`SpvClient`]. For example: /// /// ``` -/// use bitcoin::hash_types::BlockHash; /// use bitcoin::network::Network; /// /// use lightning::chain; -/// use lightning::chain::Watch; +/// use lightning::chain::{BestBlock, Watch}; /// use lightning::chain::chainmonitor; /// use lightning::chain::chainmonitor::ChainMonitor; /// use lightning::chain::channelmonitor::ChannelMonitor; @@ -89,14 +88,14 @@ where /// logger: &L, /// persister: &P, /// ) { -/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// // Read a serialized channel monitor paired with the best block when it was persisted. /// let serialized_monitor = "..."; -/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor)>::read( /// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); /// -/// // Read the channel manager paired with the block hash when it was persisted. +/// // Read the channel manager paired with the best block when it was persisted. /// let serialized_manager = "..."; -/// let (manager_block_hash, mut manager) = { +/// let (manager_best_block, mut manager) = { /// let read_args = ChannelManagerReadArgs::new( /// entropy_source, /// node_signer, @@ -110,19 +109,18 @@ where /// config, /// vec![&mut monitor], /// ); -/// <(BlockHash, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( +/// <(BestBlock, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( /// &mut Cursor::new(&serialized_manager), read_args).unwrap() /// }; /// /// // Synchronize any channel monitors and the channel manager to be on the best block. -/// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block, &manager as &dyn chain::Listen), /// ]; -/// let chain_tip = init::synchronize_listeners( -/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// let (chain_cache, chain_tip) = init::synchronize_listeners( +/// block_source, Network::Bitcoin, listeners).await.unwrap(); /// /// // Allow the chain monitor to watch any channels. /// let monitor = monitor_listener.0; @@ -131,94 +129,97 @@ where /// // Create an SPV client to notify the chain monitor and channel manager of block events. /// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); /// let mut chain_listener = (chain_monitor, &manager); -/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener); /// } /// ``` /// /// [`SpvClient`]: crate::SpvClient /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -pub async fn synchronize_listeners< - B: Deref + Sized + Send + Sync, - C: Cache, - L: chain::Listen + ?Sized, ->( - block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BlockHash, &L)>, -) -> BlockSourceResult +pub async fn synchronize_listeners( + block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, +) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)> where B::Target: BlockSource, { let best_header = validate_best_block_header(&*block_source).await?; - // Fetch the header for the block hash paired with each listener. - let mut chain_listeners_with_old_headers = Vec::new(); - for (old_block_hash, chain_listener) in chain_listeners.drain(..) { - let old_header = match header_cache.look_up(&old_block_hash) { - Some(header) => *header, - None => { - block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? - }, - }; - chain_listeners_with_old_headers.push((old_header, chain_listener)) - } - // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); - let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + let mut header_cache = HeaderCache::new(); + for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. - let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; - let difference = - chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - chain_notifier.disconnect_blocks(difference.disconnected_blocks); + let mut cache_wrapper = HeaderCacheNoDisconnect(&mut header_cache); + let mut chain_notifier = + ChainNotifier { header_cache: &mut cache_wrapper, chain_listener }; + let difference = chain_notifier + .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) + .await?; + if difference.common_ancestor.block_hash != old_best_block.block_hash { + chain_notifier.disconnect_blocks(difference.common_ancestor); + } (difference.common_ancestor, difference.connected_blocks) }; // Keep track of the most common ancestor and all blocks connected across all listeners. chain_listeners_at_height.push((common_ancestor.height, chain_listener)); if connected_blocks.len() > most_connected_blocks.len() { - most_common_ancestor = Some(common_ancestor); most_connected_blocks = connected_blocks; } } - // Connect new blocks for all listeners at once to avoid re-fetching blocks. - if let Some(common_ancestor) = most_common_ancestor { - let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; - chain_notifier - .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) - .await - .map_err(|(e, _)| e)?; - } - - Ok(best_header) -} - -/// A wrapper to make a cache read-only. -/// -/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one -/// listener. -struct ReadOnlyCache<'a, C: Cache>(&'a mut C); + while !most_connected_blocks.is_empty() { + #[cfg(not(test))] + const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded + #[cfg(test)] + const MAX_BLOCKS_AT_ONCE: usize = 2; + + let mut fetch_block_futures = + Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len())); + for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) { + let fetch_future = chain_poller.fetch_block(header); + fetch_block_futures + .push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) }))); + } + let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); -impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.0.look_up(block_hash) - } + let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; + for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { + let block = block_res?; + header_cache.block_connected(header.block_hash, *header); + *result = Some((header.height, block)); + } + debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); + debug_assert!(fetched_blocks + .is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX))); + + for (listener_height, listener) in chain_listeners_at_height.iter() { + // Connect blocks for this listener. + for result in fetched_blocks.iter() { + if let Some((height, block_data)) = result { + if *height > *listener_height { + match &**block_data { + BlockData::FullBlock(block) => { + listener.block_connected(&block, *height); + }, + BlockData::HeaderOnly(header_data) => { + listener.filtered_block_connected(&header_data, &[], *height); + }, + } + } + } + } + } - fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { - unreachable!() + most_connected_blocks + .truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE)); } - fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { - None - } + Ok((header_cache, best_header)) } /// Wrapper for supporting dynamically sized chain listeners. @@ -236,30 +237,26 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } -/// A set of dynamically sized chain listeners, each paired with a starting block height. -struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); +/// Wrapper around HeaderCache that ignores `blocks_disconnected` calls, retaining disconnected +/// blocks in the cache. This is useful during initial sync to keep headers available across +/// multiple listeners. +struct HeaderCacheNoDisconnect<'a>(&'a mut HeaderCache); -impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { - fn block_connected(&self, block: &bitcoin::Block, height: u32) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.block_connected(block, height); - } - } +impl<'a> crate::Cache for &mut HeaderCacheNoDisconnect<'a> { + fn look_up(&self, block_hash: &bitcoin::hash_types::BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) } - fn filtered_block_connected( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.filtered_block_connected(header, txdata, height); - } - } + fn insert_during_diff(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert_during_diff(block_hash, block_header); } - fn blocks_disconnected(&self, _fork_point: BestBlock) { - unreachable!() + fn block_connected(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.block_connected(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) { + // Intentionally ignore disconnections to retain blocks in cache } } @@ -267,6 +264,7 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { mod tests { use super::*; use crate::test_utils::{Blockchain, MockChainListener}; + use crate::Cache; #[tokio::test] async fn sync_from_same_chain() { @@ -282,13 +280,18 @@ mod tests { let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); let listeners = vec![ - (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), - (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen), - (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen), + (chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen), + (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), + (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; - let mut cache = chain.header_cache(0..=4); - match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, chain.tip()), + match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { + Ok((cache, header)) => { + assert_eq!(header, chain.tip()); + assert!(cache.look_up(&chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(4).block_hash).is_some()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -314,15 +317,20 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_2.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_3.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -351,36 +359,20 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - - #[tokio::test] - async fn cache_connected_and_keep_disconnected_blocks() { - let main_chain = Blockchain::default().with_height(2); - let fork_chain = main_chain.fork_at_height(1); - let new_tip = main_chain.tip(); - let old_tip = fork_chain.tip(); - - let listener = MockChainListener::new() - .expect_blocks_disconnected(*fork_chain.at_height(1)) - .expect_block_connected(*new_tip); - - let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)]; - let mut cache = fork_chain.header_cache(2..=2); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(_) => { - assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_tip.block_hash)); + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(4).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(4).block_hash).is_none()); }, Err(e) => panic!("Unexpected error: {:?}", e), } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 02593047658..bb3fcc7f1ff 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -16,9 +16,11 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +extern crate alloc; +extern crate core; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; @@ -42,6 +44,9 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +#[allow(unused)] +mod async_poll; + use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::block::{Block, Header}; @@ -170,18 +175,13 @@ pub enum BlockData { /// sources for the best chain tip. During this process it detects any chain forks, determines which /// constitutes the best chain, and updates the listener accordingly with any blocks that were /// connected or disconnected since the last poll. -/// -/// Block headers for the best chain are maintained in the parameterized cache, allowing for a -/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. -/// Hence, there is a trade-off between a lower memory footprint and potentially increased network -/// I/O as headers are re-fetched during fork detection. -pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +pub struct SpvClient where L::Target: chain::Listen, { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier<'a, C, L>, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -194,37 +194,86 @@ where /// Implementations may define how long to retain headers such that it's unlikely they will ever be /// needed to disconnect a block. In cases where block sources provide access to headers on stale /// forks reliably, caches may be entirely unnecessary. -pub trait Cache { +pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + /// Inserts the given block header during a find_difference operation, implying it might not be + /// the best header. + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + /// Called when a block has been connected to the best chain to ensure it is available to be /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); - /// Called when a block has been disconnected from the best chain. Once disconnected, a block's - /// header is no longer needed and thus can be removed. - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; + /// Called when blocks have been disconnected from the best chain. Only the fork point + /// (best comon ancestor) is provided. + /// + /// Once disconnected, a block's header is no longer needed and thus can be removed. + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); +} + +/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`]. +pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7; + +/// Bounded cache of block headers keyed by block hash. +/// +/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height. +pub struct HeaderCache(std::collections::HashMap); + +impl HeaderCache { + /// Creates a new empty header cache. + pub fn new() -> Self { + Self(std::collections::HashMap::new()) + } } -/// Unbounded cache of block headers keyed by block hash. -pub type UnboundedCache = std::collections::HashMap; +impl Cache for HeaderCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.get(block_hash) + } + + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert(block_hash, block_header); + + // Remove headers older than our newest header minus a week. + let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0); + let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert(block_hash, block_header); + + // Remove headers older than a week. + let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); + } + + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.0.retain(|_, block_info| block_info.height <= fork_point.height); + } +} -impl Cache for UnboundedCache { +impl Cache for &mut HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) + } + + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + (*self).insert_during_diff(block_hash, block_header); } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + (*self).block_connected(block_hash, block_header); } - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.remove(block_hash) + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } -impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> +impl SpvClient where L::Target: chain::Listen, { @@ -239,7 +288,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -293,15 +342,15 @@ where /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: lightning::chain::Listen -pub struct ChainNotifier<'a, C: Cache, L: Deref> +pub(crate) struct ChainNotifier where L::Target: chain::Listen, { /// Cache for looking up headers before fetching from a block source. - header_cache: &'a mut C, + pub(crate) header_cache: C, /// Listener that will be notified of connected or disconnected blocks. - chain_listener: L, + pub(crate) chain_listener: L, } /// Changes made to the chain between subsequent polls that transformed it from having one chain tip @@ -315,14 +364,11 @@ struct ChainDifference { /// If there are any disconnected blocks, this is where the chain forked. common_ancestor: ValidatedBlockHeader, - /// Blocks that were disconnected from the chain since the last poll. - disconnected_blocks: Vec, - /// Blocks that were connected to the chain since the last poll. connected_blocks: Vec, } -impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +impl ChainNotifier where L::Target: chain::Listen, { @@ -338,23 +384,66 @@ where chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { let difference = self - .find_difference(new_header, old_header, chain_poller) + .find_difference_from_header(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; - self.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != *old_header { + self.disconnect_blocks(difference.common_ancestor); + } self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) .await } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_best_block` as its tip. + /// + /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` + /// field as fallback if needed, then finds the common ancestor. + async fn find_difference_from_best_block( + &mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + chain_poller: &mut P, + ) -> BlockSourceResult { + // Try to resolve the header for the previous best block. First try the block_hash, + // then fall back to previous_blocks if that fails. + let cur_tip = core::iter::once((0, &prev_best_block.block_hash)); + let prev_tips = + prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| { + if let Some(block_hash) = hash_opt { + Some((idx as u32 + 1, block_hash)) + } else { + None + } + }); + let mut found_header = None; + for (height_diff, block_hash) in cur_tip.chain(prev_tips) { + if let Some(header) = self.header_cache.look_up(block_hash) { + found_header = Some(*header); + break; + } + let height = prev_best_block.height.checked_sub(height_diff).ok_or( + BlockSourceError::persistent("BestBlock had more previous_blocks than its height"), + )?; + if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { + found_header = Some(header); + self.header_cache.insert_during_diff(*block_hash, header); + break; + } + } + let found_header = found_header.ok_or_else(|| { + BlockSourceError::persistent("could not resolve any block from BestBlock") + })?; + + self.find_difference_from_header(current_header, &found_header, chain_poller).await + } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the /// chain with `prev_header` as its tip. /// /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. - async fn find_difference( + async fn find_difference_from_header( &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { - let mut disconnected_blocks = Vec::new(); let mut connected_blocks = Vec::new(); let mut current = current_header; let mut previous = *prev_header; @@ -369,7 +458,6 @@ where let current_height = current.height; let previous_height = previous.height; if current_height <= previous_height { - disconnected_blocks.push(previous); previous = self.look_up_previous_header(chain_poller, &previous).await?; } if current_height >= previous_height { @@ -379,7 +467,7 @@ where } let common_ancestor = current; - Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + Ok(ChainDifference { common_ancestor, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -394,16 +482,10 @@ where } /// Notifies the chain listeners of disconnected blocks. - fn disconnect_blocks(&mut self, disconnected_blocks: Vec) { - for header in disconnected_blocks.iter() { - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, *header); - } - } - if let Some(block) = disconnected_blocks.last() { - let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1); - self.chain_listener.blocks_disconnected(fork_point); - } + fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) { + self.header_cache.blocks_disconnected(&fork_point); + let best_block = BestBlock::new(fork_point.block_hash, fork_point.height); + self.chain_listener.blocks_disconnected(best_block); } /// Notifies the chain listeners of connected blocks. @@ -447,9 +529,9 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -466,9 +548,9 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -486,9 +568,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -506,9 +588,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -526,9 +608,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -547,9 +629,9 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 13e0403c3b6..fd8c546c56f 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -31,6 +31,11 @@ pub trait Poll { fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, ) -> impl Future> + Send + 'a; + + /// Returns the header for a given hash and optional height hint. + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -258,6 +263,14 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll ) -> impl Future> + Send + 'a { async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) } } + + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a { + Box::pin(async move { + self.block_source.get_header(block_hash, height_hint).await?.validate(*block_hash) + }) + } } #[cfg(test)] diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 40788e4d08c..89cb3e81d60 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,6 +1,7 @@ use crate::poll::{Validate, ValidatedBlockHeader}; use crate::{ - BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, UnboundedCache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, Cache, + HeaderCache, }; use bitcoin::block::{Block, Header, Version}; @@ -104,6 +105,18 @@ impl Blockchain { block_header.validate(block_hash).unwrap() } + pub fn best_block_at_height(&self, height: usize) -> BestBlock { + let mut previous_blocks = [None; 12]; + for (i, height) in (0..height).rev().take(12).enumerate() { + previous_blocks[i] = Some(self.blocks[height].block_hash()); + } + BestBlock { + height: height as u32, + block_hash: self.blocks[height].block_hash(), + previous_blocks, + } + } + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { assert!(!self.blocks.is_empty()); assert!(height < self.blocks.len()); @@ -123,16 +136,21 @@ impl Blockchain { self.at_height(self.blocks.len() - 1) } + pub fn best_block(&self) -> BestBlock { + assert!(!self.blocks.is_empty()); + self.best_block_at_height(self.blocks.len() - 1) + } + pub fn disconnect_tip(&mut self) -> Option { self.blocks.pop() } - pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { - let mut cache = UnboundedCache::new(); + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> HeaderCache { + let mut cache = HeaderCache::new(); for i in heights { let value = self.at_height(i); let key = value.header.block_hash(); - assert!(cache.insert(key, value).is_none()); + cache.block_connected(key, value); } cache } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..93c5a371128 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -51,10 +51,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::{EntropySource, PeerStorageKey, SignerProvider}; use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::types::features::{InitFeatures, NodeFeatures}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 515a3dc5f1d..1b63d9238f0 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1059,7 +1059,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. /// -/// Like the [`ChannelManager`], deserialization is implemented for `(BlockHash, ChannelMonitor)`, +/// Like the [`ChannelManager`], deserialization is implemented for `(BestBlock, ChannelMonitor)`, /// providing you with the last block hash which was connected before shutting down. You must begin /// syncing the chain from that point, disconnecting and connecting blocks as required to get to /// the best chain on startup. Note that all [`ChannelMonitor`]s passed to a [`ChainMonitor`] must @@ -1067,7 +1067,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// initialization. /// /// For those loading potentially-ancient [`ChannelMonitor`]s, deserialization is also implemented -/// for `Option<(BlockHash, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] +/// for `Option<(BestBlock, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] /// that was first created in LDK prior to 0.0.110 and last updated prior to LDK 0.0.119. In such /// cases, the `Option<(..)>` deserialization option may return `Ok(None)` rather than failing to /// deserialize, allowing you to differentiate between the two cases. @@ -6419,7 +6419,7 @@ where const MAX_ALLOC_SIZE: usize = 64 * 1024; impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for (BlockHash, ChannelMonitor) + for (BestBlock, ChannelMonitor) { fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { match >::read(reader, args) { @@ -6431,7 +6431,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for Option<(BlockHash, ChannelMonitor)> + for Option<(BestBlock, ChannelMonitor)> { #[rustfmt::skip] fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { @@ -6856,14 +6856,14 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP To continue, run a v0.1 release, send/route a payment over the channel or close it."); } } - Ok(Some((best_block.block_hash, monitor))) + Ok(Some((best_block, monitor))) } } #[cfg(test)] mod tests { use bitcoin::amount::Amount; - use bitcoin::hash_types::{BlockHash, Txid}; + use bitcoin::hash_types::Txid; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; @@ -6960,7 +6960,7 @@ mod tests { nodes[1].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, broadcast_tx)], conf_height); - let (_, pre_update_monitor) = <(BlockHash, ChannelMonitor<_>)>::read( + let (_, pre_update_monitor) = <(BestBlock, ChannelMonitor<_>)>::read( &mut io::Cursor::new(&get_monitor!(nodes[1], channel.2).encode()), (&nodes[1].keys_manager.backing, &nodes[1].keys_manager.backing)).unwrap(); diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index b4cc6a302ae..4ebff2ef81b 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,9 +18,10 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, +}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::impl_writeable_tlv_based; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::HTLCDescriptor; @@ -42,13 +43,20 @@ pub struct BestBlock { pub block_hash: BlockHash, /// The height at which the block was confirmed. pub height: u32, + /// Previous blocks immediately before [`Self::block_hash`], in reverse chronological order. + /// + /// These ensure we can find the fork point of a reorg if our block source no longer has the + /// previous best tip after a restart. + pub previous_blocks: [Option; ANTI_REORG_DELAY as usize * 2], } impl BestBlock { /// Constructs a `BestBlock` that represents the genesis block at height 0 of the given /// network. pub fn from_network(network: Network) -> Self { - BestBlock { block_hash: genesis_block(network).header.block_hash(), height: 0 } + let block_hash = genesis_block(network).header.block_hash(); + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height: 0, previous_blocks } } /// Returns a `BestBlock` as identified by the given block hash and height. @@ -56,13 +64,77 @@ impl BestBlock { /// This is not exported to bindings users directly as the bindings auto-generate an /// equivalent `new`. pub fn new(block_hash: BlockHash, height: u32) -> Self { - BestBlock { block_hash, height } + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height, previous_blocks } + } + + /// Advances to a new block at height [`Self::height`] + 1. + pub fn advance(&mut self, new_hash: BlockHash) { + // Shift all block hashes to the right (making room for the old tip at index 0) + for i in (1..self.previous_blocks.len()).rev() { + self.previous_blocks[i] = self.previous_blocks[i - 1]; + } + + // The old tip becomes the new index 0 (tip-1) + self.previous_blocks[0] = Some(self.block_hash); + + // Update to the new tip + self.block_hash = new_hash; + self.height += 1; + } + + /// Returns the block hash at the given height, if available in our history. + pub fn get_hash_at_height(&self, height: u32) -> Option { + if height > self.height { + return None; + } + if height == self.height { + return Some(self.block_hash); + } + + // offset = 1 means we want tip-1, which is block_hashes[0] + // offset = 2 means we want tip-2, which is block_hashes[1], etc. + let offset = self.height.saturating_sub(height) as usize; + if offset >= 1 && offset <= self.previous_blocks.len() { + self.previous_blocks[offset - 1] + } else { + None + } + } + + /// Find the most recent common ancestor between two BestBlocks by searching their block hash + /// histories. + /// + /// Returns the common block hash and height, or None if no common block is found in the + /// available histories. + pub fn find_common_ancestor(&self, other: &BestBlock) -> Option<(BlockHash, u32)> { + // First check if either tip matches + if self.block_hash == other.block_hash && self.height == other.height { + return Some((self.block_hash, self.height)); + } + + // Check all heights covered by self's history + let min_height = self.height.saturating_sub(self.previous_blocks.len() as u32); + for check_height in (min_height..=self.height).rev() { + if let Some(self_hash) = self.get_hash_at_height(check_height) { + if let Some(other_hash) = other.get_hash_at_height(check_height) { + if self_hash == other_hash { + return Some((self_hash, check_height)); + } + } + } + } + None } } impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), + // Note that any change to the previous_blocks array length will change the serialization + // format and thus it is specified without constants here. + (1, previous_blocks_read, (legacy, [Option; 6 * 2], |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; 6 * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the @@ -462,3 +534,45 @@ impl ClaimId { ClaimId(Sha256::from_engine(engine).to_byte_array()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + #[test] + fn test_best_block() { + let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); + let mut chain_a = BestBlock::new(hash1, 100); + let mut chain_b = BestBlock::new(hash1, 100); + + // Test get_hash_at_height on initial block + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), None); + assert_eq!(chain_a.get_hash_at_height(99), None); + + // Test find_common_ancestor with identical blocks + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + let hash2 = BlockHash::from_slice(&[2; 32]).unwrap(); + chain_a.advance(hash2); + assert_eq!(chain_a.height, 101); + assert_eq!(chain_a.block_hash, hash2); + assert_eq!(chain_a.previous_blocks[0], Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), Some(hash2)); + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + + // Test find_common_ancestor with different heights + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with diverged chains but the same height + let hash_b3 = BlockHash::from_slice(&[33; 32]).unwrap(); + chain_b.advance(hash_b3); + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with no common history + let hash_other = BlockHash::from_slice(&[99; 32]).unwrap(); + let chain_c = BestBlock::new(hash_other, 200); + assert_eq!(chain_a.find_common_ancestor(&chain_c), None); + } +} diff --git a/lightning/src/events/bump_transaction/mod.rs b/lightning/src/events/bump_transaction/mod.rs index e141d9b8abc..4240a27f27f 100644 --- a/lightning/src/events/bump_transaction/mod.rs +++ b/lightning/src/events/bump_transaction/mod.rs @@ -37,8 +37,8 @@ use crate::sign::{ ChannelDerivationParameters, HTLCDescriptor, SignerProvider, P2WPKH_WITNESS_WEIGHT, }; use crate::sync::Mutex; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::amount::Amount; use bitcoin::consensus::Encodable; diff --git a/lightning/src/events/bump_transaction/sync.rs b/lightning/src/events/bump_transaction/sync.rs index 1328c2c1b3a..03ffa479097 100644 --- a/lightning/src/events/bump_transaction/sync.rs +++ b/lightning/src/events/bump_transaction/sync.rs @@ -18,8 +18,9 @@ use crate::chain::chaininterface::BroadcasterInterface; use crate::chain::ClaimId; use crate::prelude::*; use crate::sign::SignerProvider; -use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; +use crate::util::async_poll::dummy_waker; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut}; diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index ff499d049d4..4e32ad46ebd 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -16,7 +16,7 @@ use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::chainmonitor::ChainMonitor; use crate::chain::channelmonitor::{ChannelMonitor, MonitorEvent, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::chain::{ChannelMonitorUpdateStatus, Listen, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Listen, Watch}; use crate::events::{ClosureReason, Event, HTLCHandlingFailureType, PaymentPurpose}; use crate::ln::channel::AnnouncementSigsState; use crate::ln::channelmanager::{PaymentId, RAACommitmentOrder, RecipientOnionFields, Retry}; @@ -94,7 +94,7 @@ fn test_monitor_and_persister_update_fail() { let chain_mon = { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan.2).unwrap(); - let (_, new_monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, new_monitor) = <(BestBlock, ChannelMonitor)>::read( &mut &monitor.encode()[..], (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 10c77505408..1b881213de2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1942,7 +1942,6 @@ where /// detailed in the [`ChannelManagerReadArgs`] documentation. /// /// ``` -/// use bitcoin::BlockHash; /// use bitcoin::network::Network; /// use lightning::chain::BestBlock; /// # use lightning::chain::channelmonitor::ChannelMonitor; @@ -1991,8 +1990,8 @@ where /// entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, /// router, message_router, logger, config, channel_monitors.iter().collect(), /// ); -/// let (block_hash, channel_manager) = -/// <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; +/// let (best_block, channel_manager) = +/// <(BestBlock, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; /// /// // Update the ChannelManager and ChannelMonitors with the latest chain data /// // ... @@ -2558,7 +2557,7 @@ where /// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds /// will be lost (modulo on-chain transaction fees). /// -/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which +/// Note that the deserializer is only implemented for `(`[`BestBlock`]`, `[`ChannelManager`]`)`, which /// tells you the last block hash which was connected. You should get the best block tip before using the manager. /// See [`chain::Listen`] and [`chain::Confirm`] for more details. /// @@ -2625,7 +2624,6 @@ where /// [`peer_disconnected`]: msgs::BaseMessageHandler::peer_disconnected /// [`funding_created`]: msgs::FundingCreated /// [`funding_transaction_generated`]: Self::funding_transaction_generated -/// [`BlockHash`]: bitcoin::hash_types::BlockHash /// [`update_channel`]: chain::Watch::update_channel /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`read`]: ReadableArgs::read @@ -17535,7 +17533,7 @@ impl< MR: Deref, L: Deref + Clone, > ReadableArgs> - for (BlockHash, Arc>) + for (BestBlock, Arc>) where M::Target: chain::Watch<::EcdsaSigner>, T::Target: BroadcasterInterface, @@ -17550,9 +17548,9 @@ where fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let (blockhash, chan_manager) = - <(BlockHash, ChannelManager)>::read(reader, args)?; - Ok((blockhash, Arc::new(chan_manager))) + let (best_block, chan_manager) = + <(BestBlock, ChannelManager)>::read(reader, args)?; + Ok((best_block, Arc::new(chan_manager))) } } @@ -17568,7 +17566,7 @@ impl< MR: Deref, L: Deref + Clone, > ReadableArgs> - for (BlockHash, ChannelManager) + for (BestBlock, ChannelManager) where M::Target: chain::Watch<::EcdsaSigner>, T::Target: BroadcasterInterface, @@ -19369,7 +19367,7 @@ where //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((best_block_hash.clone(), channel_manager)) + Ok((best_block, channel_manager)) } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 2cf5ea96acb..c3c657f226e 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -848,7 +848,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mon = self.chain_monitor.chain_monitor.get_monitor(channel_id).unwrap(); mon.write(&mut w).unwrap(); let (_, deserialized_monitor) = - <(BlockHash, ChannelMonitor)>::read( + <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -876,7 +876,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); <( - BlockHash, + BestBlock, ChannelManager< &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, @@ -1305,7 +1305,7 @@ pub fn _reload_node<'a, 'b, 'c>( let mut monitors_read = Vec::with_capacity(monitors_encoded.len()); for encoded in monitors_encoded { let mut monitor_read = &encoded[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read( &mut monitor_read, (node.keys_manager, node.keys_manager), ) @@ -1320,7 +1320,7 @@ pub fn _reload_node<'a, 'b, 'c>( for monitor in monitors_read.iter() { assert!(channel_monitors.insert(monitor.channel_id(), monitor).is_none()); } - <(BlockHash, TestChannelManager<'b, 'c>)>::read( + <(BestBlock, TestChannelManager<'b, 'c>)>::read( &mut node_read, ChannelManagerReadArgs { config, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index fcb348c690d..66c37217adf 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -19,6 +19,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch}; use crate::events::{ ClosureReason, Event, HTLCHandlingFailureType, PathFailure, PaymentFailureReason, @@ -7280,7 +7281,7 @@ pub fn test_update_err_monitor_lockdown() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7386,7 +7387,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7436,7 +7437,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index a38262e6952..9f8e25b21f4 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -11,7 +11,7 @@ //! Functional tests which test for correct behavior across node restarts. -use crate::chain::{ChannelMonitorUpdateStatus, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateStep}; use crate::routing::router::{PaymentParameters, RouteParameters}; @@ -29,7 +29,6 @@ use crate::util::ser::{Writeable, ReadableArgs}; use crate::util::config::UserConfig; use bitcoin::hashes::Hash; -use bitcoin::hash_types::BlockHash; use types::payment::{PaymentHash, PaymentPreimage}; use crate::prelude::*; @@ -410,7 +409,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_stale_monitors = Vec::new(); for serialized in node_0_stale_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_stale_monitors.push(monitor); } @@ -418,14 +417,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_monitors = Vec::new(); for serialized in node_0_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_monitors.push(monitor); } let mut nodes_0_read = &nodes_0_serialized[..]; if let Err(msgs::DecodeError::DangerousValue) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, @@ -443,7 +442,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut nodes_0_read = &nodes_0_serialized[..]; let (_, nodes_0_deserialized_tmp) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 26252c74dd2..f90f966331d 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -58,7 +58,7 @@ use crate::ln::script::ShutdownScript; use crate::offers::invoice::UnsignedBolt12Invoice; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; -use crate::util::async_poll::MaybeSend; +use crate::util::native_async::MaybeSend; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 12070741918..18055d1ebe4 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -61,7 +61,7 @@ impl<'a, T: 'a> LockTestExt<'a> for Mutex { } type ExclLock = MutexGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard<'a, T> { self.lock().unwrap() } } @@ -132,7 +132,7 @@ impl<'a, T: 'a> LockTestExt<'a> for RwLock { } type ExclLock = RwLockWriteGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard<'a, T> { self.write().unwrap() } } diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 57df5b26cb0..23ca1aad603 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -164,31 +164,3 @@ const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } - -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Sync as MaybeSync; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSync {} -#[cfg(not(feature = "std"))] -impl MaybeSync for T where T: ?Sized {} - -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Send as MaybeSend; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSend {} -#[cfg(not(feature = "std"))] -impl MaybeSend for T where T: ?Sized {} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index dcbea904b51..51e608d185f 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,7 +20,7 @@ pub mod mut_global; pub mod anchor_channel_reserves; -pub mod async_poll; +pub(crate) mod async_poll; #[cfg(fuzzing)] pub mod base32; #[cfg(not(fuzzing))] diff --git a/lightning/src/util/native_async.rs b/lightning/src/util/native_async.rs index 0c380f2b1d1..31b07c2f3b5 100644 --- a/lightning/src/util/native_async.rs +++ b/lightning/src/util/native_async.rs @@ -9,8 +9,9 @@ #[cfg(all(test, feature = "std"))] use crate::sync::{Arc, Mutex}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; +#[cfg(test)] +use alloc::boxed::Box; #[cfg(all(test, not(feature = "std")))] use alloc::rc::Rc; @@ -53,6 +54,34 @@ trait MaybeSendableFuture: Future + MaybeSend + 'static {} #[cfg(test)] impl + MaybeSend + 'static> MaybeSendableFuture for F {} +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Sync as MaybeSync; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSync {} +#[cfg(not(feature = "std"))] +impl MaybeSync for T where T: ?Sized {} + +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Send as MaybeSend; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSend {} +#[cfg(not(feature = "std"))] +impl MaybeSend for T where T: ?Sized {} + /// A simple [`FutureSpawner`] which holds [`Future`]s until they are manually polled via /// [`Self::poll_futures`]. #[cfg(all(test, feature = "std"))] diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..5b3a88c856d 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -14,7 +14,7 @@ use alloc::sync::Arc; use bitcoin::hashes::hex::FromHex; -use bitcoin::{BlockHash, Txid}; +use bitcoin::Txid; use core::convert::Infallible; use core::future::Future; @@ -32,14 +32,15 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; use crate::util::async_poll::{ - dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, + dummy_waker, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, }; use crate::util::logger::Logger; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::wakers::Notifier; @@ -447,7 +448,7 @@ impl Persist( kv_store: K, entropy_source: ES, signer_provider: SP, -) -> Result::EcdsaSigner>)>, io::Error> +) -> Result::EcdsaSigner>)>, io::Error> where K::Target: KVStoreSync, ES::Target: EntropySource + Sized, @@ -459,7 +460,7 @@ where CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )? { - match ::EcdsaSigner>)>>::read( + match ::EcdsaSigner>)>>::read( &mut io::Cursor::new(kv_store.read( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -467,7 +468,7 @@ where )?), (&*entropy_source, &*signer_provider), ) { - Ok(Some((block_hash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { let monitor_name = MonitorName::from_str(&stored_key)?; if channel_monitor.persistence_key() != monitor_name { return Err(io::Error::new( @@ -476,7 +477,7 @@ where )); } - res.push((block_hash, channel_monitor)); + res.push((best_block, channel_monitor)); }, Ok(None) => {}, Err(_) => { @@ -652,7 +653,7 @@ where pub fn read_all_channel_monitors_with_updates( &self, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { poll_sync_future(self.0.read_all_channel_monitors_with_updates()) @@ -675,7 +676,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key)) } @@ -863,7 +864,7 @@ where pub async fn read_all_channel_monitors_with_updates( &self, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; @@ -897,7 +898,7 @@ where pub async fn read_all_channel_monitors_with_updates_parallel( self: &Arc, ) -> Result< - Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Vec<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > where @@ -949,7 +950,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { self.0.read_channel_monitor_with_updates(monitor_key).await } @@ -1069,7 +1070,7 @@ where { pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + ) -> Result<(BestBlock, ChannelMonitor<::EcdsaSigner>), io::Error> { match self.maybe_read_channel_monitor_with_updates(monitor_key).await? { Some(res) => Ok(res), @@ -1088,7 +1089,7 @@ where async fn maybe_read_channel_monitor_with_updates( &self, monitor_key: &str, ) -> Result< - Option<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Option<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let monitor_name = MonitorName::from_str(monitor_key)?; @@ -1097,7 +1098,7 @@ where .kv_store .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key)); let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await; - let (block_hash, monitor) = match read_res? { + let (best_block, monitor) = match read_res? { Some(res) => res, None => return Ok(None), }; @@ -1128,14 +1129,14 @@ where io::Error::new(io::ErrorKind::Other, "Monitor update failed") })?; } - Ok(Some((block_hash, monitor))) + Ok(Some((best_block, monitor))) } /// Read a channel monitor. async fn maybe_read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, ) -> Result< - Option<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + Option<(BestBlock, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; @@ -1146,12 +1147,12 @@ where if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); } - match ::EcdsaSigner>)>>::read( + match ::EcdsaSigner>)>>::read( &mut monitor_cursor, (&*self.entropy_source, &*self.signer_provider), ) { Ok(None) => Ok(None), - Ok(Some((blockhash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { if channel_monitor.persistence_key() != *monitor_name { log_error!( self.logger, @@ -1163,7 +1164,7 @@ where "ChannelMonitor was stored under the wrong key", )) } else { - Ok(Some((blockhash, channel_monitor))) + Ok(Some((best_block, channel_monitor))) } }, Err(e) => { @@ -1342,7 +1343,7 @@ where async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { - Ok((_block_hash, monitor)) => monitor, + Ok((_best_block, monitor)) => monitor, Err(_) => return, }; let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index f821aa5afc0..2bd8a9eb879 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -1448,6 +1448,33 @@ impl Readable for BlockHash { } } +impl Writeable for [Option; 12] { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + for hash_opt in self { + match hash_opt { + Some(hash) => hash.write(w)?, + None => ([0u8; 32]).write(w)?, + } + } + Ok(()) + } +} + +impl Readable for [Option; 12] { + fn read(r: &mut R) -> Result { + use bitcoin::hashes::Hash; + + let mut res = [None; 12]; + for hash_opt in res.iter_mut() { + let buf: [u8; 32] = Readable::read(r)?; + if buf != [0; 32] { + *hash_opt = Some(BlockHash::from_slice(&buf[..]).unwrap()); + } + } + Ok(res) + } +} + impl Writeable for ChainHash { fn write(&self, w: &mut W) -> Result<(), io::Error> { w.write_all(self.as_bytes()) diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..b873ba48810 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -20,6 +20,7 @@ use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::WatchedOutput; use crate::events::bump_transaction::sync::WalletSourceSync; use crate::events::bump_transaction::Utxo; @@ -50,7 +51,6 @@ use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; -use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -58,6 +58,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; +use crate::util::native_async::MaybeSend; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; @@ -67,7 +68,7 @@ use bitcoin::amount::Amount; use bitcoin::block::Block; use bitcoin::constants::genesis_block; use bitcoin::constants::ChainHash; -use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hash_types::Txid; use bitcoin::hashes::{hex::FromHex, Hash}; use bitcoin::network::Network; use bitcoin::script::{Builder, Script, ScriptBuf}; @@ -564,7 +565,7 @@ impl<'a> TestChainMonitor<'a> { // underlying `ChainMonitor`. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -596,7 +597,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // monitor to a serialized copy and get he same one back. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -652,7 +653,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let monitor = self.chain_monitor.get_monitor(channel_id).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), )