From b098733906669fca7fb009ec2119ef9fb13ae2fb Mon Sep 17 00:00:00 2001 From: isum <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:33:15 +0000 Subject: [PATCH 01/15] node: add per-chain Amp Flight service configuration --- node/resources/tests/full_config.toml | 8 +- node/src/config.rs | 202 +++++++++++++++++++++++--- 2 files changed, 186 insertions(+), 24 deletions(-) diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 88ba990c1e6..9c5c5bdb4d4 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -45,7 +45,6 @@ ingestor = "index_0" [chains.mainnet] shard = "primary" -amp = "ethereum-mainnet" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, @@ -53,6 +52,13 @@ provider = [ { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] +[chains.mainnet.amp] +address = "http://localhost:50051" +token = "secret-token" +context_dataset = "eth" +context_table = "blocks" +network = "ethereum-mainnet" + [chains.ropsten] shard = "primary" provider = [ diff --git a/node/src/config.rs b/node/src/config.rs index 69c174e5a76..f8d1aaeb1ff 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -121,8 +121,12 @@ impl Config { .chains .iter() .map(|(chain_name, chain)| { - let amp_name: ChainName = - chain.amp.as_deref().unwrap_or(chain_name.as_str()).into(); + let amp_name: ChainName = chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .unwrap_or(chain_name.as_str()) + .into(); let internal_name: ChainName = chain_name.as_str().into(); (amp_name, internal_name) }) @@ -449,29 +453,38 @@ impl ChainSection { chain.validate()? } - // Validate that effective AMP names are unique and don't collide + // Validate that effective Amp names are unique and don't collide // with other chain names. let mut amp_names: BTreeMap = BTreeMap::new(); for (chain_name, chain) in &self.chains { - let effective = chain.amp.as_deref().unwrap_or(chain_name.as_str()); + let effective = chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .unwrap_or(chain_name.as_str()); if let Some(prev_chain) = amp_names.get(effective) { return Err(anyhow!( - "duplicate AMP name `{}`: used by chains `{}` and `{}`", + "duplicate Amp name `{}`: used by chains `{}` and `{}`", effective, prev_chain, chain_name )); } - // Check that an explicit amp alias doesn't collide with + // Check that an explicit amp network alias doesn't collide with // another chain's own name (which would be ambiguous). - if chain.amp.is_some() { + if chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .is_some() + { if let Some(other) = self.chains.get(effective) { // Only a collision if the other chain doesn't also // set the same amp alias (which is covered by the // duplicate check above). - if other.amp.as_deref() != Some(effective) { + if other.amp.as_ref().and_then(|c| c.network.as_deref()) != Some(effective) { return Err(anyhow!( - "AMP alias `{}` on chain `{}` collides with chain `{}`", + "Amp alias `{}` on chain `{}` collides with chain `{}`", effective, chain_name, effective, @@ -584,6 +597,32 @@ impl ChainSection { } } +/// Per-chain Amp Flight service configuration. +/// +/// Parsed from a `[chains..amp]` TOML table. When the `amp` key is +/// absent, Amp is disabled for that chain. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct AmpConfig { + /// Amp Flight service address (e.g. `http://localhost:50051`). + pub address: String, + /// Optional authentication token for the Amp Flight service. + pub token: Option, + /// The dataset in the Amp Flight service that contains the context table. + /// + /// This identifies the logical grouping (dataset) within the Flight + /// service where the block-level context table resides. + pub context_dataset: String, + /// The table providing block-level context: block hash, block number, + /// and timestamp. + /// + /// This should typically point to the blocks table (not transactions or + /// logs), since it reliably contains one row per block with the block hash. + pub context_table: String, + /// Optional Amp network name, used when the Amp network name differs + /// from the graph-node chain name. + pub network: Option, +} + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct Chain { pub shard: String, @@ -596,10 +635,11 @@ pub struct Chain { pub polling_interval: Duration, #[serde(rename = "provider")] pub providers: Vec, - /// AMP network name alias. When set, AMP manifests using this name will - /// resolve to this chain. Defaults to the chain name. + /// Amp configuration table. When present, Amp is enabled for this chain + /// using the specified Flight service address, context dataset/table, + /// and optional auth token and network name override. #[serde(default)] - pub amp: Option, + pub amp: Option, } fn default_blockchain_kind() -> BlockchainKind { @@ -1289,7 +1329,8 @@ mod tests { use crate::config::{default_polling_interval, ChainSection, Web3Rule}; use super::{ - Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, + AmpConfig, Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, + Web3Provider, }; use graph::blockchain::BlockchainKind; use graph::firehose::SubgraphLimit; @@ -1962,18 +2003,117 @@ fdw_pool_size = [ assert_eq!(shard.fdw_pool_size.size_for(&other, "ashard").unwrap(), 5); } + #[test] + fn amp_config_toml_parses_full_table() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + [amp] + address = "http://localhost:50051" + token = "my-secret-token" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" + "#, + ) + .unwrap(); + + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: Some("my-secret-token".to_string()), + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }) + ); + } + + #[test] + fn amp_config_toml_parses_minimal_table() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + [amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + "#, + ) + .unwrap(); + + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }) + ); + } + + #[test] + fn amp_config_toml_parses_without_amp() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + assert_eq!(actual.amp, None); + } + + #[test] + fn amp_config_toml_rejects_missing_required_field() { + // The `address` field is required; omitting it should cause a deserialization error. + let result = toml::from_str::( + r#" + shard = "primary" + provider = [] + [amp] + context_dataset = "eth" + context_table = "blocks" + "#, + ); + + assert!( + result.is_err(), + "expected deserialization error when address is missing" + ); + } + #[test] fn amp_chain_names_parsed_from_toml() { let actual: Chain = toml::from_str( r#" shard = "primary" provider = [] - amp = "ethereum-mainnet" + [amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" "#, ) .unwrap(); - assert_eq!(actual.amp, Some("ethereum-mainnet".to_string())); + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }) + ); } #[test] @@ -1997,11 +2137,19 @@ fdw_pool_size = [ [mainnet] shard = "primary" provider = [] - amp = "eth" + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "eth" [sepolia] shard = "primary" provider = [] - amp = "eth" + [sepolia.amp] + address = "http://localhost:50052" + context_dataset = "eth" + context_table = "blocks" + network = "eth" "#, ) .unwrap(); @@ -2009,8 +2157,8 @@ fdw_pool_size = [ let err = section.validate(); assert!(err.is_err()); assert!( - err.unwrap_err().to_string().contains("duplicate AMP name"), - "expected duplicate AMP name error" + err.unwrap_err().to_string().contains("duplicate Amp name"), + "expected duplicate Amp name error" ); } @@ -2025,7 +2173,11 @@ fdw_pool_size = [ [sepolia] shard = "primary" provider = [] - amp = "mainnet" + [sepolia.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "mainnet" "#, ) .unwrap(); @@ -2034,9 +2186,9 @@ fdw_pool_size = [ assert!(err.is_err()); let msg = err.unwrap_err().to_string(); // The alias "mainnet" on sepolia collides with the chain named - // "mainnet" whose effective AMP name is also "mainnet". + // "mainnet" whose effective Amp name is also "mainnet". assert!( - msg.contains("duplicate AMP name") || msg.contains("collides with chain"), + msg.contains("duplicate Amp name") || msg.contains("collides with chain"), "expected collision/duplicate error, got: {msg}" ); } @@ -2049,7 +2201,11 @@ fdw_pool_size = [ [mainnet] shard = "primary" provider = [] - amp = "ethereum-mainnet" + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" [sepolia] shard = "primary" provider = [] From 5e0c30940b96cdf54d7f690ce8ecfe0a24e8124e Mon Sep 17 00:00:00 2001 From: isum <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:38:30 +0000 Subject: [PATCH 02/15] node: add URI validation for AMP config address field --- node/src/config.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/node/src/config.rs b/node/src/config.rs index f8d1aaeb1ff..15e075c9531 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -453,6 +453,20 @@ impl ChainSection { chain.validate()? } + // Validate Amp address URIs. + for (chain_name, chain) in &self.chains { + if let Some(amp_config) = &chain.amp { + amp_config.address.parse::().map_err(|e| { + anyhow!( + "invalid Amp address URI '{}' for chain '{}': {}", + amp_config.address, + chain_name, + e + ) + })?; + } + } + // Validate that effective Amp names are unique and don't collide // with other chain names. let mut amp_names: BTreeMap = BTreeMap::new(); @@ -2246,4 +2260,74 @@ fdw_pool_size = [ graph::components::network_provider::ChainName::from("unknown") ); } + + #[test] + fn amp_config_validation_rejects_invalid_address() { + let mut section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "not a valid uri!@#".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + let err = section.validate(); + assert!(err.is_err(), "expected validation error for invalid URI"); + let msg = err.unwrap_err().to_string(); + assert!( + msg.contains("invalid Amp address URI"), + "expected 'invalid Amp address URI' in error, got: {msg}" + ); + assert!( + msg.contains("mainnet"), + "expected chain name 'mainnet' in error, got: {msg}" + ); + } + + #[test] + fn amp_config_validation_accepts_valid_address() { + let mut section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + section + .validate() + .expect("validation should pass for a valid URI"); + } } From 3cb717f536a8b33b7a2e574a345a3ea034aef1c1 Mon Sep 17 00:00:00 2001 From: isum <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:42:09 +0000 Subject: [PATCH 03/15] node: define AmpChainConfig struct and Config method Add AmpChainConfig runtime struct in graph/src/components/network_provider/mod.rs with address as parsed Uri, plus token, context_dataset, context_table, and network fields. Add Config::amp_chain_configs() method that produces a HashMap for all chains with an [amp] TOML section. URI parsing is defensive (returns error, not panic) even though ChainSection::validate already rejects invalid URIs. Three unit tests cover constructability, mixed configs, and error handling for invalid addresses. --- graph/src/components/network_provider/mod.rs | 21 ++- node/src/config.rs | 162 ++++++++++++++++++- 2 files changed, 179 insertions(+), 4 deletions(-) diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index 3677ed6447f..ae7afa79d9f 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -16,6 +16,7 @@ pub use self::provider_check::ProviderCheckStatus; pub use self::provider_manager::ProviderCheckStrategy; pub use self::provider_manager::ProviderManager; +use crate::http::Uri; use std::collections::HashMap; // Used to increase memory efficiency. @@ -26,10 +27,24 @@ pub type ChainName = crate::data::value::Word; // Currently, there is no need to create a separate type for this. pub type ProviderName = crate::data::value::Word; -/// Maps AMP network names to internal graph-node chain names. +/// Resolved per-chain Amp configuration, with the address parsed as a `Uri`. /// -/// AMP-powered subgraphs may use different network names than graph-node -/// (e.g., AMP uses `"ethereum-mainnet"` while graph-node uses `"mainnet"`). +/// This struct is the *runtime* counterpart of the TOML-level `AmpConfig` +/// (which stores the address as a plain `String`). The `Config::amp_chain_configs()` +/// method bridges the two by parsing each address string into a `Uri`. +#[derive(Clone, Debug)] +pub struct AmpChainConfig { + pub address: Uri, + pub token: Option, + pub context_dataset: String, + pub context_table: String, + pub network: Option, +} + +/// Maps Amp network names to internal graph-node chain names. +/// +/// Amp-powered subgraphs may use different network names than graph-node +/// (e.g., Amp uses `"ethereum-mainnet"` while graph-node uses `"mainnet"`). /// This type provides a config-driven translation layer. #[derive(Clone, Debug, Default)] pub struct AmpChainNames(HashMap); diff --git a/node/src/config.rs b/node/src/config.rs index 15e075c9531..9c04cee5b6a 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, - components::network_provider::{AmpChainNames, ChainName}, + components::network_provider::{AmpChainConfig, AmpChainNames, ChainName}, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -134,6 +134,37 @@ impl Config { AmpChainNames::new(mapping) } + /// Build a map from chain name to [`AmpChainConfig`] for every chain + /// that has an `[amp]` section. The `AmpConfig.address` string is parsed + /// into a `Uri`; this is expected to always succeed because + /// `ChainSection::validate` already rejects invalid URIs. + pub fn amp_chain_configs(&self) -> Result> { + let mut map = HashMap::new(); + for (chain_name, chain) in &self.chains.chains { + if let Some(amp) = &chain.amp { + let uri = amp.address.parse::().map_err(|e| { + anyhow!( + "invalid Amp address URI '{}' for chain '{}': {}", + amp.address, + chain_name, + e + ) + })?; + map.insert( + chain_name.clone(), + AmpChainConfig { + address: uri, + token: amp.token.clone(), + context_dataset: amp.context_dataset.clone(), + context_table: amp.context_table.clone(), + network: amp.network.clone(), + }, + ); + } + } + Ok(map) + } + /// Check that the config is valid. fn validate(&mut self) -> Result<()> { if !self.stores.contains_key(PRIMARY_SHARD.as_str()) { @@ -2330,4 +2361,133 @@ fdw_pool_size = [ .validate() .expect("validation should pass for a valid URI"); } + + #[test] + fn amp_chain_config_constructable() { + use graph::components::network_provider::AmpChainConfig; + use graph::http::Uri; + + let uri: Uri = "http://localhost:50051".parse().unwrap(); + let cfg = AmpChainConfig { + address: uri, + token: Some("secret".to_string()), + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }; + + assert_eq!(cfg.address.to_string(), "http://localhost:50051/"); + assert_eq!(cfg.token.as_deref(), Some("secret")); + assert_eq!(cfg.context_dataset, "eth"); + assert_eq!(cfg.context_table, "blocks"); + assert_eq!(cfg.network.as_deref(), Some("ethereum-mainnet")); + } + + #[test] + fn amp_chain_configs_from_mixed() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + token = "my-token" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" + [sepolia] + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + + // Only mainnet (with amp) should be in the map + assert_eq!(map.len(), 1); + assert!(!map.contains_key("sepolia")); + + let mainnet = map.get("mainnet").expect("mainnet should be in map"); + assert_eq!(mainnet.address.to_string(), "http://localhost:50051/"); + assert_eq!(mainnet.token.as_deref(), Some("my-token")); + assert_eq!(mainnet.context_dataset, "eth"); + assert_eq!(mainnet.context_table, "blocks"); + assert_eq!(mainnet.network.as_deref(), Some("ethereum-mainnet")); + } + + #[test] + fn amp_chain_config_invalid_address_returns_error() { + // Build a Config with an invalid address that bypasses validation + // (constructed directly, not via from_str which calls validate). + let section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "not a valid uri!@#".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let result = config.amp_chain_configs(); + assert!(result.is_err(), "expected error for invalid URI"); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("invalid Amp address URI"), + "expected 'invalid Amp address URI' in error, got: {msg}" + ); + assert!( + msg.contains("mainnet"), + "expected chain name in error, got: {msg}" + ); + } } From 8c1addbcd44d091b042c88674b841e47f0070204 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 13:53:45 +0200 Subject: [PATCH 04/15] node: refactor Amp Flight client to per-chain AmpClients wrapper Replace the single global Amp Flight client with per-chain instances stored in a new AmpClients wrapper type. Each chain with an [amp] section in the TOML config now gets its own FlightClient, looked up by chain name at the point of use. Key changes: - Add AmpClients in graph/src/components/network_provider/mod.rs with new(), get(), is_empty() methods - launcher.rs and run.rs: iterate amp_chain_configs() to create per-chain clients; log errors per-chain and continue startup - registrar.rs, instance_manager.rs, amp_subgraph/manager.rs: accept AmpClients instead of Option>; look up client by network name from the raw manifest - server/index-node: thread AmpClients through server, service, resolver - Add 3 unit tests for AmpClients wrapper --- core/src/amp_subgraph/manager.rs | 44 ++++++++-- core/src/subgraph/instance_manager.rs | 20 ++++- core/src/subgraph/registrar.rs | 31 +++++-- graph/src/components/network_provider/mod.rs | 92 +++++++++++++++++++- node/src/bin/manager.rs | 1 - node/src/launcher.rs | 67 +++++++------- node/src/manager/commands/run.rs | 89 +++++++++++-------- server/index-node/src/resolver.rs | 22 +++-- server/index-node/src/server.rs | 10 +-- server/index-node/src/service.rs | 10 +-- tests/src/fixture/mod.rs | 6 +- 11 files changed, 289 insertions(+), 103 deletions(-) diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index ae272830880..83688341c4e 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -8,6 +8,7 @@ use graph::{ components::{ link_resolver::{LinkResolver, LinkResolverContext}, metrics::MetricsRegistry, + network_provider::AmpClients, store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, @@ -31,7 +32,7 @@ pub struct Manager { monitor: Monitor, subgraph_store: Arc, link_resolver: Arc, - amp_client: Arc, + amp_clients: AmpClients, } impl Manager @@ -47,7 +48,7 @@ where cancel_token: &CancellationToken, subgraph_store: Arc, link_resolver: Arc, - amp_client: Arc, + amp_clients: AmpClients, ) -> Self { let logger = logger_factory.component_logger("AmpSubgraphManager", None); let logger_factory = logger_factory.with_parent(logger); @@ -61,7 +62,7 @@ where monitor, subgraph_store, link_resolver, - amp_client, + amp_clients, } } } @@ -112,13 +113,44 @@ where .await .context("failed to load subgraph manifest")?; - let raw_manifest = serde_yaml::from_slice(&manifest_bytes) + let raw_manifest: serde_yaml::Mapping = serde_yaml::from_slice(&manifest_bytes) .context("failed to parse subgraph manifest")?; + // Extract the network name from the raw manifest to look + // up the per-chain Amp client. + let network_name = raw_manifest + .get(serde_yaml::Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .map(|s| s.to_owned()); + + let amp_client = match &network_name { + Some(network) => match manager.amp_clients.get(network) { + Some(client) => client, + None => { + anyhow::bail!( + "Amp is not configured for chain '{}'; \ + cannot start Amp subgraph '{}'", + network, + deployment.hash + ); + } + }, + None => { + anyhow::bail!( + "no network name found in manifest for Amp subgraph '{}'", + deployment.hash + ); + } + }; + let mut manifest = amp::Manifest::resolve::( &logger, manager.link_resolver.cheap_clone(), - manager.amp_client.cheap_clone(), + amp_client.cheap_clone(), manager.env_vars.max_spec_version.cheap_clone(), deployment.hash.cheap_clone(), raw_manifest, @@ -139,7 +171,7 @@ where let runner_context = runner::Context::new( &logger, &manager.env_vars.amp, - manager.amp_client.cheap_clone(), + amp_client, store, deployment.hash.cheap_clone(), manifest, diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 5d0c89ae171..b1dade1d3c5 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -15,6 +15,7 @@ use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; +use graph::components::network_provider::AmpClients; use graph::components::store::SourceableStore; use graph::components::subgraph::ProofOfIndexingVersion; use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; @@ -41,7 +42,7 @@ pub struct SubgraphInstanceManager { link_resolver: Arc, ipfs_service: IpfsService, arweave_service: ArweaveService, - amp_client: Option>, + amp_clients: AmpClients, static_filters: bool, env_vars: Arc, @@ -175,7 +176,7 @@ impl SubgraphInstanceManager { link_resolver: Arc, ipfs_service: IpfsService, arweave_service: ArweaveService, - amp_client: Option>, + amp_clients: AmpClients, static_filters: bool, ) -> Self { let logger = logger_factory.component_logger("SubgraphInstanceManager", None); @@ -189,7 +190,7 @@ impl SubgraphInstanceManager { instances: SubgraphKeepAlive::new(sg_metrics), link_resolver, ipfs_service, - amp_client, + amp_clients, static_filters, env_vars, arweave_service, @@ -267,6 +268,17 @@ impl SubgraphInstanceManager { let subgraph_store = self.subgraph_store.cheap_clone(); let registry = self.metrics_registry.cheap_clone(); + // Look up the per-chain Amp client based on the network from the + // raw manifest (before the manifest is moved into parse). + let amp_client = raw_manifest + .get(serde_yaml::Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .and_then(|network| self.amp_clients.get(network)); + let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?; @@ -300,7 +312,7 @@ impl SubgraphInstanceManager { .resolve( &deployment.hash, &link_resolver, - self.amp_client.cheap_clone(), + amp_client, &logger, ENV_VARS.max_spec_version.clone(), ) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 9b0efe21abc..c671885ab3a 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -5,7 +5,7 @@ use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::{ link_resolver::LinkResolverContext, - network_provider::AmpChainNames, + network_provider::{AmpChainNames, AmpClients}, store::{DeploymentId, DeploymentLocator, SubscriptionManager}, subgraph::Settings, }; @@ -25,7 +25,7 @@ pub struct SubgraphRegistrar { provider: Arc

, store: Arc, subscription_manager: Arc, - amp_client: Option>, + amp_clients: AmpClients, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -47,7 +47,7 @@ where provider: Arc

, store: Arc, subscription_manager: Arc, - amp_client: Option>, + amp_clients: AmpClients, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -66,7 +66,7 @@ where provider, store, subscription_manager, - amp_client, + amp_clients, chains, node_id, version_switching_mode, @@ -297,6 +297,13 @@ where SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) })?; + // Extract the network name from the raw manifest and resolve the + // per-chain Amp client (if any). + let amp_client = network_name_from_raw(&raw).and_then(|network| { + let resolved = self.amp_chain_names.resolve(&Word::from(network)); + self.amp_clients.get(resolved.as_str()) + }); + // Give priority to deployment specific history_blocks value. let history_blocks = history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks)); @@ -316,7 +323,7 @@ where debug_fork, self.version_switching_mode, &resolver, - self.amp_client.cheap_clone(), + amp_client.cheap_clone(), history_blocks, &self.amp_chain_names, ) @@ -336,7 +343,7 @@ where debug_fork, self.version_switching_mode, &resolver, - self.amp_client.cheap_clone(), + amp_client, history_blocks, &self.amp_chain_names, ) @@ -451,6 +458,18 @@ async fn resolve_graft_block( }) } +/// Extracts the network name from the first data source in a raw manifest. +fn network_name_from_raw(raw: &serde_yaml::Mapping) -> Option { + use serde_yaml::Value; + raw.get(Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .map(|s| s.to_owned()) +} + async fn create_subgraph_version( logger: &Logger, store: Arc, diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index ae7afa79d9f..31cb199d11e 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -18,6 +18,7 @@ pub use self::provider_manager::ProviderManager; use crate::http::Uri; use std::collections::HashMap; +use std::sync::Arc; // Used to increase memory efficiency. // Currently, there is no need to create a separate type for this. @@ -41,6 +42,58 @@ pub struct AmpChainConfig { pub network: Option, } +/// Holds per-chain Amp Flight clients, keyed by chain name. +/// +/// This wrapper is used to pass per-chain Amp clients through the system +/// instead of a single global `Option>`. Use `get(chain_name)` to +/// retrieve the client for a specific chain. +pub struct AmpClients { + clients: HashMap>, +} + +impl AmpClients { + /// Creates a new `AmpClients` from a map of chain names to clients. + pub fn new(clients: HashMap>) -> Self { + Self { clients } + } + + /// Returns the Amp client for the given chain, or `None` if no client + /// is configured for that chain. + pub fn get(&self, chain_name: &str) -> Option> { + self.clients.get(chain_name).cloned() + } + + /// Returns `true` if no Amp clients are configured. + pub fn is_empty(&self) -> bool { + self.clients.is_empty() + } +} + +// Manual Clone impl: only requires `Arc: Clone` (always true), not `AC: Clone`. +impl Clone for AmpClients { + fn clone(&self) -> Self { + Self { + clients: self.clients.clone(), + } + } +} + +impl std::fmt::Debug for AmpClients { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AmpClients") + .field("chains", &self.clients.keys().collect::>()) + .finish() + } +} + +impl Default for AmpClients { + fn default() -> Self { + Self { + clients: HashMap::new(), + } + } +} + /// Maps Amp network names to internal graph-node chain names. /// /// Amp-powered subgraphs may use different network names than graph-node @@ -54,7 +107,7 @@ impl AmpChainNames { AmpChainNames(mapping) } - /// Returns the internal chain name for an AMP alias, or the input + /// Returns the internal chain name for an Amp alias, or the input /// unchanged if no alias matches. pub fn resolve(&self, name: &ChainName) -> ChainName { self.0.get(name).cloned().unwrap_or_else(|| name.clone()) @@ -87,4 +140,41 @@ mod tests { ChainName::from("mainnet") ); } + + #[test] + fn amp_clients_returns_client_for_configured_chain() { + let mut map = HashMap::new(); + map.insert("mainnet".to_string(), Arc::new(42u32)); + let clients = AmpClients::new(map); + let client = clients.get("mainnet"); + assert!(client.is_some()); + assert_eq!(*client.unwrap(), 42); + } + + #[test] + fn amp_clients_returns_none_for_unconfigured_chain() { + let map: HashMap> = HashMap::new(); + let clients = AmpClients::new(map); + assert!(clients.get("mainnet").is_none()); + } + + /// Simulates the error path in downstream consumers: when a subgraph + /// references a chain with no Amp client, the consumer should treat + /// `get()` returning `None` as an error. + #[test] + fn amp_clients_error_for_unconfigured_amp_chain() { + let mut map = HashMap::new(); + map.insert("mainnet".to_string(), Arc::new(1u32)); + let clients = AmpClients::new(map); + + // "matic" is not configured. + let result = clients + .get("matic") + .ok_or_else(|| "Amp is not configured for chain 'matic'".to_string()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err(), + "Amp is not configured for chain 'matic'" + ); + } } diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 792df8853c9..20e1a8fd5c1 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -1348,7 +1348,6 @@ async fn main() -> anyhow::Result<()> { network_name, ipfs_url, arweave_url, - opt.amp_flight_service_address.clone(), config, metrics_ctx, node_id, diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 06f9b8cf652..a7645dfe6a7 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -18,7 +18,7 @@ use graph::url::Url; use graph::{ amp, blockchain::{Blockchain, BlockchainKind, BlockchainMap}, - components::network_provider::AmpChainNames, + components::network_provider::{AmpChainNames, AmpClients}, }; use graph_core::polling_monitor::{arweave_service, ArweaveService, IpfsService}; use graph_graphql::prelude::GraphQlRunner; @@ -275,7 +275,7 @@ fn build_subgraph_registrar( subscription_manager: Arc, arweave_service: ArweaveService, ipfs_service: IpfsService, - amp_client: Option>, + amp_clients: AmpClients, cancel_token: CancellationToken, amp_chain_names: Arc, ) -> Arc< @@ -295,7 +295,7 @@ where let mut subgraph_instance_managers = graph_core::subgraph_provider::SubgraphInstanceManagers::new(); - if let Some(amp_client) = amp_client.cheap_clone() { + if !amp_clients.is_empty() { let amp_instance_manager = graph_core::amp_subgraph::Manager::new( logger_factory, metrics_registry.cheap_clone(), @@ -303,7 +303,7 @@ where &cancel_token, network_store.subgraph_store(), link_resolver.cheap_clone(), - amp_client, + amp_clients.clone(), ); subgraph_instance_managers.add( @@ -322,7 +322,7 @@ where link_resolver.clone(), ipfs_service, arweave_service, - amp_client.cheap_clone(), + amp_clients.clone(), static_filters, ); @@ -351,7 +351,7 @@ where Arc::new(subgraph_provider), network_store.subgraph_store(), subscription_manager, - amp_client, + amp_clients, blockchain_map, node_id.clone(), version_switching_mode, @@ -505,35 +505,40 @@ pub async fn run( &logger_factory, ); - let amp_client = match opt.amp_flight_service_address.as_deref() { - Some(amp_flight_service_address) => { - let addr: graph::http::Uri = amp_flight_service_address - .parse() - .expect("Invalid Amp Flight service address"); - + let amp_clients = { + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in amp_chain_configs { debug!(logger, "Connecting to Amp Flight service"; - "host" => ?addr.host(), - "port" => ?addr.port() + "chain" => &chain_name, + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() ); - - let mut amp_client = amp::FlightClient::new(addr.clone()) - .await - .expect("Failed to connect to Amp Flight service"); - - if let Some(auth_token) = &env_vars.amp.flight_service_token { - amp_client.set_auth_token(auth_token); + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => &chain_name, + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name, Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => &chain_name, + "error" => e.to_string() + ); + } } - - info!(logger, "Amp-powered subgraphs enabled"; - "amp_flight_service_host" => ?addr.host() - ); - - Some(Arc::new(amp_client)) } - None => { + if clients.is_empty() { warn!(logger, "Amp-powered subgraphs disabled"); - None } + AmpClients::new(clients) }; start_graphman_server(opt.graphman_port, graphman_server_config).await; @@ -570,7 +575,7 @@ pub async fn run( blockchain_map.clone(), network_store.clone(), link_resolver.clone(), - amp_client.cheap_clone(), + amp_clients.clone(), ); if !opt.disable_block_ingestor { @@ -597,7 +602,7 @@ pub async fn run( subscription_manager, arweave_service, ipfs_service, - amp_client, + amp_clients, cancel_token, amp_chain_names, ); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c23eb5151d..bf9754e80d2 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -11,7 +11,7 @@ use graph::amp; use graph::anyhow::bail; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; -use graph::components::network_provider::chain_id_validator; +use graph::components::network_provider::{chain_id_validator, AmpClients}; use graph::components::store::DeploymentLocator; use graph::components::subgraph::{Settings, SubgraphInstanceManager as _}; use graph::endpoint::EndpointMetrics; @@ -21,7 +21,7 @@ use graph::prelude::{ SubgraphCountMetric, SubgraphName, SubgraphRegistrar, SubgraphStore, SubgraphVersionSwitchingMode, ENV_VARS, }; -use graph::slog::{debug, info, Logger}; +use graph::slog::{debug, error, info, warn, Logger}; use graph_core::polling_monitor::{arweave_service, ipfs_service}; use tokio_util::sync::CancellationToken; @@ -40,7 +40,6 @@ pub async fn run( _network_name: String, ipfs_url: Vec, arweave_url: String, - amp_flight_service_address: Option, config: Config, metrics_ctx: MetricsContext, node_id: NodeId, @@ -144,41 +143,59 @@ pub async fn run( let mut subgraph_instance_managers = graph_core::subgraph_provider::SubgraphInstanceManagers::new(); - let amp_client = match amp_flight_service_address { - Some(amp_flight_service_address) => { - let addr = amp_flight_service_address - .parse() - .expect("Invalid Amp Flight service address"); - - let mut amp_client = amp::FlightClient::new(addr) - .await - .expect("Failed to connect to Amp Flight service"); - - if let Some(auth_token) = &env_vars.amp.flight_service_token { - amp_client.set_auth_token(auth_token); - } - - let amp_client = Arc::new(amp_client); - let amp_instance_manager = graph_core::amp_subgraph::Manager::new( - &logger_factory, - metrics_registry.cheap_clone(), - env_vars.cheap_clone(), - &cancel_token, - network_store.subgraph_store(), - link_resolver.cheap_clone(), - amp_client.cheap_clone(), + let amp_clients = { + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in amp_chain_configs { + debug!(logger, "Connecting to Amp Flight service"; + "chain" => &chain_name, + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() ); - - subgraph_instance_managers.add( - graph_core::subgraph_provider::SubgraphProcessingKind::Amp, - Arc::new(amp_instance_manager), - ); - - Some(amp_client) + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => &chain_name, + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name, Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => &chain_name, + "error" => e.to_string() + ); + } + } } - None => None, + if clients.is_empty() { + warn!(logger, "Amp-powered subgraphs disabled"); + } + AmpClients::new(clients) }; + if !amp_clients.is_empty() { + let amp_instance_manager = graph_core::amp_subgraph::Manager::new( + &logger_factory, + metrics_registry.cheap_clone(), + env_vars.cheap_clone(), + &cancel_token, + network_store.subgraph_store(), + link_resolver.cheap_clone(), + amp_clients.clone(), + ); + + subgraph_instance_managers.add( + graph_core::subgraph_provider::SubgraphProcessingKind::Amp, + Arc::new(amp_instance_manager), + ); + } + let subgraph_instance_manager = graph_core::subgraph::SubgraphInstanceManager::new( &logger_factory, env_vars.cheap_clone(), @@ -189,7 +206,7 @@ pub async fn run( link_resolver.cheap_clone(), ipfs_service, arweave_service, - amp_client.cheap_clone(), + amp_clients.clone(), static_filters, ); @@ -216,7 +233,7 @@ pub async fn run( subgraph_provider.cheap_clone(), subgraph_store.clone(), panicking_subscription_manager, - amp_client, + amp_clients, blockchain_map, node_id.clone(), SubgraphVersionSwitchingMode::Instant, diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..8250dbda561 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -10,6 +10,7 @@ use git_testament::{git_testament, CommitKind}; use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::link_resolver::LinkResolverContext; +use graph::components::network_provider::AmpClients; use graph::components::store::{BlockPtrForNumber, BlockStore, QueryPermit, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; @@ -101,7 +102,7 @@ pub struct IndexNodeResolver { blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, bearer_token: Option, } @@ -114,7 +115,7 @@ where logger: &Logger, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, bearer_token: Option, blockchain_map: Arc, ) -> Self { @@ -125,7 +126,7 @@ where blockchain_map, store, link_resolver, - amp_client, + amp_clients, bearer_token, } } @@ -520,6 +521,17 @@ where let kind = BlockchainKind::from_manifest(&raw_yaml) .map_err(SubgraphManifestResolveError::ResolveError)?; + // Extract the network name from the raw yaml to look up the + // per-chain Amp client. + let amp_client = raw_yaml + .get(serde_yaml::Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .and_then(|network| self.amp_clients.get(network)); + let max_spec_version = ENV_VARS.max_spec_version.clone(); let result = match kind { @@ -529,7 +541,7 @@ where deployment_hash.clone(), raw_yaml, &self.link_resolver, - self.amp_client.cheap_clone(), + amp_client.cheap_clone(), &self.logger, max_spec_version, ) @@ -547,7 +559,7 @@ where deployment_hash.clone(), raw_yaml, &self.link_resolver, - self.amp_client.cheap_clone(), + amp_client, &self.logger, max_spec_version, ) diff --git a/server/index-node/src/server.rs b/server/index-node/src/server.rs index 00b62c09ca2..58cc5b5dfbd 100644 --- a/server/index-node/src/server.rs +++ b/server/index-node/src/server.rs @@ -1,8 +1,8 @@ use graph::{ amp, blockchain::BlockchainMap, - cheap_clone::CheapClone, components::{ + network_provider::AmpClients, server::server::{start, ServerHandle}, store::Store, }, @@ -17,7 +17,7 @@ pub struct IndexNodeServer { blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, } impl IndexNodeServer @@ -31,7 +31,7 @@ where blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, ) -> Self { let logger = logger_factory.component_logger( "IndexNodeServer", @@ -47,7 +47,7 @@ where blockchain_map, store, link_resolver, - amp_client, + amp_clients, } } @@ -68,7 +68,7 @@ where self.blockchain_map.clone(), store, self.link_resolver.clone(), - self.amp_client.cheap_clone(), + self.amp_clients.clone(), )); start(logger_for_service.clone(), port, move |req| { diff --git a/server/index-node/src/service.rs b/server/index-node/src/service.rs index 09ddfd29038..68c89ef301c 100644 --- a/server/index-node/src/service.rs +++ b/server/index-node/src/service.rs @@ -16,7 +16,7 @@ use graph::hyper::header::{ use graph::hyper::{body::Body, Method, Request, Response, StatusCode}; use graph::amp; -use graph::components::{server::query::ServerError, store::Store}; +use graph::components::{network_provider::AmpClients, server::query::ServerError, store::Store}; use graph::data::query::{Query, QueryError, QueryResult, QueryResults}; use graph::prelude::{q, serde_json}; use graph::slog::{debug, error, Logger}; @@ -46,7 +46,7 @@ pub struct IndexNodeService { store: Arc, explorer: Arc>, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, } impl IndexNodeService @@ -59,7 +59,7 @@ where blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, ) -> Self { let explorer = Arc::new(Explorer::new(store.clone())); @@ -69,7 +69,7 @@ where store, explorer, link_resolver, - amp_client, + amp_clients, } } @@ -143,7 +143,7 @@ where &logger, store, self.link_resolver.clone(), - self.amp_client.cheap_clone(), + self.amp_clients.clone(), validated.bearer_token, self.blockchain_map.clone(), ); diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 234890730e5..72aba3ad994 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -585,7 +585,7 @@ pub async fn setup_inner( link_resolver.cheap_clone(), ipfs_service, arweave_service, - None, + graph::components::network_provider::AmpClients::::default(), static_filters, )); @@ -620,7 +620,7 @@ pub async fn setup_inner( blockchain_map.cheap_clone(), stores.network_store.cheap_clone(), link_resolver.cheap_clone(), - None, + graph::components::network_provider::AmpClients::::default(), )); let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {}); @@ -631,7 +631,7 @@ pub async fn setup_inner( subgraph_provider.cheap_clone(), subgraph_store.clone(), panicking_subscription_manager, - Option::>::None, + graph::components::network_provider::AmpClients::::default(), blockchain_map.clone(), node_id.clone(), SubgraphVersionSwitchingMode::Instant, From 79b939daffcc798878dc0766f7e49a5ea65ff010 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 13:57:52 +0200 Subject: [PATCH 05/15] node: replace Amp support check with config-based per-chain logging Improved Amp status logging in launcher.rs and run.rs to provide three distinct log levels based on config state: info when no chains have [amp] configuration (skip connection loop entirely), info with chain names when connections succeed, and warn when all configured chains fail to connect. Added two unit tests verifying AmpClients is_empty() drives Amp manager registration decisions. --- graph/src/components/network_provider/mod.rs | 24 ++++++++ node/src/launcher.rs | 65 ++++++++++++-------- node/src/manager/commands/run.rs | 65 ++++++++++++-------- 3 files changed, 104 insertions(+), 50 deletions(-) diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index 31cb199d11e..006c6f321f2 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -158,6 +158,30 @@ mod tests { assert!(clients.get("mainnet").is_none()); } + /// Verifies the condition that causes Amp manager registration: + /// `!amp_clients.is_empty()` is true when at least one chain has config. + #[test] + fn amp_manager_registered_when_chain_has_config() { + let mut map = HashMap::new(); + map.insert("mainnet".to_string(), Arc::new(42u32)); + let clients = AmpClients::new(map); + assert!( + !clients.is_empty(), + "Amp manager should be registered when at least one chain has config" + ); + } + + /// Verifies the condition that skips Amp manager registration: + /// `amp_clients.is_empty()` is true when no chains have config. + #[test] + fn amp_manager_not_registered_without_config() { + let clients: AmpClients = AmpClients::new(HashMap::new()); + assert!( + clients.is_empty(), + "Amp manager should not be registered when no chains have config" + ); + } + /// Simulates the error path in downstream consumers: when a subgraph /// references a chain with no Amp client, the consumer should treat /// `get()` returning `None` as an error. diff --git a/node/src/launcher.rs b/node/src/launcher.rs index a7645dfe6a7..4b74ced6a6b 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -509,36 +509,51 @@ pub async fn run( let amp_chain_configs = config .amp_chain_configs() .expect("Failed to load Amp chain configs"); - let mut clients = std::collections::HashMap::new(); - for (chain_name, amp_chain_config) in amp_chain_configs { - debug!(logger, "Connecting to Amp Flight service"; - "chain" => &chain_name, - "host" => ?amp_chain_config.address.host(), - "port" => ?amp_chain_config.address.port() + + if amp_chain_configs.is_empty() { + info!( + logger, + "Amp support disabled — no chains have [amp] configuration" ); - match amp::FlightClient::new(amp_chain_config.address.clone()).await { - Ok(mut client) => { - if let Some(token) = &_chain_config.token { - client.set_auth_token(token); + AmpClients::new(std::collections::HashMap::new()) + } else { + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in &_chain_configs { + debug!(logger, "Connecting to Amp Flight service"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() + ); + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name.clone(), Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => chain_name.as_str(), + "error" => e.to_string() + ); } - info!(logger, "Amp Flight client connected"; - "chain" => &chain_name, - "host" => ?amp_chain_config.address.host() - ); - clients.insert(chain_name, Arc::new(client)); - } - Err(e) => { - error!(logger, "Failed to connect Amp Flight client"; - "chain" => &chain_name, - "error" => e.to_string() - ); } } + if clients.is_empty() { + warn!( + logger, + "Amp-powered subgraphs disabled — all configured chains failed to connect" + ); + } else { + let chain_names: Vec<&str> = clients.keys().map(|s| s.as_str()).collect(); + info!(logger, "Amp enabled for chains: {}", chain_names.join(", ")); + } + AmpClients::new(clients) } - if clients.is_empty() { - warn!(logger, "Amp-powered subgraphs disabled"); - } - AmpClients::new(clients) }; start_graphman_server(opt.graphman_port, graphman_server_config).await; diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index bf9754e80d2..3419e3bdc0a 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -147,36 +147,51 @@ pub async fn run( let amp_chain_configs = config .amp_chain_configs() .expect("Failed to load Amp chain configs"); - let mut clients = std::collections::HashMap::new(); - for (chain_name, amp_chain_config) in amp_chain_configs { - debug!(logger, "Connecting to Amp Flight service"; - "chain" => &chain_name, - "host" => ?amp_chain_config.address.host(), - "port" => ?amp_chain_config.address.port() + + if amp_chain_configs.is_empty() { + info!( + logger, + "Amp support disabled — no chains have [amp] configuration" ); - match amp::FlightClient::new(amp_chain_config.address.clone()).await { - Ok(mut client) => { - if let Some(token) = &_chain_config.token { - client.set_auth_token(token); + AmpClients::new(std::collections::HashMap::new()) + } else { + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in &_chain_configs { + debug!(logger, "Connecting to Amp Flight service"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() + ); + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name.clone(), Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => chain_name.as_str(), + "error" => e.to_string() + ); } - info!(logger, "Amp Flight client connected"; - "chain" => &chain_name, - "host" => ?amp_chain_config.address.host() - ); - clients.insert(chain_name, Arc::new(client)); - } - Err(e) => { - error!(logger, "Failed to connect Amp Flight client"; - "chain" => &chain_name, - "error" => e.to_string() - ); } } + if clients.is_empty() { + warn!( + logger, + "Amp-powered subgraphs disabled — all configured chains failed to connect" + ); + } else { + let chain_names: Vec<&str> = clients.keys().map(|s| s.as_str()).collect(); + info!(logger, "Amp enabled for chains: {}", chain_names.join(", ")); + } + AmpClients::new(clients) } - if clients.is_empty() { - warn!(logger, "Amp-powered subgraphs disabled"); - } - AmpClients::new(clients) }; if !amp_clients.is_empty() { From 30eff6a394dcfd1ee59c561cc1d2e66064049f0a Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:05:44 +0200 Subject: [PATCH 06/15] node: thread context_dataset and context_table from config through Amp manifest resolution --- core/src/amp_subgraph/manager.rs | 14 +- core/src/subgraph/instance_manager.rs | 1 + core/src/subgraph/registrar.rs | 22 +- graph/src/amp/manifest/data_source/raw.rs | 426 ++++++++++++++---- graph/src/amp/manifest/mod.rs | 2 + graph/src/data/subgraph/mod.rs | 15 +- graph/src/data_source/mod.rs | 9 +- graph/src/data_source/subgraph.rs | 2 + node/src/launcher.rs | 15 +- node/src/manager/commands/run.rs | 10 +- server/index-node/src/resolver.rs | 2 + .../tests/chain/ethereum/manifest.rs | 9 + tests/src/fixture/mod.rs | 1 + 13 files changed, 423 insertions(+), 105 deletions(-) diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index 83688341c4e..c610ca5076c 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use alloy::primitives::BlockNumber; @@ -8,7 +9,7 @@ use graph::{ components::{ link_resolver::{LinkResolver, LinkResolverContext}, metrics::MetricsRegistry, - network_provider::AmpClients, + network_provider::{AmpChainConfig, AmpClients}, store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, @@ -33,6 +34,7 @@ pub struct Manager { subgraph_store: Arc, link_resolver: Arc, amp_clients: AmpClients, + amp_chain_configs: HashMap, } impl Manager @@ -49,6 +51,7 @@ where subgraph_store: Arc, link_resolver: Arc, amp_clients: AmpClients, + amp_chain_configs: HashMap, ) -> Self { let logger = logger_factory.component_logger("AmpSubgraphManager", None); let logger_factory = logger_factory.with_parent(logger); @@ -63,6 +66,7 @@ where subgraph_store, link_resolver, amp_clients, + amp_chain_configs, } } } @@ -147,6 +151,13 @@ where } }; + let amp_context = network_name.as_deref().and_then(|chain| { + manager + .amp_chain_configs + .get(chain) + .map(|cfg| (cfg.context_dataset.clone(), cfg.context_table.clone())) + }); + let mut manifest = amp::Manifest::resolve::( &logger, manager.link_resolver.cheap_clone(), @@ -154,6 +165,7 @@ where manager.env_vars.max_spec_version.cheap_clone(), deployment.hash.cheap_clone(), raw_manifest, + amp_context, ) .await?; diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index b1dade1d3c5..85f97aba952 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -313,6 +313,7 @@ impl SubgraphInstanceManager { &deployment.hash, &link_resolver, amp_client, + None, &logger, ENV_VARS.max_spec_version.clone(), ) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index c671885ab3a..c0ba1b81088 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -1,11 +1,11 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::{ link_resolver::LinkResolverContext, - network_provider::{AmpChainNames, AmpClients}, + network_provider::{AmpChainConfig, AmpChainNames, AmpClients}, store::{DeploymentId, DeploymentLocator, SubscriptionManager}, subgraph::Settings, }; @@ -26,6 +26,7 @@ pub struct SubgraphRegistrar { store: Arc, subscription_manager: Arc, amp_clients: AmpClients, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -48,6 +49,7 @@ where store: Arc, subscription_manager: Arc, amp_clients: AmpClients, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -67,6 +69,7 @@ where store, subscription_manager, amp_clients, + amp_chain_configs, chains, node_id, version_switching_mode, @@ -299,10 +302,17 @@ where // Extract the network name from the raw manifest and resolve the // per-chain Amp client (if any). - let amp_client = network_name_from_raw(&raw).and_then(|network| { + let resolved_amp_chain = network_name_from_raw(&raw).map(|network| { let resolved = self.amp_chain_names.resolve(&Word::from(network)); - self.amp_clients.get(resolved.as_str()) + resolved.to_string() }); + let amp_client = resolved_amp_chain + .as_deref() + .and_then(|chain| self.amp_clients.get(chain)); + let amp_context = resolved_amp_chain + .as_deref() + .and_then(|chain| self.amp_chain_configs.get(chain)) + .map(|cfg| (cfg.context_dataset.clone(), cfg.context_table.clone())); // Give priority to deployment specific history_blocks value. let history_blocks = @@ -324,6 +334,7 @@ where self.version_switching_mode, &resolver, amp_client.cheap_clone(), + amp_context.clone(), history_blocks, &self.amp_chain_names, ) @@ -344,6 +355,7 @@ where self.version_switching_mode, &resolver, amp_client, + amp_context, history_blocks, &self.amp_chain_names, ) @@ -484,6 +496,7 @@ async fn create_subgraph_version, amp_client: Option>, + amp_context: Option<(String, String)>, history_blocks_override: Option, amp_chain_names: &AmpChainNames, ) -> Result { @@ -494,6 +507,7 @@ async fn create_subgraph_version, + amp_context: Option<(String, String)>, ) -> Result { let Self { name, @@ -87,7 +88,14 @@ impl RawDataSource { .map_err(|e| e.source_context("invalid `source`"))?; let transformer = transformer - .resolve(&logger, link_resolver, amp_client, input_schema, &source) + .resolve( + &logger, + link_resolver, + amp_client, + input_schema, + &source, + amp_context, + ) .await .map_err(|e| e.source_context("invalid `transformer`"))?; @@ -233,6 +241,7 @@ impl RawTransformer { amp_client: &impl amp::Client, input_schema: Option<&InputSchema>, source: &Source, + amp_context: Option<(String, String)>, ) -> Result { let Self { api_version, @@ -250,6 +259,7 @@ impl RawTransformer { tables, source, &abis, + amp_context, ) .await?; @@ -307,6 +317,7 @@ impl RawTransformer { tables: Vec, source: &Source, abis: &[Abi], + amp_context: Option<(String, String)>, ) -> Result, Error> { const MAX_TABLES: usize = 100; @@ -320,23 +331,27 @@ impl RawTransformer { ))); } - let table_futs = tables.into_iter().enumerate().map(|(i, table)| async move { - let logger = logger.new(slog::o!("table_name" => table.name.clone())); - debug!(logger, "Resolving table"; - "file" => ?&table.file - ); - - table - .resolve( - &logger, - link_resolver, - amp_client, - input_schema, - source, - abis, - ) - .await - .map_err(|e| e.source_context(format!("invalid `tables` at index {i}"))) + let table_futs = tables.into_iter().enumerate().map(|(i, table)| { + let amp_context = amp_context.clone(); + async move { + let logger = logger.new(slog::o!("table_name" => table.name.clone())); + debug!(logger, "Resolving table"; + "file" => ?&table.file + ); + + table + .resolve( + &logger, + link_resolver, + amp_client, + input_schema, + source, + abis, + amp_context, + ) + .await + .map_err(|e| e.source_context(format!("invalid `tables` at index {i}"))) + } }); try_join_all(table_futs).await @@ -429,6 +444,7 @@ impl RawTable { input_schema: Option<&InputSchema>, source: &Source, abis: &[Abi], + amp_context: Option<(String, String)>, ) -> Result { let Self { name, query, file } = self; @@ -454,9 +470,9 @@ impl RawTable { logger, amp_client, input_schema, - source, query, schema.clone(), + amp_context, ) .await?; @@ -551,9 +567,9 @@ impl RawTable { logger: &Logger, amp_client: &impl amp::Client, input_schema: Option<&InputSchema>, - source: &Source, query: ValidQuery, schema: Schema, + amp_context: Option<(String, String)>, ) -> Result { debug!(logger, "Resolving block range query builder"); @@ -571,79 +587,52 @@ impl RawTable { return Ok(BlockRangeQueryBuilder::new(query, block_number_column)); } - debug!(logger, "Resolving context query"); - let mut context_query: Option = None; - - // TODO: Context is embedded in the original query using INNER JOIN to ensure availability for every output row. - // This requires all source tables to match or exceed the expected query output size. - let context_sources_iter = source - .tables - .iter() - .map(|table| (source.dataset.as_str(), table.as_str())); - - for (dataset, table) in context_sources_iter { - let context_logger = logger.new(slog::o!( - "context_dataset" => dataset.to_string(), - "context_table" => table.to_string() - )); - debug!(context_logger, "Loading context schema"); - let schema_query = format!("SELECT * FROM {dataset}.{table}"); - let schema = match Self::resolve_schema(logger, amp_client, schema_query).await { - Ok(schema) => schema, - Err(e) => { - error!(context_logger, "Failed to load context schema"; - "e" => ?e - ); - continue; - } - }; - - let record_batch = RecordBatch::new_empty(schema.clone().into()); - let mut columns = Vec::new(); - - if need_block_hash_column { - let Ok((block_hash_column, _)) = auto_block_hash_decoder(&record_batch) else { - debug!( - context_logger, - "Context schema does not contain block hash column, skipping" - ); - continue; - }; - - columns.push(block_hash_column); - } - - if need_block_timestamp_column { - let Ok((block_timestamp_column, _)) = auto_block_timestamp_decoder(&record_batch) - else { - debug!( - context_logger, - "Context schema does not contain block timestamp column, skipping" - ); - continue; - }; - - columns.push(block_timestamp_column); - } - - debug!(context_logger, "Creating context query"); - context_query = Some(ContextQuery::new( - query, - block_number_column, - dataset, - table, - columns, - )); - break; + let (context_dataset, context_table) = amp_context.ok_or_else(|| { + Error::InvalidQuery(anyhow!( + "query requires context columns (block hash/timestamp) but no Amp context config is available" + )) + })?; + + debug!(logger, "Resolving context query"; + "context_dataset" => &context_dataset, + "context_table" => &context_table + ); + + let schema_query = format!("SELECT * FROM {context_dataset}.{context_table}"); + let context_schema = Self::resolve_schema(logger, amp_client, schema_query).await?; + + let context_record_batch = RecordBatch::new_empty(context_schema.into()); + let mut columns = Vec::new(); + + if need_block_hash_column { + let (block_hash_column, _) = + auto_block_hash_decoder(&context_record_batch).map_err(|_| { + Error::InvalidQuery(anyhow!( + "context table '{context_dataset}.{context_table}' does not contain a block hash column" + )) + })?; + columns.push(block_hash_column); } - if let Some(context_query) = context_query { - return Ok(BlockRangeQueryBuilder::new_with_context(context_query)); + if need_block_timestamp_column { + let (block_timestamp_column, _) = + auto_block_timestamp_decoder(&context_record_batch).map_err(|_| { + Error::InvalidQuery(anyhow!( + "context table '{context_dataset}.{context_table}' does not contain a block timestamp column" + )) + })?; + columns.push(block_timestamp_column); } - Err(Error::InvalidQuery(anyhow!( - "query is required to output block numbers, block hashes and block timestamps" - ))) + let context_query = ContextQuery::new( + query, + block_number_column, + &context_dataset, + &context_table, + columns, + ); + + Ok(BlockRangeQueryBuilder::new_with_context(context_query)) } } @@ -711,3 +700,260 @@ fn normalize_sql_ident(s: &str) -> String { Err(_e) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(), } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::amp::error::IsDeterministic; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use futures03::future::BoxFuture; + use std::collections::HashMap; + use std::sync::Mutex; + + #[derive(Debug, thiserror::Error)] + #[error("mock error: schema not found for query")] + struct MockError; + + impl IsDeterministic for MockError { + fn is_deterministic(&self) -> bool { + true + } + } + + /// A mock Amp client that returns pre-configured schemas keyed by query string. + struct MockAmpClient { + schemas: Mutex>, + } + + impl MockAmpClient { + fn new() -> Self { + Self { + schemas: Mutex::new(HashMap::new()), + } + } + + fn add_schema(&self, query: &str, schema: Schema) { + self.schemas + .lock() + .unwrap() + .insert(query.to_string(), schema); + } + } + + impl amp::Client for MockAmpClient { + type Error = MockError; + + fn schema( + &self, + _logger: &slog::Logger, + query: impl ToString, + ) -> BoxFuture<'static, Result> { + let query_str = query.to_string(); + let schema = self.schemas.lock().unwrap().get(&query_str).cloned(); + Box::pin(async move { schema.ok_or(MockError) }) + } + + fn query( + &self, + _logger: &slog::Logger, + _query: impl ToString, + _request_metadata: Option, + ) -> futures03::stream::BoxStream<'static, Result> + { + Box::pin(futures03::stream::empty()) + } + } + + fn test_logger() -> slog::Logger { + slog::Logger::root(slog::Discard, slog::o!()) + } + + fn schema_without_hash() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("value", DataType::Utf8, true), + ]) + } + + fn schema_with_all_columns() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("block_hash", DataType::FixedSizeBinary(32), false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::Utf8, true), + ]) + } + + fn context_schema_with_hash() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("block_hash", DataType::FixedSizeBinary(32), false), + ]) + } + + fn make_valid_query(sql: &str) -> ValidQuery { + ValidQuery::new( + sql, + "my_dataset", + ["my_table"].iter().copied(), + &alloy::primitives::Address::ZERO, + std::iter::empty::<(&str, &alloy::json_abi::JsonAbi)>(), + ) + .unwrap() + } + + /// When a query lacks block hash/timestamp columns, the context CTE uses + /// context_dataset and context_table from config. + #[tokio::test] + async fn context_query_uses_config() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + client.add_schema( + "SELECT * FROM ctx_dataset.ctx_blocks", + context_schema_with_hash(), + ); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_without_hash(), + Some(("ctx_dataset".to_string(), "ctx_blocks".to_string())), + ) + .await; + + assert!( + result.is_ok(), + "Expected success when config provides context; got: {:?}", + result.err() + ); + } + + /// The old source.tables iteration is replaced — config values are the sole + /// source of context dataset and table. When no config is provided and context + /// columns are needed, resolution fails. + #[tokio::test] + async fn context_query_always_has_config() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_without_hash(), + None, + ) + .await; + + assert!( + result.is_err(), + "Expected error when amp_context is None and context columns are needed" + ); + let err_msg = format!("{:#}", result.unwrap_err()); + assert!( + err_msg.contains("no Amp context config"), + "Error should mention missing config; got: {err_msg}" + ); + } + + /// When the query already includes block hash and timestamp columns, + /// context config is not needed and resolution succeeds. + #[tokio::test] + async fn context_query_not_needed_when_columns_present() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, block_hash, timestamp, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_with_all_columns()); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_with_all_columns(), + None, + ) + .await; + + assert!( + result.is_ok(), + "Expected success when all columns present; got: {:?}", + result.err() + ); + } + + /// AmpChainConfig context fields are threaded through the full resolution + /// chain (RawDataSource → RawTransformer → RawTable). + #[tokio::test] + async fn context_config_threaded_through_resolution() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + client.add_schema( + "SELECT * FROM ctx_dataset.ctx_blocks", + context_schema_with_hash(), + ); + + let link_resolver = crate::components::link_resolver::FileLinkResolver::default(); + + let raw_data_source = RawDataSource { + name: "test_ds".to_string(), + kind: "amp".to_string(), + network: "mainnet".to_string(), + source: RawSource { + dataset: "my_dataset".to_string(), + tables: vec!["my_table".to_string()], + address: None, + start_block: None, + end_block: None, + }, + transformer: RawTransformer { + api_version: semver::Version::new(0, 0, 1), + abis: None, + tables: vec![RawTable { + name: "TestEntity".to_string(), + query: Some(main_query.to_string()), + file: None, + }], + }, + }; + + let result = raw_data_source + .resolve( + &logger, + &link_resolver, + &client, + None, + Some(("ctx_dataset".to_string(), "ctx_blocks".to_string())), + ) + .await; + + assert!( + result.is_ok(), + "Expected successful resolution with threaded context config; got: {:?}", + result.err() + ); + } +} diff --git a/graph/src/amp/manifest/mod.rs b/graph/src/amp/manifest/mod.rs index 028d567332c..87f39bb1d0d 100644 --- a/graph/src/amp/manifest/mod.rs +++ b/graph/src/amp/manifest/mod.rs @@ -44,6 +44,7 @@ impl Manifest { max_spec_version: Version, deployment: DeploymentHash, raw_manifest: serde_yaml::Mapping, + amp_context: Option<(String, String)>, ) -> Result { let unresolved_manifest = UnresolvedSubgraphManifest::::parse(deployment.cheap_clone(), raw_manifest) @@ -54,6 +55,7 @@ impl Manifest { &deployment, &link_resolver, Some(amp_client), + amp_context, logger, max_spec_version, ) diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 46154155319..ee652c3088e 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -822,6 +822,7 @@ impl UnvalidatedSubgraphManifest { raw: serde_yaml::Mapping, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result { @@ -831,6 +832,7 @@ impl UnvalidatedSubgraphManifest { raw, resolver, amp_client, + amp_context, logger, max_spec_version, ) @@ -971,12 +973,20 @@ impl SubgraphManifest { raw: serde_yaml::Mapping, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result { let unresolved = UnresolvedSubgraphManifest::parse(id.cheap_clone(), raw)?; let resolved = unresolved - .resolve(&id, resolver, amp_client, logger, max_spec_version) + .resolve( + &id, + resolver, + amp_client, + amp_context, + logger, + max_spec_version, + ) .await?; Ok(resolved) } @@ -1114,6 +1124,7 @@ impl UnresolvedSubgraphManifest { deployment_hash: &DeploymentHash, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result, SubgraphManifestResolveError> { @@ -1166,10 +1177,12 @@ impl UnresolvedSubgraphManifest { }; let data_sources = try_join_all(data_sources.into_iter().enumerate().map(|(idx, ds)| { + let amp_context = amp_context.clone(); ds.resolve( deployment_hash, resolver, amp_client.cheap_clone(), + amp_context, logger, idx as u32, &spec_version, diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index cd34ca62857..f488e882313 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -360,6 +360,7 @@ impl UnresolvedDataSource { deployment_hash: &DeploymentHash, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, manifest_idx: u32, spec_version: &semver::Version, @@ -395,7 +396,13 @@ impl UnresolvedDataSource { } Self::Amp(raw_data_source) => match amp_client { Some(amp_client) => raw_data_source - .resolve(logger, resolver.as_ref(), amp_client.as_ref(), input_schema) + .resolve( + logger, + resolver.as_ref(), + amp_client.as_ref(), + input_schema, + amp_context, + ) .await .map(DataSource::Amp) .map_err(Error::from), diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 0207aee4df3..38766d7d977 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -331,6 +331,7 @@ impl UnresolvedDataSource { &deployment_hash, &resolver, amp_client, + None, logger, LATEST_VERSION.clone(), ) @@ -383,6 +384,7 @@ impl UnresolvedDataSource { &manifest.id, resolver, amp_client.cheap_clone(), + None, logger, LATEST_VERSION.clone(), ) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4b74ced6a6b..2d28049f936 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -276,6 +276,10 @@ fn build_subgraph_registrar( arweave_service: ArweaveService, ipfs_service: IpfsService, amp_clients: AmpClients, + amp_chain_configs: std::collections::HashMap< + String, + graph::components::network_provider::AmpChainConfig, + >, cancel_token: CancellationToken, amp_chain_names: Arc, ) -> Arc< @@ -304,6 +308,7 @@ where network_store.subgraph_store(), link_resolver.cheap_clone(), amp_clients.clone(), + amp_chain_configs.clone(), ); subgraph_instance_managers.add( @@ -352,6 +357,7 @@ where network_store.subgraph_store(), subscription_manager, amp_clients, + amp_chain_configs, blockchain_map, node_id.clone(), version_switching_mode, @@ -505,11 +511,11 @@ pub async fn run( &logger_factory, ); - let amp_clients = { - let amp_chain_configs = config - .amp_chain_configs() - .expect("Failed to load Amp chain configs"); + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); + let amp_clients = { if amp_chain_configs.is_empty() { info!( logger, @@ -618,6 +624,7 @@ pub async fn run( arweave_service, ipfs_service, amp_clients, + amp_chain_configs, cancel_token, amp_chain_names, ); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 3419e3bdc0a..08b884ff476 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -143,11 +143,11 @@ pub async fn run( let mut subgraph_instance_managers = graph_core::subgraph_provider::SubgraphInstanceManagers::new(); - let amp_clients = { - let amp_chain_configs = config - .amp_chain_configs() - .expect("Failed to load Amp chain configs"); + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); + let amp_clients = { if amp_chain_configs.is_empty() { info!( logger, @@ -203,6 +203,7 @@ pub async fn run( network_store.subgraph_store(), link_resolver.cheap_clone(), amp_clients.clone(), + amp_chain_configs.clone(), ); subgraph_instance_managers.add( @@ -249,6 +250,7 @@ pub async fn run( subgraph_store.clone(), panicking_subscription_manager, amp_clients, + amp_chain_configs, blockchain_map, node_id.clone(), SubgraphVersionSwitchingMode::Instant, diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 8250dbda561..dc94fb49ac8 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -542,6 +542,7 @@ where raw_yaml, &self.link_resolver, amp_client.cheap_clone(), + None, &self.logger, max_spec_version, ) @@ -560,6 +561,7 @@ where raw_yaml, &self.link_resolver, amp_client, + None, &self.logger, max_spec_version, ) diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index a201a9c233e..63b9a36726c 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -145,6 +145,7 @@ async fn try_resolve_manifest( raw, &resolver, Option::>::None, + None, &LOGGER, max_spec_version, ) @@ -175,6 +176,7 @@ async fn resolve_unvalidated(text: &str) -> UnvalidatedSubgraphManifest { raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1330,6 +1332,7 @@ schema: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1373,6 +1376,7 @@ schema: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1450,6 +1454,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1529,6 +1534,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1619,6 +1625,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_2_0.clone(), ) @@ -1693,6 +1700,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_3_0.clone(), ) @@ -1844,6 +1852,7 @@ specVersion: 1.3.0 raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_3_0.clone(), ) diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 72aba3ad994..cdf09772365 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -632,6 +632,7 @@ pub async fn setup_inner( subgraph_store.clone(), panicking_subscription_manager, graph::components::network_provider::AmpClients::::default(), + std::collections::HashMap::new(), blockchain_map.clone(), node_id.clone(), SubgraphVersionSwitchingMode::Instant, From 69ac1a087e42d514a6732b0063b78282049bcb58 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:11:29 +0200 Subject: [PATCH 07/15] node: Amp-based block pointer resolution for start_block Add resolve_amp_start_block function that queries the Amp Flight service for a block hash at a given block number, using auto_block_hash_decoder to detect the block hash column from the returned RecordBatch. In create_subgraph_version, detect Amp subgraphs and use this new path instead of RPC-based block_pointer_from_number when start_block > 0, falling back to RPC on failure. --- core/src/subgraph/registrar.rs | 330 ++++++++++++++++++++++++++++++++- 1 file changed, 328 insertions(+), 2 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index c0ba1b81088..67e991a390d 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -13,7 +13,7 @@ use graph::data::{ subgraph::{schema::DeploymentCreate, Graft}, value::Word, }; -use graph::futures03::{self, future::TryFutureExt, Stream, StreamExt}; +use graph::futures03::{self, future::TryFutureExt, Stream, StreamExt, TryStreamExt}; use graph::prelude::{CreateSubgraphResult, SubgraphRegistrar as SubgraphRegistrarTrait, *}; use graph::util::futures::{retry_strategy, RETRY_DEFAULT_LIMIT}; use tokio_retry::Retry; @@ -420,6 +420,54 @@ where } } +/// Resolves a block pointer for an Amp subgraph by querying the Amp Flight +/// service for the block hash at the given `block_number`. +async fn resolve_amp_start_block( + amp_client: &AC, + logger: &Logger, + context_dataset: &str, + context_table: &str, + block_number: BlockNumber, +) -> Result { + let sql = format!( + "SELECT * FROM {}.{} WHERE _block_num = {}", + context_dataset, context_table, block_number + ); + + let mut stream = amp_client.query(logger, &sql, None); + + // Find the first Batch response, skipping any Reorg variants. + let data = loop { + match stream.try_next().await? { + Some(amp::client::ResponseBatch::Batch { data }) => break data, + Some(amp::client::ResponseBatch::Reorg(_)) => continue, + None => { + return Err(anyhow!( + "Amp query returned no batches for block {}", + block_number + )); + } + } + }; + + if data.num_rows() == 0 { + return Err(anyhow!( + "Amp query returned empty batch for block {}", + block_number + )); + } + + let (_col_name, decoder) = graph::amp::codec::utils::auto_block_hash_decoder(&data)?; + let hash = decoder.decode(0)?.ok_or_else(|| { + anyhow!( + "Amp query returned null block hash for block {}", + block_number + ) + })?; + + Ok(BlockPtr::new(hash.into(), block_number)) +} + /// Resolves the subgraph's earliest block async fn resolve_start_block( manifest: &SubgraphManifest, @@ -502,6 +550,10 @@ async fn create_subgraph_version Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); + // Keep copies for Amp start block resolution after the manifest is resolved. + let amp_client_for_start_block = amp_client.clone(); + let amp_context_for_start_block = amp_context.clone(); + let unvalidated = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, @@ -550,7 +602,40 @@ async fn create_subgraph_version Some(block), - None => resolve_start_block(&manifest, &*chain, &logger).await?, + None => { + let min_start_block = + manifest.start_blocks().into_iter().min().expect( + "cannot identify minimum start block because there are no data sources", + ); + + match ( + min_start_block, + &_client_for_start_block, + &_context_for_start_block, + ) { + // Genesis block — no resolution needed. + (0, _, _) => None, + // Amp subgraph with start_block > 0 — try Amp-based resolution. + (min, Some(client), Some((dataset, table))) => { + match resolve_amp_start_block(client.as_ref(), &logger, dataset, table, min - 1) + .await + { + Ok(ptr) => Some(ptr), + Err(e) => { + warn!( + logger, + "Amp block pointer resolution failed, falling back to RPC"; + "error" => e.to_string(), + "block_number" => min - 1 + ); + resolve_start_block(&manifest, &*chain, &logger).await? + } + } + } + // Non-Amp subgraph — use RPC. + _ => resolve_start_block(&manifest, &*chain, &logger).await?, + } + } }; let base_block = match &manifest.graft { @@ -617,3 +702,244 @@ async fn create_subgraph_version) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } + + impl std::error::Error for MockAmpError {} + + impl IsDeterministic for MockAmpError { + fn is_deterministic(&self) -> bool { + false + } + } + + #[derive(Clone)] + struct MockAmpClient { + /// Recorded queries (for assertion). + recorded_queries: Arc>>, + /// Batches to return from `query()`. If `None`, the stream returns an error. + response: Arc, String>>, + } + + impl MockAmpClient { + fn new_ok(batches: Vec) -> Self { + Self { + recorded_queries: Arc::new(Mutex::new(Vec::new())), + response: Arc::new(Ok(batches)), + } + } + + fn new_err(msg: &str) -> Self { + Self { + recorded_queries: Arc::new(Mutex::new(Vec::new())), + response: Arc::new(Err(msg.to_string())), + } + } + + fn recorded_queries(&self) -> Vec { + self.recorded_queries.lock().unwrap().clone() + } + } + + impl amp::Client for MockAmpClient { + type Error = MockAmpError; + + fn schema( + &self, + _logger: &Logger, + _query: impl ToString, + ) -> BoxFuture<'static, Result> { + unimplemented!("schema not needed in tests") + } + + fn query( + &self, + _logger: &Logger, + query: impl ToString, + _request_metadata: Option, + ) -> BoxStream<'static, Result> { + let query_str = query.to_string(); + self.recorded_queries.lock().unwrap().push(query_str); + + let response = self.response.clone(); + Box::pin(graph::futures03::stream::iter(match response.as_ref() { + Ok(batches) => batches.iter().cloned().map(Ok).collect::>(), + Err(msg) => vec![Err(MockAmpError(msg.clone()))], + })) + } + } + + // -- Test helpers ----------------------------------------------------- + + /// Creates a RecordBatch with a single "block_hash" column containing one + /// 32-byte FixedSizeBinary value. + fn make_block_hash_batch(hash: AllocBlockHash) -> RecordBatch { + let schema = Schema::new(vec![Field::new( + "block_hash", + DataType::FixedSizeBinary(32), + false, + )]); + let values: Vec<&[u8]> = vec![hash.as_slice()]; + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new( + FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap(), + )], + ) + .unwrap() + } + + // -- Tests ------------------------------------------------------------ + + #[tokio::test] + async fn resolve_amp_start_block() { + let alloy_hash = AllocBlockHash::from([0xABu8; 32]); + let expected_hash: BlockHash = alloy_hash.into(); + let batch = make_block_hash_batch(alloy_hash); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: batch }]); + let logger = Logger::root(slog::Discard, o!()); + + let result = + super::resolve_amp_start_block(&client, &logger, "my_dataset", "blocks", 99).await; + + let ptr = result.expect("should succeed"); + assert_eq!(ptr.hash, expected_hash); + assert_eq!(ptr.number, 99); + + // Verify the SQL query. + let queries = client.recorded_queries(); + assert_eq!(queries.len(), 1); + assert_eq!( + queries[0], + "SELECT * FROM my_dataset.blocks WHERE _block_num = 99" + ); + } + + #[tokio::test] + async fn amp_subgraph_start_block_uses_amp_resolution() { + // When an Amp client + context are available and min_start_block > 0, + // the Amp path should be used and produce the correct BlockPtr. + let alloy_hash = AllocBlockHash::from([0xCDu8; 32]); + let expected_hash: BlockHash = alloy_hash.into(); + let batch = make_block_hash_batch(alloy_hash); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: batch }]); + let logger = Logger::root(slog::Discard, o!()); + + // Simulate min_start_block = 100 → query block 99 + let block_number = 100 - 1; + let result = + super::resolve_amp_start_block(&client, &logger, "eth_mainnet", "blocks", block_number) + .await; + + let ptr = result.expect("should succeed"); + assert_eq!(ptr.hash, expected_hash); + assert_eq!(ptr.number, block_number); + + let queries = client.recorded_queries(); + assert_eq!( + queries[0], + "SELECT * FROM eth_mainnet.blocks WHERE _block_num = 99" + ); + } + + #[tokio::test] + async fn amp_start_block_falls_back_to_rpc() { + // When the Amp query fails, resolve_amp_start_block returns an error. + let client = MockAmpClient::new_err("network error"); + let logger = Logger::root(slog::Discard, o!()); + + let result = + super::resolve_amp_start_block(&client, &logger, "my_dataset", "blocks", 99).await; + + assert!( + result.is_err(), + "should return an error so caller falls back to RPC" + ); + assert!( + result.unwrap_err().to_string().contains("network error"), + "error should contain the original cause" + ); + } + + #[tokio::test] + async fn amp_start_block_zero_returns_none() { + // start_block = 0 means genesis — the caller should not invoke + // resolve_amp_start_block at all. We verify the matching logic inline: + // when min_start_block == 0, the result is None. + let min_start_block: i32 = 0; + let mock_client = Arc::new(MockAmpClient::new_ok(vec![])); + let amp_client: Option> = Some(mock_client.clone()); + let amp_context: Option<(String, String)> = Some(("ds".to_string(), "blocks".to_string())); + + let result: Option = match (min_start_block, &_client, &_context) { + (0, _, _) => None, + _ => panic!("should not reach non-zero path"), + }; + + assert!( + result.is_none(), + "start_block=0 should produce None (genesis)" + ); + + // Verify the client was never called. + let queries = mock_client.recorded_queries(); + assert!( + queries.is_empty(), + "Amp client should not be queried for genesis" + ); + } + + #[tokio::test] + async fn resolve_amp_start_block_no_batches() { + // If the Amp query returns no batches at all, it should error. + let client = MockAmpClient::new_ok(vec![]); + let logger = Logger::root(slog::Discard, o!()); + + let result = super::resolve_amp_start_block(&client, &logger, "ds", "tbl", 50).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("no batches")); + } + + #[tokio::test] + async fn resolve_amp_start_block_empty_batch() { + // If the Amp query returns an empty batch (0 rows), it should error. + let schema = Schema::new(vec![Field::new( + "block_hash", + DataType::FixedSizeBinary(32), + false, + )]); + let empty_batch = RecordBatch::new_empty(Arc::new(schema)); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: empty_batch }]); + let logger = Logger::root(slog::Discard, o!()); + + let result = super::resolve_amp_start_block(&client, &logger, "ds", "tbl", 50).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("empty batch")); + } +} From c9f606310e5360f8a11086eff7b6fd23268285c6 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:13:33 +0200 Subject: [PATCH 08/15] node: remove global Amp Flight ENV var and CLI flag Remove the GRAPH_AMP_FLIGHT_SERVICE_ADDRESS CLI flag from Opt structs in opt.rs and manager.rs, remove GRAPH_AMP_FLIGHT_SERVICE_TOKEN env var from AmpEnv and Inner envconfig struct. These are now fully superseded by per-chain TOML config. Add unit test verifying AmpEnv constructs correctly without the removed fields. --- graph/src/env/amp.rs | 36 +++++++++++++++++++++++++----------- graph/src/env/mod.rs | 2 -- node/src/bin/manager.rs | 8 -------- node/src/opt.rs | 8 -------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/graph/src/env/amp.rs b/graph/src/env/amp.rs index a6a02b194c3..e8cdd59ba7c 100644 --- a/graph/src/env/amp.rs +++ b/graph/src/env/amp.rs @@ -24,11 +24,6 @@ pub struct AmpEnv { /// /// Defaults to `600` seconds. pub query_retry_max_delay: Duration, - - /// Token used to authenticate Amp Flight gRPC service requests. - /// - /// Defaults to `None`. - pub flight_service_token: Option, } impl AmpEnv { @@ -65,12 +60,31 @@ impl AmpEnv { .amp_query_retry_max_delay_seconds .map(Duration::from_secs) .unwrap_or(Self::DEFAULT_QUERY_RETRY_MAX_DELAY), - flight_service_token: raw_env.amp_flight_service_token.as_ref().and_then(|value| { - if value.is_empty() { - return None; - } - Some(value.to_string()) - }), } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::env::ENV_VARS; + + #[test] + fn amp_env_constructs_without_flight_fields() { + // Verify that AmpEnv constructs correctly with only its remaining fields + // (the Flight service token field has been removed). The ENV_VARS static + // is constructed at process start; if AmpEnv still had that field, this + // access would fail to compile. + let amp = &ENV_VARS.amp; + assert_eq!(amp.max_buffer_size, AmpEnv::DEFAULT_MAX_BUFFER_SIZE); + assert_eq!(amp.max_block_range, AmpEnv::DEFAULT_MAX_BLOCK_RANGE); + assert_eq!( + amp.query_retry_min_delay, + AmpEnv::DEFAULT_QUERY_RETRY_MIN_DELAY + ); + assert_eq!( + amp.query_retry_max_delay, + AmpEnv::DEFAULT_QUERY_RETRY_MAX_DELAY + ); + } +} diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 23a6eaff579..cc7425764f6 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -616,8 +616,6 @@ struct Inner { amp_query_retry_min_delay_seconds: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS")] amp_query_retry_max_delay_seconds: Option, - #[envconfig(from = "GRAPH_AMP_FLIGHT_SERVICE_TOKEN")] - amp_flight_service_token: Option, } #[derive(Clone, Debug)] diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 20e1a8fd5c1..731e234b6a9 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -106,14 +106,6 @@ pub struct Opt { #[clap(long, help = "version label, used for prometheus metrics")] pub version_label: Option, - #[clap( - long, - value_name = "{HOST:PORT|URL}", - env = "GRAPH_AMP_FLIGHT_SERVICE_ADDRESS", - help = "The address of the Amp Flight gRPC service" - )] - pub amp_flight_service_address: Option, - #[clap(subcommand)] pub cmd: Command, } diff --git a/node/src/opt.rs b/node/src/opt.rs index 9372d4f1472..9441921ceae 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -230,14 +230,6 @@ pub struct Opt { help = "Port for the graphman GraphQL server" )] pub graphman_port: u16, - - #[clap( - long, - value_name = "{HOST:PORT|URL}", - env = "GRAPH_AMP_FLIGHT_SERVICE_ADDRESS", - help = "The address of the Amp Flight gRPC service" - )] - pub amp_flight_service_address: Option, } impl From for config::Opt { From c990bd272d22a2d547a8adbc79eeaadc45799980 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:18:17 +0200 Subject: [PATCH 09/15] docs: document per-chain Amp configuration in docs/config.md --- docs/config.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/docs/config.md b/docs/config.md index bcf15fc2c56..8245529c26a 100644 --- a/docs/config.md +++ b/docs/config.md @@ -114,9 +114,8 @@ The configuration for a chain `name` is specified in the section - `protocol`: the protocol type being indexed, default `ethereum` (alternatively `near`, `cosmos`,`arweave`,`starknet`) - `polling_interval`: the polling interval for the block ingestor (default 500ms) -- `amp`: the network name used by AMP for this chain; defaults to the chain name. - Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"` - on a chain named `mainnet`). +- `amp`: a TOML table configuring Amp Flight service for this chain. See + [Amp Configuration](#amp-configuration) below for details. - `provider`: a list of providers for that chain A `provider` is an object with the following characteristics: @@ -167,11 +166,17 @@ optimisations. ingestor = "block_ingestor_node" [chains.mainnet] shard = "vip" -amp = "ethereum-mainnet" provider = [ { label = "mainnet1", url = "http://..", features = [], headers = { Authorization = "Bearer foo" } }, { label = "mainnet2", url = "http://..", features = [ "archive", "traces" ] } ] +[chains.mainnet.amp] +address = "http://amp-flight.example.com:50051" +token = "my-secret-token" +context_dataset = "ethereum" +context_table = "blocks" +network = "ethereum-mainnet" + [chains.sepolia] shard = "primary" provider = [ { label = "sepolia", url = "http://..", features = [] } ] @@ -182,6 +187,51 @@ protocol = "near" provider = [ { label = "near", details = { type = "firehose", url = "https://..", key = "", features = ["compression", "filters"] } } ] ``` +### Amp Configuration + +Amp data sources use a Flight service to query the blockchain data. +Amp is configured per-chain using a TOML table under `[chains..amp]`. +When the `amp` section is absent for a chain, Amp is disabled for that chain. + +The `[chains..amp]` table supports the following fields: + +- `address` (String, **required**): The Amp Flight service endpoint + (e.g., `"http://amp-flight.example.com:50051"`). +- `token` (String, optional): Authentication token for the Amp Flight service. +- `context_dataset` (String, **required**): The dataset in the Amp Flight + service that contains the context table. This identifies the logical + grouping (dataset) within the Flight service where the block-level context + table resides. +- `context_table` (String, **required**): The table within the context dataset + that provides block-level context (block hash, block number, and timestamp). + This should typically point to the blocks table (not transactions or logs), + since it reliably contains one row per block with the block hash. +- `network` (String, optional): The Amp network name when it differs from + the graph-node chain name (e.g., `"ethereum-mainnet"` on a chain named + `mainnet`). Defaults to the chain name if omitted. + +Example: + +```toml +[chains.mainnet.amp] +address = "http://amp-flight.example.com:50051" +token = "my-secret-token" +context_dataset = "ethereum" +context_table = "blocks" +network = "ethereum-mainnet" +``` + +#### Removed ENV/CLI flags + +The following environment variables and CLI flags have been removed in favor +of per-chain TOML configuration: + +- `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` environment variable / + `--amp-flight-service-address` CLI flag — use `address` in + `[chains..amp]` instead. +- `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` environment variable — use `token` in + `[chains..amp]` instead. + ### Controlling the number of subgraphs using a provider **This feature is experimental and might be removed in a future release** From bb632f764cf2dc7c6ec1072eb995028ad7365271 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:19:36 +0200 Subject: [PATCH 10/15] docs: add tests for Amp config fixture parsing Verify that full_config.toml parses successfully with the [chains.mainnet.amp] table form and that all five Amp field values (address, token, context_dataset, context_table, network) are correctly deserialized. --- node/src/config.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/node/src/config.rs b/node/src/config.rs index 9c04cee5b6a..1c17c8fbdaf 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2490,4 +2490,42 @@ fdw_pool_size = [ "expected chain name in error, got: {msg}" ); } + + #[test] + fn parse_full_config() { + let content = read_resource_as_string("full_config.toml"); + let actual: Config = toml::from_str(&content).unwrap(); + + let mainnet = actual + .chains + .chains + .get("mainnet") + .expect("mainnet chain should exist"); + assert!( + mainnet.amp.is_some(), + "mainnet should have a non-None amp field" + ); + } + + #[test] + fn parse_full_config_amp_values() { + let content = read_resource_as_string("full_config.toml"); + let actual: Config = toml::from_str(&content).unwrap(); + + let mainnet = actual + .chains + .chains + .get("mainnet") + .expect("mainnet chain should exist"); + let amp = mainnet + .amp + .as_ref() + .expect("mainnet should have amp config"); + + assert_eq!(amp.address, "http://localhost:50051"); + assert_eq!(amp.token, Some("secret-token".to_string())); + assert_eq!(amp.context_dataset, "eth"); + assert_eq!(amp.context_table, "blocks"); + assert_eq!(amp.network, Some("ethereum-mainnet".to_string())); + } } From c184644ae1efa402bac65544165383763e6b5f63 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:22:14 +0200 Subject: [PATCH 11/15] core, graph, server: extract shared network_name_from_raw_manifest function Consolidates the duplicated YAML traversal pattern (dataSources[0].network) into a single pub fn in graph::data::subgraph, replacing four identical copies across registrar, instance_manager, amp_subgraph/manager, and index-node resolver. --- core/src/amp_subgraph/manager.rs | 10 +---- core/src/subgraph/instance_manager.rs | 13 +++--- core/src/subgraph/registrar.rs | 16 +------- graph/src/data/subgraph/mod.rs | 59 +++++++++++++++++++++++++++ server/index-node/src/resolver.rs | 11 ++--- 5 files changed, 71 insertions(+), 38 deletions(-) diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index c610ca5076c..e86640fef2e 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -13,6 +13,7 @@ use graph::{ store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, + data::subgraph::network_name_from_raw_manifest, env::EnvVars, log::factory::LoggerFactory, prelude::CheapClone, @@ -122,14 +123,7 @@ where // Extract the network name from the raw manifest to look // up the per-chain Amp client. - let network_name = raw_manifest - .get(serde_yaml::Value::String("dataSources".to_owned())) - .and_then(|ds| ds.as_sequence()) - .and_then(|ds| ds.first()) - .and_then(|ds| ds.as_mapping()) - .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) - .and_then(|n| n.as_str()) - .map(|s| s.to_owned()); + let network_name = network_name_from_raw_manifest(&raw_manifest); let amp_client = match &network_name { Some(network) => match manager.amp_clients.get(network) { diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 85f97aba952..a9d47bd0bd0 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -18,7 +18,9 @@ use graph::components::metrics::subgraph::DeploymentStatusMetric; use graph::components::network_provider::AmpClients; use graph::components::store::SourceableStore; use graph::components::subgraph::ProofOfIndexingVersion; -use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; +use graph::data::subgraph::{ + network_name_from_raw_manifest, UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6, +}; use graph::data::value::Word; use graph::data_source::causality_region::CausalityRegionSeq; use graph::env::EnvVars; @@ -270,13 +272,8 @@ impl SubgraphInstanceManager { // Look up the per-chain Amp client based on the network from the // raw manifest (before the manifest is moved into parse). - let amp_client = raw_manifest - .get(serde_yaml::Value::String("dataSources".to_owned())) - .and_then(|ds| ds.as_sequence()) - .and_then(|ds| ds.first()) - .and_then(|ds| ds.as_mapping()) - .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) - .and_then(|n| n.as_str()) + let amp_client = network_name_from_raw_manifest(&raw_manifest) + .as_deref() .and_then(|network| self.amp_clients.get(network)); let manifest = diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 67e991a390d..c96d1d0d9da 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -10,7 +10,7 @@ use graph::components::{ subgraph::Settings, }; use graph::data::{ - subgraph::{schema::DeploymentCreate, Graft}, + subgraph::{network_name_from_raw_manifest, schema::DeploymentCreate, Graft}, value::Word, }; use graph::futures03::{self, future::TryFutureExt, Stream, StreamExt, TryStreamExt}; @@ -302,7 +302,7 @@ where // Extract the network name from the raw manifest and resolve the // per-chain Amp client (if any). - let resolved_amp_chain = network_name_from_raw(&raw).map(|network| { + let resolved_amp_chain = network_name_from_raw_manifest(&raw).map(|network| { let resolved = self.amp_chain_names.resolve(&Word::from(network)); resolved.to_string() }); @@ -518,18 +518,6 @@ async fn resolve_graft_block( }) } -/// Extracts the network name from the first data source in a raw manifest. -fn network_name_from_raw(raw: &serde_yaml::Mapping) -> Option { - use serde_yaml::Value; - raw.get(Value::String("dataSources".to_owned())) - .and_then(|ds| ds.as_sequence()) - .and_then(|ds| ds.first()) - .and_then(|ds| ds.as_mapping()) - .and_then(|ds| ds.get(Value::String("network".to_owned()))) - .and_then(|n| n.as_str()) - .map(|s| s.to_owned()) -} - async fn create_subgraph_version( logger: &Logger, store: Arc, diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index ee652c3088e..14960d713dc 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -1383,6 +1383,21 @@ fn display_vector(input: &[impl std::fmt::Display]) -> impl std::fmt::Display { format!("[{}]", formatted_errors) } +/// Extracts the network name from the first data source in a raw manifest YAML mapping. +/// +/// Navigates `dataSources[0].network` and returns the network name as an owned string, +/// or `None` if any step in the path is missing. +pub fn network_name_from_raw_manifest(raw: &serde_yaml::Mapping) -> Option { + use serde_yaml::Value; + raw.get(Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .map(|s| s.to_owned()) +} + #[test] fn test_subgraph_name_validation() { assert!(SubgraphName::new("a").is_ok()); @@ -1429,3 +1444,47 @@ fn test_display_vector() { format!("{}", manifest_validation_error) ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn network_name_from_raw_manifest_extracts_network() { + use serde_yaml::{Mapping, Value}; + + let mut ds = Mapping::new(); + ds.insert( + Value::String("network".to_owned()), + Value::String("mainnet".to_owned()), + ); + + let mut raw = Mapping::new(); + raw.insert( + Value::String("dataSources".to_owned()), + Value::Sequence(vec![Value::Mapping(ds)]), + ); + + assert_eq!( + network_name_from_raw_manifest(&raw), + Some("mainnet".to_string()) + ); + } + + #[test] + fn network_name_from_raw_manifest_returns_none_when_missing() { + use serde_yaml::{Mapping, Value}; + + // Empty mapping — no dataSources key at all + let empty = Mapping::new(); + assert_eq!(network_name_from_raw_manifest(&empty), None); + + // dataSources is an empty sequence + let mut raw = Mapping::new(); + raw.insert( + Value::String("dataSources".to_owned()), + Value::Sequence(vec![]), + ); + assert_eq!(network_name_from_raw_manifest(&raw), None); + } +} diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index dc94fb49ac8..5314fae0898 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -14,7 +14,7 @@ use graph::components::network_provider::AmpClients; use graph::components::store::{BlockPtrForNumber, BlockStore, QueryPermit, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; -use graph::data::subgraph::{status, DeploymentFeatures}; +use graph::data::subgraph::{network_name_from_raw_manifest, status, DeploymentFeatures}; use graph::data::value::Object; use graph::futures03::TryFutureExt; use graph::prelude::*; @@ -523,13 +523,8 @@ where // Extract the network name from the raw yaml to look up the // per-chain Amp client. - let amp_client = raw_yaml - .get(serde_yaml::Value::String("dataSources".to_owned())) - .and_then(|ds| ds.as_sequence()) - .and_then(|ds| ds.first()) - .and_then(|ds| ds.as_mapping()) - .and_then(|ds| ds.get(serde_yaml::Value::String("network".to_owned()))) - .and_then(|n| n.as_str()) + let amp_client = network_name_from_raw_manifest(&raw_yaml) + .as_deref() .and_then(|network| self.amp_clients.get(network)); let max_spec_version = ENV_VARS.max_spec_version.clone(); From 91706293f3149a0ffad1c03985f5778d0a162c05 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:24:18 +0200 Subject: [PATCH 12/15] core, graph: add context() helper method to AmpChainConfig --- core/src/amp_subgraph/manager.rs | 2 +- core/src/subgraph/registrar.rs | 2 +- graph/src/components/network_provider/mod.rs | 22 ++++++++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index e86640fef2e..2ecdb32f09e 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -149,7 +149,7 @@ where manager .amp_chain_configs .get(chain) - .map(|cfg| (cfg.context_dataset.clone(), cfg.context_table.clone())) + .map(|cfg| cfg.context()) }); let mut manifest = amp::Manifest::resolve::( diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index c96d1d0d9da..6cbcc35e6cf 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -312,7 +312,7 @@ where let amp_context = resolved_amp_chain .as_deref() .and_then(|chain| self.amp_chain_configs.get(chain)) - .map(|cfg| (cfg.context_dataset.clone(), cfg.context_table.clone())); + .map(|cfg| cfg.context()); // Give priority to deployment specific history_blocks value. let history_blocks = diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index 006c6f321f2..b9601ba988a 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -42,6 +42,13 @@ pub struct AmpChainConfig { pub network: Option, } +impl AmpChainConfig { + /// Returns the context dataset and table as a tuple. + pub fn context(&self) -> (String, String) { + (self.context_dataset.clone(), self.context_table.clone()) + } +} + /// Holds per-chain Amp Flight clients, keyed by chain name. /// /// This wrapper is used to pass per-chain Amp clients through the system @@ -118,6 +125,21 @@ impl AmpChainNames { mod tests { use super::*; + #[test] + fn amp_chain_config_context_returns_tuple() { + let cfg = AmpChainConfig { + address: "http://localhost:50051".parse().unwrap(), + token: None, + context_dataset: "eth_mainnet".to_string(), + context_table: "blocks".to_string(), + network: None, + }; + assert_eq!( + cfg.context(), + ("eth_mainnet".to_string(), "blocks".to_string()) + ); + } + #[test] fn amp_chain_names_resolve_known_alias() { let mut map = HashMap::new(); From 004eb1edad05d8ea97271de226523587ff7116b2 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:26:07 +0200 Subject: [PATCH 13/15] graph, node: move normalize_sql_ident to shared location and normalize context idents at config load time --- graph/src/amp/manifest/data_source/raw.rs | 32 +++------ graph/src/amp/sql/mod.rs | 27 +++++++ node/src/config.rs | 85 ++++++++++++++++++++++- 3 files changed, 120 insertions(+), 24 deletions(-) diff --git a/graph/src/amp/manifest/data_source/raw.rs b/graph/src/amp/manifest/data_source/raw.rs index 4a7b2c3c3a7..57f3a0c7d08 100644 --- a/graph/src/amp/manifest/data_source/raw.rs +++ b/graph/src/amp/manifest/data_source/raw.rs @@ -8,7 +8,6 @@ use anyhow::anyhow; use arrow::{array::RecordBatch, datatypes::Schema}; use futures03::future::try_join_all; use itertools::Itertools; -use lazy_regex::regex_is_match; use semver::Version; use serde::Deserialize; use slog::{debug, Logger}; @@ -22,7 +21,9 @@ use crate::{ auto_block_hash_decoder, auto_block_number_decoder, auto_block_timestamp_decoder, }, error::IsDeterministic, - sql::{BlockRangeQueryBuilder, ContextQuery, ValidQuery}, + sql::{ + normalize_sql_ident, validate_ident, BlockRangeQueryBuilder, ContextQuery, ValidQuery, + }, }, components::link_resolver::{LinkResolver, LinkResolverContext}, data::subgraph::DeploymentHash, @@ -80,7 +81,8 @@ impl RawDataSource { let logger = logger.new(slog::o!("data_source" => name.clone())); debug!(logger, "Resolving data source"); - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; Self::validate_kind(kind)?; let source = source @@ -380,7 +382,8 @@ impl RawAbi { ) -> Result { let Self { name, file } = self; - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; let contract = Self::resolve_contract(logger, link_resolver, file).await?; Ok(Abi { name, contract }) @@ -448,7 +451,8 @@ impl RawTable { ) -> Result { let Self { name, query, file } = self; - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; let query = match Self::resolve_query(query, source, abis)? { Some(query) => query, None => Self::resolve_file(logger, link_resolver, file, source, abis).await?, @@ -459,7 +463,7 @@ impl RawTable { for field in schema.fields() { validate_ident(field.name()).map_err(|e| { - e.source_context(format!( + Error::InvalidValue(e).source_context(format!( "invalid query output schema: invalid column '{}'", field.name() )) @@ -685,22 +689,6 @@ impl IsDeterministic for Error { } } -fn validate_ident(s: &str) -> Result<(), Error> { - if !regex_is_match!("^[a-zA-Z_][a-zA-Z0-9_-]{0,100}$", s) { - return Err(Error::InvalidValue( - anyhow!("invalid identifier '{s}': must start with a letter or an underscore, and contain only letters, numbers, hyphens, and underscores") - )); - } - Ok(()) -} - -fn normalize_sql_ident(s: &str) -> String { - match validate_ident(s) { - Ok(()) => s.to_lowercase(), - Err(_e) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(), - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/graph/src/amp/sql/mod.rs b/graph/src/amp/sql/mod.rs index 02355895afa..b6cc50036ed 100644 --- a/graph/src/amp/sql/mod.rs +++ b/graph/src/amp/sql/mod.rs @@ -1,3 +1,30 @@ pub mod query_builder; pub use self::query_builder::{BlockRangeQueryBuilder, ContextQuery, ValidQuery}; + +use anyhow::anyhow; +use lazy_regex::regex_is_match; + +/// Validates that `s` is a simple SQL identifier: starts with a letter or +/// underscore and contains only `[a-zA-Z0-9_-]`, up to 101 characters. +pub fn validate_ident(s: &str) -> Result<(), anyhow::Error> { + if !regex_is_match!("^[a-zA-Z_][a-zA-Z0-9_-]{0,100}$", s) { + return Err(anyhow!( + "invalid identifier '{s}': must start with a letter or an underscore, \ + and contain only letters, numbers, hyphens, and underscores" + )); + } + Ok(()) +} + +/// Normalizes a SQL identifier for safe interpolation into SQL `FROM` clauses. +/// +/// Simple identifiers (matching `validate_ident`) are lowercased and returned +/// unquoted. Identifiers with special characters are double-quoted using +/// `sqlparser_latest::ast::Ident::with_quote`. +pub fn normalize_sql_ident(s: &str) -> String { + match validate_ident(s) { + Ok(()) => s.to_lowercase(), + Err(_) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(), + } +} diff --git a/node/src/config.rs b/node/src/config.rs index 1c17c8fbdaf..1c332437748 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,4 +1,5 @@ use graph::{ + amp::sql::normalize_sql_ident, anyhow::Error, blockchain::BlockchainKind, components::network_provider::{AmpChainConfig, AmpChainNames, ChainName}, @@ -155,8 +156,8 @@ impl Config { AmpChainConfig { address: uri, token: amp.token.clone(), - context_dataset: amp.context_dataset.clone(), - context_table: amp.context_table.clone(), + context_dataset: normalize_sql_ident(&.context_dataset), + context_table: normalize_sql_ident(&.context_table), network: amp.network.clone(), }, ); @@ -2528,4 +2529,84 @@ fdw_pool_size = [ assert_eq!(amp.context_table, "blocks"); assert_eq!(amp.network, Some("ethereum-mainnet".to_string())); } + + #[test] + fn amp_chain_config_normalizes_context_idents() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "ns/data@v1" + context_table = "my/table@2" + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + let mainnet = map.get("mainnet").expect("mainnet should be in map"); + + // Identifiers with special characters should be double-quoted + assert_eq!(mainnet.context_dataset, "\"ns/data@v1\""); + assert_eq!(mainnet.context_table, "\"my/table@2\""); + } + + #[test] + fn amp_chain_config_lowercases_simple_context_idents() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "Eth" + context_table = "Blocks" + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + let mainnet = map.get("mainnet").expect("mainnet should be in map"); + + // Simple identifiers should be lowercased and unquoted + assert_eq!(mainnet.context_dataset, "eth"); + assert_eq!(mainnet.context_table, "blocks"); + } } From bd74ea567a8903892c860dc04ca28121c3ffa933 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:46:58 +0200 Subject: [PATCH 14/15] core, graph: only use Amp start block resolver for Amp subgraphs --- core/src/subgraph/registrar.rs | 9 +++++---- graph/src/data/subgraph/mod.rs | 6 ++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6cbcc35e6cf..7b4f63cbfea 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -430,7 +430,7 @@ async fn resolve_amp_start_block( block_number: BlockNumber, ) -> Result { let sql = format!( - "SELECT * FROM {}.{} WHERE _block_num = {}", + "SELECT * FROM {}.{} WHERE _block_num = {} LIMIT 1", context_dataset, context_table, block_number ); @@ -569,6 +569,7 @@ async fn create_subgraph_version(resolved_name.clone()) @@ -604,7 +605,7 @@ async fn create_subgraph_version None, // Amp subgraph with start_block > 0 — try Amp-based resolution. - (min, Some(client), Some((dataset, table))) => { + (min, Some(client), Some((dataset, table))) if is_amp_subgraph => { match resolve_amp_start_block(client.as_ref(), &logger, dataset, table, min - 1) .await { @@ -823,7 +824,7 @@ mod tests { assert_eq!(queries.len(), 1); assert_eq!( queries[0], - "SELECT * FROM my_dataset.blocks WHERE _block_num = 99" + "SELECT * FROM my_dataset.blocks WHERE _block_num = 99 LIMIT 1" ); } @@ -850,7 +851,7 @@ mod tests { let queries = client.recorded_queries(); assert_eq!( queries[0], - "SELECT * FROM eth_mainnet.blocks WHERE _block_num = 99" + "SELECT * FROM eth_mainnet.blocks WHERE _block_num = 99 LIMIT 1" ); } diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 14960d713dc..bfd9a5909ad 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -991,6 +991,12 @@ impl SubgraphManifest { Ok(resolved) } + pub fn is_amp_subgraph(&self) -> bool { + self.data_sources + .iter() + .all(|ds| matches!(ds, DataSource::Amp(_))) + } + pub fn network_name(&self) -> String { // Assume the manifest has been validated, ensuring network names are homogenous self.data_sources From 4c2cea4d78162dd40ff898e5c692b67230afeea2 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Wed, 18 Feb 2026 15:19:26 +0200 Subject: [PATCH 15/15] core, graph, node: use ChainName instead of String --- core/src/amp_subgraph/manager.rs | 8 +++---- core/src/subgraph/instance_manager.rs | 2 +- core/src/subgraph/registrar.rs | 15 +++++++------ graph/src/components/network_provider/mod.rs | 20 +++++++++--------- graph/src/data/subgraph/mod.rs | 12 +++++++---- node/src/config.rs | 22 +++++++++++++------- node/src/launcher.rs | 11 +++++----- server/index-node/src/resolver.rs | 2 +- 8 files changed, 51 insertions(+), 41 deletions(-) diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index 2ecdb32f09e..121de23f735 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -9,7 +9,7 @@ use graph::{ components::{ link_resolver::{LinkResolver, LinkResolverContext}, metrics::MetricsRegistry, - network_provider::{AmpChainConfig, AmpClients}, + network_provider::{AmpChainConfig, AmpClients, ChainName}, store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, @@ -35,7 +35,7 @@ pub struct Manager { subgraph_store: Arc, link_resolver: Arc, amp_clients: AmpClients, - amp_chain_configs: HashMap, + amp_chain_configs: HashMap, } impl Manager @@ -52,7 +52,7 @@ where subgraph_store: Arc, link_resolver: Arc, amp_clients: AmpClients, - amp_chain_configs: HashMap, + amp_chain_configs: HashMap, ) -> Self { let logger = logger_factory.component_logger("AmpSubgraphManager", None); let logger_factory = logger_factory.with_parent(logger); @@ -145,7 +145,7 @@ where } }; - let amp_context = network_name.as_deref().and_then(|chain| { + let amp_context = network_name.as_ref().and_then(|chain| { manager .amp_chain_configs .get(chain) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index a9d47bd0bd0..cae8c2141da 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -273,7 +273,7 @@ impl SubgraphInstanceManager { // Look up the per-chain Amp client based on the network from the // raw manifest (before the manifest is moved into parse). let amp_client = network_name_from_raw_manifest(&raw_manifest) - .as_deref() + .as_ref() .and_then(|network| self.amp_clients.get(network)); let manifest = diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7b4f63cbfea..8227f1912c5 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -3,6 +3,7 @@ use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; +use graph::components::network_provider::ChainName; use graph::components::{ link_resolver::LinkResolverContext, network_provider::{AmpChainConfig, AmpChainNames, AmpClients}, @@ -26,7 +27,7 @@ pub struct SubgraphRegistrar { store: Arc, subscription_manager: Arc, amp_clients: AmpClients, - amp_chain_configs: HashMap, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -49,7 +50,7 @@ where store: Arc, subscription_manager: Arc, amp_clients: AmpClients, - amp_chain_configs: HashMap, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -302,15 +303,13 @@ where // Extract the network name from the raw manifest and resolve the // per-chain Amp client (if any). - let resolved_amp_chain = network_name_from_raw_manifest(&raw).map(|network| { - let resolved = self.amp_chain_names.resolve(&Word::from(network)); - resolved.to_string() - }); + let resolved_amp_chain = network_name_from_raw_manifest(&raw) + .map(|network| self.amp_chain_names.resolve(&network)); let amp_client = resolved_amp_chain - .as_deref() + .as_ref() .and_then(|chain| self.amp_clients.get(chain)); let amp_context = resolved_amp_chain - .as_deref() + .as_ref() .and_then(|chain| self.amp_chain_configs.get(chain)) .map(|cfg| cfg.context()); diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index b9601ba988a..d0ba4595875 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -55,18 +55,18 @@ impl AmpChainConfig { /// instead of a single global `Option>`. Use `get(chain_name)` to /// retrieve the client for a specific chain. pub struct AmpClients { - clients: HashMap>, + clients: HashMap>, } impl AmpClients { /// Creates a new `AmpClients` from a map of chain names to clients. - pub fn new(clients: HashMap>) -> Self { + pub fn new(clients: HashMap>) -> Self { Self { clients } } /// Returns the Amp client for the given chain, or `None` if no client /// is configured for that chain. - pub fn get(&self, chain_name: &str) -> Option> { + pub fn get(&self, chain_name: &ChainName) -> Option> { self.clients.get(chain_name).cloned() } @@ -166,18 +166,18 @@ mod tests { #[test] fn amp_clients_returns_client_for_configured_chain() { let mut map = HashMap::new(); - map.insert("mainnet".to_string(), Arc::new(42u32)); + map.insert(ChainName::from("mainnet"), Arc::new(42u32)); let clients = AmpClients::new(map); - let client = clients.get("mainnet"); + let client = clients.get(&ChainName::from("mainnet")); assert!(client.is_some()); assert_eq!(*client.unwrap(), 42); } #[test] fn amp_clients_returns_none_for_unconfigured_chain() { - let map: HashMap> = HashMap::new(); + let map: HashMap> = HashMap::new(); let clients = AmpClients::new(map); - assert!(clients.get("mainnet").is_none()); + assert!(clients.get(&ChainName::from("mainnet")).is_none()); } /// Verifies the condition that causes Amp manager registration: @@ -185,7 +185,7 @@ mod tests { #[test] fn amp_manager_registered_when_chain_has_config() { let mut map = HashMap::new(); - map.insert("mainnet".to_string(), Arc::new(42u32)); + map.insert(ChainName::from("mainnet"), Arc::new(42u32)); let clients = AmpClients::new(map); assert!( !clients.is_empty(), @@ -210,12 +210,12 @@ mod tests { #[test] fn amp_clients_error_for_unconfigured_amp_chain() { let mut map = HashMap::new(); - map.insert("mainnet".to_string(), Arc::new(1u32)); + map.insert(ChainName::from("mainnet"), Arc::new(1u32)); let clients = AmpClients::new(map); // "matic" is not configured. let result = clients - .get("matic") + .get(&ChainName::from("matic")) .ok_or_else(|| "Amp is not configured for chain 'matic'".to_string()); assert!(result.is_err()); assert_eq!( diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index bfd9a5909ad..d0ca5e65a93 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -14,7 +14,11 @@ pub mod status; pub use features::{SubgraphFeature, SubgraphFeatureValidationError}; -use crate::{cheap_clone::CheapClone, components::store::BLOCK_NUMBER_MAX, object}; +use crate::{ + cheap_clone::CheapClone, + components::{network_provider::ChainName, store::BLOCK_NUMBER_MAX}, + object, +}; use anyhow::{anyhow, Context, Error}; use futures03::future::try_join_all; use itertools::Itertools; @@ -1393,7 +1397,7 @@ fn display_vector(input: &[impl std::fmt::Display]) -> impl std::fmt::Display { /// /// Navigates `dataSources[0].network` and returns the network name as an owned string, /// or `None` if any step in the path is missing. -pub fn network_name_from_raw_manifest(raw: &serde_yaml::Mapping) -> Option { +pub fn network_name_from_raw_manifest(raw: &serde_yaml::Mapping) -> Option { use serde_yaml::Value; raw.get(Value::String("dataSources".to_owned())) .and_then(|ds| ds.as_sequence()) @@ -1401,7 +1405,7 @@ pub fn network_name_from_raw_manifest(raw: &serde_yaml::Mapping) -> Option Result> { + pub fn amp_chain_configs(&self) -> Result> { let mut map = HashMap::new(); for (chain_name, chain) in &self.chains.chains { if let Some(amp) = &chain.amp { @@ -152,7 +152,7 @@ impl Config { ) })?; map.insert( - chain_name.clone(), + chain_name.as_str().into(), AmpChainConfig { address: uri, token: amp.token.clone(), @@ -1375,8 +1375,8 @@ mod tests { use crate::config::{default_polling_interval, ChainSection, Web3Rule}; use super::{ - AmpConfig, Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, - Web3Provider, + AmpConfig, Chain, ChainName, Config, FirehoseProvider, Provider, ProviderDetails, Shard, + Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; use graph::firehose::SubgraphLimit; @@ -2425,9 +2425,11 @@ fdw_pool_size = [ // Only mainnet (with amp) should be in the map assert_eq!(map.len(), 1); - assert!(!map.contains_key("sepolia")); + assert!(!map.contains_key(&ChainName::from("sepolia"))); - let mainnet = map.get("mainnet").expect("mainnet should be in map"); + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); assert_eq!(mainnet.address.to_string(), "http://localhost:50051/"); assert_eq!(mainnet.token.as_deref(), Some("my-token")); assert_eq!(mainnet.context_dataset, "eth"); @@ -2563,7 +2565,9 @@ fdw_pool_size = [ }; let map = config.amp_chain_configs().unwrap(); - let mainnet = map.get("mainnet").expect("mainnet should be in map"); + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); // Identifiers with special characters should be double-quoted assert_eq!(mainnet.context_dataset, "\"ns/data@v1\""); @@ -2603,7 +2607,9 @@ fdw_pool_size = [ }; let map = config.amp_chain_configs().unwrap(); - let mainnet = map.get("mainnet").expect("mainnet should be in map"); + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); // Simple identifiers should be lowercased and unquoted assert_eq!(mainnet.context_dataset, "eth"); diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 2d28049f936..018946a68f0 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{BufRead, BufReader}, path::Path, time::Duration, @@ -6,8 +7,11 @@ use std::{ use anyhow::Result; use git_testament::{git_testament, render_testament}; -use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; use graph::components::subgraph::Settings; +use graph::components::{ + link_resolver::{ArweaveClient, FileSizeLimit}, + network_provider::ChainName, +}; use graph::data::graphql::load_manager::LoadManager; use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; @@ -276,10 +280,7 @@ fn build_subgraph_registrar( arweave_service: ArweaveService, ipfs_service: IpfsService, amp_clients: AmpClients, - amp_chain_configs: std::collections::HashMap< - String, - graph::components::network_provider::AmpChainConfig, - >, + amp_chain_configs: HashMap, cancel_token: CancellationToken, amp_chain_names: Arc, ) -> Arc< diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 5314fae0898..bee5fe3c1df 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -524,7 +524,7 @@ where // Extract the network name from the raw yaml to look up the // per-chain Amp client. let amp_client = network_name_from_raw_manifest(&raw_yaml) - .as_deref() + .as_ref() .and_then(|network| self.amp_clients.get(network)); let max_spec_version = ENV_VARS.max_spec_version.clone();