Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
622 changes: 617 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ members = [
"crates/core/providers-solana/gen",
"crates/core/providers-static",
"crates/core/providers-static/gen",
"crates/core/providers-tempo",
"crates/core/providers-tempo/gen",
"crates/core/verification",
"crates/core/worker-core",
"crates/core/worker-datasets-derived",
Expand Down
1 change: 1 addition & 0 deletions crates/core/monitoring/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const AMP_CRATES: &[&str] = &[
"amp_providers_registry",
"amp_providers_solana",
"amp_providers_static",
"amp_providers_tempo",
"amp_worker_core",
"amp_worker_datasets_derived",
"amp_worker_datasets_raw",
Expand Down
1 change: 1 addition & 0 deletions crates/core/providers-registry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ amp-providers-common = { path = "../providers-common" }
amp-providers-evm-rpc = { path = "../providers-evm-rpc" }
amp-providers-firehose = { path = "../providers-firehose" }
amp-providers-solana = { path = "../providers-solana" }
amp-providers-tempo = { path = "../providers-tempo" }
async-stream.workspace = true
datasets-common = { path = "../datasets-common" }
datasets-raw = { path = "../datasets-raw" }
Expand Down
36 changes: 36 additions & 0 deletions crates/core/providers-registry/src/client/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use amp_providers_common::{
use amp_providers_evm_rpc::kind::EvmRpcProviderKind;
use amp_providers_firehose::kind::FirehoseProviderKind;
use amp_providers_solana::kind::SolanaProviderKind;
use amp_providers_tempo::kind::TempoProviderKind;
use async_stream::stream;
use datasets_common::{block_num::BlockNum, network_id::NetworkId};
use datasets_raw::{
Expand Down Expand Up @@ -75,6 +76,21 @@ pub async fn create(
name,
source: ProviderClientError::Firehose(err),
})
} else if header.kind == TempoProviderKind {
let typed_config =
config
.try_into_config()
.map_err(|err| CreateClientError::ConfigParse {
name: name.clone(),
source: err,
})?;
amp_providers_tempo::client(name.clone(), typed_config, meter)
.await
.map(BlockStreamClient::Tempo)
.map_err(|err| CreateClientError::ProviderClient {
name,
source: ProviderClientError::Tempo(err),
})
} else {
Err(CreateClientError::UnsupportedKind {
kind: header.kind.to_string(),
Expand All @@ -88,6 +104,7 @@ pub enum BlockStreamClient {
EvmRpc(amp_providers_evm_rpc::Client),
Solana(amp_providers_solana::Client),
Firehose(Box<amp_providers_firehose::Client>),
Tempo(amp_providers_tempo::Client),
}

impl BlockStreamer for BlockStreamClient {
Expand Down Expand Up @@ -118,6 +135,12 @@ impl BlockStreamer for BlockStreamClient {
yield item;
}
}
Self::Tempo(client) => {
let stream = client.block_stream(start_block, end_block).await;
for await item in stream {
yield item;
}
}
}
}
}
Expand All @@ -130,6 +153,7 @@ impl BlockStreamer for BlockStreamClient {
Self::EvmRpc(client) => client.latest_block(finalized).await,
Self::Solana(client) => client.latest_block(finalized).await,
Self::Firehose(client) => client.latest_block(finalized).await,
Self::Tempo(client) => client.latest_block(finalized).await,
}
}

Expand All @@ -138,6 +162,7 @@ impl BlockStreamer for BlockStreamClient {
Self::EvmRpc(client) => client.bucket_size(),
Self::Solana(client) => client.bucket_size(),
Self::Firehose(client) => client.bucket_size(),
Self::Tempo(client) => client.bucket_size(),
}
}

Expand All @@ -146,6 +171,7 @@ impl BlockStreamer for BlockStreamClient {
Self::EvmRpc(client) => client.provider_name(),
Self::Solana(client) => client.provider_name(),
Self::Firehose(client) => client.provider_name(),
Self::Tempo(client) => client.provider_name(),
}
}
}
Expand Down Expand Up @@ -228,6 +254,13 @@ pub enum ProviderClientError {
/// invalid gRPC endpoints, connection issues, or authentication failures.
#[error("failed to create Firehose client")]
Firehose(#[source] amp_providers_firehose::error::ClientError),

/// Failed to create Tempo RPC client.
///
/// This occurs during initialization of the Tempo RPC client, which may fail due to
/// invalid RPC URLs, connection issues, or authentication failures.
#[error("failed to create Tempo RPC client")]
Tempo(#[source] amp_providers_tempo::error::ClientError),
}

impl crate::retryable::RetryableErrorExt for ProviderClientError {
Expand All @@ -237,6 +270,9 @@ impl crate::retryable::RetryableErrorExt for ProviderClientError {
Self::EvmRpc(_) => true,
Self::Solana(err) => err.is_retryable(),
Self::Firehose(err) => err.is_retryable(),
Self::Tempo(err) => {
matches!(err, amp_providers_tempo::error::ClientError::Transport(_))
}
}
}
}
31 changes: 31 additions & 0 deletions crates/core/providers-tempo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "amp-providers-tempo"
edition.workspace = true
version.workspace = true
license-file.workspace = true

[features]
# JSON Schema generation support for provider configuration validation.
schemars = ["dep:schemars", "amp-providers-common/schemars"]

[dependencies]
alloy.workspace = true
amp-providers-common = { path = "../providers-common" }
anyhow.workspace = true
async-stream.workspace = true
datasets-common = { path = "../datasets-common" }
datasets-raw = { path = "../datasets-raw" }
futures.workspace = true
governor.workspace = true
headers.workspace = true
monitoring = { path = "../monitoring" }
schemars = { workspace = true, optional = true }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tempo-alloy = { git = "https://github.com/tempoxyz/tempo" }
thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
tracing.workspace = true
url.workspace = true
21 changes: 21 additions & 0 deletions crates/core/providers-tempo/gen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "amp-providers-tempo-gen"
edition.workspace = true
version.workspace = true
license-file.workspace = true
publish = false

# The parent crate is always available as a build dependency
[build-dependencies]
amp-providers-tempo = { path = ".." }

# Provider config schema generation dependencies
# These dependencies are only included when the gen_schema_provider cfg flag is enabled
[target.'cfg(gen_schema_provider)'.build-dependencies]
amp-providers-tempo = { path = "..", features = ["schemars"] }
schemars = { workspace = true }
serde_json = { workspace = true }

[lints.rust]
# Allow the gen_schema_provider cfg flag used for conditional schema generation
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(gen_schema_provider)'] }
24 changes: 24 additions & 0 deletions crates/core/providers-tempo/gen/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# JSON Schema Generation

A generation crate for Tempo provider configuration JSON schemas. This crate generates JSON schemas for Tempo provider configurations using the schemars library for external validation and documentation purposes.

## JSON Schema Generation

The library uses a build configuration flag `gen_schema_provider` that enables JSON schema generation during the build process. When enabled, the build script will generate JSON schemas from Rust structs using schemars for Tempo provider configuration validation.

To generate JSON schema bindings, run:

```bash
just gen-tempo-provider-schema
```

Or using the full `cargo` command:

```bash
RUSTFLAGS="--cfg gen_schema_provider" cargo check -p amp-providers-tempo-gen

mkdir -p docs/providers
cp target/debug/build/amp-providers-tempo-gen-*/out/schema.json docs/schemas/providers/tempo.spec.json
```

This will generate JSON schemas from the Tempo provider configurations and copy them to `docs/schemas/providers/tempo.spec.json`.
25 changes: 25 additions & 0 deletions crates/core/providers-tempo/gen/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(gen_schema_provider)]
{
println!(
"cargo:warning=Config 'gen_schema_provider' enabled: Running JSON schema generation"
);
let out_dir = std::env::var("OUT_DIR")?;
let schema = schemars::schema_for!(amp_providers_tempo::config::TempoProviderConfig);
let schema_json = serde_json::to_string_pretty(&schema)?;
let schema_path = format!("{out_dir}/schema.json");
std::fs::write(&schema_path, schema_json)?;
println!(
"cargo:warning=Generated Tempo provider config schema file: {}",
schema_path
);
}
#[cfg(not(gen_schema_provider))]
{
println!(
"cargo:debug=Config 'gen_schema_provider' not enabled: Skipping JSON schema generation"
);
}

Ok(())
}
2 changes: 2 additions & 0 deletions crates/core/providers-tempo/gen/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Codegen auxiliary crate - contains no runtime logic
#![doc = include_str!("../README.md")]
Loading
Loading