diff --git a/Cargo.toml b/Cargo.toml index 51baa85..2f8c351 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.8.0" +version = "0.8.1" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.8.0", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.8.0", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.8.0", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.8.0", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.8.0", path = "./crates/cold-sql" } -signet-storage = { version = "0.8.0", path = "./crates/storage" } -signet-storage-types = { version = "0.8.0", path = "./crates/types" } +signet-hot = { version = "0.8.1", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.8.1", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.8.1", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.8.1", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.8.1", path = "./crates/cold-sql" } +signet-storage = { version = "0.8.1", path = "./crates/storage" } +signet-storage-types = { version = "0.8.1", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } diff --git a/crates/cold/src/conformance.rs b/crates/cold/src/conformance.rs index 72f13d2..8102430 100644 --- a/crates/cold/src/conformance.rs +++ b/crates/cold/src/conformance.rs @@ -5,8 +5,8 @@ //! a custom backend, call the test functions with your backend instance. use crate::{ - BlockData, ColdResult, ColdStorage, ColdStorageBackend, ColdStorageError, Filter, - HeaderSpecifier, ReceiptSpecifier, RpcLog, TransactionSpecifier, + BlockData, ColdResult, ColdStorage, ColdStorageBackend, ColdStorageError, ErasedBackend, + Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, TransactionSpecifier, }; use alloy::{ consensus::{ @@ -43,6 +43,32 @@ pub async fn conformance(backend: B) -> ColdResult<()> { Ok(()) } +/// Run the conformance suite against `backend` after erasing it +/// through [`crate::DynColdStorageBackend`]. +/// +/// Exercises the same contract as [`conformance`] but routes every +/// call through [`ErasedBackend`], validating that the erased +/// dispatch path upholds the trait contract. +pub async fn conformance_erased(backend: B) -> ColdResult<()> { + let erased = ErasedBackend::new(backend); + let cancel = CancellationToken::new(); + let handle = ColdStorage::new(erased, cancel.clone()); + test_empty_storage(&handle).await?; + test_append_and_read_header(&handle).await?; + test_header_hash_lookup(&handle).await?; + test_transaction_lookups(&handle).await?; + test_receipt_lookups(&handle).await?; + test_truncation(&handle).await?; + test_batch_append(&handle).await?; + test_confirmation_metadata(&handle).await?; + test_cold_receipt_metadata(&handle).await?; + test_get_logs(&handle).await?; + test_stream_logs(&handle).await?; + test_drain_above(&handle).await?; + cancel.cancel(); + Ok(()) +} + /// Create test block data for conformance tests. /// /// Creates a minimal valid block with the given block number. diff --git a/crates/cold/src/dyn_backend.rs b/crates/cold/src/dyn_backend.rs new file mode 100644 index 0000000..6c1bec1 --- /dev/null +++ b/crates/cold/src/dyn_backend.rs @@ -0,0 +1,491 @@ +//! Object-safe mirror of [`ColdStorageBackend`]. +//! +//! [`DynColdStorageBackend`] re-declares every method on +//! [`ColdStorageRead`], [`ColdStorageWrite`], and [`ColdStorageBackend`] +//! with an explicit [`StorageFuture`] return type so the trait is +//! object-safe. A blanket impl auto-implements it for every +//! `B: ColdStorageBackend`, and [`ErasedBackend`] re-implements the +//! strong traits by delegating to the boxed methods. +//! +//! # Plumbing, Not API +//! +//! This trait exists so [`ColdStorage`]'s default type parameter +//! ([`ErasedBackend`]) is nameable in error messages and downstream +//! signatures. Backends should implement [`ColdStorageBackend`] — the +//! blanket impl handles this trait. +//! +//! # Why a Newtype, Not a Type Alias +//! +//! [`ErasedBackend`] is a newtype wrapping `Arc` +//! rather than a plain alias. A type alias exposes the dyn trait-object +//! lifetime to trait resolution; when an `ErasedBackend` is captured +//! into a spawned future, rustc invents a fresh `'0` lifetime for the +//! dyn object and asks `for<'0> Arc: +//! ColdStorageRead`. The `'static`-bounded impl does not satisfy this +//! HRTB and downstream `Send` checks fail. A concrete newtype has no +//! dyn lifetime in its surface type, so resolution is trivial. +//! +//! # Filter Cloning on the Erased Path +//! +//! The [`ColdStorageRead`] impl for [`ErasedBackend`] clones the +//! [`Filter`] inside `get_logs` and `produce_log_stream`. The dyn +//! methods unify `&self` and `&Filter` into a single lifetime, which +//! cannot be expressed by the independent-lifetime trait signatures +//! without an owned bridge. The concrete `ColdStorage` path is +//! unaffected. +//! +//! # Maintainer Note: Recursion Hazard for Borrowed Arguments +//! +//! Any method on the [`ErasedBackend`] impls that cannot use the +//! direct `self.0.dyn_(...)` form (because a borrowed argument +//! forces it through a `self.clone()` + `async move` bridge) MUST +//! dispatch via qualified path on the inner trait object, e.g. +//! `DynColdStorageBackend::dyn_(this.0.as_ref(), ...)`. +//! +//! Writing `this.dyn_(...)` on a cloned [`ErasedBackend`] +//! resolves to the blanket impl (`ErasedBackend: ColdStorageBackend` +//! ⇒ `ErasedBackend: DynColdStorageBackend`), which calls back into +//! the strong-trait impl and recurses infinitely. See `get_logs` and +//! `produce_log_stream` for the canonical pattern. +//! +//! [`ColdStorage`]: crate::ColdStorage +//! [`ColdStorageBackend`]: crate::ColdStorageBackend +//! [`ColdStorageRead`]: crate::ColdStorageRead +//! [`ColdStorageWrite`]: crate::ColdStorageWrite + +use crate::{ + BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageRead, ColdStorageWrite, + Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, + StreamParams, TransactionSpecifier, ZenithHeaderSpecifier, +}; +use alloy::primitives::BlockNumber; +use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; + +/// Boxed, pinned, `Send`-able future returned from object-safe +/// [`DynColdStorageBackend`] methods. +pub type StorageFuture<'a, T> = Pin + Send + 'a>>; + +/// Type-erased cold storage backend, shareable across tasks. +/// +/// This is the default `B` for [`ColdStorage`](crate::ColdStorage): a +/// handle written as plain `ColdStorage` uses this backend. Construct +/// one with [`ErasedBackend::new`] or +/// [`ColdStorage::new_erased`](crate::ColdStorage::new_erased). +/// +/// # Why a Newtype +/// +/// Wrapping the `Arc` in a struct keeps the trait-object +/// lifetime out of the public type signature. See the module-level +/// docs for the HRTB resolution problem this avoids. +pub struct ErasedBackend(Arc); + +impl ErasedBackend { + /// Erase a concrete backend behind `Arc`. + pub fn new(backend: B) -> Self { + Self(Arc::new(backend)) + } + + /// Wrap an existing trait object. + /// + /// Prefer [`ErasedBackend::new`] for concrete backends. Use this + /// only when you already hold an `Arc`, + /// e.g. when bridging from another type-erased channel. + pub const fn from_arc(arc: Arc) -> Self { + Self(arc) + } + + /// Borrow the inner trait object. + pub fn as_dyn(&self) -> &(dyn DynColdStorageBackend + 'static) { + &*self.0 + } + + /// Consume the newtype and return the inner `Arc`. + pub fn into_arc(self) -> Arc { + self.0 + } +} + +impl Clone for ErasedBackend { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl std::fmt::Debug for ErasedBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ErasedBackend").finish() + } +} + +/// Object-safe mirror of [`ColdStorageBackend`]. Auto-implemented by a +/// blanket impl over every `B: ColdStorageBackend`; do not implement +/// directly. +/// +/// [`ColdStorageBackend`]: crate::ColdStorageBackend +pub trait DynColdStorageBackend: Send + Sync + 'static { + /// Get a header by specifier. + fn dyn_get_header<'a>( + &'a self, + spec: HeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get multiple headers by specifiers. + fn dyn_get_headers<'a>( + &'a self, + specs: Vec, + ) -> StorageFuture<'a, ColdResult>>>; + + /// Get a transaction by specifier, with block confirmation metadata. + fn dyn_get_transaction<'a>( + &'a self, + spec: TransactionSpecifier, + ) -> StorageFuture<'a, ColdResult>>>; + + /// Get all transactions in a block. + fn dyn_get_transactions_in_block<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get the number of transactions in a block. + fn dyn_get_transaction_count<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>; + + /// Get a receipt by specifier. + fn dyn_get_receipt<'a>( + &'a self, + spec: ReceiptSpecifier, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get all receipts in a block. + fn dyn_get_receipts_in_block<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get signet events by specifier. + fn dyn_get_signet_events<'a>( + &'a self, + spec: SignetEventsSpecifier, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get a zenith header by specifier. + fn dyn_get_zenith_header<'a>( + &'a self, + spec: ZenithHeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get multiple zenith headers by specifier. + fn dyn_get_zenith_headers<'a>( + &'a self, + spec: ZenithHeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>>; + + /// Get the latest block number in storage. + fn dyn_get_latest_block<'a>(&'a self) -> StorageFuture<'a, ColdResult>>; + + /// Filter logs by block range, address, and topics. + fn dyn_get_logs<'a>( + &'a self, + filter: &'a Filter, + max_logs: usize, + ) -> StorageFuture<'a, ColdResult>>; + + /// Produce a log stream by iterating blocks and sending matching logs. + fn dyn_produce_log_stream<'a>( + &'a self, + filter: &'a Filter, + params: StreamParams, + ) -> StorageFuture<'a, ()>; + + /// Append a single block to cold storage. + fn dyn_append_block<'a>(&'a self, data: BlockData) -> StorageFuture<'a, ColdResult<()>>; + + /// Append multiple blocks to cold storage. + fn dyn_append_blocks<'a>(&'a self, data: Vec) -> StorageFuture<'a, ColdResult<()>>; + + /// Truncate all data above the given block number (exclusive). + fn dyn_truncate_above<'a>(&'a self, block: BlockNumber) -> StorageFuture<'a, ColdResult<()>>; + + /// Read and remove all blocks above the given block number. + fn dyn_drain_above<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>>>; + + /// Configured read deadline, if any. + fn dyn_read_timeout(&self) -> Option; + + /// Configured write deadline, if any. + fn dyn_write_timeout(&self) -> Option; +} + +impl DynColdStorageBackend for B { + fn dyn_get_header<'a>( + &'a self, + spec: HeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_header(self, spec)) + } + + fn dyn_get_headers<'a>( + &'a self, + specs: Vec, + ) -> StorageFuture<'a, ColdResult>>> { + Box::pin(::get_headers(self, specs)) + } + + fn dyn_get_transaction<'a>( + &'a self, + spec: TransactionSpecifier, + ) -> StorageFuture<'a, ColdResult>>> { + Box::pin(::get_transaction(self, spec)) + } + + fn dyn_get_transactions_in_block<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_transactions_in_block(self, block)) + } + + fn dyn_get_transaction_count<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult> { + Box::pin(::get_transaction_count(self, block)) + } + + fn dyn_get_receipt<'a>( + &'a self, + spec: ReceiptSpecifier, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_receipt(self, spec)) + } + + fn dyn_get_receipts_in_block<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_receipts_in_block(self, block)) + } + + fn dyn_get_signet_events<'a>( + &'a self, + spec: SignetEventsSpecifier, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_signet_events(self, spec)) + } + + fn dyn_get_zenith_header<'a>( + &'a self, + spec: ZenithHeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_zenith_header(self, spec)) + } + + fn dyn_get_zenith_headers<'a>( + &'a self, + spec: ZenithHeaderSpecifier, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_zenith_headers(self, spec)) + } + + fn dyn_get_latest_block<'a>(&'a self) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_latest_block(self)) + } + + fn dyn_get_logs<'a>( + &'a self, + filter: &'a Filter, + max_logs: usize, + ) -> StorageFuture<'a, ColdResult>> { + Box::pin(::get_logs(self, filter, max_logs)) + } + + fn dyn_produce_log_stream<'a>( + &'a self, + filter: &'a Filter, + params: StreamParams, + ) -> StorageFuture<'a, ()> { + Box::pin(::produce_log_stream(self, filter, params)) + } + + fn dyn_append_block<'a>(&'a self, data: BlockData) -> StorageFuture<'a, ColdResult<()>> { + Box::pin(::append_block(self, data)) + } + + fn dyn_append_blocks<'a>(&'a self, data: Vec) -> StorageFuture<'a, ColdResult<()>> { + Box::pin(::append_blocks(self, data)) + } + + fn dyn_truncate_above<'a>(&'a self, block: BlockNumber) -> StorageFuture<'a, ColdResult<()>> { + Box::pin(::truncate_above(self, block)) + } + + fn dyn_drain_above<'a>( + &'a self, + block: BlockNumber, + ) -> StorageFuture<'a, ColdResult>>> { + Box::pin(::drain_above(self, block)) + } + + fn dyn_read_timeout(&self) -> Option { + ::read_timeout(self) + } + + fn dyn_write_timeout(&self) -> Option { + ::write_timeout(self) + } +} + +// Compile-time check that the trait is object-safe. +const _: fn() = || { + fn _assert_object_safe(_: &dyn DynColdStorageBackend) {} +}; + +impl ColdStorageRead for ErasedBackend { + fn get_header( + &self, + spec: HeaderSpecifier, + ) -> impl Future>> + Send { + self.0.dyn_get_header(spec) + } + + fn get_headers( + &self, + specs: Vec, + ) -> impl Future>>> + Send { + self.0.dyn_get_headers(specs) + } + + fn get_transaction( + &self, + spec: TransactionSpecifier, + ) -> impl Future>>> + Send { + self.0.dyn_get_transaction(spec) + } + + fn get_transactions_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + self.0.dyn_get_transactions_in_block(block) + } + + fn get_transaction_count( + &self, + block: BlockNumber, + ) -> impl Future> + Send { + self.0.dyn_get_transaction_count(block) + } + + fn get_receipt( + &self, + spec: ReceiptSpecifier, + ) -> impl Future>> + Send { + self.0.dyn_get_receipt(spec) + } + + fn get_receipts_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + self.0.dyn_get_receipts_in_block(block) + } + + fn get_signet_events( + &self, + spec: SignetEventsSpecifier, + ) -> impl Future>> + Send { + self.0.dyn_get_signet_events(spec) + } + + fn get_zenith_header( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + self.0.dyn_get_zenith_header(spec) + } + + fn get_zenith_headers( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + self.0.dyn_get_zenith_headers(spec) + } + + fn get_latest_block(&self) -> impl Future>> + Send { + self.0.dyn_get_latest_block() + } + + fn get_logs( + &self, + filter: &Filter, + max_logs: usize, + ) -> impl Future>> + Send { + let this = self.clone(); + let filter = filter.clone(); + // Call dyn_get_logs via the inner trait object directly (not through + // the newtype's blanket DynColdStorageBackend impl), which would + // re-enter ColdStorageRead::get_logs on ErasedBackend and recurse + // infinitely. + async move { DynColdStorageBackend::dyn_get_logs(this.0.as_ref(), &filter, max_logs).await } + } + + fn produce_log_stream( + &self, + filter: &Filter, + params: StreamParams, + ) -> impl Future + Send { + let this = self.clone(); + let filter = filter.clone(); + // Same recursion hazard as `get_logs` above — call through the + // inner Arc + qualified path so dispatch lands on the trait + // object's vtable, not the newtype's blanket impl. + async move { + DynColdStorageBackend::dyn_produce_log_stream(this.0.as_ref(), &filter, params).await + } + } +} + +impl ColdStorageWrite for ErasedBackend { + fn append_block(&self, data: BlockData) -> impl Future> + Send { + self.0.dyn_append_block(data) + } + + fn append_blocks(&self, data: Vec) -> impl Future> + Send { + self.0.dyn_append_blocks(data) + } + + fn truncate_above(&self, block: BlockNumber) -> impl Future> + Send { + self.0.dyn_truncate_above(block) + } +} + +impl ColdStorageBackend for ErasedBackend { + fn read_timeout(&self) -> Option { + self.0.dyn_read_timeout() + } + + fn write_timeout(&self) -> Option { + self.0.dyn_write_timeout() + } + + fn drain_above( + &self, + block: BlockNumber, + ) -> impl Future>>> + Send { + self.0.dyn_drain_above(block) + } +} + +// Compile-time check that `ErasedBackend` satisfies the bound +// `ColdStorage` will require. +const _: fn() = || { + const fn _assert_bound() {} + _assert_bound::(); +}; diff --git a/crates/cold/src/handle.rs b/crates/cold/src/handle.rs index 6547b05..97d0c2a 100644 --- a/crates/cold/src/handle.rs +++ b/crates/cold/src/handle.rs @@ -14,9 +14,10 @@ //! participate in the drain barrier. use crate::{ - BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, Confirmed, Filter, - HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, StreamParams, - TransactionSpecifier, ZenithHeaderSpecifier, cache::ColdCache, metrics, + BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, Confirmed, + ErasedBackend, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog, + SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier, + cache::ColdCache, metrics, }; use alloy::primitives::{B256, BlockNumber}; use parking_lot::Mutex; @@ -111,7 +112,7 @@ pub(crate) struct Inner { /// `ColdStorage` is cheap to [`Clone`] — it is just an `Arc` around the /// shared inner state. All operations dispatch through semaphore-gated /// [`TaskTracker`]-spawned tasks. -pub struct ColdStorage { +pub struct ColdStorage { inner: Arc>, } @@ -692,3 +693,19 @@ impl ColdStorage { .await } } + +impl ColdStorage { + /// Construct a type-erased cold storage handle. + /// + /// Wraps `backend` in [`ErasedBackend`] so the resulting handle + /// has no `B` type parameter to propagate through downstream + /// signatures. Equivalent to + /// `ColdStorage::new(ErasedBackend::new(backend), cancel)`. + /// + /// Choose this constructor when you want runtime swappability of + /// the backend; use [`new`](Self::new) directly for fully + /// monomorphized call sites. + pub fn new_erased(backend: B, cancel: CancellationToken) -> Self { + Self::new(ErasedBackend::new(backend), cancel) + } +} diff --git a/crates/cold/src/lib.rs b/crates/cold/src/lib.rs index 7a57891..2a8b3c9 100644 --- a/crates/cold/src/lib.rs +++ b/crates/cold/src/lib.rs @@ -135,28 +135,38 @@ #![cfg_attr(docsrs, feature(doc_cfg))] mod cache; + +mod cold_receipt; +pub use cold_receipt::ColdReceipt; + +pub mod connect; +pub use connect::ColdConnect; + mod error; -mod metrics; pub use error::{ColdResult, ColdStorageError}; + mod handle; pub use handle::ColdStorage; + +mod metrics; + mod specifier; -pub use alloy::rpc::types::{Filter, Log as RpcLog}; -pub use signet_storage_types::{Confirmed, Recovered}; -pub use specifier::{ - HeaderSpecifier, ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, - ZenithHeaderSpecifier, -}; -mod cold_receipt; -pub use cold_receipt::ColdReceipt; mod stream; pub use stream::{StreamParams, produce_log_stream_default}; + +mod dyn_backend; +pub use dyn_backend::{DynColdStorageBackend, ErasedBackend, StorageFuture}; + mod traits; pub use traits::{BlockData, ColdStorageBackend, ColdStorageRead, ColdStorageWrite, LogStream}; -pub mod connect; -pub use connect::ColdConnect; +pub use alloy::rpc::types::{Filter, Log as RpcLog}; +pub use signet_storage_types::{Confirmed, Recovered}; +pub use specifier::{ + HeaderSpecifier, ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, + ZenithHeaderSpecifier, +}; /// Conformance tests for cold storage backends. #[cfg(any(test, feature = "test-utils"))] diff --git a/crates/cold/src/mem.rs b/crates/cold/src/mem.rs index 37d0a99..88f1a7b 100644 --- a/crates/cold/src/mem.rs +++ b/crates/cold/src/mem.rs @@ -387,7 +387,7 @@ impl ColdStorageBackend for MemColdBackend { mod test { use super::*; - use crate::conformance::conformance; + use crate::conformance::{conformance, conformance_erased}; #[tokio::test] async fn mem_backend_conformance() { @@ -395,6 +395,173 @@ mod test { conformance(backend).await.unwrap(); } + #[tokio::test] + async fn erased_conformance() { + conformance_erased(MemColdBackend::new()).await.unwrap(); + } + + #[tokio::test] + async fn dyn_drain_above_invokes_backend_override() { + use crate::{ + BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageRead, + ColdStorageWrite, DynColdStorageBackend, Filter, HeaderSpecifier, ReceiptSpecifier, + RpcLog, SignetEventsSpecifier, StreamParams, TransactionSpecifier, + ZenithHeaderSpecifier, + }; + use alloy::primitives::BlockNumber; + use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; + use std::{ + future::Future, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + }; + + #[derive(Clone)] + struct DrainTracker { + inner: MemColdBackend, + called: Arc, + } + + impl ColdStorageRead for DrainTracker { + fn get_header( + &self, + spec: HeaderSpecifier, + ) -> impl Future>> + Send { + self.inner.get_header(spec) + } + + fn get_headers( + &self, + specs: Vec, + ) -> impl Future>>> + Send { + self.inner.get_headers(specs) + } + + fn get_transaction( + &self, + spec: TransactionSpecifier, + ) -> impl Future>>> + Send + { + self.inner.get_transaction(spec) + } + + fn get_transactions_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + self.inner.get_transactions_in_block(block) + } + + fn get_transaction_count( + &self, + block: BlockNumber, + ) -> impl Future> + Send { + self.inner.get_transaction_count(block) + } + + fn get_receipt( + &self, + spec: ReceiptSpecifier, + ) -> impl Future>> + Send { + self.inner.get_receipt(spec) + } + + fn get_receipts_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + self.inner.get_receipts_in_block(block) + } + + fn get_signet_events( + &self, + spec: SignetEventsSpecifier, + ) -> impl Future>> + Send { + self.inner.get_signet_events(spec) + } + + fn get_zenith_header( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + self.inner.get_zenith_header(spec) + } + + fn get_zenith_headers( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + self.inner.get_zenith_headers(spec) + } + + fn get_latest_block( + &self, + ) -> impl Future>> + Send { + self.inner.get_latest_block() + } + + fn get_logs( + &self, + filter: &Filter, + max_logs: usize, + ) -> impl Future>> + Send { + self.inner.get_logs(filter, max_logs) + } + + fn produce_log_stream( + &self, + filter: &Filter, + params: StreamParams, + ) -> impl Future + Send { + self.inner.produce_log_stream(filter, params) + } + } + + impl ColdStorageWrite for DrainTracker { + fn append_block(&self, data: BlockData) -> impl Future> + Send { + self.inner.append_block(data) + } + + fn append_blocks( + &self, + data: Vec, + ) -> impl Future> + Send { + self.inner.append_blocks(data) + } + + fn truncate_above( + &self, + block: BlockNumber, + ) -> impl Future> + Send { + self.inner.truncate_above(block) + } + } + + impl ColdStorageBackend for DrainTracker { + fn drain_above( + &self, + block: BlockNumber, + ) -> impl Future>>> + Send { + let called = self.called.clone(); + let fut = self.inner.drain_above(block); + async move { + called.store(true, Ordering::SeqCst); + fut.await + } + } + } + + let called = Arc::new(AtomicBool::new(false)); + let backend = DrainTracker { inner: MemColdBackend::new(), called: called.clone() }; + let erased: Arc = Arc::new(backend); + + let _ = erased.dyn_drain_above(0).await.unwrap(); + + assert!(called.load(Ordering::SeqCst), "overridden drain_above must run through erasure"); + } + #[tokio::test] async fn write_trait_takes_self_shared_ref() { // Compile-fence: proves ColdStorageWrite can be called on &self. diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 75b6d5d..c389e22 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -83,6 +83,7 @@ pub use signet_cold_sql::SqlConnector; // Re-export key types for convenience pub use signet_cold::{ ColdStorage, ColdStorageBackend, ColdStorageError, ColdStorageRead, ColdStorageWrite, + DynColdStorageBackend, ErasedBackend, }; pub use signet_cold_mdbx::MdbxColdBackend; pub use signet_hot::{ diff --git a/crates/storage/src/unified.rs b/crates/storage/src/unified.rs index e8c1b8d..f525d12 100644 --- a/crates/storage/src/unified.rs +++ b/crates/storage/src/unified.rs @@ -72,12 +72,17 @@ pub struct DrainedBlock { /// // Handle reorgs /// storage.unwind_above(reorg_block).await?; /// ``` -#[derive(Debug)] -pub struct UnifiedStorage { +pub struct UnifiedStorage { hot: H, cold: ColdStorage, } +impl std::fmt::Debug for UnifiedStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UnifiedStorage").finish_non_exhaustive() + } +} + impl UnifiedStorage { /// Create a new unified storage instance. pub const fn new(hot: H, cold: ColdStorage) -> Self { @@ -93,7 +98,25 @@ impl UnifiedStorage { let cold = ColdStorage::new(cold_backend, cancel_token); Self::new(hot, cold) } +} + +impl UnifiedStorage { + /// Spawn a unified storage with a type-erased cold backend. + /// + /// Erases the concrete cold backend behind + /// [`signet_cold::ErasedBackend`], so callers can hold a + /// `UnifiedStorage` without propagating a backend generic. + pub fn spawn_erased( + hot: H, + cold_backend: B, + cancel_token: CancellationToken, + ) -> Self { + let cold = ColdStorage::new_erased(cold_backend, cancel_token); + Self::new(hot, cold) + } +} +impl UnifiedStorage { /// Get a reference to the hot storage backend. pub const fn hot(&self) -> &H { &self.hot