Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,12 @@ Notes:

- Ordering is guaranteed only within a single subscription stream. There is no global ordering guarantee across multiple subscriptions.
- When the scanner detects a reorg, it emits `Notification::ReorgDetected`. Consumers should assume the same events might be delivered more than once around reorgs (i.e. benign duplicates are possible). Depending on the application's needs, this could be handled via idempotency/deduplication or by rolling back application state on reorg notifications.
- In **Historic** mode specifically, reorg checks are only performed while streaming the **non-finalized** portion of the requested range. Blocks at or below the chain's `finalized` height are streamed without reorg checks.

### Scanning Modes

- **Live** – scanner that streams new blocks as they arrive.
- **Historic** – scanner for streaming events from a past block range (default: genesis..=latest).
- **Historic** – scanner for streaming events from a past block range (default: genesis..=latest). For non-finalized blocks, the scanner may re-stream parts of the range if it detects a reorg, and will emit `Notification::ReorgDetected`.
- **Latest Events** – scanner that collects up to `count` most recent events per listener. Final delivery is in chronological order (oldest to newest).
- **Sync from Block** – scanner that streams events from a given start block up to the current confirmed tip, then automatically transitions to live streaming.
- **Sync from Latest Events** - scanner that collects the most recent `count` events, then automatically transitions to live streaming.
Expand Down
74 changes: 0 additions & 74 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
};
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network},
primitives::BlockNumber,
};
Expand Down Expand Up @@ -385,79 +384,6 @@ struct LiveStreamingState<N: Network> {
previous_batch_end: Option<N::BlockResponse>,
}

#[must_use]
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
)]
pub(crate) async fn stream_historical_range<N: Network, R: ReorgHandler<N>>(
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut R,
) -> Option<()> {
// NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized
// depth) just use zero.
// Since we use the finalized block number only to determine whether to run reorg checks
// or not, this is a "low-stakes" RPC call, for which, for simplicity, we can default to `0`
// even on errors. Here `0` is used because it effectively just enables reorg checks.
// If there was actually a provider problem, any subsequent provider call will catch and
// properly log it and return the error to the caller.
let finalized_block_num =
provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await.unwrap_or(0);

// no reorg check for finalized blocks
let finalized_batch_end = finalized_block_num.min(end);
trace!(
start = start,
finalized_batch_end = finalized_batch_end,
batch_count = RangeIterator::forward(start, finalized_batch_end, max_block_range).count(),
"Streaming finalized blocks (no reorg check)"
);

for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range");
if sender.try_stream(range).await.is_closed() {
return None; // channel closed
}
}

// If start > finalized_batch_end, the loop above was empty and we should
// continue from start. Otherwise, continue from after finalized_batch_end.
let batch_start = start.max(finalized_batch_end + 1);

// covers case when `end <= finalized`
if batch_start > end {
return Some(()); // we're done
}

// we have non-finalized block numbers to stream, a reorg can occur

// Possible minimal common ancestors when a reorg occurs:
// * start > finalized -> the common ancestor we care about is the block before `start`, that's
// where the stream should restart -> this is why we used `start - 1`
// * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart
// on `start + 1`
// * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only
// re-stream non-finalized blocks
let min_common_ancestor = (start.saturating_sub(1)).max(finalized_block_num);

stream_range_with_reorg_handling(
min_common_ancestor,
batch_start,
end,
max_block_range,
sender,
provider,
reorg_handler,
)
.await?;

Some(())
}

/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
#[cfg_attr(
feature = "tracing",
Expand Down
200 changes: 200 additions & 0 deletions src/block_range_scanner/historical_range_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::BlockNumber,
};
use robust_provider::RobustProvider;
use tokio::sync::mpsc;

use crate::{
Notification,
block_range_scanner::{common::BlockScannerResult, range_iterator::RangeIterator},
types::{ChannelState, TryStream},
};

pub(crate) struct HistoricalRangeHandler<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
start: BlockNumber,
end: BlockNumber,
sender: mpsc::Sender<BlockScannerResult>,
}

impl<N: Network> HistoricalRangeHandler<N> {
pub fn new(
provider: RobustProvider<N>,
max_block_range: u64,
start: BlockNumber,
end: BlockNumber,
sender: mpsc::Sender<BlockScannerResult>,
) -> Self {
Self { provider, max_block_range, start, end, sender }
}

pub fn run(self) {
tokio::spawn(async move {
_ = self.handle_stream_historical_range().await;
debug!("Historical range stream ended");
});
}

/// Run the handler in the current task.
///
/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
#[must_use]
pub async fn run_sync(self) -> ChannelState {
self.handle_stream_historical_range().await
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
async fn handle_stream_historical_range(self) -> ChannelState {
// Phase 1: Stream all finalized blocks without any reorg checks
let Some((non_finalized_start, finalized_block_num)) = self.stream_finalized_blocks().await
else {
return ChannelState::Closed;
};

// All blocks already finalized
if non_finalized_start > self.end {
return ChannelState::Open;
}

// Phase 2: Stream non-finalized blocks with reorg detection
self.stream_non_finalized_blocks(non_finalized_start, finalized_block_num).await
}

/// Streams finalized blocks without reorg checks.
///
/// Returns `(non_finalized_start, finalized_block_num)`, or `None` if the channel closed.
async fn stream_finalized_blocks(&self) -> Option<(BlockNumber, BlockNumber)> {
// NOTE: Edge case - If the chain is too young to expose finalized blocks (height <
// finalized depth) just use zero. Since we use the finalized block number only to
// determine whether to run reorg checks or not, this is a "low-stakes" RPC call.
let finalized_block_num = self
.provider
.get_block_number_by_id(BlockNumberOrTag::Finalized.into())
.await
.unwrap_or(0);

let finalized_batch_end = finalized_block_num.min(self.end);

let iter = RangeIterator::forward(self.start, finalized_batch_end, self.max_block_range);

for range in iter {
trace!(
range_start = *range.start(),
range_end = *range.end(),
"Streaming finalized range"
);
if self.sender.try_stream(range).await.is_closed() {
return None;
}
}

// If start > finalized_batch_end, the loop above was empty and we should
// continue from start. Otherwise, continue from after finalized_batch_end.
Some((self.start.max(finalized_batch_end + 1), finalized_block_num))
}

/// Streams non-finalized blocks with end-of-range reorg detection.
///
/// The handler takes a snapshot of the requested end block before streaming the
/// non-finalized portion of the range. After streaming that portion, it re-fetches the end
/// block and compares hashes. If a reorg is detected, it emits a
/// [`Notification::ReorgDetected`] with the common ancestor and re-streams the non-finalized
/// portion starting from `min_common_ancestor + 1`. This process repeats until the end block
/// remains stable.
///
/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
async fn stream_non_finalized_blocks(
&self,
non_finalized_start: BlockNumber,
finalized_block_num: BlockNumber,
) -> ChannelState {
let min_common_ancestor = non_finalized_start.saturating_sub(1).max(finalized_block_num);

let mut end_block = match self.provider.get_block_by_number(self.end.into()).await {
Ok(block) => block,
Err(e) => {
error!("Failed to get end block");
_ = self.sender.try_stream(e).await;
return ChannelState::Closed;
}
};

// the only way to break out of the loop is for no reorg to happen while streaming
loop {
let iter = RangeIterator::forward(non_finalized_start, self.end, self.max_block_range);

for range in iter {
trace!(
range_start = *range.start(),
range_end = *range.end(),
"Streaming non-finalized range"
);
if self.sender.try_stream(range).await.is_closed() {
return ChannelState::Closed;
}
}

let reorged_end_block = self.get_reorged_end_block(&end_block).await;

match reorged_end_block {
None => {
break;
}
Some(new_end_block) => {
// notify the receiver and update the tracked end block

if self
.sender
.try_stream(Notification::ReorgDetected {
common_ancestor: min_common_ancestor,
})
.await
.is_closed()
{
return ChannelState::Closed;
}
end_block = new_end_block;
}
}
}

debug!(end_block_hash = %end_block.header().hash(), "Historical sync completed");
ChannelState::Open
}

/// Checks if a reorg occurred by comparing block hashes.
///
/// Returns `Some(new_hash)` if a reorg was detected, `None` otherwise.
async fn get_reorged_end_block(
&self,
end_block: &N::BlockResponse,
) -> Option<N::BlockResponse> {
let new_end_block =
match self.provider.get_block_by_number(end_block.header().number().into()).await {
Ok(block) => block,
Err(e) => {
error!("Failed to fetch end block for reorg check");
_ = self.sender.try_stream(e).await;
return None;
}
};

let new_block_hash = new_end_block.header().hash();
if new_block_hash == end_block.header().hash() {
return None;
}

warn!(
end_block = end_block.header().number(),
old_hash = %end_block.header().hash(),
new_hash = %new_block_hash,
"Reorg detected, re-streaming non-finalized blocks"
);

Some(new_end_block)
}
}
1 change: 1 addition & 0 deletions src/block_range_scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod builder;
mod common;
mod historical_range_handler;
mod range_iterator;
mod reorg_handler;
mod rewind_handler;
Expand Down
26 changes: 9 additions & 17 deletions src/block_range_scanner/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use crate::{
block_range_scanner::{
RingBufferCapacity,
common::{self, BlockScannerResult},
historical_range_handler::HistoricalRangeHandler,
reorg_handler::DefaultReorgHandler,
rewind_handler::RewindHandler,
sync_handler::SyncHandler,
Expand Down Expand Up @@ -219,7 +220,6 @@ impl<N: Network> BlockRangeScanner<N> {
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);

let max_block_range = self.max_block_range;
let past_blocks_storage_capacity = self.past_blocks_storage_capacity;
let provider = self.provider.clone();

let (start_block, end_block) = tokio::try_join!(
Expand All @@ -243,22 +243,14 @@ impl<N: Network> BlockRangeScanner<N> {
"Starting historical block stream"
);

tokio::spawn(async move {
let mut reorg_handler =
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);

_ = common::stream_historical_range(
start_block_num,
end_block_num,
max_block_range,
&blocks_sender,
&provider,
&mut reorg_handler,
)
.await;

debug!("Historical block stream completed");
});
let handler = HistoricalRangeHandler::new(
provider,
max_block_range,
start_block_num,
end_block_num,
blocks_sender,
);
handler.run();

Ok(ReceiverStream::new(blocks_receiver))
}
Expand Down
Loading