Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions core/src/amp_subgraph/manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use alloy::primitives::BlockNumber;
Expand All @@ -8,9 +9,11 @@ use graph::{
components::{
link_resolver::{LinkResolver, LinkResolverContext},
metrics::MetricsRegistry,
network_provider::{AmpChainConfig, AmpClients, ChainName},
store::{DeploymentLocator, SubgraphStore},
subgraph::SubgraphInstanceManager,
},
data::subgraph::network_name_from_raw_manifest,
env::EnvVars,
log::factory::LoggerFactory,
prelude::CheapClone,
Expand All @@ -31,7 +34,8 @@ pub struct Manager<SS, NC> {
monitor: Monitor,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
amp_clients: AmpClients<NC>,
amp_chain_configs: HashMap<ChainName, AmpChainConfig>,
}

impl<SS, NC> Manager<SS, NC>
Expand All @@ -47,7 +51,8 @@ where
cancel_token: &CancellationToken,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
amp_clients: AmpClients<NC>,
amp_chain_configs: HashMap<ChainName, AmpChainConfig>,
) -> Self {
let logger = logger_factory.component_logger("AmpSubgraphManager", None);
let logger_factory = logger_factory.with_parent(logger);
Expand All @@ -61,7 +66,8 @@ where
monitor,
subgraph_store,
link_resolver,
amp_client,
amp_clients,
amp_chain_configs,
}
}
}
Expand Down Expand Up @@ -112,16 +118,48 @@ where
.await
.context("failed to load subgraph manifest")?;

let raw_manifest = serde_yaml::from_slice(&manifest_bytes)
let raw_manifest: serde_yaml::Mapping = serde_yaml::from_slice(&manifest_bytes)
.context("failed to parse subgraph manifest")?;

// Extract the network name from the raw manifest to look
// up the per-chain Amp client.
let network_name = network_name_from_raw_manifest(&raw_manifest);

let amp_client = match &network_name {
Some(network) => match manager.amp_clients.get(network) {
Some(client) => client,
None => {
anyhow::bail!(
"Amp is not configured for chain '{}'; \
cannot start Amp subgraph '{}'",
network,
deployment.hash
);
}
},
None => {
anyhow::bail!(
"no network name found in manifest for Amp subgraph '{}'",
deployment.hash
);
}
};

let amp_context = network_name.as_ref().and_then(|chain| {
manager
.amp_chain_configs
.get(chain)
.map(|cfg| cfg.context())
});

let mut manifest = amp::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
&logger,
manager.link_resolver.cheap_clone(),
manager.amp_client.cheap_clone(),
amp_client.cheap_clone(),
manager.env_vars.max_spec_version.cheap_clone(),
deployment.hash.cheap_clone(),
raw_manifest,
amp_context,
)
.await?;

Expand All @@ -139,7 +177,7 @@ where
let runner_context = runner::Context::new(
&logger,
&manager.env_vars.amp,
manager.amp_client.cheap_clone(),
amp_client,
store,
deployment.hash.cheap_clone(),
manifest,
Expand Down
20 changes: 15 additions & 5 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::metrics::subgraph::DeploymentStatusMetric;
use graph::components::network_provider::AmpClients;
use graph::components::store::SourceableStore;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data::subgraph::{
network_name_from_raw_manifest, UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6,
};
use graph::data::value::Word;
use graph::data_source::causality_region::CausalityRegionSeq;
use graph::env::EnvVars;
Expand All @@ -41,7 +44,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore, AC> {
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
amp_client: Option<Arc<AC>>,
amp_clients: AmpClients<AC>,
static_filters: bool,
env_vars: Arc<EnvVars>,

Expand Down Expand Up @@ -175,7 +178,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
amp_client: Option<Arc<AC>>,
amp_clients: AmpClients<AC>,
static_filters: bool,
) -> Self {
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
Expand All @@ -189,7 +192,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
instances: SubgraphKeepAlive::new(sg_metrics),
link_resolver,
ipfs_service,
amp_client,
amp_clients,
static_filters,
env_vars,
arweave_service,
Expand Down Expand Up @@ -267,6 +270,12 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
let subgraph_store = self.subgraph_store.cheap_clone();
let registry = self.metrics_registry.cheap_clone();

// Look up the per-chain Amp client based on the network from the
// raw manifest (before the manifest is moved into parse).
let amp_client = network_name_from_raw_manifest(&raw_manifest)
.as_ref()
.and_then(|network| self.amp_clients.get(network));

let manifest =
UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?;

Expand Down Expand Up @@ -300,7 +309,8 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
.resolve(
&deployment.hash,
&link_resolver,
self.amp_client.cheap_clone(),
amp_client,
None,
&logger,
ENV_VARS.max_spec_version.clone(),
)
Expand Down
Loading