diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index aff904a5..005b981a 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -1,8 +1,8 @@ use alloy::{ consensus::BlockHeader, - eips::BlockNumberOrTag, + eips::{BlockNumHash, BlockNumberOrTag}, network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse}, - primitives::BlockHash, + primitives::{B256, BlockNumber}, }; use robust_provider::RobustProvider; @@ -29,47 +29,49 @@ pub trait ReorgHandler { ) -> Result, ScannerError>; } -/// Default implementation of [`ReorgHandler`] that uses an RPC provider. +/// Default implementation of [`ReorgHandler`] that combines on-chain hash +/// verification with parent hash validation against a ring buffer of +/// `(block_number, block_hash)` pairs. +/// +/// # Detection Strategy +/// +/// 1. **On-chain verification**: Every incoming block's hash is checked via `get_block_by_hash`. If +/// the hash no longer exists on the canonical chain, a reorg is detected immediately. This is +/// essential for stale/stored blocks (e.g., `previous_batch_end`) that may have been reorged +/// since last seen. +/// +/// 2. **Parent hash verification**: For sequential blocks (live subscription), the incoming block's +/// parent hash is compared against the buffer back. This avoids extra RPC calls in the common +/// case. +/// +/// # Scenarios Handled +/// +/// - **Happy path**: Next sequential block whose parent hash matches buffer back. +/// - **Scenario 1 (stream rewind)**: Block within buffer range, parent matches → truncate + push. +/// - **Scenario 2 (past fork)**: Block within buffer range, parent mismatch → walk back via RPC. +/// - **Scenario 3 (batch boundary)**: Non-consecutive block (already verified on-chain) → push. +/// - **Scenario 4 (deep reorg)**: Block before buffer range → reset to finalized. +/// - **Duplicate**: Same number and hash as buffer back → no-op. #[derive(Clone, Debug)] pub(crate) struct DefaultReorgHandler { provider: RobustProvider, - buffer: RingBuffer, + buffer: RingBuffer, } impl ReorgHandler for DefaultReorgHandler { - /// Checks if a block was reorged and returns the common ancestor if found. - /// - /// # Arguments + /// Checks an incoming block for reorgs using on-chain hash verification and + /// parent hash validation against a ring buffer. /// - /// * `block` - The block to check for reorg. - /// - /// # Returns - /// - /// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block. - /// * `Ok(None)` - If no reorg was detected, returns `None`. - /// * `Err(e)` - If an error occurred while checking for reorg. + /// Returns `Ok(None)` if no reorg, or `Ok(Some(common_ancestor))` if a reorg was detected. /// /// # Edge Cases /// - /// * **Duplicate block detection** - If the incoming block hash matches the last buffered hash, - /// it won't be added again to prevent buffer pollution from duplicate checks. - /// - /// * **Empty buffer on reorg** - If a reorg is detected but the buffer is empty (e.g., first - /// block after initialization), the function falls back to the finalized block as the common - /// ancestor. - /// - /// * **Deep reorg beyond buffer capacity** - If all buffered blocks are reorged (buffer - /// exhausted), the finalized block is used as a safe fallback to prevent data loss. - /// - /// * **Common ancestor beyond finalized** - This can happen if not all sequental blocks are - /// checked and stored. If the found common ancestor has a lower block number than the - /// finalized block, the finalized block is used instead and the buffer is cleared. - /// - /// * **Network errors during lookup** - Non-`BlockNotFound` errors (e.g., RPC failures) are - /// propagated immediately rather than being treated as reorgs. - /// - /// * **Finalized block unavailable** - If the finalized block cannot be fetched when needed as - /// a fallback, the error is propagated to the caller. + /// * **Empty buffer** - First block is simply added; no reorg detection possible. + /// * **Duplicate block** - Same number and hash as buffer back is a no-op. + /// * **Gap in block numbers** - Non-consecutive blocks (e.g., batch boundaries) are pushed + /// directly since the on-chain check already verified them. + /// * **Deep reorg beyond buffer capacity** - Falls back to the finalized block. + /// * **Network errors** - Propagated immediately, not treated as reorgs. #[cfg_attr( feature = "tracing", tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number())) @@ -78,58 +80,67 @@ impl ReorgHandler for DefaultReorgHandler { &mut self, block: &N::BlockResponse, ) -> Result, ScannerError> { - let block = block.header(); - - if !self.reorg_detected(block).await? { - let block_hash = block.hash(); - // store the incoming block's hash for future reference - if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) { - self.buffer.push(block_hash); - trace!( - block_number = block.number(), - block_hash = %block_hash, - "Block hash added to reorg buffer" - ); - } + let header = block.header(); + let incoming = BlockNumHash::new(header.number(), header.hash()); + let parent_hash = header.parent_hash(); + + // ── On-chain verification ──────────────────────────────────── + // Essential when called with a stored block (e.g., previous_batch_end). + // If the hash no longer exists on the canonical chain, the block was reorged. + if self.reorg_detected(incoming.hash).await? { + return self.find_on_chain_ancestor().await; + } + + // ── Case 0: Empty buffer ───────────────────────────────────── + if self.buffer.is_empty() { + trace!(block_number = incoming.number, "Buffer empty, adding first block"); + self.buffer.push(incoming); return Ok(None); } - debug!( - block_number = block.number(), - block_hash = %block.hash(), - "Reorg detected, searching for common ancestor" - ); + // SAFETY: buffer is non-empty after the check above + let buffer_front = *self.buffer.front().unwrap(); + let buffer_back = *self.buffer.back().unwrap(); - while let Some(&block_hash) = self.buffer.back() { - trace!(block_hash = %block_hash, "Checking if buffered block exists on chain"); - match self.provider.get_block_by_hash(block_hash).await { - Ok(common_ancestor) => { - debug!( - common_ancestor_hash = %block_hash, - common_ancestor_number = common_ancestor.header().number(), - "Found common ancestor" - ); - return self.return_common_ancestor(common_ancestor).await; - } - Err(robust_provider::Error::BlockNotFound) => { - // block was reorged - trace!(block_hash = %block_hash, "Buffered block was reorged, removing from buffer"); - _ = self.buffer.pop_back(); - } - Err(e) => return Err(e.into()), - } + // ── Duplicate block (no-op) ────────────────────────────────── + if incoming.number == buffer_back.number && incoming.hash == buffer_back.hash { + trace!(block_number = incoming.number, "Duplicate block, skipping"); + return Ok(None); } - // return last finalized block as common ancestor + // ── Happy path: next sequential block ──────────────────────── + if incoming.number == buffer_back.number + 1 && parent_hash == buffer_back.hash { + trace!(block_number = incoming.number, "Next sequential block, parent hash matches"); + self.buffer.push(incoming); + return Ok(None); + } - // no need to store finalized block's hash in the buffer, as it is returned by default only - // if not buffered hashes exist on-chain + // ── Scenario 1 & 2: Block within buffer range ─────────────── + if incoming.number >= buffer_front.number && incoming.number <= buffer_back.number { + return self.handle_reorg_within_buffer(block, incoming, parent_hash).await; + } - info!("No common ancestors found in buffer, falling back to finalized block"); + // ── Scenario 3: Non-consecutive block (batch boundary) ─────── + // The block was already verified on-chain by reorg_detected() above. + // Just push it — no need to fetch intermediate blocks. + if incoming.number > buffer_back.number + 1 { + trace!( + incoming = incoming.number, + buffer_back = buffer_back.number, + "Non-consecutive block verified on-chain, pushing to buffer" + ); + self.buffer.push(incoming); + return Ok(None); + } - let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + // ── Scenario 4: Block before buffer range (deep reorg) ────── + if incoming.number < buffer_front.number { + return self.handle_deep_reorg().await; + } - Ok(Some(finalized)) + // Fallback: next block but parent hash mismatch (reorg at tip) + // incoming.number == buffer_back.number + 1, but parent_hash != buffer_back.hash + self.handle_reorg_within_buffer(block, incoming, parent_hash).await } } @@ -138,42 +149,197 @@ impl DefaultReorgHandler { Self { provider, buffer: RingBuffer::new(capacity) } } - async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result { - match self.provider.get_block_by_hash(block.hash()).await { + /// Checks if a block hash still exists on the canonical chain via RPC. + async fn reorg_detected(&self, hash: B256) -> Result { + match self.provider.get_block_by_hash(hash).await { Ok(_) => Ok(false), Err(robust_provider::Error::BlockNotFound) => Ok(true), Err(e) => Err(e.into()), } } - async fn return_common_ancestor( + /// Walks backwards through the buffer, checking each hash on-chain to find + /// the common ancestor after a reorg is detected. + async fn find_on_chain_ancestor(&mut self) -> Result, ScannerError> { + debug!("Reorg detected via on-chain verification, searching for common ancestor"); + + let (front_number, back_number) = match (self.buffer.front(), self.buffer.back()) { + (Some(f), Some(b)) => (f.number, b.number), + _ => return self.handle_deep_reorg().await, + }; + + for number in (front_number..=back_number).rev() { + if let Some(entry) = self.buffer.get(number) { + match self.provider.get_block_by_hash(entry.hash).await { + Ok(_) => { + debug!( + common_ancestor_number = number, + common_ancestor_hash = %entry.hash, + "Found common ancestor in buffer" + ); + self.buffer.truncate_after(number); + return self.return_reorg_ancestor(number).await; + } + Err(robust_provider::Error::BlockNotFound) => { + trace!( + block_number = number, + block_hash = %entry.hash, + "Buffered block was reorged, continuing walk-back" + ); + } + Err(e) => return Err(e.into()), + } + } + } + + // Entire buffer exhausted — all buffered blocks were reorged + self.handle_deep_reorg().await + } + + /// Handles blocks that fall within the buffer range — the stream rewound to a block + /// we already have stored. Covers Scenario 1 (clean rewind) and Scenario 2 (past fork). + async fn handle_reorg_within_buffer( &mut self, - common_ancestor: ::BlockResponse, + incoming_block: &N::BlockResponse, + incoming: BlockNumHash, + parent_hash: B256, ) -> Result, ScannerError> { - let common_ancestor_header = common_ancestor.header(); + let fork_number = incoming.number.saturating_sub(1); + + // Check if parent is in our buffer and matches + if let Some(stored) = self.buffer.get(fork_number) && + parent_hash == stored.hash + { + // ── Scenario 1: Clean rewind, fork point found ─────── + debug!( + fork_point = fork_number, + incoming_block = incoming.number, + "Reorg detected (Scenario 1): stream rewound to fork point" + ); + self.buffer.truncate_after(fork_number); + self.buffer.push(incoming); + + return self.return_reorg_ancestor(fork_number).await; + } + + // ── Scenario 2: Parent mismatch, walk back via RPC ────────── + self.walk_back_to_find_fork(incoming_block, incoming).await + } + + /// Walks back via RPC to find the fork point when the incoming block's parent + /// doesn't match what's in our buffer (Scenario 2). + async fn walk_back_to_find_fork( + &mut self, + _incoming_block: &N::BlockResponse, + incoming: BlockNumHash, + ) -> Result, ScannerError> { + let buffer_front = match self.buffer.front() { + Some(f) => *f, + None => return self.handle_deep_reorg().await, + }; + + let mut new_blocks = vec![incoming]; + let mut current_number = incoming.number; + + loop { + if current_number == 0 { + return self.handle_deep_reorg().await; + } + + let parent_number = current_number.saturating_sub(1); + + // If we've walked past our buffer, we can't verify further + if parent_number < buffer_front.number { + debug!( + parent_number = parent_number, + buffer_front = buffer_front.number, + "Walked past buffer range during fork search" + ); + return self.handle_deep_reorg().await; + } + + // Fetch the parent block from the node + let parent_block = self.provider.get_block_by_number(parent_number.into()).await?; + let parent_header = parent_block.header(); + let parent_entry = BlockNumHash::new(parent_header.number(), parent_header.hash()); + let grandparent_hash = parent_header.parent_hash(); + + new_blocks.push(parent_entry); + + // Check if this block's parent matches our buffer + let grandparent_number = parent_number.saturating_sub(1); + if let Some(stored) = self.buffer.get(grandparent_number) && + grandparent_hash == stored.hash + { + // Found the fork point + let fork_point = grandparent_number; + debug!( + fork_point = fork_point, + walk_back_depth = incoming.number - parent_number, + "Reorg detected (Scenario 2): found fork point by walking back" + ); + + self.buffer.truncate_after(fork_point); + // Push new blocks in order (they were collected newest-first) + new_blocks.reverse(); + self.buffer.append(new_blocks); + + return self.return_reorg_ancestor(fork_point).await; + } + + current_number = parent_number; + } + } + + /// Handles deep reorg where the incoming block is before the buffer range (Scenario 4). + /// Falls back to the latest finalized block as a safe anchor. + async fn handle_deep_reorg(&mut self) -> Result, ScannerError> { + info!("Deep reorg detected (block before buffer range), falling back to finalized block"); let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; let finalized_header = finalized.header(); - let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() { + debug!( + finalized_number = finalized_header.number(), + finalized_hash = %finalized_header.hash(), + "Resetting buffer to finalized block" + ); + + self.buffer.clear(); + self.buffer.push(BlockNumHash::new(finalized_header.number(), finalized_header.hash())); + + Ok(Some(finalized)) + } + + /// Fetches the common ancestor block and returns it, after verifying it's not + /// before the finalized block. + async fn return_reorg_ancestor( + &mut self, + fork_block_number: BlockNumber, + ) -> Result, ScannerError> { + let ancestor = self.provider.get_block_by_number(fork_block_number.into()).await?; + let ancestor_number = ancestor.header().number(); + + // Verify ancestor is not before finalized + let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + let finalized_number = finalized.header().number(); + + if ancestor_number >= finalized_number { debug!( - common_ancestor_number = common_ancestor_header.number(), - common_ancestor_hash = %common_ancestor_header.hash(), + common_ancestor_number = ancestor_number, + common_ancestor_hash = %ancestor.header().hash(), "Returning common ancestor" ); - common_ancestor + Ok(Some(ancestor)) } else { warn!( - common_ancestor_number = common_ancestor_header.number(), - common_ancestor_hash = %common_ancestor_header.hash(), - finalized_number = finalized_header.number(), - finalized_hash = %finalized_header.hash(), - "Found common ancestor predates finalized block, falling back to finalized" + ancestor_number = ancestor_number, + finalized_number = finalized_number, + "Common ancestor predates finalized block, falling back to finalized" ); - // all buffered blocks are finalized, so no more need to track them self.buffer.clear(); - finalized - }; - Ok(Some(common_ancestor)) + self.buffer.push(BlockNumHash::new(finalized_number, finalized.header().hash())); + Ok(Some(finalized)) + } } } diff --git a/src/block_range_scanner/ring_buffer.rs b/src/block_range_scanner/ring_buffer.rs index a5318408..7b2b0553 100644 --- a/src/block_range_scanner/ring_buffer.rs +++ b/src/block_range_scanner/ring_buffer.rs @@ -1,5 +1,7 @@ use std::collections::VecDeque; +use alloy::{eips::BlockNumHash, primitives::BlockNumber}; + /// Configuration for how many past block hashes to retain for reorg detection. /// /// This type is re-exported as `PastBlocksStorageCapacity` from the crate root. @@ -32,12 +34,12 @@ macro_rules! impl_from_unsigned { impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize); #[derive(Clone, Debug)] -pub(crate) struct RingBuffer { - inner: VecDeque, +pub(crate) struct RingBuffer { + inner: VecDeque, capacity: RingBufferCapacity, } -impl RingBuffer { +impl RingBuffer { /// Creates an empty [`RingBuffer`] with a specific capacity. pub fn new(capacity: RingBufferCapacity) -> Self { if let RingBufferCapacity::Limited(limit) = capacity { @@ -47,13 +49,13 @@ impl RingBuffer { } } - /// Adds a new element to the buffer. + /// Adds a new `BlockNumHash` to the buffer. /// /// If limited capacity and the buffer is full, the oldest element is removed to make space. - pub fn push(&mut self, item: T) { + pub fn push(&mut self, item: BlockNumHash) { match self.capacity { RingBufferCapacity::Infinite => { - self.inner.push_back(item); // Add the new element + self.inner.push_back(item); } RingBufferCapacity::Limited(0) => { // Do nothing, reorg handling disabled @@ -62,21 +64,50 @@ impl RingBuffer { if self.inner.len() == limit { self.inner.pop_front(); // Remove the oldest element } - self.inner.push_back(item); // Add the new element + self.inner.push_back(item); } } } - /// Removes and returns the newest element from the buffer. - pub fn pop_back(&mut self) -> Option { - self.inner.pop_back() + /// Appends multiple `BlockNumHash` entries to the buffer. + pub fn append(&mut self, items: impl IntoIterator) { + for item in items { + self.push(item); + } + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns a reference to the oldest element in the buffer. + pub fn front(&self) -> Option<&BlockNumHash> { + self.inner.front() } /// Returns a reference to the newest element in the buffer. - pub fn back(&self) -> Option<&T> { + pub fn back(&self) -> Option<&BlockNumHash> { self.inner.back() } + /// Finds a block by its number. + /// + /// Since blocks are stored in sorted order, uses binary search for O(log n) lookup. + pub fn get(&self, number: BlockNumber) -> Option<&BlockNumHash> { + let idx = self.inner.partition_point(|entry| entry.number < number); + self.inner.get(idx).filter(|entry| entry.number == number) + } + + /// Truncates the buffer, keeping only blocks up to and including `keep_number`. + /// + /// Removes all blocks with number > `keep_number`. + /// Uses binary search since the buffer is sorted by block number. + pub fn truncate_after(&mut self, keep_number: BlockNumber) { + let idx = self.inner.partition_point(|entry| entry.number <= keep_number); + self.inner.truncate(idx); + } + /// Clears all elements currently stored in the buffer. pub fn clear(&mut self) { self.inner.clear(); @@ -85,12 +116,101 @@ impl RingBuffer { #[cfg(test)] mod tests { + use alloy::primitives::B256; + use super::*; + fn block(number: u64, byte: u8) -> BlockNumHash { + BlockNumHash::new(number, B256::repeat_byte(byte)) + } + #[test] fn zero_capacity_should_ignore_elements() { - let mut buf = RingBuffer::::new(RingBufferCapacity::Limited(0)); - buf.push(1); - assert!(buf.inner.is_empty()); + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(0)); + buf.push(block(1, 1)); + assert!(buf.is_empty()); + } + + #[test] + fn push_and_back_returns_newest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + assert_eq!(buf.back().unwrap().number, 101); + } + + #[test] + fn front_returns_oldest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + assert_eq!(buf.front().unwrap().number, 100); + } + + #[test] + fn get_finds_block_by_number() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + let h1 = B256::repeat_byte(1); + let h2 = B256::repeat_byte(2); + let h3 = B256::repeat_byte(3); + + buf.push(BlockNumHash::new(100, h1)); + buf.push(BlockNumHash::new(101, h2)); + buf.push(BlockNumHash::new(102, h3)); + + assert_eq!(buf.get(100).map(|b| b.hash), Some(h1)); + assert_eq!(buf.get(101).map(|b| b.hash), Some(h2)); + assert_eq!(buf.get(102).map(|b| b.hash), Some(h3)); + assert!(buf.get(99).is_none()); + assert!(buf.get(103).is_none()); + } + + #[test] + fn truncate_after_keeps_blocks_up_to_number() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + buf.push(block(102, 3)); + buf.push(block(103, 4)); + + buf.truncate_after(101); + + assert!(buf.get(100).is_some()); + assert!(buf.get(101).is_some()); + assert!(buf.get(102).is_none()); + assert!(buf.get(103).is_none()); + } + + #[test] + fn append_adds_multiple_blocks() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.append([block(101, 2), block(102, 3)]); + + assert_eq!(buf.back().unwrap().number, 102); + assert!(buf.get(101).is_some()); + } + + #[test] + fn is_empty_works_correctly() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + assert!(buf.is_empty()); + buf.push(block(100, 1)); + assert!(!buf.is_empty()); + buf.clear(); + assert!(buf.is_empty()); + } + + #[test] + fn limited_capacity_evicts_oldest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(3)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + buf.push(block(102, 3)); + buf.push(block(103, 4)); + + assert!(buf.get(100).is_none()); // evicted + assert_eq!(buf.front().unwrap().number, 101); + assert_eq!(buf.back().unwrap().number, 103); } }