From a51a0429a8c3e183e3959af8e275e4889f912969 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Thu, 19 Mar 2026 20:19:53 +0530 Subject: [PATCH] feat(providers-tempo): add Tempo RPC provider crate --- Cargo.lock | 622 +++++++++++++++++- Cargo.toml | 2 + crates/core/monitoring/src/logging.rs | 1 + crates/core/providers-registry/Cargo.toml | 1 + .../src/client/block_stream.rs | 36 + crates/core/providers-tempo/Cargo.toml | 31 + crates/core/providers-tempo/gen/Cargo.toml | 21 + crates/core/providers-tempo/gen/README.md | 24 + crates/core/providers-tempo/gen/build.rs | 25 + crates/core/providers-tempo/gen/src/lib.rs | 2 + crates/core/providers-tempo/src/client.rs | 605 +++++++++++++++++ crates/core/providers-tempo/src/config.rs | 109 +++ crates/core/providers-tempo/src/convert.rs | 485 ++++++++++++++ crates/core/providers-tempo/src/error.rs | 143 ++++ crates/core/providers-tempo/src/kind.rs | 132 ++++ crates/core/providers-tempo/src/lib.rs | 83 +++ crates/core/providers-tempo/src/metrics.rs | 90 +++ crates/core/providers-tempo/src/provider.rs | 217 ++++++ docs/schemas/providers/tempo.spec.json | 86 +++ justfile | 14 +- tests/config/manifests/tempo.json | 2 +- tests/src/tests/it_ampctl_gen_manifest.rs | 10 +- 22 files changed, 2732 insertions(+), 9 deletions(-) create mode 100644 crates/core/providers-tempo/Cargo.toml create mode 100644 crates/core/providers-tempo/gen/Cargo.toml create mode 100644 crates/core/providers-tempo/gen/README.md create mode 100644 crates/core/providers-tempo/gen/build.rs create mode 100644 crates/core/providers-tempo/gen/src/lib.rs create mode 100644 crates/core/providers-tempo/src/client.rs create mode 100644 crates/core/providers-tempo/src/config.rs create mode 100644 crates/core/providers-tempo/src/convert.rs create mode 100644 crates/core/providers-tempo/src/error.rs create mode 100644 crates/core/providers-tempo/src/kind.rs create mode 100644 crates/core/providers-tempo/src/lib.rs create mode 100644 crates/core/providers-tempo/src/metrics.rs create mode 100644 crates/core/providers-tempo/src/provider.rs create mode 100644 docs/schemas/providers/tempo.spec.json diff --git a/Cargo.lock b/Cargo.lock index 11ef46e25..8b1ba2bef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "borsh", + "k256", "serde", "thiserror 2.0.18", ] @@ -400,7 +401,9 @@ dependencies = [ "alloy-primitives", "alloy-serde", "alloy-trie", + "borsh", "serde", + "serde_with", ] [[package]] @@ -516,6 +519,7 @@ dependencies = [ "const-hex", "derive_more", "foldhash 0.2.0", + "getrandom 0.4.2", "hashbrown 0.16.1", "indexmap 2.13.0", "itoa", @@ -1161,6 +1165,7 @@ dependencies = [ "amp-providers-evm-rpc", "amp-providers-firehose", "amp-providers-solana", + "amp-providers-tempo", "async-stream", "datasets-common", "datasets-raw", @@ -1252,6 +1257,41 @@ dependencies = [ "serde_json", ] +[[package]] +name = "amp-providers-tempo" +version = "0.1.0" +dependencies = [ + "alloy", + "amp-providers-common", + "anyhow", + "async-stream", + "datasets-common", + "datasets-raw", + "futures", + "governor 0.10.4", + "headers", + "monitoring", + "schemars 1.2.1", + "serde", + "serde_json", + "serde_with", + "tempo-alloy", + "thiserror 2.0.18", + "tokio", + "tower", + "tracing", + "url", +] + +[[package]] +name = "amp-providers-tempo-gen" +version = "0.1.0" +dependencies = [ + "amp-providers-tempo", + "schemars 1.2.1", + "serde_json", +] + [[package]] name = "amp-worker-core" version = "0.1.0" @@ -1597,6 +1637,50 @@ dependencies = [ "rustversion", ] +[[package]] +name = "ark-bls12-381" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df4dcc01ff89867cd86b0da835f23c3f02738353aaee7dde7495af71363b8d5" +dependencies = [ + "ark-ec", + "ark-ff 0.5.0", + "ark-serialize 0.5.0", + "ark-std 0.5.0", +] + +[[package]] +name = "ark-bn254" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d69eab57e8d2663efa5c63135b2af4f396d66424f88954c21104125ab6b3e6bc" +dependencies = [ + "ark-ec", + "ark-ff 0.5.0", + "ark-std 0.5.0", +] + +[[package]] +name = "ark-ec" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d68f2d516162846c1238e755a7c4d131b892b70cc70c471a8e3ca3ed818fce" +dependencies = [ + "ahash 0.8.12", + "ark-ff 0.5.0", + "ark-poly", + "ark-serialize 0.5.0", + "ark-std 0.5.0", + "educe", + "fnv", + "hashbrown 0.15.5", + "itertools 0.13.0", + "num-bigint 0.4.6", + "num-integer", + "num-traits", + "zeroize", +] + [[package]] name = "ark-ff" version = "0.3.0" @@ -1723,6 +1807,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "ark-poly" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579305839da207f02b89cd1679e50e67b4331e2f9294a57693e5051b7703fe27" +dependencies = [ + "ahash 0.8.12", + "ark-ff 0.5.0", + "ark-serialize 0.5.0", + "ark-std 0.5.0", + "educe", + "fnv", + "hashbrown 0.15.5", +] + [[package]] name = "ark-serialize" version = "0.3.0" @@ -1750,12 +1849,24 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f4d068aaf107ebcd7dfb52bc748f8030e0fc930ac8e360146ca54c1203088f7" dependencies = [ + "ark-serialize-derive", "ark-std 0.5.0", "arrayvec", "digest 0.10.7", "num-bigint 0.4.6", ] +[[package]] +name = "ark-serialize-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213888f660fddcca0d257e88e54ac05bca01885f258ccdf695bafd77031bb69d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "ark-std" version = "0.3.0" @@ -2200,6 +2311,16 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "aurora-engine-modexp" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "518bc5745a6264b5fd7b09dffb9667e400ee9e2bbe18555fac75e1fe9afa0df9" +dependencies = [ + "hex", + "num 0.4.3", +] + [[package]] name = "auto_impl" version = "1.3.0" @@ -2494,6 +2615,7 @@ checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" dependencies = [ "funty", "radium", + "serde", "tap", "wyz", ] @@ -3393,6 +3515,12 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -3707,6 +3835,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] @@ -4593,6 +4722,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive-where" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08b3a0bcc0d079199cd476b2cae8435016ec11d1c0986c6901c5ac223041534" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -7051,6 +7191,27 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "modular-bitfield" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2956e537fc68236d2aa048f55704f231cc93f1c4de42fe1ecb5bd7938061fc4a" +dependencies = [ + "modular-bitfield-impl", + "static_assertions", +] + +[[package]] +name = "modular-bitfield-impl" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b43b4fd69e3437618106f7754f34021b831a514f9e1a98ae863cabcd8d8dad" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "monitoring" version = "0.1.0" @@ -7180,7 +7341,21 @@ dependencies = [ "num-complex 0.2.4", "num-integer", "num-iter", - "num-rational", + "num-rational 0.2.4", + "num-traits", +] + +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint 0.4.6", + "num-complex 0.4.6", + "num-integer", + "num-iter", + "num-rational 0.4.2", "num-traits", ] @@ -7299,6 +7474,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint 0.4.6", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -7498,6 +7684,10 @@ name = "once_cell" version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "once_cell_polyfill" @@ -7511,6 +7701,24 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "op-alloy-consensus" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadcb964b0aa645d12e952d470c7585d23286d8bcf1ac41589410b6724216b68" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "alloy-rpc-types-eth", + "alloy-serde", + "derive_more", + "serde", + "serde_with", + "thiserror 2.0.18", +] + [[package]] name = "opaque-debug" version = "0.3.1" @@ -7657,6 +7865,19 @@ dependencies = [ "num-traits", ] +[[package]] +name = "p256" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "serdect", + "sha2", +] + [[package]] name = "page_size" version = "0.6.0" @@ -7926,7 +8147,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd23b938276f14057220b707937bcb42fa76dda7560e57a2da30cb52d557937" dependencies = [ - "num", + "num 0.2.1", ] [[package]] @@ -8021,7 +8242,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" dependencies = [ - "phf_macros", + "phf_macros 0.11.3", "phf_shared 0.11.3", ] @@ -8034,13 +8255,24 @@ dependencies = [ "phf_shared 0.12.1", ] +[[package]] +name = "phf" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf" +dependencies = [ + "phf_macros 0.13.1", + "phf_shared 0.13.1", + "serde", +] + [[package]] name = "phf_codegen" version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" dependencies = [ - "phf_generator", + "phf_generator 0.11.3", "phf_shared 0.11.3", ] @@ -8054,19 +8286,42 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "phf_generator" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135ace3a761e564ec88c03a77317a7c6b80bb7f7135ef2544dbe054243b89737" +dependencies = [ + "fastrand", + "phf_shared 0.13.1", +] + [[package]] name = "phf_macros" version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" dependencies = [ - "phf_generator", + "phf_generator 0.11.3", "phf_shared 0.11.3", "proc-macro2", "quote", "syn 2.0.117", ] +[[package]] +name = "phf_macros" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "812f032b54b1e759ccd5f8b6677695d5268c588701effba24601f6932f8269ef" +dependencies = [ + "phf_generator 0.13.1", + "phf_shared 0.13.1", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "phf_shared" version = "0.11.3" @@ -8085,6 +8340,15 @@ dependencies = [ "siphasher 1.0.2", ] +[[package]] +name = "phf_shared" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266" +dependencies = [ + "siphasher 1.0.2", +] + [[package]] name = "pin-project" version = "1.1.11" @@ -8253,6 +8517,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "primeorder" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" +dependencies = [ + "elliptic-curve", + "serdect", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -8660,6 +8934,7 @@ version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" dependencies = [ + "rand 0.9.2", "rustversion", ] @@ -8996,6 +9271,270 @@ dependencies = [ "tower-service", ] +[[package]] +name = "reth-codecs" +version = "1.11.3" +source = "git+https://github.com/paradigmxyz/reth?rev=9060c50#9060c5059e0ca15813ab97a72e33d04cd2e7d998" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-genesis", + "alloy-primitives", + "alloy-trie", + "bytes", + "modular-bitfield", + "op-alloy-consensus", + "reth-codecs-derive", + "reth-zstd-compressors", + "serde", +] + +[[package]] +name = "reth-codecs-derive" +version = "1.11.3" +source = "git+https://github.com/paradigmxyz/reth?rev=9060c50#9060c5059e0ca15813ab97a72e33d04cd2e7d998" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "reth-ethereum-primitives" +version = "1.11.3" +source = "git+https://github.com/paradigmxyz/reth?rev=9060c50#9060c5059e0ca15813ab97a72e33d04cd2e7d998" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rpc-types-eth", + "reth-codecs", + "reth-primitives-traits", + "serde", +] + +[[package]] +name = "reth-primitives-traits" +version = "1.11.3" +source = "git+https://github.com/paradigmxyz/reth?rev=9060c50#9060c5059e0ca15813ab97a72e33d04cd2e7d998" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-genesis", + "alloy-primitives", + "alloy-rlp", + "alloy-rpc-types-eth", + "alloy-trie", + "auto_impl", + "bytes", + "dashmap 6.1.0", + "derive_more", + "once_cell", + "op-alloy-consensus", + "reth-codecs", + "revm-bytecode", + "revm-primitives", + "revm-state", + "secp256k1", + "serde", + "thiserror 2.0.18", +] + +[[package]] +name = "reth-zstd-compressors" +version = "1.11.3" +source = "git+https://github.com/paradigmxyz/reth?rev=9060c50#9060c5059e0ca15813ab97a72e33d04cd2e7d998" +dependencies = [ + "zstd", +] + +[[package]] +name = "revm" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0abc15d09cd211e9e73410ada10134069c794d4bcdb787dfc16a1bf0939849c" +dependencies = [ + "revm-bytecode", + "revm-context", + "revm-context-interface", + "revm-database", + "revm-database-interface", + "revm-handler", + "revm-inspector", + "revm-interpreter", + "revm-precompile", + "revm-primitives", + "revm-state", +] + +[[package]] +name = "revm-bytecode" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e86e468df3cf5cf59fa7ef71a3e9ccabb76bb336401ea2c0674f563104cf3c5e" +dependencies = [ + "bitvec", + "phf 0.13.1", + "revm-primitives", + "serde", +] + +[[package]] +name = "revm-context" +version = "15.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eb1f0a76b14d684a444fc52f7bf6b7564bf882599d91ee62e76d602e7a187c7" +dependencies = [ + "bitvec", + "cfg-if", + "derive-where", + "revm-bytecode", + "revm-context-interface", + "revm-database-interface", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-context-interface" +version = "16.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc256b27743e2912ca16899568e6652a372eb5d1d573e6edb16c7836b16cf487" +dependencies = [ + "alloy-eip2930", + "alloy-eip7702", + "auto_impl", + "either", + "revm-database-interface", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-database" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0a7d6da41061f2c50f99a2632571026b23684b5449ff319914151f4449b6c8" +dependencies = [ + "alloy-eips", + "revm-bytecode", + "revm-database-interface", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-database-interface" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd497a38a79057b94a049552cb1f925ad15078bc1a479c132aeeebd1d2ccc768" +dependencies = [ + "auto_impl", + "either", + "revm-primitives", + "revm-state", + "serde", + "thiserror 2.0.18", +] + +[[package]] +name = "revm-handler" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1eed729ca9b228ae98688f352235871e9b8be3d568d488e4070f64c56e9d3d" +dependencies = [ + "auto_impl", + "derive-where", + "revm-bytecode", + "revm-context", + "revm-context-interface", + "revm-database-interface", + "revm-interpreter", + "revm-precompile", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-inspector" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf5102391706513689f91cb3cb3d97b5f13a02e8647e6e9cb7620877ef84847" +dependencies = [ + "auto_impl", + "either", + "revm-context", + "revm-database-interface", + "revm-handler", + "revm-interpreter", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-interpreter" +version = "34.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf22f80612bb8f58fd1f578750281f2afadb6c93835b14ae6a4d6b75ca26f445" +dependencies = [ + "revm-bytecode", + "revm-context-interface", + "revm-primitives", + "revm-state", + "serde", +] + +[[package]] +name = "revm-precompile" +version = "32.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2ec11f45deec71e4945e1809736bb20d454285f9167ab53c5159dae1deb603f" +dependencies = [ + "ark-bls12-381", + "ark-bn254", + "ark-ec", + "ark-ff 0.5.0", + "ark-serialize 0.5.0", + "arrayref", + "aurora-engine-modexp", + "cfg-if", + "k256", + "p256", + "revm-primitives", + "ripemd", + "sha2", +] + +[[package]] +name = "revm-primitives" +version = "22.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfb5ce6cf18b118932bcdb7da05cd9c250f2cb9f64131396b55f3fe3537c35" +dependencies = [ + "alloy-primitives", + "num_enum", + "once_cell", + "serde", +] + +[[package]] +name = "revm-state" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29404707763da607e5d6e4771cb203998c28159279c2f64cc32de08d2814651" +dependencies = [ + "alloy-eip7928", + "bitflags 2.11.0", + "revm-bytecode", + "revm-primitives", + "serde", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -9029,6 +9568,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ripemd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd124222d17ad93a644ed9d011a40f4fb64aa54275c08cc216524a9ea82fb09f" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "rkyv" version = "0.7.46" @@ -9222,6 +9770,9 @@ name = "rustc-hash" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +dependencies = [ + "rand 0.8.5", +] [[package]] name = "rustc-hex" @@ -12311,6 +12862,43 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tempo-alloy" +version = "1.4.2" +source = "git+https://github.com/tempoxyz/tempo#7262978724b8bf3f1d89d14dba62244efe1e9848" +dependencies = [ + "alloy-consensus", + "alloy-contract", + "alloy-eips", + "alloy-network", + "alloy-primitives", + "alloy-provider", + "alloy-rpc-types-eth", + "alloy-serde", + "alloy-signer", + "alloy-signer-local", + "alloy-transport", + "async-trait", + "dashmap 6.1.0", + "derive_more", + "futures", + "serde", + "tempo-contracts", + "tempo-primitives", + "tracing", +] + +[[package]] +name = "tempo-contracts" +version = "1.4.2" +source = "git+https://github.com/tempoxyz/tempo#7262978724b8bf3f1d89d14dba62244efe1e9848" +dependencies = [ + "alloy-contract", + "alloy-primitives", + "alloy-sol-types", + "serde", +] + [[package]] name = "tempo-datasets" version = "0.1.0" @@ -12319,6 +12907,30 @@ dependencies = [ "datasets-raw", ] +[[package]] +name = "tempo-primitives" +version = "1.4.2" +source = "git+https://github.com/tempoxyz/tempo#7262978724b8bf3f1d89d14dba62244efe1e9848" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "alloy-serde", + "base64 0.22.1", + "derive_more", + "once_cell", + "p256", + "reth-codecs", + "reth-ethereum-primitives", + "reth-primitives-traits", + "revm", + "serde", + "serde_json", + "sha2", + "tempo-contracts", +] + [[package]] name = "termcolor" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 6599b53ca..dc649bce6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ members = [ "crates/core/providers-solana/gen", "crates/core/providers-static", "crates/core/providers-static/gen", + "crates/core/providers-tempo", + "crates/core/providers-tempo/gen", "crates/core/verification", "crates/core/worker-core", "crates/core/worker-datasets-derived", diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index 35432863a..565373a8a 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -78,6 +78,7 @@ const AMP_CRATES: &[&str] = &[ "amp_providers_registry", "amp_providers_solana", "amp_providers_static", + "amp_providers_tempo", "amp_worker_core", "amp_worker_datasets_derived", "amp_worker_datasets_raw", diff --git a/crates/core/providers-registry/Cargo.toml b/crates/core/providers-registry/Cargo.toml index 3c295e3ed..ea20a5f48 100644 --- a/crates/core/providers-registry/Cargo.toml +++ b/crates/core/providers-registry/Cargo.toml @@ -9,6 +9,7 @@ amp-providers-common = { path = "../providers-common" } amp-providers-evm-rpc = { path = "../providers-evm-rpc" } amp-providers-firehose = { path = "../providers-firehose" } amp-providers-solana = { path = "../providers-solana" } +amp-providers-tempo = { path = "../providers-tempo" } async-stream.workspace = true datasets-common = { path = "../datasets-common" } datasets-raw = { path = "../datasets-raw" } diff --git a/crates/core/providers-registry/src/client/block_stream.rs b/crates/core/providers-registry/src/client/block_stream.rs index f7fb60a60..e9fedd798 100644 --- a/crates/core/providers-registry/src/client/block_stream.rs +++ b/crates/core/providers-registry/src/client/block_stream.rs @@ -7,6 +7,7 @@ use amp_providers_common::{ use amp_providers_evm_rpc::kind::EvmRpcProviderKind; use amp_providers_firehose::kind::FirehoseProviderKind; use amp_providers_solana::kind::SolanaProviderKind; +use amp_providers_tempo::kind::TempoProviderKind; use async_stream::stream; use datasets_common::{block_num::BlockNum, network_id::NetworkId}; use datasets_raw::{ @@ -75,6 +76,21 @@ pub async fn create( name, source: ProviderClientError::Firehose(err), }) + } else if header.kind == TempoProviderKind { + let typed_config = + config + .try_into_config() + .map_err(|err| CreateClientError::ConfigParse { + name: name.clone(), + source: err, + })?; + amp_providers_tempo::client(name.clone(), typed_config, meter) + .await + .map(BlockStreamClient::Tempo) + .map_err(|err| CreateClientError::ProviderClient { + name, + source: ProviderClientError::Tempo(err), + }) } else { Err(CreateClientError::UnsupportedKind { kind: header.kind.to_string(), @@ -88,6 +104,7 @@ pub enum BlockStreamClient { EvmRpc(amp_providers_evm_rpc::Client), Solana(amp_providers_solana::Client), Firehose(Box), + Tempo(amp_providers_tempo::Client), } impl BlockStreamer for BlockStreamClient { @@ -118,6 +135,12 @@ impl BlockStreamer for BlockStreamClient { yield item; } } + Self::Tempo(client) => { + let stream = client.block_stream(start_block, end_block).await; + for await item in stream { + yield item; + } + } } } } @@ -130,6 +153,7 @@ impl BlockStreamer for BlockStreamClient { Self::EvmRpc(client) => client.latest_block(finalized).await, Self::Solana(client) => client.latest_block(finalized).await, Self::Firehose(client) => client.latest_block(finalized).await, + Self::Tempo(client) => client.latest_block(finalized).await, } } @@ -138,6 +162,7 @@ impl BlockStreamer for BlockStreamClient { Self::EvmRpc(client) => client.bucket_size(), Self::Solana(client) => client.bucket_size(), Self::Firehose(client) => client.bucket_size(), + Self::Tempo(client) => client.bucket_size(), } } @@ -146,6 +171,7 @@ impl BlockStreamer for BlockStreamClient { Self::EvmRpc(client) => client.provider_name(), Self::Solana(client) => client.provider_name(), Self::Firehose(client) => client.provider_name(), + Self::Tempo(client) => client.provider_name(), } } } @@ -228,6 +254,13 @@ pub enum ProviderClientError { /// invalid gRPC endpoints, connection issues, or authentication failures. #[error("failed to create Firehose client")] Firehose(#[source] amp_providers_firehose::error::ClientError), + + /// Failed to create Tempo RPC client. + /// + /// This occurs during initialization of the Tempo RPC client, which may fail due to + /// invalid RPC URLs, connection issues, or authentication failures. + #[error("failed to create Tempo RPC client")] + Tempo(#[source] amp_providers_tempo::error::ClientError), } impl crate::retryable::RetryableErrorExt for ProviderClientError { @@ -237,6 +270,9 @@ impl crate::retryable::RetryableErrorExt for ProviderClientError { Self::EvmRpc(_) => true, Self::Solana(err) => err.is_retryable(), Self::Firehose(err) => err.is_retryable(), + Self::Tempo(err) => { + matches!(err, amp_providers_tempo::error::ClientError::Transport(_)) + } } } } diff --git a/crates/core/providers-tempo/Cargo.toml b/crates/core/providers-tempo/Cargo.toml new file mode 100644 index 000000000..5ba4ab2fa --- /dev/null +++ b/crates/core/providers-tempo/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "amp-providers-tempo" +edition.workspace = true +version.workspace = true +license-file.workspace = true + +[features] +# JSON Schema generation support for provider configuration validation. +schemars = ["dep:schemars", "amp-providers-common/schemars"] + +[dependencies] +alloy.workspace = true +amp-providers-common = { path = "../providers-common" } +anyhow.workspace = true +async-stream.workspace = true +datasets-common = { path = "../datasets-common" } +datasets-raw = { path = "../datasets-raw" } +futures.workspace = true +governor.workspace = true +headers.workspace = true +monitoring = { path = "../monitoring" } +schemars = { workspace = true, optional = true } +serde.workspace = true +serde_json.workspace = true +serde_with.workspace = true +tempo-alloy = { git = "https://github.com/tempoxyz/tempo" } +thiserror.workspace = true +tokio.workspace = true +tower.workspace = true +tracing.workspace = true +url.workspace = true diff --git a/crates/core/providers-tempo/gen/Cargo.toml b/crates/core/providers-tempo/gen/Cargo.toml new file mode 100644 index 000000000..820fabddf --- /dev/null +++ b/crates/core/providers-tempo/gen/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "amp-providers-tempo-gen" +edition.workspace = true +version.workspace = true +license-file.workspace = true +publish = false + +# The parent crate is always available as a build dependency +[build-dependencies] +amp-providers-tempo = { path = ".." } + +# Provider config schema generation dependencies +# These dependencies are only included when the gen_schema_provider cfg flag is enabled +[target.'cfg(gen_schema_provider)'.build-dependencies] +amp-providers-tempo = { path = "..", features = ["schemars"] } +schemars = { workspace = true } +serde_json = { workspace = true } + +[lints.rust] +# Allow the gen_schema_provider cfg flag used for conditional schema generation +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(gen_schema_provider)'] } diff --git a/crates/core/providers-tempo/gen/README.md b/crates/core/providers-tempo/gen/README.md new file mode 100644 index 000000000..4d8f1cb55 --- /dev/null +++ b/crates/core/providers-tempo/gen/README.md @@ -0,0 +1,24 @@ +# JSON Schema Generation + +A generation crate for Tempo provider configuration JSON schemas. This crate generates JSON schemas for Tempo provider configurations using the schemars library for external validation and documentation purposes. + +## JSON Schema Generation + +The library uses a build configuration flag `gen_schema_provider` that enables JSON schema generation during the build process. When enabled, the build script will generate JSON schemas from Rust structs using schemars for Tempo provider configuration validation. + +To generate JSON schema bindings, run: + +```bash +just gen-tempo-provider-schema +``` + +Or using the full `cargo` command: + +```bash +RUSTFLAGS="--cfg gen_schema_provider" cargo check -p amp-providers-tempo-gen + +mkdir -p docs/providers +cp target/debug/build/amp-providers-tempo-gen-*/out/schema.json docs/schemas/providers/tempo.spec.json +``` + +This will generate JSON schemas from the Tempo provider configurations and copy them to `docs/schemas/providers/tempo.spec.json`. diff --git a/crates/core/providers-tempo/gen/build.rs b/crates/core/providers-tempo/gen/build.rs new file mode 100644 index 000000000..8556902ab --- /dev/null +++ b/crates/core/providers-tempo/gen/build.rs @@ -0,0 +1,25 @@ +fn main() -> Result<(), Box> { + #[cfg(gen_schema_provider)] + { + println!( + "cargo:warning=Config 'gen_schema_provider' enabled: Running JSON schema generation" + ); + let out_dir = std::env::var("OUT_DIR")?; + let schema = schemars::schema_for!(amp_providers_tempo::config::TempoProviderConfig); + let schema_json = serde_json::to_string_pretty(&schema)?; + let schema_path = format!("{out_dir}/schema.json"); + std::fs::write(&schema_path, schema_json)?; + println!( + "cargo:warning=Generated Tempo provider config schema file: {}", + schema_path + ); + } + #[cfg(not(gen_schema_provider))] + { + println!( + "cargo:debug=Config 'gen_schema_provider' not enabled: Skipping JSON schema generation" + ); + } + + Ok(()) +} diff --git a/crates/core/providers-tempo/gen/src/lib.rs b/crates/core/providers-tempo/gen/src/lib.rs new file mode 100644 index 000000000..9b05d9399 --- /dev/null +++ b/crates/core/providers-tempo/gen/src/lib.rs @@ -0,0 +1,2 @@ +// Codegen auxiliary crate - contains no runtime logic +#![doc = include_str!("../README.md")] diff --git a/crates/core/providers-tempo/src/client.rs b/crates/core/providers-tempo/src/client.rs new file mode 100644 index 000000000..0391cac40 --- /dev/null +++ b/crates/core/providers-tempo/src/client.rs @@ -0,0 +1,605 @@ +use std::{ + num::{NonZeroU32, NonZeroU64}, + sync::Arc, + time::{Duration, Instant}, +}; + +use alloy::{ + eips::{BlockId, BlockNumberOrTag}, + hex, + providers::Provider as _, + rpc::{ + client::BatchRequest, + json_rpc::{RpcRecv, RpcSend}, + }, + transports::http::reqwest::Url, +}; +use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; +use anyhow::Context; +use async_stream::stream; +use datasets_common::block_num::BlockNum; +use datasets_raw::{ + client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError}, + rows::Rows, +}; +use futures::{Stream, future::try_join_all}; +use tempo_alloy::{TempoNetwork, rpc::TempoTransactionReceipt}; + +use crate::{ + convert::{TempoRpcBlock, rpc_to_rows}, + error::{BatchRequestError, BatchingError, ClientError}, + provider::Auth, +}; + +struct BatchingRpcWrapper { + client: RootProviderWithMetrics, + batch_size: usize, + limiter: Arc, +} + +impl BatchingRpcWrapper { + fn new( + client: RootProviderWithMetrics, + batch_size: usize, + limiter: Arc, + ) -> Self { + assert!(batch_size > 0, "batch_size must be > 0"); + Self { + client, + batch_size, + limiter, + } + } + + async fn execute( + &self, + calls: Vec<(&'static str, Params)>, + ) -> Result, BatchingError> { + if calls.is_empty() { + tracing::debug!("skipped batch execution, no calls provided"); + return Ok(Vec::new()); + } + let mut results = Vec::new(); + let mut remaining_calls = calls; + + while !remaining_calls.is_empty() { + let chunk: Vec<_> = remaining_calls + .drain(..self.batch_size.min(remaining_calls.len())) + .collect(); + + let _permit = self + .limiter + .acquire() + .await + .map_err(BatchingError::RateLimitAcquire)?; + + let batch_responses = self + .client + .batch_request(&chunk) + .await + .map_err(BatchingError::Request)?; + results.extend(batch_responses); + } + Ok(results) + } +} + +/// Tempo RPC block-streaming client. +/// +/// Connects to a Tempo-compatible JSON-RPC endpoint and streams blocks with +/// their transactions, receipts, and logs as dataset rows. +#[derive(Clone)] +pub struct Client { + client: RootProviderWithMetrics, + network: NetworkId, + provider_name: ProviderName, + limiter: Arc, + batch_size: usize, + fetch_receipts_per_tx: bool, +} + +impl Client { + /// Create a new HTTP/HTTPS Tempo RPC client. + /// + /// # Panics + /// + /// Panics if `request_limit` is zero. + #[expect(clippy::too_many_arguments)] + pub fn new( + url: Url, + network: NetworkId, + provider_name: ProviderName, + request_limit: u16, + batch_size: usize, + rate_limit: Option, + fetch_receipts_per_tx: bool, + timeout: Duration, + auth: Option, + meter: Option<&monitoring::telemetry::metrics::Meter>, + ) -> Result { + assert!(request_limit >= 1); + let client = crate::provider::new_http(url, auth, rate_limit, timeout)?; + let client = + RootProviderWithMetrics::new(client, meter, provider_name.to_string(), network.clone()); + let limiter = tokio::sync::Semaphore::new(request_limit as usize).into(); + Ok(Self { + client, + network, + provider_name, + limiter, + batch_size, + fetch_receipts_per_tx, + }) + } + + /// Create a new IPC Tempo RPC client. + /// + /// # Panics + /// + /// Panics if `request_limit` is zero. + #[expect(clippy::too_many_arguments)] + pub async fn new_ipc( + path: std::path::PathBuf, + network: NetworkId, + provider_name: ProviderName, + request_limit: u16, + batch_size: usize, + rate_limit: Option, + fetch_receipts_per_tx: bool, + meter: Option<&monitoring::telemetry::metrics::Meter>, + ) -> Result { + assert!(request_limit >= 1); + let client = crate::provider::new_ipc(path, rate_limit) + .await + .map_err(ClientError::Transport) + .map(|c| { + RootProviderWithMetrics::new(c, meter, provider_name.to_string(), network.clone()) + })?; + let limiter = tokio::sync::Semaphore::new(request_limit as usize).into(); + Ok(Self { + client, + network, + provider_name, + limiter, + batch_size, + fetch_receipts_per_tx, + }) + } + + /// Create a new WebSocket Tempo RPC client. + /// + /// # Panics + /// + /// Panics if `request_limit` is zero. + #[expect(clippy::too_many_arguments)] + pub async fn new_ws( + url: Url, + network: NetworkId, + provider_name: ProviderName, + request_limit: u16, + batch_size: usize, + rate_limit: Option, + fetch_receipts_per_tx: bool, + auth: Option, + meter: Option<&monitoring::telemetry::metrics::Meter>, + ) -> Result { + assert!(request_limit >= 1); + let client = crate::provider::new_ws(url, auth, rate_limit) + .await + .map_err(ClientError::Transport) + .map(|c| { + RootProviderWithMetrics::new(c, meter, provider_name.to_string(), network.clone()) + })?; + let limiter = tokio::sync::Semaphore::new(request_limit as usize).into(); + Ok(Self { + client, + network, + provider_name, + limiter, + batch_size, + fetch_receipts_per_tx, + }) + } + + /// Returns the configured provider name. + pub fn provider_name(&self) -> &str { + &self.provider_name + } + + /// Create a stream that fetches blocks one at a time. + fn unbatched_block_stream( + self, + start_block: u64, + end_block: u64, + ) -> impl Stream> + Send { + assert!(end_block >= start_block); + let total_blocks_to_stream = end_block - start_block + 1; + + tracing::info!( + %start_block, + %end_block, + total = %total_blocks_to_stream, + "started unbatched block fetch" + ); + + let mut last_progress_report = Instant::now(); + + stream! { + 'outer: for block_num in start_block..=end_block { + let elapsed = last_progress_report.elapsed(); + if elapsed >= Duration::from_secs(15) { + let blocks_streamed = block_num - start_block; + let percentage_streamed = (blocks_streamed as f32 / total_blocks_to_stream as f32) * 100.0; + tracing::info!( + current = %block_num, + start = %start_block, + end = %end_block, + completed = blocks_streamed, + total = total_blocks_to_stream, + percent = format_args!("{:.2}", percentage_streamed), + "block fetch progress" + ); + last_progress_report = Instant::now(); + } + + let Ok(_permit) = self.limiter.acquire().await else { + yield Err("rate limiter semaphore closed").recoverable(); + return; + }; + + let block_num = BlockNumberOrTag::Number(block_num); + let block = self.client.with_metrics("eth_getBlockByNumber", async |c| { + c.get_block_by_number(block_num).full().await + }).await; + let block = match block { + Ok(Some(block)) => block, + Ok(None) => { + yield Err(format!("block {} not found", block_num)).recoverable(); + continue; + } + Err(err) => { + yield Err(err).recoverable(); + continue; + } + }; + + if block.transactions.is_empty() { + yield rpc_to_rows(block, Vec::new(), &self.network).recoverable(); + continue; + } + + let receipts = if self.fetch_receipts_per_tx { + let calls = block + .transactions + .hashes() + .map(|hash| { + let client = &self.client; + async move { + client.with_metrics("eth_getTransactionReceipt", |c| async move { + c.get_transaction_receipt(hash).await.map(|r| (hash, r)) + }).await + } + }); + let Ok(receipts) = try_join_all(calls).await else { + yield Err(format!("error fetching receipts for block {}", block.number())).recoverable(); + continue; + }; + let mut received_receipts = Vec::new(); + for (hash, receipt) in receipts { + match receipt { + Some(receipt) => received_receipts.push(receipt), + None => { + yield Err(format!("missing receipt for transaction: {}", hex::encode(hash))).recoverable(); + continue 'outer; + } + } + } + received_receipts + } else { + let receipts_result = self.client + .with_metrics("eth_getBlockReceipts", async |c| { + c.get_block_receipts(BlockId::Number(block_num)).await + }) + .await + .with_context(|| format!("error fetching receipts for block {}", block.number())) + .and_then(|receipts| { + let mut receipts = receipts.context("no receipts returned for block")?; + receipts.sort_by(|r1, r2| r1.transaction_index.cmp(&r2.transaction_index)); + Ok(receipts) + }); + + match receipts_result { + Ok(receipts) => receipts, + Err(err) => { + yield Err(err).recoverable(); + continue; + } + } + }; + + yield rpc_to_rows(block, receipts, &self.network).recoverable(); + } + } + } + + /// Create a stream that fetches blocks in batches. + fn batched_block_stream( + self, + start_block: u64, + end_block: u64, + ) -> impl Stream> + Send { + tracing::info!( + %start_block, + %end_block, + "started batched block fetch" + ); + let batching_client = + BatchingRpcWrapper::new(self.client.clone(), self.batch_size, self.limiter.clone()); + + let mut blocks_completed = 0; + let mut txns_completed = 0; + + stream! { + let stream_start = Instant::now(); + let block_calls: Vec<_> = (start_block..=end_block) + .map(|block_num| ( + "eth_getBlockByNumber", + (BlockNumberOrTag::Number(block_num), true), + )) + .collect::>() + .chunks(self.batch_size * 10) + .map(<[_]>::to_vec) + .collect(); + + for batch_calls in block_calls { + let start = Instant::now(); + let blocks_result: Result, BatchingError> = batching_client.execute(batch_calls).await; + let blocks = match blocks_result { + Ok(blocks) => blocks, + Err(err) => { + yield Err(err).recoverable(); + return; + } + }; + + let total_tx_count: usize = blocks.iter().map(|b| b.transactions.len()).sum(); + + if total_tx_count == 0 { + for block in blocks.into_iter() { + blocks_completed += 1; + yield rpc_to_rows(block, Vec::new(), &self.network).recoverable(); + } + } else { + let all_receipts_result: Result, BatchingError> = if self.fetch_receipts_per_tx { + let receipt_calls: Vec<_> = blocks + .iter() + .flat_map(|block| { + block.transactions.hashes().map(|tx_hash| { + let tx_hash = format!("0x{}",hex::encode(tx_hash)); + ("eth_getTransactionReceipt", [tx_hash]) + }) + }) + .collect(); + batching_client.execute(receipt_calls).await + } else { + let receipt_calls: Vec<_> = blocks + .iter() + .map(|block| ( + "eth_getBlockReceipts", + [BlockNumberOrTag::Number(block.number())] + )) + .collect(); + let receipts_result: Result>, BatchingError> = + batching_client.execute(receipt_calls).await; + receipts_result.map(|receipts| receipts.into_iter().flatten().collect()) + }; + + let all_receipts = match all_receipts_result { + Ok(receipts) => receipts, + Err(err) => { + yield Err(err).recoverable(); + return; + } + }; + + if total_tx_count != all_receipts.len() { + let err = format!( + "mismatched tx and receipt count in batch: {} txs, {} receipts", + total_tx_count, + all_receipts.len() + ); + yield Err(err).recoverable(); + return; + } + + let mut all_receipts = all_receipts.into_iter(); + + for block in blocks { + let mut block_receipts: Vec<_> = + all_receipts.by_ref().take(block.transactions.len()).collect(); + block_receipts.sort_by(|r1, r2| r1.transaction_index.cmp(&r2.transaction_index)); + blocks_completed += 1; + txns_completed += block.transactions.len(); + yield rpc_to_rows(block, block_receipts, &self.network).recoverable(); + } + } + + let total_blocks_to_stream = end_block - start_block + 1; + tracing::info!( + completed = blocks_completed, + total = total_blocks_to_stream, + percent = format_args!("{:.2}", (blocks_completed as f32 / total_blocks_to_stream as f32) * 100.0), + txns = txns_completed, + elapsed_ms = start.elapsed().as_millis() as u64, + "batch fetch progress" + ); + } + tracing::info!( + %start_block, + %end_block, + elapsed_ms = stream_start.elapsed().as_millis() as u64, + blocks = blocks_completed, + txns = txns_completed, + "completed batched block fetch" + ); + } + } +} + +impl AsRef> for Client { + fn as_ref(&self) -> &alloy::providers::RootProvider { + &self.client.inner + } +} + +impl BlockStreamer for Client { + async fn block_stream( + self, + start: BlockNum, + end: BlockNum, + ) -> impl Stream> + Send { + stream! { + if self.batch_size > 1 { + for await item in self.batched_block_stream(start, end) { + yield item; + } + } else { + for await item in self.unbatched_block_stream(start, end) { + yield item; + } + } + } + } + + #[tracing::instrument(skip(self), err)] + async fn latest_block( + &mut self, + finalized: bool, + ) -> Result, LatestBlockError> { + let number = match finalized { + true => BlockNumberOrTag::Finalized, + false => BlockNumberOrTag::Latest, + }; + let _permit = self.limiter.acquire().await.map_err(|_| { + LatestBlockError::from(anyhow::anyhow!("rate limiter semaphore closed")) + })?; + let block = self + .client + .with_metrics("eth_getBlockByNumber", async |c| { + c.get_block_by_number(number).await + }) + .await?; + Ok(block.map(|b| b.number())) + } + + fn bucket_size(&self) -> Option { + None + } + + fn provider_name(&self) -> &str { + &self.provider_name + } +} + +#[derive(Debug, Clone)] +struct RootProviderWithMetrics { + inner: alloy::providers::RootProvider, + metrics: Option, + provider: String, + network: NetworkId, +} + +impl RootProviderWithMetrics { + fn new( + inner: alloy::providers::RootProvider, + meter: Option<&monitoring::telemetry::metrics::Meter>, + provider: String, + network: NetworkId, + ) -> Self { + let metrics = meter.map(crate::metrics::MetricsRegistry::new); + Self { + inner, + metrics, + provider, + network, + } + } + + async fn with_metrics(&self, method: &str, func: F) -> Fut::Output + where + F: FnOnce(alloy::providers::RootProvider) -> Fut, + Fut: Future>, + { + let Some(metrics) = self.metrics.as_ref() else { + return func(self.inner.clone()).await; + }; + + let start = Instant::now(); + let resp = func(self.inner.clone()).await; + let duration = start.elapsed().as_millis() as f64; + metrics.record_single_request(duration, &self.provider, &self.network, method); + if resp.is_err() { + metrics.record_error(&self.provider, &self.network); + } + resp + } + + async fn batch_request( + &self, + requests: &[(&'static str, Params)], + ) -> Result, BatchRequestError> + where + Params: RpcSend, + Resp: RpcRecv, + { + let mut batch = BatchRequest::new(self.inner.client()); + let mut waiters = Vec::new(); + + for (method, params) in requests.iter() { + waiters.push( + batch + .add_call(*method, ¶ms) + .map_err(BatchRequestError)?, + ); + } + + let Some(metrics) = self.metrics.as_ref() else { + batch.send().await.map_err(BatchRequestError)?; + let resp = try_join_all(waiters).await.map_err(BatchRequestError)?; + return Ok(resp); + }; + + let start = Instant::now(); + + batch + .send() + .await + .inspect_err(|_| { + let duration = start.elapsed().as_millis() as f64; + metrics.record_batch_request( + duration, + requests.len() as u64, + &self.provider, + &self.network, + ); + metrics.record_error(&self.provider, &self.network); + }) + .map_err(BatchRequestError)?; + + let resp = try_join_all(waiters).await; + let duration = start.elapsed().as_millis() as f64; + metrics.record_batch_request( + duration, + requests.len() as u64, + &self.provider, + &self.network, + ); + + if resp.is_err() { + metrics.record_error(&self.provider, &self.network); + } + + let resp = resp.map_err(BatchRequestError)?; + Ok(resp) + } +} diff --git a/crates/core/providers-tempo/src/config.rs b/crates/core/providers-tempo/src/config.rs new file mode 100644 index 000000000..7b0644b30 --- /dev/null +++ b/crates/core/providers-tempo/src/config.rs @@ -0,0 +1,109 @@ +use std::{num::NonZeroU32, time::Duration}; + +use amp_providers_common::{network_id::NetworkId, redacted::Redacted}; +use headers::{HeaderName, HeaderValue}; +use serde_with::{DurationSeconds, serde_as}; +use url::Url; + +use crate::kind::TempoProviderKind; + +/// Tempo RPC provider configuration for parsing TOML config. +#[serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct TempoProviderConfig { + /// The provider kind, must be `"tempo"`. + pub kind: TempoProviderKind, + + /// The network this provider serves. + pub network: NetworkId, + + /// The URL of the Tempo RPC endpoint (HTTP, HTTPS, WebSocket, or IPC). + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub url: Redacted, + + /// Custom header name for authentication. + #[serde(default)] + #[cfg_attr(feature = "schemars", schemars(with = "Option"))] + pub auth_header: Option, + + /// Authentication token for RPC requests. + #[serde(default)] + #[cfg_attr(feature = "schemars", schemars(with = "Option"))] + pub auth_token: Option, + + /// Optional rate limit for requests per minute. + #[cfg_attr(feature = "schemars", schemars(default))] + pub rate_limit_per_minute: Option, + + /// Optional limit on the number of concurrent requests. + #[cfg_attr( + feature = "schemars", + schemars(default = "default_concurrent_request_limit") + )] + pub concurrent_request_limit: Option, + + /// Maximum number of JSON-RPC requests to batch together. + /// + /// Set to 0 to disable batching. + #[serde(default)] + pub rpc_batch_size: usize, + + /// Whether to use `eth_getTransactionReceipt` to fetch receipts for each transaction + /// or `eth_getBlockReceipts` to fetch all receipts for a block in one call. + #[serde(default)] + pub fetch_receipts_per_tx: bool, + + /// Request timeout in seconds. + #[serde_as(as = "DurationSeconds")] + #[serde(default = "default_timeout", rename = "timeout_secs")] + #[cfg_attr(feature = "schemars", schemars(with = "u64"))] + pub timeout: Duration, +} + +/// Validated HTTP header name for custom authentication. +#[derive(Debug, Clone)] +pub struct AuthHeaderName(String); + +impl AuthHeaderName { + /// Consumes the wrapper and returns the inner string. + pub fn into_inner(self) -> String { + self.0 + } +} + +impl<'de> serde::Deserialize<'de> for AuthHeaderName { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + HeaderName::try_from(&s).map_err(serde::de::Error::custom)?; + Ok(AuthHeaderName(s)) + } +} + +/// Validated authentication token for RPC requests. +#[derive(Debug, Clone)] +pub struct AuthToken(Redacted); + +impl AuthToken { + /// Consumes the wrapper and returns the inner string. + pub fn into_inner(self) -> String { + self.0.into_inner() + } +} + +impl<'de> serde::Deserialize<'de> for AuthToken { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + HeaderValue::try_from(&s).map_err(serde::de::Error::custom)?; + Ok(AuthToken(Redacted::from(s))) + } +} + +fn default_timeout() -> Duration { + Duration::from_secs(30) +} + +#[cfg(feature = "schemars")] +fn default_concurrent_request_limit() -> Option { + Some(1024) +} diff --git a/crates/core/providers-tempo/src/convert.rs b/crates/core/providers-tempo/src/convert.rs new file mode 100644 index 000000000..a94cf1a7d --- /dev/null +++ b/crates/core/providers-tempo/src/convert.rs @@ -0,0 +1,485 @@ +//! Pure transformation functions for converting Tempo RPC responses to dataset rows. + +use std::time::Duration; + +use alloy::{ + consensus::{BlockHeader as _, Transaction as _}, + eips::Typed2718, + hex::ToHexExt, + network::{BlockResponse, Network, ReceiptResponse as _, TransactionResponse as _}, + rpc::types::Log as RpcLog, +}; +use amp_providers_common::network_id::NetworkId; +use datasets_common::{block_range::BlockRange, network_id::NetworkId as DatasetNetworkId}; +use datasets_raw::{ + Timestamp, + evm::{ + EvmCurrency, + tables::logs::{Log, LogRowsBuilder}, + }, + rows::Rows, + tempo::tables::{ + blocks::{Block, BlockRowsBuilder}, + transactions::{ + AAAuthorizationRow, Call, FeePayerSignature, KeyAuthorizationRow, KeyType, + SignatureType as DsSignatureType, TempoSignatureRow, TokenLimitRow, Transaction, + TransactionRowsBuilder, + }, + }, +}; +use tempo_alloy::{ + TempoNetwork, + primitives::{ + TempoTxEnvelope, + transaction::{PrimitiveSignature, SignatureType, TempoSignature}, + }, + rpc::TempoTransactionReceipt, +}; + +use crate::error::{OverflowSource, RpcToRowsError, ToRowError}; + +pub(crate) type TempoRpcBlock = ::BlockResponse; +pub(crate) type TempoRpcTransaction = ::TransactionResponse; + +pub(crate) fn rpc_to_rows( + block: TempoRpcBlock, + receipts: Vec, + network: &NetworkId, +) -> Result { + if block.transactions.len() != receipts.len() { + return Err(RpcToRowsError::TxReceiptCountMismatch { + block_num: block.number(), + tx_count: block.transactions.len(), + receipt_count: receipts.len(), + }); + } + let tx_receipt_pairs = block.transactions.clone().into_transactions().zip(receipts); + + let header = rpc_header_to_row(block.header()).map_err(RpcToRowsError::ToRow)?; + let mut logs = Vec::new(); + let mut transactions = Vec::new(); + + for (idx, (tx, mut receipt)) in tx_receipt_pairs.enumerate() { + if tx.tx_hash() != receipt.transaction_hash() { + return Err(RpcToRowsError::TxReceiptHashMismatch { + block_num: header.block_num, + tx_hash: tx.tx_hash().encode_hex(), + receipt_hash: receipt.transaction_hash().encode_hex(), + }); + } + // Move the logs out of the nested structure. + let receipt_logs = std::mem::take(&mut receipt.inner.inner.receipt.logs); + for log in receipt_logs { + logs.push(rpc_log_to_row(log, header.timestamp).map_err(RpcToRowsError::ToRow)?); + } + transactions.push( + rpc_transaction_to_row(&header, tx, receipt, idx).map_err(RpcToRowsError::ToRow)?, + ); + } + + let dataset_network_id = DatasetNetworkId::new_unchecked(network.to_string()); + + let block_range = BlockRange { + numbers: header.block_num..=header.block_num, + network: dataset_network_id, + hash: header.hash.into(), + prev_hash: header.parent_hash.into(), + timestamp: Some(header.timestamp.0.as_secs()), + }; + + let header_row = { + let mut builder = BlockRowsBuilder::with_capacity_for(&header); + builder.append(&header); + builder + .build(block_range.clone()) + .map_err(RpcToRowsError::TableRow)? + }; + + let logs_row = { + let total_data_size = logs.iter().map(|log| log.data.len()).sum(); + let mut builder = LogRowsBuilder::with_capacity(logs.len(), total_data_size); + for log in &logs { + builder.append(log); + } + builder + .build(block_range.clone()) + .map_err(RpcToRowsError::TableRow)? + }; + + let transactions_row = { + let total_input_size: usize = transactions + .iter() + .map(|t| t.input.as_ref().map_or(0, |i| i.len())) + .sum(); + let mut builder = + TransactionRowsBuilder::with_capacity(transactions.len(), total_input_size); + for tx in &transactions { + builder.append(tx); + } + builder + .build(block_range.clone()) + .map_err(RpcToRowsError::TableRow)? + }; + + Ok(Rows::new(vec![header_row, logs_row, transactions_row])) +} + +fn rpc_header_to_row(header: &tempo_alloy::rpc::TempoHeaderResponse) -> Result { + use alloy::network::primitives::HeaderResponse as _; + + Ok(Block { + block_num: header.number(), + timestamp: Timestamp(Duration::from_secs(header.timestamp())), + hash: header.hash().into(), + parent_hash: header.parent_hash().into(), + ommers_hash: header.ommers_hash().into(), + miner: header.beneficiary().into(), + state_root: header.state_root().into(), + transactions_root: header.transactions_root().into(), + receipt_root: header.receipts_root().into(), + logs_bloom: <[u8; 256]>::from(header.logs_bloom()).into(), + difficulty: EvmCurrency::try_from(header.difficulty()) + .map_err(|err| ToRowError::Overflow("difficulty", OverflowSource::BigInt(err)))?, + total_difficulty: header + .total_difficulty + .map(EvmCurrency::try_from) + .transpose() + .map_err(|err| ToRowError::Overflow("total_difficulty", OverflowSource::BigInt(err)))?, + gas_limit: header.gas_limit(), + gas_used: header.gas_used(), + extra_data: header.extra_data().to_vec(), + mix_hash: *header.mix_hash().unwrap_or_default(), + nonce: u64::from(header.nonce().unwrap_or_default()), + base_fee_per_gas: header.base_fee_per_gas().map(EvmCurrency::from), + withdrawals_root: header.withdrawals_root().map(Into::into), + blob_gas_used: header.blob_gas_used(), + excess_blob_gas: header.excess_blob_gas(), + parent_beacon_root: header.parent_beacon_block_root().map(Into::into), + requests_hash: header.requests_hash().map(Into::into), + // Tempo-specific Fields + general_gas_limit: header.general_gas_limit, + shared_gas_limit: header.shared_gas_limit, + timestamp_millis_part: header.timestamp_millis_part, + }) +} + +fn rpc_log_to_row(log: RpcLog, timestamp: Timestamp) -> Result { + Ok(Log { + block_hash: log + .block_hash + .ok_or(ToRowError::Missing("block_hash"))? + .into(), + block_num: log + .block_number + .ok_or(ToRowError::Missing("block_number"))?, + timestamp, + tx_index: u32::try_from( + log.transaction_index + .ok_or(ToRowError::Missing("transaction_index"))?, + ) + .map_err(|err| ToRowError::Overflow("transaction_index", OverflowSource::Int(err)))?, + tx_hash: log + .transaction_hash + .ok_or(ToRowError::Missing("transaction_hash"))? + .into(), + log_index: u32::try_from(log.log_index.ok_or(ToRowError::Missing("log_index"))?) + .map_err(|err| ToRowError::Overflow("log_index", OverflowSource::Int(err)))?, + address: log.address().into(), + topic0: log.topics().first().cloned().map(Into::into), + topic1: log.topics().get(1).cloned().map(Into::into), + topic2: log.topics().get(2).cloned().map(Into::into), + topic3: log.topics().get(3).cloned().map(Into::into), + data: log.data().data.to_vec(), + }) +} + +fn rpc_transaction_to_row( + block: &Block, + tx: TempoRpcTransaction, + receipt: TempoTransactionReceipt, + tx_index: usize, +) -> Result { + let envelope: &TempoTxEnvelope = tx.inner.inner(); + + // For standard EVM txs: r/s/v_parity are Some, signature is None. + // For Tempo AA txs: r/s/v_parity are None, signature is Some (multi-type). + let (r, s, v_parity, tx_signature, chain_id) = match envelope { + TempoTxEnvelope::Legacy(signed) => { + let sig = signed.signature(); + ( + Some(sig.r().to_be_bytes::<32>()), + Some(sig.s().to_be_bytes::<32>()), + Some(sig.v()), + None, + signed.tx().chain_id(), + ) + } + TempoTxEnvelope::Eip2930(signed) => { + let sig = signed.signature(); + ( + Some(sig.r().to_be_bytes::<32>()), + Some(sig.s().to_be_bytes::<32>()), + Some(sig.v()), + None, + signed.tx().chain_id(), + ) + } + TempoTxEnvelope::Eip1559(signed) => { + let sig = signed.signature(); + ( + Some(sig.r().to_be_bytes::<32>()), + Some(sig.s().to_be_bytes::<32>()), + Some(sig.v()), + None, + signed.tx().chain_id(), + ) + } + TempoTxEnvelope::Eip7702(signed) => { + let sig = signed.signature(); + ( + Some(sig.r().to_be_bytes::<32>()), + Some(sig.s().to_be_bytes::<32>()), + Some(sig.v()), + None, + signed.tx().chain_id(), + ) + } + TempoTxEnvelope::AA(aa_signed) => ( + None, + None, + None, + Some(tempo_sig_to_row( + aa_signed.signature(), + &aa_signed.signature_hash(), + )), + Some(aa_signed.tx().chain_id), + ), + }; + + let tx_type = tx.ty(); + let is_tempo_aa = matches!(envelope, TempoTxEnvelope::AA(_)); + + let ( + fee_token, + nonce_key, + calls, + fee_payer_signature, + key_authorization, + aa_authorization_list, + valid_before, + valid_after, + ) = if let TempoTxEnvelope::AA(aa_signed) = envelope { + let tt = aa_signed.tx(); + ( + tt.fee_token.map(|a| a.0.0), + Some(tt.nonce_key.to_be_bytes::<32>()), + Some( + tt.calls + .iter() + .map(|c| Call { + to: c.to.to().map(|a| a.0.0), + value: c.value.to_string(), + input: c.input.to_vec(), + }) + .collect::>(), + ), + tt.fee_payer_signature.map(|sig| FeePayerSignature { + r: sig.r().to_be_bytes::<32>(), + s: sig.s().to_be_bytes::<32>(), + y_parity: sig.v(), + }), + tt.key_authorization.as_ref().map(|ka| KeyAuthorizationRow { + chain_id: ka.authorization.chain_id, + key_type: match ka.authorization.key_type { + SignatureType::Secp256k1 => KeyType::Secp256k1, + SignatureType::P256 => KeyType::P256, + SignatureType::WebAuthn => KeyType::WebAuthn, + }, + key_id: ka.authorization.key_id.0.0, + expiry: ka.authorization.expiry, + limits: ka.authorization.limits.as_ref().map(|limits| { + limits + .iter() + .map(|l| TokenLimitRow { + token: l.token.0.0, + limit: l.limit.to_string(), + }) + .collect() + }), + signature: primitive_sig_to_row(&ka.signature), + }), + if tt.tempo_authorization_list.is_empty() { + Some(vec![]) + } else { + Some( + tt.tempo_authorization_list + .iter() + .map(|auth| { + let inner = auth.inner(); + AAAuthorizationRow { + chain_id: inner.chain_id.to::(), + address: inner.address.0.0, + nonce: inner.nonce, + signature: tempo_sig_to_row( + auth.signature(), + &auth.signature_hash(), + ), + } + }) + .collect(), + ) + }, + tt.valid_before, + tt.valid_after, + ) + } else { + (None, None, None, None, None, None, None, None) + }; + + Ok(Transaction { + block_hash: block.hash, + block_num: block.block_num, + timestamp: block.timestamp, + tx_index: u32::try_from(tx_index) + .map_err(|err| ToRowError::Overflow("tx_index", OverflowSource::Int(err)))?, + tx_hash: tx.tx_hash().0, + to: if is_tempo_aa { + None + } else { + tx.to().map(|addr| addr.0.0) + }, + from: tx.as_recovered().signer().into(), + nonce: tx.nonce(), + chain_id, + gas_limit: tx.gas_limit(), + gas_used: receipt.gas_used(), + receipt_cumulative_gas_used: receipt.cumulative_gas_used(), + r#type: tx_type.into(), + max_fee_per_gas: i128::try_from(envelope.max_fee_per_gas()) + .map_err(|err| ToRowError::Overflow("max_fee_per_gas", OverflowSource::Int(err)))?, + max_priority_fee_per_gas: tx + .max_priority_fee_per_gas() + .map(i128::try_from) + .transpose() + .map_err(|err| { + ToRowError::Overflow("max_priority_fee_per_gas", OverflowSource::Int(err)) + })?, + gas_price: alloy::network::TransactionResponse::gas_price(&tx) + .map(i128::try_from) + .transpose() + .map_err(|err| ToRowError::Overflow("gas_price", OverflowSource::Int(err)))?, + status: receipt.status(), + state_root: receipt.state_root().map(|root| root.0), + value: if is_tempo_aa { + None + } else { + Some(tx.value().to_string()) + }, + input: if is_tempo_aa { + None + } else { + Some(tx.input().to_vec()) + }, + r, + s, + v_parity, + signature: tx_signature, + // Tempo-specific + fee_token, + nonce_key, + calls, + fee_payer_signature, + key_authorization, + aa_authorization_list, + valid_before, + valid_after, + access_list: match envelope { + TempoTxEnvelope::Legacy(_) => None, + TempoTxEnvelope::Eip2930(signed) => Some(convert_access_list(&signed.tx().access_list)), + TempoTxEnvelope::Eip1559(signed) => Some(convert_access_list(&signed.tx().access_list)), + TempoTxEnvelope::Eip7702(signed) => Some(convert_access_list(&signed.tx().access_list)), + TempoTxEnvelope::AA(aa_signed) => { + Some(convert_access_list(&aa_signed.tx().access_list)) + } + }, + }) +} + +fn convert_access_list( + access_list: &alloy::eips::eip2930::AccessList, +) -> Vec<([u8; 20], Vec<[u8; 32]>)> { + access_list + .0 + .iter() + .map(|item| { + let address: [u8; 20] = item.address.0.0; + let storage_keys: Vec<[u8; 32]> = item.storage_keys.iter().map(|key| key.0).collect(); + (address, storage_keys) + }) + .collect() +} + +/// Convert a `PrimitiveSignature` to a `TempoSignatureRow` with full type-specific fields. +fn primitive_sig_to_row(sig: &PrimitiveSignature) -> TempoSignatureRow { + match sig { + PrimitiveSignature::Secp256k1(s) => TempoSignatureRow { + r#type: DsSignatureType::Secp256k1, + r: s.r().to_be_bytes::<32>(), + s: s.s().to_be_bytes::<32>(), + y_parity: Some(s.v()), + ..Default::default() + }, + PrimitiveSignature::P256(p) => TempoSignatureRow { + r#type: DsSignatureType::P256, + r: p.r.0, + s: p.s.0, + pub_key_x: Some(p.pub_key_x.0), + pub_key_y: Some(p.pub_key_y.0), + pre_hash: Some(p.pre_hash), + ..Default::default() + }, + PrimitiveSignature::WebAuthn(w) => TempoSignatureRow { + r#type: DsSignatureType::WebAuthn, + r: w.r.0, + s: w.s.0, + pub_key_x: Some(w.pub_key_x.0), + pub_key_y: Some(w.pub_key_y.0), + webauthn_data: Some(w.webauthn_data.to_vec()), + ..Default::default() + }, + } +} + +/// Convert a `TempoSignature` (Primitive or Keychain) to a `TempoSignatureRow`. +/// +/// `sig_hash` is needed to recover the `key_id` for Keychain signatures with secp256k1 inner keys. +/// For P256/WebAuthn, `key_id` is derived from the public key without needing the hash. +fn tempo_sig_to_row(sig: &TempoSignature, sig_hash: &alloy::primitives::B256) -> TempoSignatureRow { + match sig { + TempoSignature::Primitive(p) => primitive_sig_to_row(p), + TempoSignature::Keychain(k) => { + let mut row = primitive_sig_to_row(&k.signature); + // Derive key_id: for P256/WebAuthn from pub keys, for secp256k1 via sig recovery. + row.key_id = match &k.signature { + PrimitiveSignature::P256(p) => Some( + tempo_alloy::primitives::transaction::derive_p256_address( + &p.pub_key_x, + &p.pub_key_y, + ) + .0 + .0, + ), + PrimitiveSignature::WebAuthn(w) => Some( + tempo_alloy::primitives::transaction::derive_p256_address( + &w.pub_key_x, + &w.pub_key_y, + ) + .0 + .0, + ), + PrimitiveSignature::Secp256k1(_) => k.key_id(sig_hash).ok().map(|a| a.0.0), + }; + row + } + } +} diff --git a/crates/core/providers-tempo/src/error.rs b/crates/core/providers-tempo/src/error.rs new file mode 100644 index 000000000..fe17059b8 --- /dev/null +++ b/crates/core/providers-tempo/src/error.rs @@ -0,0 +1,143 @@ +use alloy::primitives::ruint::FromUintError; +use datasets_raw::rows::TableRowError; +use tokio::sync::AcquireError; + +/// Errors that occur during batched RPC request execution. +/// +/// Returned when a batch of JSON-RPC calls fails either due to a transport error +/// or because the rate-limiting semaphore was closed. +#[derive(Debug, thiserror::Error)] +pub enum BatchingError { + /// The RPC batch request failed. + /// + /// Occurs when the underlying HTTP/WS/IPC transport returns an error for a + /// batched JSON-RPC request. + #[error("RPC batch request failed: {0}")] + Request(#[source] BatchRequestError), + + /// Failed to acquire a permit from the rate limiter. + /// + /// Occurs when the concurrency-limiting semaphore has been closed, typically + /// during shutdown. + #[error("rate limiter semaphore closed: {0}")] + RateLimitAcquire(#[source] AcquireError), +} + +/// Error wrapper for RPC transport failures. +/// +/// Wraps an [`alloy::transports::TransportError`] that occurred while sending a +/// batch request or awaiting individual responses within a batch. +#[derive(Debug, thiserror::Error)] +#[error("RPC client error")] +pub struct BatchRequestError(#[source] pub alloy::transports::TransportError); + +/// Errors that occur when converting RPC responses to table rows. +/// +/// Returned by the row-conversion pipeline when block, transaction, or receipt +/// data from the RPC cannot be mapped to the dataset schema. +#[derive(Debug, thiserror::Error)] +pub enum RpcToRowsError { + /// Transaction and receipt counts don't match for a block. + /// + /// Occurs when the number of transactions in a block does not equal the + /// number of receipts returned by the RPC, indicating an inconsistent + /// response. + #[error( + "mismatched tx and receipt count for block {block_num}: {tx_count} txs, {receipt_count} receipts" + )] + TxReceiptCountMismatch { + block_num: u64, + tx_count: usize, + receipt_count: usize, + }, + + /// Transaction and receipt hashes don't match. + /// + /// Occurs when a transaction hash does not match its corresponding receipt + /// hash, indicating the receipts are out of order or belong to a different + /// block. + #[error( + "mismatched tx and receipt hash for block {block_num}: tx {tx_hash}, receipt {receipt_hash}" + )] + TxReceiptHashMismatch { + block_num: u64, + tx_hash: String, + receipt_hash: String, + }, + + /// Failed to convert RPC data to row format. + /// + /// Occurs when an individual field in a transaction, block header, or log + /// cannot be converted to the expected dataset column type. + #[error("row conversion failed")] + ToRow(#[source] ToRowError), + + /// Failed to build the final table rows. + /// + /// Occurs when the row builder cannot produce a valid Arrow record batch + /// from the converted row data. + #[error("table build failed")] + TableRow(#[source] TableRowError), +} + +/// Errors during individual field conversion to row format. +/// +/// Returned when a single field from an RPC response cannot be mapped to the +/// corresponding dataset column. +#[derive(Debug, thiserror::Error)] +pub enum ToRowError { + /// A required field is missing from the RPC response. + /// + /// Occurs when a field that is mandatory in the dataset schema (e.g., + /// `block_hash`, `transaction_index`) is `None` in the RPC response. + #[error("missing field: {0}")] + Missing(&'static str), + + /// A numeric field overflowed during type conversion. + /// + /// Occurs when an RPC numeric value (e.g., `U256`, `u64`) does not fit + /// into the target column type (e.g., `i128`, `u32`). + #[error("overflow in field {0}: {1}")] + Overflow(&'static str, #[source] OverflowSource), +} + +/// Source of numeric overflow errors during field conversion. +/// +/// Distinguishes between standard integer conversion failures and big integer +/// (U256) conversion failures for better diagnostics. +#[derive(Debug, thiserror::Error)] +pub enum OverflowSource { + /// Overflow from standard integer type conversion. + /// + /// Occurs when a `TryFrom` conversion between standard integer types fails + /// (e.g., `u64` to `u32`). + #[error("{0}")] + Int(#[source] std::num::TryFromIntError), + + /// Overflow from big integer (U256) conversion. + /// + /// Occurs when a `U256` value does not fit into the target `i128` column + /// type. + #[error("{0}")] + BigInt(#[source] FromUintError), +} + +/// Error connecting to a Tempo RPC provider. +/// +/// Returned when the RPC transport (HTTP, WebSocket, or IPC) cannot be +/// established during client initialization. +#[derive(Debug, thiserror::Error)] +pub enum ClientError { + /// Transport-level error (WebSocket/IPC connection failure). + /// + /// Occurs when the underlying transport cannot be established. + #[error("provider error: {0}")] + Transport(#[source] alloy::transports::TransportError), + + /// HTTP client build failure. + /// + /// Occurs when the reqwest HTTP client cannot be constructed (e.g., TLS + /// backend initialization failure). + #[error("HTTP client build error: {0}")] + HttpBuild(#[from] crate::provider::HttpBuildError), +} diff --git a/crates/core/providers-tempo/src/kind.rs b/crates/core/providers-tempo/src/kind.rs new file mode 100644 index 000000000..cad41ac5d --- /dev/null +++ b/crates/core/providers-tempo/src/kind.rs @@ -0,0 +1,132 @@ +//! Tempo provider kind type and parsing utilities. + +use amp_providers_common::kind::ProviderKindStr; + +/// The canonical string identifier for Tempo providers. +const PROVIDER_KIND: &str = "tempo"; + +/// Type-safe representation of the Tempo provider kind. +/// +/// This zero-sized type represents the "tempo" provider kind, which interacts +/// with Tempo-compatible JSON-RPC endpoints for blockchain data extraction. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(inline))] +#[cfg_attr( + feature = "schemars", + schemars(schema_with = "tempo_provider_kind_schema") +)] +pub struct TempoProviderKind; + +impl TempoProviderKind { + /// Returns the canonical string identifier for this provider kind. + #[inline] + pub const fn as_str(self) -> &'static str { + PROVIDER_KIND + } +} + +impl AsRef for TempoProviderKind { + fn as_ref(&self) -> &str { + PROVIDER_KIND + } +} + +impl From for ProviderKindStr { + fn from(value: TempoProviderKind) -> Self { + // SAFETY: The constant PROVIDER_KIND is "tempo", which is non-empty + ProviderKindStr::new_unchecked(value.to_string()) + } +} + +#[cfg(feature = "schemars")] +fn tempo_provider_kind_schema(_gen: &mut schemars::SchemaGenerator) -> schemars::Schema { + let schema_obj = serde_json::json!({ + "const": PROVIDER_KIND + }); + serde_json::from_value(schema_obj).unwrap() +} + +impl std::str::FromStr for TempoProviderKind { + type Err = TempoProviderKindError; + + fn from_str(s: &str) -> Result { + if s != PROVIDER_KIND { + return Err(TempoProviderKindError(s.to_string())); + } + + Ok(TempoProviderKind) + } +} + +impl std::fmt::Display for TempoProviderKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + PROVIDER_KIND.fmt(f) + } +} + +impl serde::Serialize for TempoProviderKind { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(PROVIDER_KIND) + } +} + +impl<'de> serde::Deserialize<'de> for TempoProviderKind { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } +} + +impl PartialEq for TempoProviderKind { + fn eq(&self, other: &str) -> bool { + PROVIDER_KIND == other + } +} + +impl PartialEq for str { + fn eq(&self, _other: &TempoProviderKind) -> bool { + self == PROVIDER_KIND + } +} + +impl PartialEq<&str> for TempoProviderKind { + fn eq(&self, other: &&str) -> bool { + PROVIDER_KIND == *other + } +} + +impl PartialEq for &str { + fn eq(&self, _other: &TempoProviderKind) -> bool { + *self == PROVIDER_KIND + } +} + +impl PartialEq for TempoProviderKind { + fn eq(&self, other: &String) -> bool { + PROVIDER_KIND == other.as_str() + } +} + +impl PartialEq for String { + fn eq(&self, _other: &TempoProviderKind) -> bool { + self.as_str() == PROVIDER_KIND + } +} + +impl PartialEq for TempoProviderKind { + fn eq(&self, other: &ProviderKindStr) -> bool { + PROVIDER_KIND == other.as_str() + } +} + +/// Error returned when parsing an invalid Tempo RPC provider kind string. +#[derive(Debug, thiserror::Error)] +#[error("invalid provider kind: {}, expected: {}", .0, PROVIDER_KIND)] +pub struct TempoProviderKindError(String); diff --git a/crates/core/providers-tempo/src/lib.rs b/crates/core/providers-tempo/src/lib.rs new file mode 100644 index 000000000..dd9ae11b5 --- /dev/null +++ b/crates/core/providers-tempo/src/lib.rs @@ -0,0 +1,83 @@ +//! Tempo RPC provider types, configuration, and extraction client. +//! +//! This crate provides the Tempo RPC source implementation: the provider +//! kind identifier, configuration, HTTP/WS/IPC transport construction, and the +//! block-streaming extraction client. + +use std::path::PathBuf; + +use amp_providers_common::provider_name::ProviderName; + +use crate::{config::TempoProviderConfig, error::ClientError, provider::Auth}; + +pub mod client; +pub mod config; +mod convert; +pub mod error; +pub mod kind; +pub mod metrics; +pub mod provider; + +pub use self::client::Client; + +/// Create a Tempo RPC block-streaming client from provider configuration. +pub async fn client( + name: ProviderName, + config: TempoProviderConfig, + meter: Option<&monitoring::telemetry::metrics::Meter>, +) -> Result { + let url = config.url.into_inner(); + let auth = config.auth_token.map(|token| match config.auth_header { + Some(header) => Auth::CustomHeader { + name: header, + value: token, + }, + None => Auth::Bearer(token), + }); + + let request_limit = u16::max(1, config.concurrent_request_limit.unwrap_or(1024)); + let client = match url.scheme() { + "ipc" => { + let path = url.path(); + Client::new_ipc( + PathBuf::from(path), + config.network, + name, + request_limit, + config.rpc_batch_size, + config.rate_limit_per_minute, + config.fetch_receipts_per_tx, + meter, + ) + .await? + } + "ws" | "wss" => { + Client::new_ws( + url, + config.network, + name, + request_limit, + config.rpc_batch_size, + config.rate_limit_per_minute, + config.fetch_receipts_per_tx, + auth, + meter, + ) + .await? + } + _ => Client::new( + url, + config.network, + name, + request_limit, + config.rpc_batch_size, + config.rate_limit_per_minute, + config.fetch_receipts_per_tx, + config.timeout, + auth, + meter, + )?, + }; + + Ok(client) +} diff --git a/crates/core/providers-tempo/src/metrics.rs b/crates/core/providers-tempo/src/metrics.rs new file mode 100644 index 000000000..11fa670eb --- /dev/null +++ b/crates/core/providers-tempo/src/metrics.rs @@ -0,0 +1,90 @@ +use amp_providers_common::network_id::NetworkId; +use monitoring::telemetry; + +/// Metrics registry for Tempo RPC provider operations. +/// +/// Tracks request counts, durations, batch sizes, and error rates for Tempo +/// RPC calls, broken down by provider name, network, and method. +#[derive(Debug, Clone)] +pub struct MetricsRegistry { + /// Total number of Tempo RPC requests made. + pub rpc_requests: telemetry::metrics::Counter, + /// Duration of individual Tempo RPC requests in milliseconds. + pub rpc_request_duration: telemetry::metrics::Histogram, + /// Total number of Tempo RPC errors encountered. + pub rpc_errors: telemetry::metrics::Counter, + /// Number of requests per RPC batch. + pub rpc_batch_size: telemetry::metrics::Histogram, +} + +impl MetricsRegistry { + /// Create a new metrics registry from an OpenTelemetry meter. + pub fn new(meter: &telemetry::metrics::Meter) -> Self { + Self { + rpc_requests: telemetry::metrics::Counter::new( + meter, + "tempo_rpc_requests_total", + "Total number of Tempo RPC requests", + ), + rpc_request_duration: telemetry::metrics::Histogram::new_f64( + meter, + "tempo_rpc_request_duration", + "Duration of Tempo RPC requests", + "milliseconds", + ), + rpc_errors: telemetry::metrics::Counter::new( + meter, + "tempo_rpc_errors_total", + "Total number of Tempo RPC errors", + ), + rpc_batch_size: telemetry::metrics::Histogram::new_u64( + meter, + "tempo_rpc_batch_size_requests", + "Number of requests per RPC batch", + "requests", + ), + } + } + + pub(crate) fn record_single_request( + &self, + duration_millis: f64, + provider: &str, + network: &NetworkId, + method: &str, + ) { + let kv_pairs = [ + telemetry::metrics::KeyValue::new("provider", provider.to_string()), + telemetry::metrics::KeyValue::new("network", network.to_string()), + telemetry::metrics::KeyValue::new("method", method.to_string()), + ]; + self.rpc_requests.inc_with_kvs(&kv_pairs); + self.rpc_request_duration + .record_with_kvs(duration_millis, &kv_pairs); + } + + pub(crate) fn record_batch_request( + &self, + duration_millis: f64, + batch_size: u64, + provider: &str, + network: &NetworkId, + ) { + let kv_pairs = [ + telemetry::metrics::KeyValue::new("provider", provider.to_string()), + telemetry::metrics::KeyValue::new("network", network.to_string()), + ]; + self.rpc_requests.inc_with_kvs(&kv_pairs); + self.rpc_request_duration + .record_with_kvs(duration_millis, &kv_pairs); + self.rpc_batch_size.record_with_kvs(batch_size, &kv_pairs); + } + + pub(crate) fn record_error(&self, provider: &str, network: &NetworkId) { + let kv_pairs = [ + telemetry::metrics::KeyValue::new("provider", provider.to_string()), + telemetry::metrics::KeyValue::new("network", network.to_string()), + ]; + self.rpc_errors.inc_with_kvs(&kv_pairs); + } +} diff --git a/crates/core/providers-tempo/src/provider.rs b/crates/core/providers-tempo/src/provider.rs new file mode 100644 index 000000000..cea1466d9 --- /dev/null +++ b/crates/core/providers-tempo/src/provider.rs @@ -0,0 +1,217 @@ +use std::{ + future::Future, + num::NonZeroU32, + path::Path, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use alloy::{ + providers::{ + ProviderBuilder as AlloyProviderBuilder, RootProvider as AlloyRootProvider, WsConnect, + }, + rpc::client::ClientBuilder, + transports::{Authorization as TransportAuthorization, TransportError, http::reqwest::Client}, +}; +use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use headers::{Authorization, HeaderMap, HeaderMapExt, HeaderName, HeaderValue}; +use tempo_alloy::TempoNetwork; +use tower::{Layer, Service}; +use url::Url; + +use crate::config::{AuthHeaderName, AuthToken}; + +/// Type alias for a Tempo RPC provider using the Tempo network type. +pub type TempoRpcAlloyProvider = AlloyRootProvider; + +/// Authentication configuration for Tempo RPC providers. +pub enum Auth { + /// Standard `Authorization: Bearer ` header. + Bearer(AuthToken), + /// Custom header: `: `. + CustomHeader { + name: AuthHeaderName, + value: AuthToken, + }, +} + +/// Create an HTTP/HTTPS Tempo RPC provider. +/// +/// # Errors +/// +/// Returns an error if the HTTP client cannot be built (e.g., TLS backend +/// initialization failure). +pub fn new_http( + url: Url, + auth: Option, + rate_limit: Option, + timeout: Duration, +) -> Result { + let mut http_client_builder = Client::builder(); + http_client_builder = if let Some(auth) = auth { + let mut headers = HeaderMap::new(); + match auth { + // SAFETY: AuthToken and AuthHeaderName are validated at deserialization + // time via HeaderValue::try_from and HeaderName::try_from checks. + Auth::Bearer(token) => { + headers.typed_insert( + Authorization::bearer(&token.into_inner()) + .expect("validated at config parse time"), + ); + } + Auth::CustomHeader { name, value: token } => { + let name = HeaderName::try_from(name.into_inner()) + .expect("validated at config parse time"); + let mut value = HeaderValue::try_from(token.into_inner()) + .expect("validated at config parse time"); + value.set_sensitive(true); + headers.insert(name, value); + } + } + http_client_builder.default_headers(headers) + } else { + http_client_builder + }; + let http_client = http_client_builder + .timeout(timeout) + .build() + .map_err(HttpBuildError)?; + + let client_builder = ClientBuilder::default(); + let client = if let Some(rl) = rate_limit { + client_builder + .layer(RateLimitLayer::new(rl)) + .http_with_client(http_client, url) + } else { + client_builder.http_with_client(http_client, url) + }; + + Ok(AlloyProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_client(client)) +} + +/// Error building the HTTP client for a Tempo RPC provider. +/// +/// Occurs when the reqwest HTTP client cannot be constructed, typically due to +/// TLS backend initialization failure. +#[derive(Debug, thiserror::Error)] +#[error("failed to build HTTP client")] +pub struct HttpBuildError(#[source] alloy::transports::http::reqwest::Error); + +/// Create an IPC Tempo RPC provider. +pub async fn new_ipc>( + path: P, + rate_limit: Option, +) -> Result { + let client_builder = ClientBuilder::default(); + let client = if let Some(rl) = rate_limit { + client_builder + .layer(RateLimitLayer::new(rl)) + .ipc(path.as_ref().to_path_buf().into()) + .await? + } else { + client_builder + .ipc(path.as_ref().to_path_buf().into()) + .await? + }; + + Ok(AlloyProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_client(client)) +} + +/// Create a WebSocket Tempo RPC provider. +pub async fn new_ws( + url: Url, + auth: Option, + rate_limit: Option, +) -> Result { + let mut ws_connect = WsConnect::new(url); + ws_connect = if let Some(a) = auth { + let token = match a { + Auth::Bearer(token) => token, + Auth::CustomHeader { value: token, .. } => { + tracing::warn!( + "custom auth header unsupported for WebSocket, fell back to Authorization header" + ); + token + } + }; + ws_connect.with_auth(TransportAuthorization::raw(token.into_inner())) + } else { + ws_connect + }; + + let client_builder = ClientBuilder::default(); + let client = if let Some(rl) = rate_limit { + client_builder + .layer(RateLimitLayer::new(rl)) + .ws(ws_connect) + .await? + } else { + client_builder.ws(ws_connect).await? + }; + + Ok(AlloyProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_client(client)) +} + +struct RateLimitLayer { + limiter: Arc, +} + +impl RateLimitLayer { + fn new(rate_limit: NonZeroU32) -> Self { + let quota = Quota::per_minute(rate_limit); + let limiter = Arc::new(RateLimiter::direct(quota)); + RateLimitLayer { limiter } + } +} + +impl Layer for RateLimitLayer { + type Service = RateLimitService; + + fn layer(&self, inner: S) -> Self::Service { + RateLimitService { + inner, + limiter: Arc::clone(&self.limiter), + } + } +} + +#[derive(Clone)] +struct RateLimitService { + inner: S, + limiter: Arc, +} + +impl Service for RateLimitService +where + S: Service + Send + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let inner_fut = self.inner.call(req); + let limiter = Arc::clone(&self.limiter); + + Box::pin(async move { + limiter.until_ready().await; + inner_fut.await + }) + } +} diff --git a/docs/schemas/providers/tempo.spec.json b/docs/schemas/providers/tempo.spec.json new file mode 100644 index 000000000..98048be55 --- /dev/null +++ b/docs/schemas/providers/tempo.spec.json @@ -0,0 +1,86 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "TempoProviderConfig", + "description": "Tempo RPC provider configuration for parsing TOML config.", + "type": "object", + "properties": { + "auth_header": { + "description": "Custom header name for authentication.", + "type": [ + "string", + "null" + ] + }, + "auth_token": { + "description": "Authentication token for RPC requests.", + "type": [ + "string", + "null" + ] + }, + "concurrent_request_limit": { + "description": "Optional limit on the number of concurrent requests.", + "type": [ + "integer", + "null" + ], + "format": "uint16", + "default": 1024, + "maximum": 65535, + "minimum": 0 + }, + "fetch_receipts_per_tx": { + "description": "Whether to use `eth_getTransactionReceipt` to fetch receipts for each transaction\nor `eth_getBlockReceipts` to fetch all receipts for a block in one call.", + "type": "boolean", + "default": false + }, + "kind": { + "description": "The provider kind, must be `\"tempo\"`.", + "const": "tempo" + }, + "network": { + "description": "The network this provider serves.", + "$ref": "#/$defs/NetworkId" + }, + "rate_limit_per_minute": { + "description": "Optional rate limit for requests per minute.", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "default": null, + "minimum": 1 + }, + "rpc_batch_size": { + "description": "Maximum number of JSON-RPC requests to batch together.\n\nSet to 0 to disable batching.", + "type": "integer", + "format": "uint", + "default": 0, + "minimum": 0 + }, + "timeout_secs": { + "description": "Request timeout in seconds.", + "type": "integer", + "format": "uint64", + "default": 30, + "minimum": 0 + }, + "url": { + "description": "The URL of the Tempo RPC endpoint (HTTP, HTTPS, WebSocket, or IPC).", + "type": "string" + } + }, + "required": [ + "kind", + "network", + "url" + ], + "$defs": { + "NetworkId": { + "description": "A validated network identifier that enforces basic constraints.\n\nNetwork identifiers are used to distinguish between different blockchain networks\n(e.g., \"mainnet\", \"base\", \"arbitrum-one\"). This type provides compile-time guarantees\nthat all instances contain valid network identifiers.\n\nNetwork IDs should follow the values defined in The Graph's networks registry:\n\n\n## Format Requirements\n\nA valid network identifier must:\n- **Not be empty** (minimum length of 1 character)", + "type": "string", + "minLength": 1 + } + } +} \ No newline at end of file diff --git a/justfile b/justfile index 785a030a8..1f88e13dc 100644 --- a/justfile +++ b/justfile @@ -250,7 +250,9 @@ gen: cp -f $(ls -t target/debug/build/datasets-solana-gen-*/out/tables.md | head -1) {{GEN_TABLE_SCHEMAS_OUTDIR}}/solana.md echo " {{GEN_TABLE_SCHEMAS_OUTDIR}}/solana.md" - # Tempo (table schema only) + # Tempo (provider + table schema) + cp -f $(ls -t target/debug/build/amp-providers-tempo-gen-*/out/schema.json | head -1) {{GEN_PROVIDER_SCHEMAS_OUTDIR}}/tempo.spec.json + echo " {{GEN_PROVIDER_SCHEMAS_OUTDIR}}/tempo.spec.json" cp -f $(ls -t target/debug/build/datasets-tempo-gen-*/out/tables.md | head -1) {{GEN_TABLE_SCHEMAS_OUTDIR}}/tempo.md echo " {{GEN_TABLE_SCHEMAS_OUTDIR}}/tempo.md" @@ -383,6 +385,16 @@ update-solana-storage-proto BRANCH="master": gen-solana-storage-proto: RUSTFLAGS="--cfg gen_proto" cargo check -p solana-storage-proto +### Tempo + +# Generate Tempo provider config JSON schema +[group: 'codegen'] +gen-tempo-provider-schema DEST_DIR=GEN_PROVIDER_SCHEMAS_OUTDIR: + RUSTFLAGS="--cfg gen_schema_provider" cargo check -p amp-providers-tempo-gen + @mkdir -p {{DEST_DIR}} + @cp -f $(ls -t target/debug/build/amp-providers-tempo-gen-*/out/schema.json | head -1) {{DEST_DIR}}/tempo.spec.json + @echo "Schema generated and copied to {{DEST_DIR}}/tempo.spec.json" + ### Static (provider only) # Generate static provider config JSON schema diff --git a/tests/config/manifests/tempo.json b/tests/config/manifests/tempo.json index 28f5e288a..13c764d95 100644 --- a/tests/config/manifests/tempo.json +++ b/tests/config/manifests/tempo.json @@ -1,7 +1,7 @@ { "kind": "tempo", "network": "testnet", - "start_block": 0, + "start_block": 9069931, "finalized_blocks_only": false, "tables": { "blocks": { diff --git a/tests/src/tests/it_ampctl_gen_manifest.rs b/tests/src/tests/it_ampctl_gen_manifest.rs index d0de7a3ec..32616b8b5 100644 --- a/tests/src/tests/it_ampctl_gen_manifest.rs +++ b/tests/src/tests/it_ampctl_gen_manifest.rs @@ -316,8 +316,14 @@ async fn gen_manifest_produces_expected_tempo_json() { //* When let mut out = Vec::new(); - let result = - generate::generate_manifest(&kind, network.parse().unwrap(), None, false, &mut out).await; + let result = generate::generate_manifest( + &kind, + network.parse().unwrap(), + Some(9069931), + false, + &mut out, + ) + .await; //* Then assert!(result.is_ok(), "manifest generation should succeed");