From 8ca5a1de52c353ba25910a3ae347b13b905deb3c Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Mon, 23 Mar 2026 13:23:04 +0530 Subject: [PATCH 1/4] refactor: rewrite reorg detection with parent hash verification --- src/block_range_scanner/common.rs | 4 +- src/block_range_scanner/reorg_handler.rs | 350 ++++++++++++++++------- src/block_range_scanner/ring_buffer.rs | 148 +++++++++- src/event_scanner/block_range_handler.rs | 4 +- 4 files changed, 392 insertions(+), 114 deletions(-) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 2d3764d3..412ee9bc 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -82,8 +82,8 @@ pub(crate) async fn stream_live_blocks>( // only once the first relevant block is received from the subscription, and not before that; // otherwise callers might perform certain operations expecting the relevant blocks to start // coming, when in fact they are not. - if notify_after_first_block && - sender.try_stream(Notification::SwitchingToLive).await.is_closed() + if notify_after_first_block + && sender.try_stream(Notification::SwitchingToLive).await.is_closed() { return; } diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index aff904a5..12180dea 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -1,8 +1,8 @@ use alloy::{ consensus::BlockHeader, - eips::BlockNumberOrTag, + eips::{BlockNumHash, BlockNumberOrTag}, network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse}, - primitives::BlockHash, + primitives::{B256, BlockNumber}, }; use robust_provider::RobustProvider; @@ -29,47 +29,42 @@ pub trait ReorgHandler { ) -> Result, ScannerError>; } -/// Default implementation of [`ReorgHandler`] that uses an RPC provider. +/// Default implementation of [`ReorgHandler`] that uses parent hash verification +/// against a ring buffer of `(block_number, block_hash)` pairs. +/// +/// # Core Invariant +/// +/// Every incoming block's parent hash is verified against the buffer, rather than +/// checking on-chain hash existence. This is cleaner and more performant for common +/// reorg scenarios. +/// +/// # Scenarios Handled +/// +/// - **Happy path**: Next sequential block whose parent hash matches buffer back. +/// - **Scenario 1 (stream rewind)**: Block within buffer range, parent matches → truncate + push. +/// - **Scenario 2 (past fork)**: Block within buffer range, parent mismatch → walk back via RPC. +/// - **Scenario 3 (gap)**: Block beyond buffer head → fetch gap blocks, verify chain. +/// - **Scenario 4 (deep reorg)**: Block before buffer range → reset to finalized. +/// - **Duplicate**: Same number and hash as buffer back → no-op. #[derive(Clone, Debug)] pub(crate) struct DefaultReorgHandler { provider: RobustProvider, - buffer: RingBuffer, + buffer: RingBuffer, } impl ReorgHandler for DefaultReorgHandler { - /// Checks if a block was reorged and returns the common ancestor if found. - /// - /// # Arguments + /// Checks an incoming block for reorgs using parent hash verification. /// - /// * `block` - The block to check for reorg. - /// - /// # Returns - /// - /// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block. - /// * `Ok(None)` - If no reorg was detected, returns `None`. - /// * `Err(e)` - If an error occurred while checking for reorg. + /// Returns `Ok(None)` if no reorg, or `Ok(Some(common_ancestor))` if a reorg was detected. /// /// # Edge Cases /// - /// * **Duplicate block detection** - If the incoming block hash matches the last buffered hash, - /// it won't be added again to prevent buffer pollution from duplicate checks. - /// - /// * **Empty buffer on reorg** - If a reorg is detected but the buffer is empty (e.g., first - /// block after initialization), the function falls back to the finalized block as the common - /// ancestor. - /// - /// * **Deep reorg beyond buffer capacity** - If all buffered blocks are reorged (buffer - /// exhausted), the finalized block is used as a safe fallback to prevent data loss. - /// - /// * **Common ancestor beyond finalized** - This can happen if not all sequental blocks are - /// checked and stored. If the found common ancestor has a lower block number than the - /// finalized block, the finalized block is used instead and the buffer is cleared. - /// - /// * **Network errors during lookup** - Non-`BlockNotFound` errors (e.g., RPC failures) are - /// propagated immediately rather than being treated as reorgs. - /// - /// * **Finalized block unavailable** - If the finalized block cannot be fetched when needed as - /// a fallback, the error is propagated to the caller. + /// * **Empty buffer** - First block is simply added; no reorg detection possible. + /// * **Duplicate block** - Same number and hash as buffer back is a no-op. + /// * **Gap in block numbers** - Intermediate blocks are fetched and verified for chain + /// continuity. + /// * **Deep reorg beyond buffer capacity** - Falls back to the finalized block. + /// * **Network errors** - Propagated immediately, not treated as reorgs. #[cfg_attr( feature = "tracing", tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number())) @@ -78,58 +73,52 @@ impl ReorgHandler for DefaultReorgHandler { &mut self, block: &N::BlockResponse, ) -> Result, ScannerError> { - let block = block.header(); - - if !self.reorg_detected(block).await? { - let block_hash = block.hash(); - // store the incoming block's hash for future reference - if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) { - self.buffer.push(block_hash); - trace!( - block_number = block.number(), - block_hash = %block_hash, - "Block hash added to reorg buffer" - ); - } + let header = block.header(); + let incoming = BlockNumHash::new(header.number(), header.hash()); + let parent_hash = header.parent_hash(); + + // ── Case 0: Empty buffer ───────────────────────────────────── + if self.buffer.is_empty() { + trace!(block_number = incoming.number, "Buffer empty, adding first block"); + self.buffer.push(incoming); return Ok(None); } - debug!( - block_number = block.number(), - block_hash = %block.hash(), - "Reorg detected, searching for common ancestor" - ); + // SAFETY: buffer is non-empty after the check above + let buffer_front = *self.buffer.front().unwrap(); + let buffer_back = *self.buffer.back().unwrap(); - while let Some(&block_hash) = self.buffer.back() { - trace!(block_hash = %block_hash, "Checking if buffered block exists on chain"); - match self.provider.get_block_by_hash(block_hash).await { - Ok(common_ancestor) => { - debug!( - common_ancestor_hash = %block_hash, - common_ancestor_number = common_ancestor.header().number(), - "Found common ancestor" - ); - return self.return_common_ancestor(common_ancestor).await; - } - Err(robust_provider::Error::BlockNotFound) => { - // block was reorged - trace!(block_hash = %block_hash, "Buffered block was reorged, removing from buffer"); - _ = self.buffer.pop_back(); - } - Err(e) => return Err(e.into()), - } + // ── Duplicate block (no-op) ────────────────────────────────── + if incoming.number == buffer_back.number && incoming.hash == buffer_back.hash { + trace!(block_number = incoming.number, "Duplicate block, skipping"); + return Ok(None); } - // return last finalized block as common ancestor + // ── Happy path: next sequential block ──────────────────────── + if incoming.number == buffer_back.number + 1 && parent_hash == buffer_back.hash { + trace!(block_number = incoming.number, "Next sequential block, parent hash matches"); + self.buffer.push(incoming); + return Ok(None); + } - // no need to store finalized block's hash in the buffer, as it is returned by default only - // if not buffered hashes exist on-chain + // ── Scenario 1 & 2: Block within buffer range ─────────────── + if incoming.number >= buffer_front.number && incoming.number <= buffer_back.number { + return self.handle_reorg_within_buffer(block, incoming, parent_hash).await; + } - info!("No common ancestors found in buffer, falling back to finalized block"); + // ── Scenario 3: Gap (block beyond buffer head) ────────────── + if incoming.number > buffer_back.number + 1 { + return self.handle_gap(block, incoming, parent_hash).await; + } - let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + // ── Scenario 4: Block before buffer range (deep reorg) ────── + if incoming.number < buffer_front.number { + return self.handle_deep_reorg().await; + } - Ok(Some(finalized)) + // Fallback: next block but parent hash mismatch (reorg at tip) + // incoming.number == buffer_back.number + 1, but parent_hash != buffer_back.hash + self.handle_reorg_within_buffer(block, incoming, parent_hash).await } } @@ -138,42 +127,211 @@ impl DefaultReorgHandler { Self { provider, buffer: RingBuffer::new(capacity) } } - async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result { - match self.provider.get_block_by_hash(block.hash()).await { - Ok(_) => Ok(false), - Err(robust_provider::Error::BlockNotFound) => Ok(true), - Err(e) => Err(e.into()), + /// Handles blocks that fall within the buffer range — the stream rewound to a block + /// we already have stored. Covers Scenario 1 (clean rewind) and Scenario 2 (past fork). + async fn handle_reorg_within_buffer( + &mut self, + incoming_block: &N::BlockResponse, + incoming: BlockNumHash, + parent_hash: B256, + ) -> Result, ScannerError> { + let fork_number = incoming.number.saturating_sub(1); + + // Check if parent is in our buffer and matches + if let Some(stored) = self.buffer.get(fork_number) + && parent_hash == stored.hash + { + // ── Scenario 1: Clean rewind, fork point found ─────── + debug!( + fork_point = fork_number, + incoming_block = incoming.number, + "Reorg detected (Scenario 1): stream rewound to fork point" + ); + self.buffer.truncate_after(fork_number); + self.buffer.push(incoming); + + return self.return_reorg_ancestor(fork_number).await; + } + + // ── Scenario 2: Parent mismatch, walk back via RPC ────────── + self.walk_back_to_find_fork(incoming_block, incoming).await + } + + /// Walks back via RPC to find the fork point when the incoming block's parent + /// doesn't match what's in our buffer (Scenario 2). + async fn walk_back_to_find_fork( + &mut self, + _incoming_block: &N::BlockResponse, + incoming: BlockNumHash, + ) -> Result, ScannerError> { + let buffer_front = match self.buffer.front() { + Some(f) => *f, + None => return self.handle_deep_reorg().await, + }; + + let mut new_blocks = vec![incoming]; + let mut current_number = incoming.number; + + loop { + let parent_number = current_number.saturating_sub(1); + + // If we've walked past our buffer, we can't verify further + if parent_number < buffer_front.number { + debug!( + parent_number = parent_number, + buffer_front = buffer_front.number, + "Walked past buffer range during fork search" + ); + return self.handle_deep_reorg().await; + } + + // Fetch the parent block from the node + let parent_block = self.provider.get_block_by_number(parent_number.into()).await?; + let parent_header = parent_block.header(); + let parent_entry = BlockNumHash::new(parent_header.number(), parent_header.hash()); + let grandparent_hash = parent_header.parent_hash(); + + new_blocks.push(parent_entry); + + // Check if this block's parent matches our buffer + let grandparent_number = parent_number.saturating_sub(1); + if let Some(stored) = self.buffer.get(grandparent_number) + && grandparent_hash == stored.hash + { + // Found the fork point + let fork_point = grandparent_number; + debug!( + fork_point = fork_point, + walk_back_depth = incoming.number - parent_number, + "Reorg detected (Scenario 2): found fork point by walking back" + ); + + self.buffer.truncate_after(fork_point); + // Push new blocks in order (they were collected newest-first) + new_blocks.reverse(); + self.buffer.append(new_blocks); + + return self.return_reorg_ancestor(fork_point).await; + } + + current_number = parent_number; } } - async fn return_common_ancestor( + /// Handles the case where the stream jumped ahead (gap between buffer back and incoming). + /// Fetches missing blocks and verifies chain continuity (Scenario 3). + async fn handle_gap( &mut self, - common_ancestor: ::BlockResponse, + incoming_block: &N::BlockResponse, + incoming: BlockNumHash, + parent_hash: B256, ) -> Result, ScannerError> { - let common_ancestor_header = common_ancestor.header(); + let buffer_back = *self.buffer.back().unwrap(); + let gap_start = buffer_back.number + 1; + let gap_end = incoming.number; // exclusive: we handle incoming separately + + debug!( + buffer_tip = buffer_back.number, + incoming = incoming.number, + gap_size = gap_end - gap_start, + "Gap detected, fetching intermediate blocks" + ); + + // Fetch all gap blocks and record their parent hashes for verification + let mut gap_blocks = Vec::new(); + let mut gap_parent_hashes: Vec = Vec::new(); + for num in gap_start..gap_end { + let block = self.provider.get_block_by_number(num.into()).await?; + let h = block.header(); + gap_parent_hashes.push(h.parent_hash()); + gap_blocks.push(BlockNumHash::new(h.number(), h.hash())); + } + + // Verify the first gap block connects to our buffer + if let Some(first_gap_parent) = gap_parent_hashes.first() + && *first_gap_parent != buffer_back.hash + { + // A reorg happened that affected our buffer tail. + // Treat the first gap block as a reorg block. + debug!( + expected_parent = %buffer_back.hash, + actual_parent = %first_gap_parent, + "Gap block doesn't connect to buffer, reorg in gap detected" + ); + let first_entry = gap_blocks[0]; + return self + .handle_reorg_within_buffer(incoming_block, first_entry, *first_gap_parent) + .await; + } + + // Verify incoming block connects to the gap chain + let expected_parent = gap_blocks.last().map_or(buffer_back.hash, |b| b.hash); + if parent_hash != expected_parent { + debug!( + expected = %expected_parent, + actual = %parent_hash, + "Incoming block doesn't connect to gap chain" + ); + return self.walk_back_to_find_fork(incoming_block, incoming).await; + } + + // Everything connects — push gap blocks and incoming into buffer + self.buffer.append(gap_blocks); + self.buffer.push(incoming); + + trace!(buffer_tip = incoming.number, "Gap filled successfully, no reorg"); + Ok(None) + } + + /// Handles deep reorg where the incoming block is before the buffer range (Scenario 4). + /// Falls back to the latest finalized block as a safe anchor. + async fn handle_deep_reorg(&mut self) -> Result, ScannerError> { + info!("Deep reorg detected (block before buffer range), falling back to finalized block"); let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; let finalized_header = finalized.header(); - let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() { + debug!( + finalized_number = finalized_header.number(), + finalized_hash = %finalized_header.hash(), + "Resetting buffer to finalized block" + ); + + self.buffer.clear(); + self.buffer.push(BlockNumHash::new(finalized_header.number(), finalized_header.hash())); + + Ok(Some(finalized)) + } + + /// Fetches the common ancestor block and returns it, after verifying it's not + /// before the finalized block. + async fn return_reorg_ancestor( + &mut self, + fork_block_number: BlockNumber, + ) -> Result, ScannerError> { + let ancestor = self.provider.get_block_by_number(fork_block_number.into()).await?; + let ancestor_number = ancestor.header().number(); + + // Verify ancestor is not before finalized + let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + let finalized_number = finalized.header().number(); + + if ancestor_number >= finalized_number { debug!( - common_ancestor_number = common_ancestor_header.number(), - common_ancestor_hash = %common_ancestor_header.hash(), + common_ancestor_number = ancestor_number, + common_ancestor_hash = %ancestor.header().hash(), "Returning common ancestor" ); - common_ancestor + Ok(Some(ancestor)) } else { warn!( - common_ancestor_number = common_ancestor_header.number(), - common_ancestor_hash = %common_ancestor_header.hash(), - finalized_number = finalized_header.number(), - finalized_hash = %finalized_header.hash(), - "Found common ancestor predates finalized block, falling back to finalized" + ancestor_number = ancestor_number, + finalized_number = finalized_number, + "Common ancestor predates finalized block, falling back to finalized" ); - // all buffered blocks are finalized, so no more need to track them self.buffer.clear(); - finalized - }; - Ok(Some(common_ancestor)) + self.buffer.push(BlockNumHash::new(finalized_number, finalized.header().hash())); + Ok(Some(finalized)) + } } } diff --git a/src/block_range_scanner/ring_buffer.rs b/src/block_range_scanner/ring_buffer.rs index a5318408..7b2b0553 100644 --- a/src/block_range_scanner/ring_buffer.rs +++ b/src/block_range_scanner/ring_buffer.rs @@ -1,5 +1,7 @@ use std::collections::VecDeque; +use alloy::{eips::BlockNumHash, primitives::BlockNumber}; + /// Configuration for how many past block hashes to retain for reorg detection. /// /// This type is re-exported as `PastBlocksStorageCapacity` from the crate root. @@ -32,12 +34,12 @@ macro_rules! impl_from_unsigned { impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize); #[derive(Clone, Debug)] -pub(crate) struct RingBuffer { - inner: VecDeque, +pub(crate) struct RingBuffer { + inner: VecDeque, capacity: RingBufferCapacity, } -impl RingBuffer { +impl RingBuffer { /// Creates an empty [`RingBuffer`] with a specific capacity. pub fn new(capacity: RingBufferCapacity) -> Self { if let RingBufferCapacity::Limited(limit) = capacity { @@ -47,13 +49,13 @@ impl RingBuffer { } } - /// Adds a new element to the buffer. + /// Adds a new `BlockNumHash` to the buffer. /// /// If limited capacity and the buffer is full, the oldest element is removed to make space. - pub fn push(&mut self, item: T) { + pub fn push(&mut self, item: BlockNumHash) { match self.capacity { RingBufferCapacity::Infinite => { - self.inner.push_back(item); // Add the new element + self.inner.push_back(item); } RingBufferCapacity::Limited(0) => { // Do nothing, reorg handling disabled @@ -62,21 +64,50 @@ impl RingBuffer { if self.inner.len() == limit { self.inner.pop_front(); // Remove the oldest element } - self.inner.push_back(item); // Add the new element + self.inner.push_back(item); } } } - /// Removes and returns the newest element from the buffer. - pub fn pop_back(&mut self) -> Option { - self.inner.pop_back() + /// Appends multiple `BlockNumHash` entries to the buffer. + pub fn append(&mut self, items: impl IntoIterator) { + for item in items { + self.push(item); + } + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns a reference to the oldest element in the buffer. + pub fn front(&self) -> Option<&BlockNumHash> { + self.inner.front() } /// Returns a reference to the newest element in the buffer. - pub fn back(&self) -> Option<&T> { + pub fn back(&self) -> Option<&BlockNumHash> { self.inner.back() } + /// Finds a block by its number. + /// + /// Since blocks are stored in sorted order, uses binary search for O(log n) lookup. + pub fn get(&self, number: BlockNumber) -> Option<&BlockNumHash> { + let idx = self.inner.partition_point(|entry| entry.number < number); + self.inner.get(idx).filter(|entry| entry.number == number) + } + + /// Truncates the buffer, keeping only blocks up to and including `keep_number`. + /// + /// Removes all blocks with number > `keep_number`. + /// Uses binary search since the buffer is sorted by block number. + pub fn truncate_after(&mut self, keep_number: BlockNumber) { + let idx = self.inner.partition_point(|entry| entry.number <= keep_number); + self.inner.truncate(idx); + } + /// Clears all elements currently stored in the buffer. pub fn clear(&mut self) { self.inner.clear(); @@ -85,12 +116,101 @@ impl RingBuffer { #[cfg(test)] mod tests { + use alloy::primitives::B256; + use super::*; + fn block(number: u64, byte: u8) -> BlockNumHash { + BlockNumHash::new(number, B256::repeat_byte(byte)) + } + #[test] fn zero_capacity_should_ignore_elements() { - let mut buf = RingBuffer::::new(RingBufferCapacity::Limited(0)); - buf.push(1); - assert!(buf.inner.is_empty()); + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(0)); + buf.push(block(1, 1)); + assert!(buf.is_empty()); + } + + #[test] + fn push_and_back_returns_newest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + assert_eq!(buf.back().unwrap().number, 101); + } + + #[test] + fn front_returns_oldest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + assert_eq!(buf.front().unwrap().number, 100); + } + + #[test] + fn get_finds_block_by_number() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + let h1 = B256::repeat_byte(1); + let h2 = B256::repeat_byte(2); + let h3 = B256::repeat_byte(3); + + buf.push(BlockNumHash::new(100, h1)); + buf.push(BlockNumHash::new(101, h2)); + buf.push(BlockNumHash::new(102, h3)); + + assert_eq!(buf.get(100).map(|b| b.hash), Some(h1)); + assert_eq!(buf.get(101).map(|b| b.hash), Some(h2)); + assert_eq!(buf.get(102).map(|b| b.hash), Some(h3)); + assert!(buf.get(99).is_none()); + assert!(buf.get(103).is_none()); + } + + #[test] + fn truncate_after_keeps_blocks_up_to_number() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + buf.push(block(102, 3)); + buf.push(block(103, 4)); + + buf.truncate_after(101); + + assert!(buf.get(100).is_some()); + assert!(buf.get(101).is_some()); + assert!(buf.get(102).is_none()); + assert!(buf.get(103).is_none()); + } + + #[test] + fn append_adds_multiple_blocks() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + buf.push(block(100, 1)); + buf.append([block(101, 2), block(102, 3)]); + + assert_eq!(buf.back().unwrap().number, 102); + assert!(buf.get(101).is_some()); + } + + #[test] + fn is_empty_works_correctly() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(10)); + assert!(buf.is_empty()); + buf.push(block(100, 1)); + assert!(!buf.is_empty()); + buf.clear(); + assert!(buf.is_empty()); + } + + #[test] + fn limited_capacity_evicts_oldest() { + let mut buf = RingBuffer::new(RingBufferCapacity::Limited(3)); + buf.push(block(100, 1)); + buf.push(block(101, 2)); + buf.push(block(102, 3)); + buf.push(block(103, 4)); + + assert!(buf.get(100).is_none()); // evicted + assert_eq!(buf.front().unwrap().number, 101); + assert_eq!(buf.back().unwrap().number, 103); } } diff --git a/src/event_scanner/block_range_handler.rs b/src/event_scanner/block_range_handler.rs index c6fe164d..7d86feaa 100644 --- a/src/event_scanner/block_range_handler.rs +++ b/src/event_scanner/block_range_handler.rs @@ -117,8 +117,8 @@ impl StreamHandler { // process all of the buffered results while let Some(result) = stream.next().await { - if let Ok(ScannerMessage::Data(logs)) = result.as_ref() && - logs.is_empty() + if let Ok(ScannerMessage::Data(logs)) = result.as_ref() + && logs.is_empty() { continue; } From 3259493c319be9b384a66d0b38b6fd6209d83ce6 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:25:11 +0530 Subject: [PATCH 2/4] fix: add on-chain hash verification for reorg detection --- src/block_range_scanner/common.rs | 4 +- src/block_range_scanner/reorg_handler.rs | 69 +++++++++++++++++++++--- src/event_scanner/block_range_handler.rs | 4 +- 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 412ee9bc..2d3764d3 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -82,8 +82,8 @@ pub(crate) async fn stream_live_blocks>( // only once the first relevant block is received from the subscription, and not before that; // otherwise callers might perform certain operations expecting the relevant blocks to start // coming, when in fact they are not. - if notify_after_first_block - && sender.try_stream(Notification::SwitchingToLive).await.is_closed() + if notify_after_first_block && + sender.try_stream(Notification::SwitchingToLive).await.is_closed() { return; } diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index 12180dea..5db78512 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -53,7 +53,8 @@ pub(crate) struct DefaultReorgHandler { } impl ReorgHandler for DefaultReorgHandler { - /// Checks an incoming block for reorgs using parent hash verification. + /// Checks an incoming block for reorgs using on-chain hash verification and + /// parent hash validation against a ring buffer. /// /// Returns `Ok(None)` if no reorg, or `Ok(Some(common_ancestor))` if a reorg was detected. /// @@ -77,6 +78,13 @@ impl ReorgHandler for DefaultReorgHandler { let incoming = BlockNumHash::new(header.number(), header.hash()); let parent_hash = header.parent_hash(); + // ── On-chain verification ──────────────────────────────────── + // Essential when called with a stored block (e.g., previous_batch_end). + // If the hash no longer exists on the canonical chain, the block was reorged. + if self.reorg_detected(incoming.hash).await? { + return self.find_on_chain_ancestor().await; + } + // ── Case 0: Empty buffer ───────────────────────────────────── if self.buffer.is_empty() { trace!(block_number = incoming.number, "Buffer empty, adding first block"); @@ -127,6 +135,53 @@ impl DefaultReorgHandler { Self { provider, buffer: RingBuffer::new(capacity) } } + /// Checks if a block hash still exists on the canonical chain via RPC. + async fn reorg_detected(&self, hash: B256) -> Result { + match self.provider.get_block_by_hash(hash).await { + Ok(_) => Ok(false), + Err(robust_provider::Error::BlockNotFound) => Ok(true), + Err(e) => Err(e.into()), + } + } + + /// Walks backwards through the buffer, checking each hash on-chain to find + /// the common ancestor after a reorg is detected. + async fn find_on_chain_ancestor(&mut self) -> Result, ScannerError> { + debug!("Reorg detected via on-chain verification, searching for common ancestor"); + + let (front_number, back_number) = match (self.buffer.front(), self.buffer.back()) { + (Some(f), Some(b)) => (f.number, b.number), + _ => return self.handle_deep_reorg().await, + }; + + for number in (front_number..=back_number).rev() { + if let Some(entry) = self.buffer.get(number) { + match self.provider.get_block_by_hash(entry.hash).await { + Ok(_) => { + debug!( + common_ancestor_number = number, + common_ancestor_hash = %entry.hash, + "Found common ancestor in buffer" + ); + self.buffer.truncate_after(number); + return self.return_reorg_ancestor(number).await; + } + Err(robust_provider::Error::BlockNotFound) => { + trace!( + block_number = number, + block_hash = %entry.hash, + "Buffered block was reorged, continuing walk-back" + ); + } + Err(e) => return Err(e.into()), + } + } + } + + // Entire buffer exhausted — all buffered blocks were reorged + self.handle_deep_reorg().await + } + /// Handles blocks that fall within the buffer range — the stream rewound to a block /// we already have stored. Covers Scenario 1 (clean rewind) and Scenario 2 (past fork). async fn handle_reorg_within_buffer( @@ -138,8 +193,8 @@ impl DefaultReorgHandler { let fork_number = incoming.number.saturating_sub(1); // Check if parent is in our buffer and matches - if let Some(stored) = self.buffer.get(fork_number) - && parent_hash == stored.hash + if let Some(stored) = self.buffer.get(fork_number) && + parent_hash == stored.hash { // ── Scenario 1: Clean rewind, fork point found ─────── debug!( @@ -195,8 +250,8 @@ impl DefaultReorgHandler { // Check if this block's parent matches our buffer let grandparent_number = parent_number.saturating_sub(1); - if let Some(stored) = self.buffer.get(grandparent_number) - && grandparent_hash == stored.hash + if let Some(stored) = self.buffer.get(grandparent_number) && + grandparent_hash == stored.hash { // Found the fork point let fork_point = grandparent_number; @@ -248,8 +303,8 @@ impl DefaultReorgHandler { } // Verify the first gap block connects to our buffer - if let Some(first_gap_parent) = gap_parent_hashes.first() - && *first_gap_parent != buffer_back.hash + if let Some(first_gap_parent) = gap_parent_hashes.first() && + *first_gap_parent != buffer_back.hash { // A reorg happened that affected our buffer tail. // Treat the first gap block as a reorg block. diff --git a/src/event_scanner/block_range_handler.rs b/src/event_scanner/block_range_handler.rs index 7d86feaa..c6fe164d 100644 --- a/src/event_scanner/block_range_handler.rs +++ b/src/event_scanner/block_range_handler.rs @@ -117,8 +117,8 @@ impl StreamHandler { // process all of the buffered results while let Some(result) = stream.next().await { - if let Ok(ScannerMessage::Data(logs)) = result.as_ref() - && logs.is_empty() + if let Ok(ScannerMessage::Data(logs)) = result.as_ref() && + logs.is_empty() { continue; } From 4c63dde1416eeb514bc2109c53e5b1d774177620 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:45:12 +0530 Subject: [PATCH 3/4] fix: add genesis guard in walk-back and gap continuity check --- src/block_range_scanner/reorg_handler.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index 5db78512..8c1d6644 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -228,6 +228,10 @@ impl DefaultReorgHandler { let mut current_number = incoming.number; loop { + if current_number == 0 { + return self.handle_deep_reorg().await; + } + let parent_number = current_number.saturating_sub(1); // If we've walked past our buffer, we can't verify further @@ -293,11 +297,28 @@ impl DefaultReorgHandler { ); // Fetch all gap blocks and record their parent hashes for verification - let mut gap_blocks = Vec::new(); + let mut gap_blocks: Vec = Vec::new(); let mut gap_parent_hashes: Vec = Vec::new(); for num in gap_start..gap_end { let block = self.provider.get_block_by_number(num.into()).await?; let h = block.header(); + + // Verify each gap block links to the previous one + if let Some(prev) = gap_blocks.last() && + h.parent_hash() != prev.hash + { + debug!( + expected_parent = %prev.hash, + actual_parent = %h.parent_hash(), + gap_block_number = h.number(), + "Gap chain discontinuity detected, treating as reorg" + ); + let entry = BlockNumHash::new(h.number(), h.hash()); + return self + .handle_reorg_within_buffer(incoming_block, entry, h.parent_hash()) + .await; + } + gap_parent_hashes.push(h.parent_hash()); gap_blocks.push(BlockNumHash::new(h.number(), h.hash())); } From 70e9610595928711fea2624805b8dc69a30bef62 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Mon, 23 Mar 2026 16:14:46 +0530 Subject: [PATCH 4/4] refactor: remove gap-fill RPC calls, simplify batch boundary handling --- src/block_range_scanner/reorg_handler.rs | 118 +++++------------------ 1 file changed, 25 insertions(+), 93 deletions(-) diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index 8c1d6644..005b981a 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -29,21 +29,27 @@ pub trait ReorgHandler { ) -> Result, ScannerError>; } -/// Default implementation of [`ReorgHandler`] that uses parent hash verification -/// against a ring buffer of `(block_number, block_hash)` pairs. +/// Default implementation of [`ReorgHandler`] that combines on-chain hash +/// verification with parent hash validation against a ring buffer of +/// `(block_number, block_hash)` pairs. /// -/// # Core Invariant +/// # Detection Strategy /// -/// Every incoming block's parent hash is verified against the buffer, rather than -/// checking on-chain hash existence. This is cleaner and more performant for common -/// reorg scenarios. +/// 1. **On-chain verification**: Every incoming block's hash is checked via `get_block_by_hash`. If +/// the hash no longer exists on the canonical chain, a reorg is detected immediately. This is +/// essential for stale/stored blocks (e.g., `previous_batch_end`) that may have been reorged +/// since last seen. +/// +/// 2. **Parent hash verification**: For sequential blocks (live subscription), the incoming block's +/// parent hash is compared against the buffer back. This avoids extra RPC calls in the common +/// case. /// /// # Scenarios Handled /// /// - **Happy path**: Next sequential block whose parent hash matches buffer back. /// - **Scenario 1 (stream rewind)**: Block within buffer range, parent matches → truncate + push. /// - **Scenario 2 (past fork)**: Block within buffer range, parent mismatch → walk back via RPC. -/// - **Scenario 3 (gap)**: Block beyond buffer head → fetch gap blocks, verify chain. +/// - **Scenario 3 (batch boundary)**: Non-consecutive block (already verified on-chain) → push. /// - **Scenario 4 (deep reorg)**: Block before buffer range → reset to finalized. /// - **Duplicate**: Same number and hash as buffer back → no-op. #[derive(Clone, Debug)] @@ -62,8 +68,8 @@ impl ReorgHandler for DefaultReorgHandler { /// /// * **Empty buffer** - First block is simply added; no reorg detection possible. /// * **Duplicate block** - Same number and hash as buffer back is a no-op. - /// * **Gap in block numbers** - Intermediate blocks are fetched and verified for chain - /// continuity. + /// * **Gap in block numbers** - Non-consecutive blocks (e.g., batch boundaries) are pushed + /// directly since the on-chain check already verified them. /// * **Deep reorg beyond buffer capacity** - Falls back to the finalized block. /// * **Network errors** - Propagated immediately, not treated as reorgs. #[cfg_attr( @@ -114,9 +120,17 @@ impl ReorgHandler for DefaultReorgHandler { return self.handle_reorg_within_buffer(block, incoming, parent_hash).await; } - // ── Scenario 3: Gap (block beyond buffer head) ────────────── + // ── Scenario 3: Non-consecutive block (batch boundary) ─────── + // The block was already verified on-chain by reorg_detected() above. + // Just push it — no need to fetch intermediate blocks. if incoming.number > buffer_back.number + 1 { - return self.handle_gap(block, incoming, parent_hash).await; + trace!( + incoming = incoming.number, + buffer_back = buffer_back.number, + "Non-consecutive block verified on-chain, pushing to buffer" + ); + self.buffer.push(incoming); + return Ok(None); } // ── Scenario 4: Block before buffer range (deep reorg) ────── @@ -277,88 +291,6 @@ impl DefaultReorgHandler { } } - /// Handles the case where the stream jumped ahead (gap between buffer back and incoming). - /// Fetches missing blocks and verifies chain continuity (Scenario 3). - async fn handle_gap( - &mut self, - incoming_block: &N::BlockResponse, - incoming: BlockNumHash, - parent_hash: B256, - ) -> Result, ScannerError> { - let buffer_back = *self.buffer.back().unwrap(); - let gap_start = buffer_back.number + 1; - let gap_end = incoming.number; // exclusive: we handle incoming separately - - debug!( - buffer_tip = buffer_back.number, - incoming = incoming.number, - gap_size = gap_end - gap_start, - "Gap detected, fetching intermediate blocks" - ); - - // Fetch all gap blocks and record their parent hashes for verification - let mut gap_blocks: Vec = Vec::new(); - let mut gap_parent_hashes: Vec = Vec::new(); - for num in gap_start..gap_end { - let block = self.provider.get_block_by_number(num.into()).await?; - let h = block.header(); - - // Verify each gap block links to the previous one - if let Some(prev) = gap_blocks.last() && - h.parent_hash() != prev.hash - { - debug!( - expected_parent = %prev.hash, - actual_parent = %h.parent_hash(), - gap_block_number = h.number(), - "Gap chain discontinuity detected, treating as reorg" - ); - let entry = BlockNumHash::new(h.number(), h.hash()); - return self - .handle_reorg_within_buffer(incoming_block, entry, h.parent_hash()) - .await; - } - - gap_parent_hashes.push(h.parent_hash()); - gap_blocks.push(BlockNumHash::new(h.number(), h.hash())); - } - - // Verify the first gap block connects to our buffer - if let Some(first_gap_parent) = gap_parent_hashes.first() && - *first_gap_parent != buffer_back.hash - { - // A reorg happened that affected our buffer tail. - // Treat the first gap block as a reorg block. - debug!( - expected_parent = %buffer_back.hash, - actual_parent = %first_gap_parent, - "Gap block doesn't connect to buffer, reorg in gap detected" - ); - let first_entry = gap_blocks[0]; - return self - .handle_reorg_within_buffer(incoming_block, first_entry, *first_gap_parent) - .await; - } - - // Verify incoming block connects to the gap chain - let expected_parent = gap_blocks.last().map_or(buffer_back.hash, |b| b.hash); - if parent_hash != expected_parent { - debug!( - expected = %expected_parent, - actual = %parent_hash, - "Incoming block doesn't connect to gap chain" - ); - return self.walk_back_to_find_fork(incoming_block, incoming).await; - } - - // Everything connects — push gap blocks and incoming into buffer - self.buffer.append(gap_blocks); - self.buffer.push(incoming); - - trace!(buffer_tip = incoming.number, "Gap filled successfully, no reorg"); - Ok(None) - } - /// Handles deep reorg where the incoming block is before the buffer range (Scenario 4). /// Falls back to the latest finalized block as a safe anchor. async fn handle_deep_reorg(&mut self) -> Result, ScannerError> {