diff --git a/Cargo.lock b/Cargo.lock index 0e6914410..a421b378f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -999,6 +999,7 @@ dependencies = [ "datafusion", "datasets-common", "datasets-derived", + "datasets-raw", "evm-rpc-datasets", "firehose-datasets", "futures", @@ -2720,6 +2721,7 @@ dependencies = [ "datafusion-datasource", "datafusion-tracing", "datasets-common", + "datasets-raw", "futures", "governor 0.10.4", "itertools 0.14.0", @@ -4030,6 +4032,7 @@ dependencies = [ name = "datasets-common" version = "0.1.0" dependencies = [ + "alloy", "datafusion", "hex", "js-runtime", @@ -4091,8 +4094,18 @@ dependencies = [ name = "datasets-raw" version = "0.1.0" dependencies = [ + "alloy", + "arrow", + "async-stream", + "backon", "datafusion", "datasets-common", + "futures", + "monitoring", + "serde", + "thiserror 2.0.18", + "tokio", + "tracing", ] [[package]] @@ -4398,6 +4411,7 @@ dependencies = [ "datafusion", "datasets-common", "datasets-derived", + "datasets-raw", "futures", "metadata-db", "monitoring", @@ -4652,6 +4666,7 @@ dependencies = [ "async-stream", "common", "datasets-common", + "datasets-raw", "futures", "monitoring", "schemars 1.2.0", @@ -4792,6 +4807,7 @@ dependencies = [ "async-stream", "common", "datasets-common", + "datasets-raw", "futures", "hex", "monitoring", @@ -9752,6 +9768,7 @@ dependencies = [ "bs58", "common", "datasets-common", + "datasets-raw", "fs-err", "futures", "governor 0.10.4", diff --git a/crates/core/common/Cargo.toml b/crates/core/common/Cargo.toml index ca7330ef3..421b7c00c 100644 --- a/crates/core/common/Cargo.toml +++ b/crates/core/common/Cargo.toml @@ -18,6 +18,7 @@ datafusion.workspace = true datafusion-datasource.workspace = true datafusion-tracing.workspace = true datasets-common = { path = "../datasets-common" } +datasets-raw = { path = "../datasets-raw" } futures.workspace = true governor.workspace = true itertools.workspace = true diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index 49087333f..ce9ab9484 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -28,11 +28,11 @@ use object_store::ObjectStore; use url::Url; use crate::{ - BlockNum, BoxError, LogicalCatalog, + BlockNum, BlockRange, BoxError, LogicalCatalog, metadata::{ FileMetadata, parquet::ParquetMeta, - segments::{BlockRange, Chain, Segment, canonical_chain, missing_ranges}, + segments::{Chain, Segment, canonical_chain, missing_ranges}, }, sql::TableReference, }; diff --git a/crates/core/common/src/evm/tables/blocks.rs b/crates/core/common/src/evm/tables/blocks.rs index 17f97c9b7..7c93a90e5 100644 --- a/crates/core/common/src/evm/tables/blocks.rs +++ b/crates/core/common/src/evm/tables/blocks.rs @@ -4,12 +4,14 @@ use arrow::{ array::{ArrayRef, BinaryBuilder, UInt64Builder}, datatypes::{DataType, Field, Schema, SchemaRef}, }; +use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; use crate::{ - BYTES32_TYPE, BoxError, Bytes32, Bytes32ArrayBuilder, EVM_ADDRESS_TYPE as ADDRESS_TYPE, - EVM_CURRENCY_TYPE, EvmAddress as Address, EvmAddressArrayBuilder, EvmCurrency, - EvmCurrencyArrayBuilder, RawTableRows, SPECIAL_BLOCK_NUM, Table, Timestamp, - TimestampArrayBuilder, arrow, metadata::segments::BlockRange, timestamp_type, + BYTES32_TYPE, BlockRange, BoxError, Bytes32, Bytes32ArrayBuilder, + EVM_ADDRESS_TYPE as ADDRESS_TYPE, EVM_CURRENCY_TYPE, EvmAddress as Address, + EvmAddressArrayBuilder, EvmCurrency, EvmCurrencyArrayBuilder, SPECIAL_BLOCK_NUM, Timestamp, + TimestampArrayBuilder, arrow, timestamp_type, }; static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); @@ -197,7 +199,7 @@ impl BlockRowsBuilder { self.requests_hash.append_option(*requests_hash); } - pub fn build(self, range: BlockRange) -> Result { + pub fn build(self, range: BlockRange) -> Result { let Self { mut special_block_num, mut block_num, @@ -252,7 +254,7 @@ impl BlockRowsBuilder { Arc::new(requests_hash.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/core/common/src/evm/tables/logs.rs b/crates/core/common/src/evm/tables/logs.rs index 1662b85de..5585dfd74 100644 --- a/crates/core/common/src/evm/tables/logs.rs +++ b/crates/core/common/src/evm/tables/logs.rs @@ -4,11 +4,13 @@ use arrow::{ array::{ArrayRef, BinaryBuilder, UInt32Builder, UInt64Builder}, datatypes::{DataType, Field, Schema, SchemaRef}, }; +use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; use crate::{ - BYTES32_TYPE, BoxError, Bytes32, Bytes32ArrayBuilder, EVM_ADDRESS_TYPE as ADDRESS_TYPE, - EvmAddress as Address, EvmAddressArrayBuilder, RawTableRows, SPECIAL_BLOCK_NUM, Table, - Timestamp, TimestampArrayBuilder, arrow, metadata::segments::BlockRange, timestamp_type, + BYTES32_TYPE, BlockRange, BoxError, Bytes32, Bytes32ArrayBuilder, + EVM_ADDRESS_TYPE as ADDRESS_TYPE, EvmAddress as Address, EvmAddressArrayBuilder, + SPECIAL_BLOCK_NUM, Timestamp, TimestampArrayBuilder, arrow, timestamp_type, }; static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); @@ -146,7 +148,7 @@ impl LogRowsBuilder { self.log_index.append_value(*log_index); } - pub fn build(self, range: BlockRange) -> Result { + pub fn build(self, range: BlockRange) -> Result { let Self { mut special_block_num, block_hash, @@ -179,7 +181,7 @@ impl LogRowsBuilder { Arc::new(data.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/core/common/src/lib.rs b/crates/core/common/src/lib.rs index bc16eb7e5..9e3ca44de 100644 --- a/crates/core/common/src/lib.rs +++ b/crates/core/common/src/lib.rs @@ -13,7 +13,6 @@ pub mod stream_helpers; pub mod utils; use std::{ - future::Future, ops::RangeInclusive, time::{Duration, SystemTime}, }; @@ -22,26 +21,18 @@ use arrow::{array::FixedSizeBinaryArray, datatypes::DataType}; pub use arrow_helpers::*; pub use catalog::logical::*; use datafusion::arrow::{ - array::{ArrayRef, AsArray as _, RecordBatch}, - datatypes::{DECIMAL128_MAX_PRECISION, TimeUnit, UInt64Type}, + datatypes::{DECIMAL128_MAX_PRECISION, TimeUnit}, error::ArrowError, }; pub use datafusion::{arrow, parquet}; -use datasets_common::dataset::Table; -use futures::{Stream, StreamExt}; -use metadata::segments::BlockRange; +pub use datasets_common::{BlockNum, SPECIAL_BLOCK_NUM, block_range::BlockRange}; +pub use metadata::segments::{ResumeWatermark, Watermark}; pub use planning_context::{DetachedLogicalPlan, PlanningContext}; pub use query_context::{Error as QueryError, QueryContext}; use serde::{Deserialize, Serialize}; pub type BoxError = Box; pub type BoxResult = Result; - -/// Special column name for block numbers. These are implicitly selected when doing streaming -/// queries, and in some other cases. -pub const SPECIAL_BLOCK_NUM: &str = "_block_num"; - -pub type BlockNum = u64; pub type Bytes32 = [u8; 32]; pub type EvmAddress = [u8; 20]; pub type EvmCurrency = i128; // Payment amount in the EVM. Used for gas or value transfers. @@ -83,227 +74,6 @@ pub fn timestamp_type() -> DataType { /// Remember to call `.with_timezone_utc()` after creating a Timestamp array. pub(crate) type TimestampArrayType = arrow::array::TimestampNanosecondArray; -/// A record batch associated with a single block of chain data, for populating raw datasets. -pub struct RawTableRows { - pub table: Table, - pub rows: RecordBatch, - pub range: BlockRange, -} - -impl RawTableRows { - pub fn new(table: Table, range: BlockRange, columns: Vec) -> Result { - let schema = table.schema().clone(); - let rows = RecordBatch::try_new(schema, columns)?; - Self::check_invariants(&range, &rows) - .map_err(|err| format!("malformed table {}: {}", table.name(), err))?; - Ok(RawTableRows { table, rows, range }) - } - - pub fn block_num(&self) -> BlockNum { - self.range.start() - } - - fn check_invariants(range: &BlockRange, rows: &RecordBatch) -> Result<(), BoxError> { - if range.start() != range.end() { - return Err("block range must contain a single block number".into()); - } - if rows.num_rows() == 0 { - return Ok(()); - } - - let block_nums = rows - .column_by_name(SPECIAL_BLOCK_NUM) - .ok_or("missing _block_num column")?; - let block_nums = block_nums - .as_primitive_opt::() - .ok_or("_block_num column is not uint64")?; - - // Unwrap: `rows` is not empty. - let start = arrow::compute::kernels::aggregate::min(block_nums).unwrap(); - let end = arrow::compute::kernels::aggregate::max(block_nums).unwrap(); - if start != range.start() { - return Err(format!("contains unexpected block_num: {}", start).into()); - }; - if end != range.start() { - return Err(format!("contains unexpected block_num: {}", end).into()); - }; - - Ok(()) - } -} - -pub struct RawDatasetRows(Vec); - -impl RawDatasetRows { - pub fn new(rows: Vec) -> Self { - assert!(!rows.is_empty()); - assert!(rows.iter().skip(1).all(|r| r.range == rows[0].range)); - Self(rows) - } - - pub fn block_num(&self) -> BlockNum { - self.0[0].block_num() - } -} - -impl IntoIterator for RawDatasetRows { - type Item = RawTableRows; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -pub trait BlockStreamer: Clone + 'static { - fn block_stream( - self, - start: BlockNum, - end: BlockNum, - ) -> impl Future> + Send> + Send; - - fn latest_block( - &mut self, - finalized: bool, - ) -> impl Future, BoxError>> + Send; - - /// Waits for any background work and resources associated with this [`BlockStreamer`] - /// to be cleaned up. - /// - /// This should be called once the user no longer needs to create new block streams - /// to allow implementations to terminate internal tasks, flush or release network - /// connections, and free any other resources. - /// - /// After requesting cleanup, callers should not call [BlockStreamer::block_stream] - /// again on the same instance. Behavior when creating new streams after cleanup is - /// implementation-defined and must not be relied on. - fn wait_for_cleanup(self) -> impl Future> + Send; - - fn provider_name(&self) -> &str; -} - -impl BlockStreamerExt for T {} - -pub trait BlockStreamerExt: BlockStreamer { - fn with_retry(self) -> BlockStreamerWithRetry { - BlockStreamerWithRetry(self) - } -} - -#[derive(Clone)] -pub struct BlockStreamerWithRetry(T); - -impl BlockStreamer for BlockStreamerWithRetry { - async fn block_stream( - self, - start: BlockNum, - end: BlockNum, - ) -> impl Stream> + Send { - const DEBUG_RETRY_LIMIT: u16 = 8; - const DEBUG_RETRY_DELAY: Duration = Duration::from_millis(50); - const WARN_RETRY_LIMIT: u16 = 16; - const WARN_RETRY_DELAY: Duration = Duration::from_millis(100); - const ERROR_RETRY_DELAY: Duration = Duration::from_millis(300); - - let mut current_block = start; - let mut num_retries = 0; - - async_stream::stream! { - 'retry: loop { - let inner_stream = self.0.clone().block_stream(current_block, end).await; - futures::pin_mut!(inner_stream); - while let Some(block) = inner_stream.next().await { - match &block { - Ok(_) => { - num_retries = 0; - current_block += 1; - yield block; - } - Err(e) => { - let error_source = monitoring::logging::error_source(e.as_ref()); - // Progressively more severe logging and longer retry interval. - match num_retries { - 0 => { - // First error, make sure it is visible in info (default) logs. - num_retries += 1; - tracing::info!( - block = %current_block, - error = %e, - error_source, - "Block streaming failed, retrying" - ); - tokio::time::sleep(DEBUG_RETRY_DELAY).await; - } - 1..DEBUG_RETRY_LIMIT => { - num_retries += 1; - tracing::debug!( - block = %current_block, - error = %e, - error_source, - "Block streaming failed, retrying"); - tokio::time::sleep(DEBUG_RETRY_DELAY).await; - } - DEBUG_RETRY_LIMIT..WARN_RETRY_LIMIT => { - num_retries += 1; - tracing::warn!( - block = %current_block, - error = %e, - error_source, - "Block streaming failed, retrying" - ); - tokio::time::sleep(WARN_RETRY_DELAY).await; - } - _ => { - tracing::error!( - block = %current_block, - error = %e, - error_source, - "Block streaming failed, retrying" - ); - tokio::time::sleep(ERROR_RETRY_DELAY).await; - } - } - continue 'retry; - } - } - } - break 'retry; - } - } - } - - async fn latest_block(&mut self, finalized: bool) -> Result, BoxError> { - use backon::{ExponentialBuilder, Retryable}; - - (|| async { - let mut inner = self.0.clone(); - inner.latest_block(finalized).await - }) - .retry( - ExponentialBuilder::default() - .with_min_delay(Duration::from_secs(2)) - .with_max_delay(Duration::from_secs(20)) - .with_max_times(10), - ) - .notify(|err, dur| { - tracing::warn!( - error = %err, - "Failed to get latest block. Retrying in {:.1}s", - dur.as_secs_f32() - ); - }) - .await - } - - fn wait_for_cleanup(self) -> impl Future> + Send { - self.0.wait_for_cleanup() - } - - fn provider_name(&self) -> &str { - self.0.provider_name() - } -} - pub fn block_range_intersection( a: RangeInclusive, b: RangeInclusive, diff --git a/crates/core/common/src/metadata/parquet.rs b/crates/core/common/src/metadata/parquet.rs index 15ecc84fe..0f92155d8 100644 --- a/crates/core/common/src/metadata/parquet.rs +++ b/crates/core/common/src/metadata/parquet.rs @@ -19,9 +19,10 @@ //! See also: metadata-consistency use amp_data_store::file_name::FileName; +use datasets_common::block_range::BlockRange; use serde::{Deserialize, Serialize}; -use crate::{Timestamp, metadata::segments::BlockRange}; +use crate::Timestamp; pub const PARQUET_METADATA_KEY: &str = "nozzle_metadata"; pub const PARENT_FILE_ID_METADATA_KEY: &str = "parent_file_ids"; diff --git a/crates/core/common/src/metadata/segments.rs b/crates/core/common/src/metadata/segments.rs index 14182212d..59dada1ee 100644 --- a/crates/core/common/src/metadata/segments.rs +++ b/crates/core/common/src/metadata/segments.rs @@ -4,50 +4,24 @@ use alloy::primitives::BlockHash; use metadata_db::files::FileId; use object_store::ObjectMeta; -use crate::{BlockNum, BoxError, block_range_intersection}; +use crate::{BlockNum, BlockRange, BoxError, block_range_intersection}; -#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] -pub struct BlockRange { - pub numbers: RangeInclusive, - pub network: String, +/// A watermark representing a specific block in the chain. +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Watermark { + /// The segment end block. + pub number: BlockNum, + /// The hash associated with the segment end block. pub hash: BlockHash, - pub prev_hash: Option, } -impl BlockRange { - #[inline] - pub fn start(&self) -> BlockNum { - *self.numbers.start() - } - - #[inline] - pub fn end(&self) -> BlockNum { - *self.numbers.end() - } - - #[inline] - pub fn watermark(&self) -> Watermark { - Watermark { - number: self.end(), - hash: self.hash, +impl From<&BlockRange> for Watermark { + fn from(range: &BlockRange) -> Self { + Self { + number: range.end(), + hash: range.hash, } } - - /// Return true iff `self` is sequenced immediately before `other`. - #[inline] - fn adjacent(&self, other: &Self) -> bool { - self.network == other.network - && (self.end() + 1) == other.start() - && other.prev_hash.map(|h| h == self.hash).unwrap_or(true) - } -} - -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct Watermark { - /// The segment end block - pub number: BlockNum, - /// The hash associated with the segment end block - pub hash: BlockHash, } /// Public interface for resuming a stream from a watermark. @@ -56,17 +30,16 @@ pub struct Watermark { pub struct ResumeWatermark(pub BTreeMap); impl ResumeWatermark { + /// Create a ResumeWatermark from a slice of BlockRanges. pub fn from_ranges(ranges: &[BlockRange]) -> Self { let watermark = ranges .iter() - .map(|r| { - let watermark = r.watermark(); - (r.network.clone(), watermark) - }) + .map(|r| (r.network.clone(), r.into())) .collect(); Self(watermark) } + /// Extract the watermark for a specific network. pub fn to_watermark(self, network: &str) -> Result { self.0 .into_iter() @@ -388,14 +361,12 @@ mod test { use alloy::primitives::BlockHash; use chrono::DateTime; + use datasets_common::BlockNum; use metadata_db::files::FileId; use object_store::ObjectMeta; use rand::{Rng as _, RngCore as _, SeedableRng as _, rngs::StdRng, seq::SliceRandom}; - use crate::{ - BlockNum, BlockRange, - metadata::segments::{Chain, Chains, Segment}, - }; + use super::{BlockRange, Chain, Chains, Segment}; fn test_range(numbers: RangeInclusive, fork: (u8, u8)) -> BlockRange { fn test_hash(number: u8, fork: u8) -> BlockHash { diff --git a/crates/core/common/src/query_context.rs b/crates/core/common/src/query_context.rs index f298d4144..9e35fc63a 100644 --- a/crates/core/common/src/query_context.rs +++ b/crates/core/common/src/query_context.rs @@ -37,14 +37,13 @@ use thiserror::Error; use tracing::{field, instrument}; use crate::{ - BlockNum, BoxError, arrow, + BlockNum, BlockRange, BoxError, arrow, catalog::physical::{Catalog, CatalogSnapshot, TableSnapshot}, evm::udfs::{ EvmDecodeHex, EvmDecodeLog, EvmDecodeParams, EvmDecodeType, EvmEncodeHex, EvmEncodeParams, EvmEncodeType, EvmTopic, ShiftUnits, }, memory_pool::{MemoryPoolKind, TieredMemoryPool, make_memory_pool}, - metadata::segments::BlockRange, plan_visitors::{ extract_table_references_from_plan, forbid_duplicate_field_names, forbid_underscore_prefixed_aliases, diff --git a/crates/core/dataset-store/Cargo.toml b/crates/core/dataset-store/Cargo.toml index 029c562a1..0dcc43eba 100644 --- a/crates/core/dataset-store/Cargo.toml +++ b/crates/core/dataset-store/Cargo.toml @@ -13,6 +13,7 @@ common = { path = "../common" } datafusion.workspace = true datasets-common = { path = "../datasets-common" } datasets-derived = { path = "../datasets-derived" } +datasets-raw = { path = "../datasets-raw" } evm-rpc-datasets = { path = "../../extractors/evm-rpc" } firehose-datasets = { path = "../../extractors/firehose" } futures.workspace = true diff --git a/crates/core/dataset-store/src/block_stream_client.rs b/crates/core/dataset-store/src/client.rs similarity index 82% rename from crates/core/dataset-store/src/block_stream_client.rs rename to crates/core/dataset-store/src/client.rs index dad1e717e..dc2c6689c 100644 --- a/crates/core/dataset-store/src/block_stream_client.rs +++ b/crates/core/dataset-store/src/client.rs @@ -1,9 +1,13 @@ use async_stream::stream; -use common::{BlockNum, BlockStreamer, BoxError, RawDatasetRows}; +use datasets_common::BlockNum; +use datasets_raw::{ + client::{BlockStreamError, BlockStreamer, CleanupError, LatestBlockError}, + rows::Rows, +}; use futures::Stream; #[derive(Clone)] -pub(crate) enum BlockStreamClient { +pub enum BlockStreamClient { EvmRpc(evm_rpc_datasets::JsonRpcClient), Solana(solana_datasets::SolanaExtractor), Firehose(Box), @@ -14,7 +18,7 @@ impl BlockStreamer for BlockStreamClient { self, start_block: BlockNum, end_block: BlockNum, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { // Each client returns a different concrete stream type, so we // use `stream!` to unify them into a wrapper stream stream! { @@ -41,7 +45,10 @@ impl BlockStreamer for BlockStreamClient { } } - async fn latest_block(&mut self, finalized: bool) -> Result, BoxError> { + async fn latest_block( + &mut self, + finalized: bool, + ) -> Result, LatestBlockError> { match self { Self::EvmRpc(client) => client.latest_block(finalized).await, Self::Solana(client) => client.latest_block(finalized).await, @@ -49,7 +56,7 @@ impl BlockStreamer for BlockStreamClient { } } - async fn wait_for_cleanup(self) -> Result<(), BoxError> { + async fn wait_for_cleanup(self) -> Result<(), CleanupError> { match self { Self::EvmRpc(client) => client.wait_for_cleanup().await, Self::Solana(client) => client.wait_for_cleanup().await, diff --git a/crates/core/dataset-store/src/lib.rs b/crates/core/dataset-store/src/lib.rs index 0b16b089b..5e93a16a8 100644 --- a/crates/core/dataset-store/src/lib.rs +++ b/crates/core/dataset-store/src/lib.rs @@ -8,7 +8,7 @@ use std::{ use amp_datasets_registry::{DatasetsRegistry, error::ResolveRevisionError}; use amp_providers_registry::{ProviderConfig, ProvidersRegistry}; use common::{ - BlockStreamer, BlockStreamerExt, BoxError, + BoxError, evm::{self, udfs::EthCall}, }; use datafusion::{ @@ -20,6 +20,7 @@ use datasets_common::{ reference::Reference, }; use datasets_derived::{DerivedDatasetKind, Manifest as DerivedManifest}; +use datasets_raw::client::{BlockStreamer, BlockStreamerExt}; use evm_rpc_datasets::{ EvmRpcDatasetKind, Manifest as EvmRpcManifest, ProviderConfig as EvmRpcProviderConfig, }; @@ -33,12 +34,12 @@ use solana_datasets::{Manifest as SolanaManifest, ProviderConfig as SolanaProvid use tracing::instrument; use url::Url; -mod block_stream_client; +mod client; mod dataset_kind; mod env_substitute; mod error; -use self::block_stream_client::BlockStreamClient; +use self::client::BlockStreamClient; pub use self::{ dataset_kind::DatasetKind, error::{ diff --git a/crates/core/datasets-common/Cargo.toml b/crates/core/datasets-common/Cargo.toml index ee4d7eb49..441f1a152 100644 --- a/crates/core/datasets-common/Cargo.toml +++ b/crates/core/datasets-common/Cargo.toml @@ -9,6 +9,7 @@ license-file.workspace = true default = ["metadata-db"] [dependencies] +alloy = { workspace = true } datafusion = { workspace = true } hex = { workspace = true } metadata-db = { path = "../metadata-db", optional = true } diff --git a/crates/core/datasets-common/src/block_range.rs b/crates/core/datasets-common/src/block_range.rs new file mode 100644 index 000000000..ff01e6371 --- /dev/null +++ b/crates/core/datasets-common/src/block_range.rs @@ -0,0 +1,45 @@ +//! Block range types for dataset management. + +use std::ops::RangeInclusive; + +use alloy::primitives::BlockHash; +use serde::{Deserialize, Serialize}; + +use crate::BlockNum; + +/// Block range for data extraction and segment management. +/// +/// This type contains all the information needed for data extraction and segment management. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct BlockRange { + /// Inclusive range of block numbers. + pub numbers: RangeInclusive, + /// Network identifier (e.g., "mainnet", "sepolia"). + pub network: String, + /// Hash of the end block. + pub hash: BlockHash, + /// Hash of the block before the start block (for reorg detection). + pub prev_hash: Option, +} + +impl BlockRange { + /// Get the start block number. + #[inline] + pub fn start(&self) -> BlockNum { + *self.numbers.start() + } + + /// Get the end block number. + #[inline] + pub fn end(&self) -> BlockNum { + *self.numbers.end() + } + + /// Return true iff `self` is sequenced immediately before `other`. + #[inline] + pub fn adjacent(&self, other: &Self) -> bool { + self.network == other.network + && (self.end() + 1) == other.start() + && other.prev_hash.map(|h| h == self.hash).unwrap_or(true) + } +} diff --git a/crates/core/datasets-common/src/lib.rs b/crates/core/datasets-common/src/lib.rs index 56d9cb482..a11257851 100644 --- a/crates/core/datasets-common/src/lib.rs +++ b/crates/core/datasets-common/src/lib.rs @@ -3,6 +3,7 @@ //! This module provides shared structures used across different dataset definition formats, //! including serializable schema representations and common dataset metadata. +pub mod block_range; pub mod dataset; pub mod deps; pub mod fqn; @@ -19,6 +20,8 @@ pub mod revision; pub mod table_name; pub mod version; +pub use self::dataset::{BlockNum, SPECIAL_BLOCK_NUM}; + /// Re-exports of UDF-related types for derived datasets and the common crate. /// /// This module provides convenient access to types needed for implementing diff --git a/crates/core/datasets-raw/Cargo.toml b/crates/core/datasets-raw/Cargo.toml index 9b245e564..17acc55ac 100644 --- a/crates/core/datasets-raw/Cargo.toml +++ b/crates/core/datasets-raw/Cargo.toml @@ -11,5 +11,17 @@ gen-schema = ["dep:datafusion"] # Always available - provides Table type datasets-common = { path = "../datasets-common" } +# For BlockStreamer and RawBlockRange +alloy = { workspace = true } +arrow = { workspace = true } +async-stream = { workspace = true } +backon = { workspace = true } +futures = { workspace = true } +monitoring = { path = "../monitoring" } +serde = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + # Only for schema generation datafusion = { workspace = true, optional = true } diff --git a/crates/core/datasets-raw/src/client.rs b/crates/core/datasets-raw/src/client.rs new file mode 100644 index 000000000..ba8ca4b13 --- /dev/null +++ b/crates/core/datasets-raw/src/client.rs @@ -0,0 +1,172 @@ +//! BlockStreamer trait and retry wrapper for extracting raw blockchain data. + +use std::{future::Future, time::Duration}; + +use datasets_common::BlockNum; +use futures::{Stream, StreamExt as _}; + +use crate::rows::Rows; + +/// Error type for [`BlockStreamer::block_stream`]. +pub type BlockStreamError = Box; + +/// Error type for [`BlockStreamer::latest_block`]. +pub type LatestBlockError = Box; + +/// Error type for [`BlockStreamer::wait_for_cleanup`]. +pub type CleanupError = Box; + +pub trait BlockStreamer: Clone + 'static { + fn block_stream( + self, + start: BlockNum, + end: BlockNum, + ) -> impl Future> + Send> + Send; + + fn latest_block( + &mut self, + finalized: bool, + ) -> impl Future, LatestBlockError>> + Send; + + /// Waits for any background work and resources associated with this [`BlockStreamer`] + /// to be cleaned up. + /// + /// This should be called once the user no longer needs to create new block streams + /// to allow implementations to terminate internal tasks, flush or release network + /// connections, and free any other resources. + /// + /// After requesting cleanup, callers should not call [BlockStreamer::block_stream] + /// again on the same instance. Behavior when creating new streams after cleanup is + /// implementation-defined and must not be relied on. + fn wait_for_cleanup(self) -> impl Future> + Send; + + fn provider_name(&self) -> &str; +} + +pub trait BlockStreamerExt: BlockStreamer { + fn with_retry(self) -> BlockStreamerWithRetry { + BlockStreamerWithRetry(self) + } +} + +impl BlockStreamerExt for T where T: BlockStreamer {} + +#[derive(Clone)] +pub struct BlockStreamerWithRetry(T); + +impl BlockStreamer for BlockStreamerWithRetry +where + T: BlockStreamer + Send + Sync, +{ + async fn block_stream( + self, + start: BlockNum, + end: BlockNum, + ) -> impl Stream> + Send { + const DEBUG_RETRY_LIMIT: u16 = 8; + const DEBUG_RETRY_DELAY: Duration = Duration::from_millis(50); + const WARN_RETRY_LIMIT: u16 = 16; + const WARN_RETRY_DELAY: Duration = Duration::from_millis(100); + const ERROR_RETRY_DELAY: Duration = Duration::from_millis(300); + + let mut current_block = start; + let mut num_retries = 0; + + async_stream::stream! { + 'retry: loop { + let inner_stream = self.0.clone().block_stream(current_block, end).await; + futures::pin_mut!(inner_stream); + while let Some(block) = inner_stream.next().await { + match &block { + Ok(_) => { + num_retries = 0; + current_block += 1; + yield block; + } + Err(e) => { + let error_source = monitoring::logging::error_source(e.as_ref()); + // Progressively more severe logging and longer retry interval. + match num_retries { + 0 => { + // First error, make sure it is visible in info (default) logs. + num_retries += 1; + tracing::info!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(DEBUG_RETRY_DELAY).await; + } + 1..DEBUG_RETRY_LIMIT => { + num_retries += 1; + tracing::debug!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying"); + tokio::time::sleep(DEBUG_RETRY_DELAY).await; + } + DEBUG_RETRY_LIMIT..WARN_RETRY_LIMIT => { + num_retries += 1; + tracing::warn!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(WARN_RETRY_DELAY).await; + } + _ => { + tracing::error!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(ERROR_RETRY_DELAY).await; + } + } + continue 'retry; + } + } + } + break 'retry; + } + } + } + + async fn latest_block( + &mut self, + finalized: bool, + ) -> Result, LatestBlockError> { + use backon::{ExponentialBuilder, Retryable}; + + (|| async { + let mut inner = self.0.clone(); + inner.latest_block(finalized).await + }) + .retry( + ExponentialBuilder::default() + .with_min_delay(Duration::from_secs(2)) + .with_max_delay(Duration::from_secs(20)) + .with_max_times(10), + ) + .notify(|err, dur| { + tracing::warn!( + error = %err, + "Failed to get latest block. Retrying in {:.1}s", + dur.as_secs_f32() + ); + }) + .await + } + + fn wait_for_cleanup(self) -> impl Future> + Send { + self.0.wait_for_cleanup() + } + + fn provider_name(&self) -> &str { + self.0.provider_name() + } +} diff --git a/crates/core/datasets-raw/src/lib.rs b/crates/core/datasets-raw/src/lib.rs index 2fd2f848a..07331050c 100644 --- a/crates/core/datasets-raw/src/lib.rs +++ b/crates/core/datasets-raw/src/lib.rs @@ -3,5 +3,7 @@ //! This crate provides utilities for working with raw datasets, including //! build-time schema generation for documentation purposes. +pub mod client; +pub mod rows; #[cfg(feature = "gen-schema")] pub mod schema; diff --git a/crates/core/datasets-raw/src/rows.rs b/crates/core/datasets-raw/src/rows.rs new file mode 100644 index 000000000..fbbb478c2 --- /dev/null +++ b/crates/core/datasets-raw/src/rows.rs @@ -0,0 +1,169 @@ +//! Raw table row types for dataset extraction. + +use arrow::{ + array::{ArrayRef, AsArray as _, RecordBatch}, + datatypes::UInt64Type, +}; +use datasets_common::{BlockNum, SPECIAL_BLOCK_NUM, block_range::BlockRange, dataset::Table}; + +pub struct Rows(Vec); + +impl Rows { + pub fn new(rows: Vec) -> Self { + assert!(!rows.is_empty()); + assert!(rows.iter().skip(1).all(|r| r.range == rows[0].range)); + Self(rows) + } + + pub fn block_num(&self) -> BlockNum { + self.0[0].block_num() + } +} + +impl IntoIterator for Rows { + type Item = TableRows; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +/// A record batch associated with a single block of chain data, for populating raw datasets. +pub struct TableRows { + pub table: Table, + pub rows: RecordBatch, + pub range: BlockRange, +} + +impl TableRows { + pub fn new( + table: Table, + range: BlockRange, + columns: Vec, + ) -> Result { + let table_name = table.name().to_string(); + let schema = table.schema().clone(); + let rows = RecordBatch::try_new(schema, columns)?; + Self::check_invariants(&range, &rows).map_err(|source| TableRowError::Invariants { + table: table_name, + source, + })?; + Ok(TableRows { table, rows, range }) + } + + pub fn block_num(&self) -> BlockNum { + self.range.start() + } + + fn check_invariants( + range: &BlockRange, + rows: &RecordBatch, + ) -> Result<(), CheckInvariantsError> { + if range.start() != range.end() { + return Err(CheckInvariantsError::InvalidBlockRange); + } + if rows.num_rows() == 0 { + return Ok(()); + } + + let block_nums = rows + .column_by_name(SPECIAL_BLOCK_NUM) + .ok_or(CheckInvariantsError::MissingBlockNumColumn)?; + let block_nums = block_nums + .as_primitive_opt::() + .ok_or(CheckInvariantsError::InvalidBlockNumColumnType)?; + + // Unwrap: `rows` is not empty. + let start = arrow::compute::kernels::aggregate::min(block_nums).unwrap(); + let end = arrow::compute::kernels::aggregate::max(block_nums).unwrap(); + if start != range.start() { + return Err(CheckInvariantsError::UnexpectedBlockNum(start)); + }; + if end != range.start() { + return Err(CheckInvariantsError::UnexpectedBlockNum(end)); + }; + + Ok(()) + } +} + +/// Errors that occur when validating table row invariants. +/// +/// These errors represent violations of structural requirements for raw dataset tables, +/// such as block range consistency and required column presence/types. +#[derive(Debug, thiserror::Error)] +pub enum CheckInvariantsError { + /// Block range does not contain exactly one block number + /// + /// This occurs when the block range start and end differ, violating the requirement + /// that TableRows must represent data from a single block. + /// + /// Raw dataset tables are organized by individual blocks, and each TableRows instance + /// must contain data from exactly one block number. + #[error("block range must contain a single block number")] + InvalidBlockRange, + + /// Required `_block_num` column is missing from the record batch + /// + /// This occurs when the Arrow RecordBatch does not contain the special `_block_num` + /// column that tracks which block each row belongs to. + /// + /// All raw dataset tables require the `_block_num` column for block-level partitioning + /// and filtering operations. + #[error("missing _block_num column")] + MissingBlockNumColumn, + + /// The `_block_num` column has incorrect data type + /// + /// This occurs when the `_block_num` column exists but is not of type UInt64. + /// + /// The `_block_num` column must be UInt64 to properly represent blockchain block numbers. + #[error("_block_num column is not uint64")] + InvalidBlockNumColumnType, + + /// Row contains block number that doesn't match the expected range + /// + /// This occurs when one or more rows have a `_block_num` value that differs from + /// the block number specified in the range. All rows must have the same block number + /// matching the range's single block. + /// + /// This check validates data consistency between the block range metadata and the + /// actual block numbers in the row data. + #[error("contains unexpected block_num: {0}")] + UnexpectedBlockNum(BlockNum), +} + +/// Errors that occur when creating TableRows instances. +/// +/// This error type is used by `TableRows::new()` and represents failures during +/// record batch construction and validation. +#[derive(Debug, thiserror::Error)] +pub enum TableRowError { + /// Failed to construct Arrow RecordBatch from columns + /// + /// This occurs when Arrow cannot create a valid RecordBatch from the provided + /// columns and schema. The underlying Arrow error provides specific details. + /// + /// Common causes: + /// - Column count doesn't match schema field count + /// - Column types don't match schema types + /// - Column lengths are inconsistent + /// - Invalid array data or corrupted memory buffers + #[error(transparent)] + Arrow(#[from] arrow::error::ArrowError), + + /// Table rows violate structural invariants + /// + /// This occurs when the constructed RecordBatch violates requirements for raw + /// dataset tables, such as missing required columns, incorrect block numbers, + /// or invalid block ranges. + /// + /// The source error provides specific details about which invariant was violated. + #[error("malformed table {table}: {source}")] + Invariants { + table: String, + #[source] + source: CheckInvariantsError, + }, +} diff --git a/crates/core/dump/Cargo.toml b/crates/core/dump/Cargo.toml index 6327417c0..871b19065 100644 --- a/crates/core/dump/Cargo.toml +++ b/crates/core/dump/Cargo.toml @@ -13,6 +13,7 @@ common = { path = "../common" } datafusion.workspace = true datasets-common = { path = "../datasets-common" } datasets-derived = { path = "../datasets-derived" } +datasets-raw = { path = "../datasets-raw" } futures.workspace = true metadata-db = { path = "../metadata-db" } monitoring = { path = "../monitoring" } diff --git a/crates/core/dump/src/compaction/compactor.rs b/crates/core/dump/src/compaction/compactor.rs index 2e1ea4925..daa3333e4 100644 --- a/crates/core/dump/src/compaction/compactor.rs +++ b/crates/core/dump/src/compaction/compactor.rs @@ -9,11 +9,7 @@ use std::{ }; use amp_data_store::{DataStore, file_name::FileName}; -use common::{ - BlockNum, - catalog::physical::PhysicalTable, - metadata::{SegmentSize, segments::BlockRange}, -}; +use common::{BlockNum, BlockRange, catalog::physical::PhysicalTable, metadata::SegmentSize}; use futures::{StreamExt, TryStreamExt, stream}; use metadata_db::MetadataDb; use monitoring::logging; diff --git a/crates/core/dump/src/compaction/plan.rs b/crates/core/dump/src/compaction/plan.rs index 7db4d7414..a57adbb88 100644 --- a/crates/core/dump/src/compaction/plan.rs +++ b/crates/core/dump/src/compaction/plan.rs @@ -8,11 +8,9 @@ use std::{ use amp_data_store::DataStore; use common::{ + BlockRange, catalog::physical::{PhysicalTable, TableSnapshot, reader::AmpReaderFactory}, - metadata::{ - SegmentSize, - segments::{BlockRange, Segment}, - }, + metadata::{SegmentSize, segments::Segment}, parquet::arrow::{ ParquetRecordBatchStreamBuilder, arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}, diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index 773637aac..6d4573ae7 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -98,12 +98,12 @@ use std::{collections::BTreeMap, sync::Arc, time::Instant}; use amp_data_store::file_name::FileName; use common::{ - BlockNum, BoxError, DetachedLogicalPlan, PlanningContext, QueryContext, + BlockNum, BoxError, DetachedLogicalPlan, PlanningContext, QueryContext, ResumeWatermark, catalog::{ logical::for_dump as logical_catalog, physical::{Catalog, PhysicalTable, for_dump as physical_catalog}, }, - metadata::{Generation, segments::ResumeWatermark}, + metadata::Generation, query_context::QueryEnv, sql::{resolve_function_references, resolve_table_references}, }; diff --git a/crates/core/dump/src/parquet_writer.rs b/crates/core/dump/src/parquet_writer.rs index fdc168b7f..bf05f774c 100644 --- a/crates/core/dump/src/parquet_writer.rs +++ b/crates/core/dump/src/parquet_writer.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use amp_data_store::{DataStore, file_name::FileName}; use common::{ - BoxError, Timestamp, + BlockRange, BoxError, Timestamp, arrow::array::RecordBatch, catalog::physical::PhysicalTable, metadata::{ @@ -10,7 +10,6 @@ use common::{ parquet::{ GENERATION_METADATA_KEY, PARENT_FILE_ID_METADATA_KEY, PARQUET_METADATA_KEY, ParquetMeta, }, - segments::BlockRange, }, parquet::{ arrow::AsyncArrowWriter, errors::ParquetError, diff --git a/crates/core/dump/src/raw_dataset.rs b/crates/core/dump/src/raw_dataset.rs index df57498eb..1eaf15a9c 100644 --- a/crates/core/dump/src/raw_dataset.rs +++ b/crates/core/dump/src/raw_dataset.rs @@ -91,11 +91,12 @@ use std::{ use amp_data_store::DataStore; use common::{ - BlockNum, BlockStreamer, BoxError, LogicalCatalog, LogicalTable, + BlockNum, BoxError, LogicalCatalog, LogicalTable, catalog::physical::{Catalog, MissingRangesError, PhysicalTable}, metadata::segments::merge_ranges, }; use datasets_common::{hash_reference::HashReference, table_name::TableName}; +use datasets_raw::client::BlockStreamer; use futures::TryStreamExt as _; use metadata_db::MetadataDb; use monitoring::logging; diff --git a/crates/core/dump/src/raw_dataset_writer.rs b/crates/core/dump/src/raw_dataset_writer.rs index b11d93896..8d4e7df93 100644 --- a/crates/core/dump/src/raw_dataset_writer.rs +++ b/crates/core/dump/src/raw_dataset_writer.rs @@ -2,12 +2,13 @@ use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc}; use amp_data_store::{DataStore, file_name::FileName}; use common::{ - BlockNum, BoxError, RawTableRows, + BlockNum, BlockRange, BoxError, arrow::array::RecordBatch, catalog::physical::{Catalog, PhysicalTable}, - metadata::{Generation, segments::BlockRange}, + metadata::Generation, }; use datasets_common::table_name::TableName; +use datasets_raw::rows::TableRows; use metadata_db::MetadataDb; use crate::{ @@ -59,7 +60,7 @@ impl RawDatasetWriter { }) } - pub async fn write(&mut self, table_rows: RawTableRows) -> Result<(), BoxError> { + pub async fn write(&mut self, table_rows: TableRows) -> Result<(), BoxError> { let table_name = table_rows.table.name(); let writer = self.writers.get_mut(table_name).unwrap(); if let Some(ParquetFileWriterOutput { @@ -171,7 +172,7 @@ impl RawTableWriter { pub async fn write( &mut self, - table_rows: RawTableRows, + table_rows: TableRows, ) -> Result, BoxError> { assert_eq!(table_rows.table.name(), self.table.table_name()); diff --git a/crates/core/dump/src/streaming_query.rs b/crates/core/dump/src/streaming_query.rs index 11c6014af..5f10374d2 100644 --- a/crates/core/dump/src/streaming_query.rs +++ b/crates/core/dump/src/streaming_query.rs @@ -10,12 +10,12 @@ use alloy::{hex::ToHexExt as _, primitives::BlockHash}; use amp_data_store::DataStore; use amp_dataset_store::DatasetStore; use common::{ - BlockNum, BoxError, DetachedLogicalPlan, LogicalCatalog, LogicalTable, PlanningContext, - QueryContext, SPECIAL_BLOCK_NUM, + BlockNum, BlockRange, BoxError, DetachedLogicalPlan, LogicalCatalog, LogicalTable, + PlanningContext, QueryContext, ResumeWatermark, SPECIAL_BLOCK_NUM, Watermark, arrow::{array::RecordBatch, datatypes::SchemaRef}, catalog::physical::{Catalog, PhysicalTable}, incrementalizer::incrementalize_plan, - metadata::segments::{BlockRange, ResumeWatermark, Segment, Watermark}, + metadata::segments::Segment, plan_visitors::{order_by_block_num, unproject_special_block_num_column}, query_context::QueryEnv, sql_str::SqlStr, @@ -469,7 +469,7 @@ impl StreamingQuery { // If we reached the end block, we are done return Ok(()); } - self.prev_watermark = Some(range.watermark()); + self.prev_watermark = Some((&range).into()); } } @@ -615,7 +615,7 @@ impl StreamingQuery { let mut latest_src_watermarks: Vec = Default::default(); 'chain_loop: for chain in chains { for segment in chain.iter().rev() { - let watermark = segment.range.watermark(); + let watermark = (&segment.range).into(); if self.blocks_table_contains(ctx, &watermark).await? { latest_src_watermarks.push(watermark); continue 'chain_loop; diff --git a/crates/core/dump/src/streaming_query/message_stream_with_block_complete.rs b/crates/core/dump/src/streaming_query/message_stream_with_block_complete.rs index dfd0ff24a..9af7b74d8 100644 --- a/crates/core/dump/src/streaming_query/message_stream_with_block_complete.rs +++ b/crates/core/dump/src/streaming_query/message_stream_with_block_complete.rs @@ -272,11 +272,11 @@ mod tests { use std::sync::Arc; use common::{ + BlockRange, arrow::{ array::UInt64Array, datatypes::{DataType, Field, Schema}, }, - metadata::segments::BlockRange, }; use futures::{StreamExt, stream}; diff --git a/crates/extractors/evm-rpc/Cargo.toml b/crates/extractors/evm-rpc/Cargo.toml index 136c8b739..637f7f267 100644 --- a/crates/extractors/evm-rpc/Cargo.toml +++ b/crates/extractors/evm-rpc/Cargo.toml @@ -12,6 +12,7 @@ alloy.workspace = true async-stream.workspace = true common = { path = "../../core/common" } datasets-common = { path = "../../core/datasets-common" } +datasets-raw = { path = "../../core/datasets-raw" } futures.workspace = true monitoring = { path = "../../core/monitoring" } schemars = { workspace = true, optional = true } diff --git a/crates/extractors/evm-rpc/src/client.rs b/crates/extractors/evm-rpc/src/client.rs index 3d1f0fd57..6dc757095 100644 --- a/crates/extractors/evm-rpc/src/client.rs +++ b/crates/extractors/evm-rpc/src/client.rs @@ -23,7 +23,7 @@ use alloy::{ }; use async_stream::stream; use common::{ - BlockNum, BlockStreamer, BoxError, BoxResult, EvmCurrency, RawDatasetRows, Timestamp, + BlockNum, BlockRange, BoxError, BoxResult, EvmCurrency, Timestamp, evm::{ self, tables::{ @@ -31,8 +31,8 @@ use common::{ logs::{self, LogRowsBuilder}, }, }, - metadata::segments::BlockRange, }; +use datasets_raw::{client::BlockStreamer, rows::Rows}; use futures::{Stream, future::try_join_all}; use thiserror::Error; use tracing::{instrument, warn}; @@ -197,7 +197,7 @@ impl JsonRpcClient { self, start_block: u64, end_block: u64, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { assert!(end_block >= start_block); let total_blocks_to_stream = end_block - start_block + 1; @@ -309,7 +309,7 @@ impl JsonRpcClient { self, start_block: u64, end_block: u64, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { tracing::info!("Fetching blocks (batched) {} to {}", start_block, end_block); let batching_client = BatchingRpcWrapper::new(self.client.clone(), self.batch_size, self.limiter.clone()); @@ -435,7 +435,7 @@ impl BlockStreamer for JsonRpcClient { self, start: BlockNum, end: BlockNum, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { // Each function returns a different concrete stream type, so we // use `stream!` to unify them into a wrapper stream stream! { @@ -586,7 +586,7 @@ fn rpc_to_rows( block: AnyRpcBlock, receipts: Vec, network: &str, -) -> Result { +) -> Result { if block.transactions.len() != receipts.len() { let err = format!( "mismatched tx and receipt count for block {}: {} txs, {} receipts", @@ -652,11 +652,7 @@ fn rpc_to_rows( builder.build(block.clone())? }; - Ok(RawDatasetRows::new(vec![ - header_row, - logs_row, - transactions_row, - ])) + Ok(Rows::new(vec![header_row, logs_row, transactions_row])) } fn rpc_header_to_row(header: Header) -> Result { diff --git a/crates/extractors/evm-rpc/src/tables/transactions.rs b/crates/extractors/evm-rpc/src/tables/transactions.rs index 2569e4ae6..49dfe2530 100644 --- a/crates/extractors/evm-rpc/src/tables/transactions.rs +++ b/crates/extractors/evm-rpc/src/tables/transactions.rs @@ -1,9 +1,10 @@ use std::sync::{Arc, LazyLock}; use common::{ - BYTES32_TYPE, BoxError, Bytes32, Bytes32ArrayBuilder, EVM_ADDRESS_TYPE as ADDRESS_TYPE, - EVM_CURRENCY_TYPE, EvmAddress as Address, EvmAddressArrayBuilder, EvmCurrency, - EvmCurrencyArrayBuilder, RawTableRows, SPECIAL_BLOCK_NUM, Timestamp, TimestampArrayBuilder, + BYTES32_TYPE, BlockRange, BoxError, Bytes32, Bytes32ArrayBuilder, + EVM_ADDRESS_TYPE as ADDRESS_TYPE, EVM_CURRENCY_TYPE, EvmAddress as Address, + EvmAddressArrayBuilder, EvmCurrency, EvmCurrencyArrayBuilder, SPECIAL_BLOCK_NUM, Timestamp, + TimestampArrayBuilder, arrow::{ array::{ ArrayRef, BinaryBuilder, BooleanBuilder, FixedSizeBinaryBuilder, Int32Builder, @@ -11,9 +12,9 @@ use common::{ }, datatypes::{DataType, Field, Fields, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); @@ -452,7 +453,7 @@ impl TransactionRowsBuilder { } } - pub(crate) fn build(self, range: BlockRange) -> Result { + pub(crate) fn build(self, range: BlockRange) -> Result { let Self { mut special_block_num, block_hash, @@ -513,7 +514,7 @@ impl TransactionRowsBuilder { Arc::new(authorization_list.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/firehose/Cargo.toml b/crates/extractors/firehose/Cargo.toml index 75488482b..d612dfa66 100644 --- a/crates/extractors/firehose/Cargo.toml +++ b/crates/extractors/firehose/Cargo.toml @@ -12,6 +12,7 @@ alloy.workspace = true async-stream.workspace = true common = { path = "../../core/common" } datasets-common = { path = "../../core/datasets-common" } +datasets-raw = { path = "../../core/datasets-raw" } futures.workspace = true hex.workspace = true monitoring = { path = "../../core/monitoring" } diff --git a/crates/extractors/firehose/src/client.rs b/crates/extractors/firehose/src/client.rs index 7e218b8b1..cc20b4181 100644 --- a/crates/extractors/firehose/src/client.rs +++ b/crates/extractors/firehose/src/client.rs @@ -4,7 +4,8 @@ use std::{ }; use async_stream::stream; -use common::{BlockNum, BlockStreamer, BoxError, RawDatasetRows}; +use common::{BlockNum, BoxError}; +use datasets_raw::{client::BlockStreamer, rows::Rows}; use futures::{Stream, StreamExt as _, TryStreamExt as _}; use monitoring::telemetry; use pbfirehose::{Response as StreamResponse, stream_client::StreamClient}; @@ -176,7 +177,7 @@ impl BlockStreamer for Client { mut self, start_block: u64, end_block: u64, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { const RETRY_BACKOFF: Duration = Duration::from_secs(5); stream! { diff --git a/crates/extractors/firehose/src/evm/pb_to_rows.rs b/crates/extractors/firehose/src/evm/pb_to_rows.rs index 37541335c..03b4de4b4 100644 --- a/crates/extractors/firehose/src/evm/pb_to_rows.rs +++ b/crates/extractors/firehose/src/evm/pb_to_rows.rs @@ -2,13 +2,13 @@ use std::time::Duration; use alloy::primitives::U256; use common::{ - BoxError, Bytes32, EvmCurrency, RawDatasetRows, Timestamp, + BlockRange, BoxError, Bytes32, EvmCurrency, Timestamp, evm::tables::{ blocks::{Block, BlockRowsBuilder}, logs::{Log, LogRowsBuilder}, }, - metadata::segments::BlockRange, }; +use datasets_raw::rows::Rows; use thiserror::Error; use super::{ @@ -34,7 +34,7 @@ pub enum ProtobufToRowError { pub fn protobufs_to_rows( block: pbethereum::Block, network: &str, -) -> Result { +) -> Result { use ProtobufToRowError::*; fn transactions_iter( @@ -242,7 +242,7 @@ pub fn protobufs_to_rows( let calls_rows = calls.build(block.clone()).map_err(ArrowError)?; let logs_rows = logs.build(block.clone()).map_err(ArrowError)?; - Ok(RawDatasetRows::new(vec![ + Ok(Rows::new(vec![ header_row, transactions_rows, calls_rows, diff --git a/crates/extractors/firehose/src/evm/tables/calls.rs b/crates/extractors/firehose/src/evm/tables/calls.rs index 12b39f16a..eba2a13c2 100644 --- a/crates/extractors/firehose/src/evm/tables/calls.rs +++ b/crates/extractors/firehose/src/evm/tables/calls.rs @@ -1,18 +1,19 @@ use std::sync::{Arc, LazyLock}; use common::{ - BYTES32_TYPE, BoxError, Bytes32, Bytes32ArrayBuilder, EVM_ADDRESS_TYPE as ADDRESS_TYPE, - EVM_CURRENCY_TYPE, EvmAddress as Address, EvmAddressArrayBuilder, EvmCurrency, - EvmCurrencyArrayBuilder, RawTableRows, SPECIAL_BLOCK_NUM, Timestamp, TimestampArrayBuilder, + BYTES32_TYPE, BlockRange, BoxError, Bytes32, Bytes32ArrayBuilder, + EVM_ADDRESS_TYPE as ADDRESS_TYPE, EVM_CURRENCY_TYPE, EvmAddress as Address, + EvmAddressArrayBuilder, EvmCurrency, EvmCurrencyArrayBuilder, SPECIAL_BLOCK_NUM, Timestamp, + TimestampArrayBuilder, arrow::{ array::{ ArrayRef, BinaryBuilder, BooleanBuilder, Int32Builder, UInt32Builder, UInt64Builder, }, datatypes::{DataType, Field, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); @@ -211,7 +212,7 @@ impl CallRowsBuilder { self.end_ordinal.append_value(*end_ordinal); } - pub(crate) fn build(self, range: BlockRange) -> Result { + pub(crate) fn build(self, range: BlockRange) -> Result { let Self { mut special_block_num, block_hash, @@ -260,7 +261,7 @@ impl CallRowsBuilder { Arc::new(end_ordinal.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/firehose/src/evm/tables/transactions.rs b/crates/extractors/firehose/src/evm/tables/transactions.rs index 01e597fae..d4f9d8bb5 100644 --- a/crates/extractors/firehose/src/evm/tables/transactions.rs +++ b/crates/extractors/firehose/src/evm/tables/transactions.rs @@ -1,18 +1,19 @@ use std::sync::{Arc, LazyLock}; use common::{ - BYTES32_TYPE, BoxError, Bytes32, Bytes32ArrayBuilder, EVM_ADDRESS_TYPE as ADDRESS_TYPE, - EVM_CURRENCY_TYPE, EvmAddress as Address, EvmAddressArrayBuilder, EvmCurrency, - EvmCurrencyArrayBuilder, RawTableRows, SPECIAL_BLOCK_NUM, Timestamp, TimestampArrayBuilder, + BYTES32_TYPE, BlockRange, BoxError, Bytes32, Bytes32ArrayBuilder, + EVM_ADDRESS_TYPE as ADDRESS_TYPE, EVM_CURRENCY_TYPE, EvmAddress as Address, + EvmAddressArrayBuilder, EvmCurrency, EvmCurrencyArrayBuilder, SPECIAL_BLOCK_NUM, Timestamp, + TimestampArrayBuilder, arrow::{ array::{ ArrayRef, BinaryBuilder, Int32Builder, StringBuilder, UInt32Builder, UInt64Builder, }, datatypes::{DataType, Field, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; static SCHEMA: LazyLock = LazyLock::new(|| Arc::new(schema())); @@ -252,7 +253,7 @@ impl TransactionRowsBuilder { self.end_ordinal.append_value(*end_ordinal); } - pub(crate) fn build(self, range: BlockRange) -> Result { + pub(crate) fn build(self, range: BlockRange) -> Result { let Self { mut special_block_num, block_hash, @@ -309,7 +310,7 @@ impl TransactionRowsBuilder { Arc::new(end_ordinal.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/solana/Cargo.toml b/crates/extractors/solana/Cargo.toml index 8894c51ab..12999103f 100644 --- a/crates/extractors/solana/Cargo.toml +++ b/crates/extractors/solana/Cargo.toml @@ -14,6 +14,7 @@ bs58 = "0.5.1" base64 = "0.22.1" common = { path = "../../core/common" } datasets-common = { path = "../../core/datasets-common" } +datasets-raw = { path = "../../core/datasets-raw" } futures.workspace = true fs-err.workspace = true governor.workspace = true diff --git a/crates/extractors/solana/src/extractor.rs b/crates/extractors/solana/src/extractor.rs index 1383a042d..1f0b8dfae 100644 --- a/crates/extractors/solana/src/extractor.rs +++ b/crates/extractors/solana/src/extractor.rs @@ -15,7 +15,8 @@ use std::{ sync::{Arc, Mutex}, }; -use common::{BlockNum, BlockStreamer, BoxError, BoxResult, RawDatasetRows}; +use common::{BlockNum, BoxError, BoxResult}; +use datasets_raw::{client::BlockStreamer, rows::Rows}; use futures::{Stream, StreamExt}; use url::Url; @@ -86,7 +87,7 @@ impl SolanaExtractor { end: BlockNum, historical_block_stream: T, get_block_config: rpc_client::rpc_config::RpcBlockConfig, - ) -> impl Stream> + ) -> impl Stream> where T: Stream>, { @@ -184,7 +185,7 @@ impl BlockStreamer for SolanaExtractor { self, start: BlockNum, end: BlockNum, - ) -> impl Stream> { + ) -> impl Stream> { let get_block_config = rpc_client::rpc_config::RpcBlockConfig { encoding: Some(rpc_client::rpc_config::UiTransactionEncoding::Json), transaction_details: Some(rpc_client::rpc_config::TransactionDetails::Full), diff --git a/crates/extractors/solana/src/tables/mod.rs b/crates/extractors/solana/src/tables.rs similarity index 96% rename from crates/extractors/solana/src/tables/mod.rs rename to crates/extractors/solana/src/tables.rs index b463a7f12..e3f3b74c2 100644 --- a/crates/extractors/solana/src/tables/mod.rs +++ b/crates/extractors/solana/src/tables.rs @@ -1,4 +1,5 @@ -use common::{BoxResult, RawDatasetRows, metadata::segments::BlockRange}; +use common::{BlockRange, BoxResult}; +use datasets_raw::rows::Rows; use solana_clock::Slot; use crate::rpc_client::{EncodedTransaction, UiConfirmedBlock, UiMessage}; @@ -23,7 +24,7 @@ pub fn all(network: &str) -> Vec { pub(crate) fn convert_of_data_to_db_rows( mut block: crate::of1_client::DecodedBlock, network: &str, -) -> BoxResult { +) -> BoxResult { let of_transactions = std::mem::take(&mut block.transactions); let of_transactions_meta = std::mem::take(&mut block.transaction_metas); let mut db_transactions = Vec::new(); @@ -96,7 +97,7 @@ pub(crate) fn convert_of_data_to_db_rows( builder.build(range)? }; - Ok(RawDatasetRows::new(vec![ + Ok(Rows::new(vec![ block_headers_row, transactions_row, messages_row, @@ -108,7 +109,7 @@ pub(crate) fn convert_rpc_block_to_db_rows( slot: Slot, mut block: UiConfirmedBlock, network: &str, -) -> BoxResult { +) -> BoxResult { let rpc_transactions = std::mem::take(&mut block.transactions).unwrap_or_default(); let mut db_transactions = Vec::new(); let mut db_messages = Vec::new(); @@ -196,14 +197,14 @@ pub(crate) fn convert_rpc_block_to_db_rows( builder.build(range)? }; - Ok(RawDatasetRows::new(vec![ + Ok(Rows::new(vec![ block_headers_row, transactions_row, messages_row, ])) } -pub(crate) fn empty_db_rows(slot: Slot, network: &str) -> BoxResult { +pub(crate) fn empty_db_rows(slot: Slot, network: &str) -> BoxResult { let range = BlockRange { // Using the slot as a block number since we don't skip empty slots. numbers: slot..=slot, @@ -236,7 +237,7 @@ pub(crate) fn empty_db_rows(slot: Slot, network: &str) -> BoxResult BoxResult { + /// Builds the [TableRows] from the appended data. + pub(crate) fn build(self, range: BlockRange) -> BoxResult { let Self { mut special_block_num, mut slot, @@ -148,6 +148,6 @@ impl BlockHeaderRowsBuilder { Arc::new(block_time.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/solana/src/tables/instructions.rs b/crates/extractors/solana/src/tables/instructions.rs index 5350e1d44..394af6cd7 100644 --- a/crates/extractors/solana/src/tables/instructions.rs +++ b/crates/extractors/solana/src/tables/instructions.rs @@ -1,14 +1,14 @@ use std::sync::{Arc, LazyLock}; use common::{ - BoxResult, RawTableRows, SPECIAL_BLOCK_NUM, + BlockRange, BoxResult, SPECIAL_BLOCK_NUM, arrow::{ array::{ArrayRef, ListBuilder, UInt8Builder, UInt32Builder, UInt64Builder}, datatypes::{DataType, Field, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; use serde::Deserialize; use solana_clock::Slot; @@ -114,7 +114,7 @@ impl InstructionRowsBuilder { self.inner_stack_height.append_option(*inner_stack_height); } - pub(crate) fn build(self, range: BlockRange) -> BoxResult { + pub(crate) fn build(self, range: BlockRange) -> BoxResult { let Self { mut special_block_num, mut slot, @@ -137,6 +137,6 @@ impl InstructionRowsBuilder { Arc::new(inner_stack_height.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/solana/src/tables/messages.rs b/crates/extractors/solana/src/tables/messages.rs index f9ea7f844..2aa1e16bd 100644 --- a/crates/extractors/solana/src/tables/messages.rs +++ b/crates/extractors/solana/src/tables/messages.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, LazyLock}; use common::{ - BoxResult, RawTableRows, SPECIAL_BLOCK_NUM, + BlockRange, BoxResult, SPECIAL_BLOCK_NUM, arrow::{ array::{ ArrayRef, ListBuilder, StringBuilder, StructBuilder, UInt8Builder, UInt32Builder, @@ -9,9 +9,9 @@ use common::{ }, datatypes::{DataType, Field, Fields, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; use solana_clock::Slot; use crate::{rpc_client::UiRawMessage, tables::BASE58_ENCODED_HASH_LEN}; @@ -191,7 +191,7 @@ pub(crate) struct AddressTableLookup { pub(crate) readonly_indexes: Vec, } -/// A builder for converting [Message]s into [RawTableRows]. +/// A builder for converting [Message]s into [TableRows]. pub(crate) struct MessageRowsBuilder { special_block_num: UInt64Builder, slot: UInt64Builder, @@ -326,8 +326,8 @@ impl MessageRowsBuilder { self.recent_block_hash.append_value(recent_block_hash); } - /// Builds the [RawTableRows] from the appended data. - pub(crate) fn build(self, range: BlockRange) -> BoxResult { + /// Builds the [TableRows] from the appended data. + pub(crate) fn build(self, range: BlockRange) -> BoxResult { let Self { mut special_block_num, mut slot, @@ -348,6 +348,6 @@ impl MessageRowsBuilder { Arc::new(recent_block_hash.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/extractors/solana/src/tables/transactions.rs b/crates/extractors/solana/src/tables/transactions.rs index 978fd9791..3318853f9 100644 --- a/crates/extractors/solana/src/tables/transactions.rs +++ b/crates/extractors/solana/src/tables/transactions.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, LazyLock}; use base64::Engine; use common::{ - BoxResult, RawTableRows, SPECIAL_BLOCK_NUM, + BlockRange, BoxResult, SPECIAL_BLOCK_NUM, arrow::{ array::{ ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, ListBuilder, StringBuilder, @@ -10,9 +10,9 @@ use common::{ }, datatypes::{DataType, Field, Fields, Schema, SchemaRef}, }, - metadata::segments::BlockRange, }; use datasets_common::dataset::Table; +use datasets_raw::rows::TableRows; use serde::Deserialize; use solana_clock::Slot; @@ -935,7 +935,7 @@ impl TransactionRowsBuilder { self.slot.append_value(*slot); } - pub(crate) fn build(self, range: BlockRange) -> BoxResult { + pub(crate) fn build(self, range: BlockRange) -> BoxResult { let Self { mut special_block_num, mut tx_index, @@ -978,6 +978,6 @@ impl TransactionRowsBuilder { Arc::new(cost_units.finish()), ]; - RawTableRows::new(table(range.network.clone()), range, columns) + TableRows::new(table(range.network.clone()), range, columns).map_err(Into::into) } } diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 0b28f6e1c..706780f58 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -21,7 +21,7 @@ use async_stream::stream; use axum::{Router, http::StatusCode, response::IntoResponse}; use bytes::{BufMut, Bytes, BytesMut}; use common::{ - BlockNum, DetachedLogicalPlan, PlanningContext, QueryContext, + BlockNum, BlockRange, DetachedLogicalPlan, PlanningContext, QueryContext, ResumeWatermark, arrow::{ self, array::RecordBatch, @@ -42,7 +42,6 @@ use common::{ }, }, }, - metadata::segments::{BlockRange, ResumeWatermark}, query_context::{Error as CoreError, QueryEnv}, sql::{ ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, diff --git a/tests/src/testlib/helpers.rs b/tests/src/testlib/helpers.rs index d9f84df59..57ae545b6 100644 --- a/tests/src/testlib/helpers.rs +++ b/tests/src/testlib/helpers.rs @@ -12,10 +12,9 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use amp_data_store::DataStore; use amp_dataset_store::DatasetStore; use common::{ - BoxError, LogicalCatalog, LogicalTable, + BlockRange, BoxError, LogicalCatalog, LogicalTable, arrow::array::RecordBatch, catalog::physical::{Catalog, PhysicalTable}, - metadata::segments::BlockRange, sql, sql_str::SqlStr, }; diff --git a/tests/src/tests/it_multi_network_batch.rs b/tests/src/tests/it_multi_network_batch.rs index 6da5c64f6..dbc24be42 100644 --- a/tests/src/tests/it_multi_network_batch.rs +++ b/tests/src/tests/it_multi_network_batch.rs @@ -1,5 +1,5 @@ use arrow_flight::FlightData; -use common::metadata::segments::BlockRange; +use common::BlockRange; use monitoring::logging; use serde::Deserialize; use tokio::sync::mpsc; diff --git a/tests/src/tests/it_reorg.rs b/tests/src/tests/it_reorg.rs index e7f61888d..6dc46ad32 100644 --- a/tests/src/tests/it_reorg.rs +++ b/tests/src/tests/it_reorg.rs @@ -3,9 +3,8 @@ use std::{collections::BTreeMap, ops::RangeInclusive, time::Duration}; use alloy::primitives::BlockHash; use arrow_flight::FlightData; use common::{ - BlockNum, + BlockNum, BlockRange, catalog::{logical::for_query as logical_catalog, physical::for_query as physical_catalog}, - metadata::segments::BlockRange, sql::{self, resolve_function_references, resolve_table_references}, sql_str::SqlStr, };