-
Notifications
You must be signed in to change notification settings - Fork 432
Use BestBlock for chain state serialization (and somewhat parallelize init)
#4266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a147ba1
0532501
0618934
9ce8da5
513c57a
7453fbe
2073860
63df681
2e75824
de82976
c91cdac
f36e1ff
1eabd3f
c12f01c
458f709
6bd7c74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| ../../lightning/src/util/async_poll.rs | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this symlinking work on Windows? I can't believe that it is necessary to use this hack. I think it can be called a hack?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIU yes it does. git used to default to not allowing it but i think that has since changed. We already have other symlinks in the repo for our lockorder stuff and maybe crypto.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Saw that indeed. I don't think it is ideal, but apparently that's what rust wants you to do... |
||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,9 +16,11 @@ | |
| #![deny(rustdoc::broken_intra_doc_links)] | ||
| #![deny(rustdoc::private_intra_doc_links)] | ||
| #![deny(missing_docs)] | ||
| #![deny(unsafe_code)] | ||
| #![cfg_attr(docsrs, feature(doc_cfg))] | ||
|
|
||
| extern crate alloc; | ||
| extern crate core; | ||
|
|
||
| #[cfg(any(feature = "rest-client", feature = "rpc-client"))] | ||
| pub mod http; | ||
|
|
||
|
|
@@ -42,6 +44,9 @@ mod test_utils; | |
| #[cfg(any(feature = "rest-client", feature = "rpc-client"))] | ||
| mod utils; | ||
|
|
||
| #[allow(unused)] | ||
| mod async_poll; | ||
|
|
||
| use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; | ||
|
|
||
| use bitcoin::block::{Block, Header}; | ||
|
|
@@ -170,18 +175,13 @@ pub enum BlockData { | |
| /// sources for the best chain tip. During this process it detects any chain forks, determines which | ||
| /// constitutes the best chain, and updates the listener accordingly with any blocks that were | ||
| /// connected or disconnected since the last poll. | ||
| /// | ||
| /// Block headers for the best chain are maintained in the parameterized cache, allowing for a | ||
| /// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. | ||
| /// Hence, there is a trade-off between a lower memory footprint and potentially increased network | ||
| /// I/O as headers are re-fetched during fork detection. | ||
| pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> | ||
| pub struct SpvClient<P: Poll, L: Deref> | ||
| where | ||
| L::Target: chain::Listen, | ||
| { | ||
| chain_tip: ValidatedBlockHeader, | ||
| chain_poller: P, | ||
| chain_notifier: ChainNotifier<'a, C, L>, | ||
| chain_notifier: ChainNotifier<HeaderCache, L>, | ||
| } | ||
|
|
||
| /// The `Cache` trait defines behavior for managing a block header cache, where block headers are | ||
|
|
@@ -194,37 +194,86 @@ where | |
| /// Implementations may define how long to retain headers such that it's unlikely they will ever be | ||
| /// needed to disconnect a block. In cases where block sources provide access to headers on stale | ||
| /// forks reliably, caches may be entirely unnecessary. | ||
| pub trait Cache { | ||
| pub(crate) trait Cache { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the trait still needed or can it be removed if fixed types are used internally?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually yea, see new commit, we do :/
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But the |
||
| /// Retrieves the block header keyed by the given block hash. | ||
| fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; | ||
|
|
||
| /// Inserts the given block header during a find_difference operation, implying it might not be | ||
| /// the best header. | ||
| fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); | ||
|
|
||
| /// Called when a block has been connected to the best chain to ensure it is available to be | ||
| /// disconnected later if needed. | ||
| fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); | ||
|
|
||
| /// Called when a block has been disconnected from the best chain. Once disconnected, a block's | ||
| /// header is no longer needed and thus can be removed. | ||
| fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>; | ||
| /// Called when blocks have been disconnected from the best chain. Only the fork point | ||
| /// (best comon ancestor) is provided. | ||
| /// | ||
| /// Once disconnected, a block's header is no longer needed and thus can be removed. | ||
| fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); | ||
| } | ||
|
|
||
| /// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`]. | ||
| pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7; | ||
|
|
||
| /// Bounded cache of block headers keyed by block hash. | ||
| /// | ||
| /// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height. | ||
| pub struct HeaderCache(std::collections::HashMap<BlockHash, ValidatedBlockHeader>); | ||
|
|
||
| impl HeaderCache { | ||
| /// Creates a new empty header cache. | ||
| pub fn new() -> Self { | ||
| Self(std::collections::HashMap::new()) | ||
| } | ||
| } | ||
|
|
||
| /// Unbounded cache of block headers keyed by block hash. | ||
| pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>; | ||
| impl Cache for HeaderCache { | ||
| fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { | ||
| self.0.get(block_hash) | ||
| } | ||
|
|
||
| fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { | ||
| self.0.insert(block_hash, block_header); | ||
|
|
||
| // Remove headers older than our newest header minus a week. | ||
| let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0); | ||
| let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT); | ||
| self.0.retain(|_, header| header.height >= cutoff_height); | ||
| } | ||
|
|
||
| fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { | ||
| self.0.insert(block_hash, block_header); | ||
|
|
||
| // Remove headers older than a week. | ||
| let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT); | ||
| self.0.retain(|_, header| header.height >= cutoff_height); | ||
| } | ||
|
|
||
| fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { | ||
| self.0.retain(|_, block_info| block_info.height <= fork_point.height); | ||
| } | ||
| } | ||
|
|
||
| impl Cache for UnboundedCache { | ||
| impl Cache for &mut HeaderCache { | ||
| fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { | ||
| self.get(block_hash) | ||
| self.0.get(block_hash) | ||
| } | ||
|
|
||
| fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { | ||
| (*self).insert_during_diff(block_hash, block_header); | ||
| } | ||
|
|
||
| fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { | ||
| self.insert(block_hash, block_header); | ||
| (*self).block_connected(block_hash, block_header); | ||
| } | ||
|
|
||
| fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> { | ||
| self.remove(block_hash) | ||
| fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { | ||
| self.0.retain(|_, block_info| block_info.height <= fork_point.height); | ||
| } | ||
| } | ||
|
|
||
| impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> | ||
| impl<P: Poll, L: Deref> SpvClient<P, L> | ||
| where | ||
| L::Target: chain::Listen, | ||
| { | ||
|
|
@@ -239,7 +288,7 @@ where | |
| /// | ||
| /// [`poll_best_tip`]: SpvClient::poll_best_tip | ||
| pub fn new( | ||
| chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, | ||
| chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache, | ||
| chain_listener: L, | ||
| ) -> Self { | ||
| let chain_notifier = ChainNotifier { header_cache, chain_listener }; | ||
|
|
@@ -293,15 +342,15 @@ where | |
| /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. | ||
| /// | ||
| /// [listeners]: lightning::chain::Listen | ||
| pub struct ChainNotifier<'a, C: Cache, L: Deref> | ||
| pub(crate) struct ChainNotifier<C: Cache, L: Deref> | ||
| where | ||
| L::Target: chain::Listen, | ||
| { | ||
| /// Cache for looking up headers before fetching from a block source. | ||
| header_cache: &'a mut C, | ||
| pub(crate) header_cache: C, | ||
|
|
||
| /// Listener that will be notified of connected or disconnected blocks. | ||
| chain_listener: L, | ||
| pub(crate) chain_listener: L, | ||
| } | ||
|
|
||
| /// Changes made to the chain between subsequent polls that transformed it from having one chain tip | ||
|
|
@@ -315,14 +364,11 @@ struct ChainDifference { | |
| /// If there are any disconnected blocks, this is where the chain forked. | ||
| common_ancestor: ValidatedBlockHeader, | ||
|
|
||
| /// Blocks that were disconnected from the chain since the last poll. | ||
| disconnected_blocks: Vec<ValidatedBlockHeader>, | ||
|
|
||
| /// Blocks that were connected to the chain since the last poll. | ||
| connected_blocks: Vec<ValidatedBlockHeader>, | ||
| } | ||
|
|
||
| impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> | ||
| impl<C: Cache, L: Deref> ChainNotifier<C, L> | ||
| where | ||
| L::Target: chain::Listen, | ||
| { | ||
|
|
@@ -338,23 +384,66 @@ where | |
| chain_poller: &mut P, | ||
| ) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> { | ||
| let difference = self | ||
| .find_difference(new_header, old_header, chain_poller) | ||
| .find_difference_from_header(new_header, old_header, chain_poller) | ||
| .await | ||
| .map_err(|e| (e, None))?; | ||
| self.disconnect_blocks(difference.disconnected_blocks); | ||
| if difference.common_ancestor != *old_header { | ||
| self.disconnect_blocks(difference.common_ancestor); | ||
| } | ||
| self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) | ||
| .await | ||
| } | ||
|
|
||
| /// Returns the changes needed to produce the chain with `current_header` as its tip from the | ||
| /// chain with `prev_best_block` as its tip. | ||
| /// | ||
| /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` | ||
| /// field as fallback if needed, then finds the common ancestor. | ||
| async fn find_difference_from_best_block<P: Poll>( | ||
| &mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, | ||
| chain_poller: &mut P, | ||
| ) -> BlockSourceResult<ChainDifference> { | ||
| // Try to resolve the header for the previous best block. First try the block_hash, | ||
| // then fall back to previous_blocks if that fails. | ||
| let cur_tip = core::iter::once((0, &prev_best_block.block_hash)); | ||
| let prev_tips = | ||
| prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| { | ||
| if let Some(block_hash) = hash_opt { | ||
| Some((idx as u32 + 1, block_hash)) | ||
| } else { | ||
| None | ||
| } | ||
| }); | ||
| let mut found_header = None; | ||
| for (height_diff, block_hash) in cur_tip.chain(prev_tips) { | ||
| if let Some(header) = self.header_cache.look_up(block_hash) { | ||
| found_header = Some(*header); | ||
| break; | ||
| } | ||
| let height = prev_best_block.height.checked_sub(height_diff).ok_or( | ||
joostjager marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| BlockSourceError::persistent("BestBlock had more previous_blocks than its height"), | ||
| )?; | ||
| if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { | ||
| found_header = Some(header); | ||
| self.header_cache.insert_during_diff(*block_hash, header); | ||
| break; | ||
| } | ||
| } | ||
| let found_header = found_header.ok_or_else(|| { | ||
| BlockSourceError::persistent("could not resolve any block from BestBlock") | ||
| })?; | ||
|
|
||
| self.find_difference_from_header(current_header, &found_header, chain_poller).await | ||
| } | ||
|
|
||
| /// Returns the changes needed to produce the chain with `current_header` as its tip from the | ||
| /// chain with `prev_header` as its tip. | ||
| /// | ||
| /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. | ||
| async fn find_difference<P: Poll>( | ||
| async fn find_difference_from_header<P: Poll>( | ||
| &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, | ||
| chain_poller: &mut P, | ||
| ) -> BlockSourceResult<ChainDifference> { | ||
| let mut disconnected_blocks = Vec::new(); | ||
| let mut connected_blocks = Vec::new(); | ||
| let mut current = current_header; | ||
| let mut previous = *prev_header; | ||
|
|
@@ -369,7 +458,6 @@ where | |
| let current_height = current.height; | ||
| let previous_height = previous.height; | ||
| if current_height <= previous_height { | ||
| disconnected_blocks.push(previous); | ||
| previous = self.look_up_previous_header(chain_poller, &previous).await?; | ||
| } | ||
| if current_height >= previous_height { | ||
|
|
@@ -379,7 +467,7 @@ where | |
| } | ||
|
|
||
| let common_ancestor = current; | ||
| Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) | ||
| Ok(ChainDifference { common_ancestor, connected_blocks }) | ||
| } | ||
|
|
||
| /// Returns the previous header for the given header, either by looking it up in the cache or | ||
|
|
@@ -394,16 +482,10 @@ where | |
| } | ||
|
|
||
| /// Notifies the chain listeners of disconnected blocks. | ||
| fn disconnect_blocks(&mut self, disconnected_blocks: Vec<ValidatedBlockHeader>) { | ||
| for header in disconnected_blocks.iter() { | ||
| if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { | ||
| assert_eq!(cached_header, *header); | ||
| } | ||
| } | ||
| if let Some(block) = disconnected_blocks.last() { | ||
| let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1); | ||
| self.chain_listener.blocks_disconnected(fork_point); | ||
| } | ||
| fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) { | ||
| self.header_cache.blocks_disconnected(&fork_point); | ||
| let best_block = BestBlock::new(fork_point.block_hash, fork_point.height); | ||
| self.chain_listener.blocks_disconnected(best_block); | ||
| } | ||
|
|
||
| /// Notifies the chain listeners of connected blocks. | ||
|
|
@@ -447,9 +529,9 @@ mod spv_client_tests { | |
| let best_tip = chain.at_height(1); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => { | ||
| assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); | ||
|
|
@@ -466,9 +548,9 @@ mod spv_client_tests { | |
| let common_tip = chain.tip(); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| Ok((chain_tip, blocks_connected)) => { | ||
|
|
@@ -486,9 +568,9 @@ mod spv_client_tests { | |
| let old_tip = chain.at_height(1); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| Ok((chain_tip, blocks_connected)) => { | ||
|
|
@@ -506,9 +588,9 @@ mod spv_client_tests { | |
| let old_tip = chain.at_height(1); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| Ok((chain_tip, blocks_connected)) => { | ||
|
|
@@ -526,9 +608,9 @@ mod spv_client_tests { | |
| let old_tip = chain.at_height(1); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| Ok((chain_tip, blocks_connected)) => { | ||
|
|
@@ -547,9 +629,9 @@ mod spv_client_tests { | |
| let worse_tip = chain.tip(); | ||
|
|
||
| let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); | ||
| let mut cache = UnboundedCache::new(); | ||
| let cache = HeaderCache::new(); | ||
| let mut listener = NullChainListener {}; | ||
| let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); | ||
| let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); | ||
| match client.poll_best_tip().await { | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| Ok((chain_tip, blocks_connected)) => { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit msg typo