Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use graph_store_postgres::add_chain;
use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::BlockStore;
use graph_store_postgres::ChainStatus;
use graph_store_postgres::ChainStore;
use graph_store_postgres::PoolCoordinator;
use graph_store_postgres::ScopedFutureExt;
Expand Down Expand Up @@ -254,7 +253,7 @@ pub async fn change_block_cache_shard(

let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;

store.add_chain_store(&chain,ChainStatus::Ingestible, true).await?;
store.add_chain_store(&chain, true).await?;

// Drop the foreign key constraint on deployment_schemas
sql_query(
Expand Down
59 changes: 14 additions & 45 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use graph::{
blockchain::ChainIdentifier,
components::store::{BlockStore as BlockStoreTrait, QueryPermit},
derive::CheapClone,
prelude::{error, info, BlockNumber, BlockPtr, Logger, ENV_VARS},
prelude::{error, info, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
slog::o,
};
use graph::{
Expand All @@ -37,14 +37,6 @@ pub const FAKE_NETWORK_SHARED: &str = "fake_network_shared";
// To be incremented on each breaking change to the database.
const SUPPORTED_DB_VERSION: i64 = 3;

/// The status of a chain: whether we can only read from the chain, or
/// whether it is ok to ingest from it, too
#[derive(Copy, Clone)]
pub enum ChainStatus {
ReadOnly,
Ingestible,
}

pub mod primary {
use std::convert::TryFrom;

Expand Down Expand Up @@ -278,39 +270,22 @@ impl BlockStore {
});
let block_store = Self { inner };

/// Check that the configuration for `chain` hasn't changed so that
/// it is ok to ingest from it
fn chain_ingestible(
logger: &Logger,
chain: &primary::Chain,
shard: &Shard,
// ident: &ChainIdentifier,
) -> bool {
if &chain.shard != shard {
error!(
logger,
"the chain {} is stored in shard {} but is configured for shard {}",
chain.name,
chain.shard,
shard
);
return false;
}
true
}

// For each configured chain, add a chain store
for (chain_name, shard) in chains {
if let Some(chain) = existing_chains
.iter()
.find(|chain| chain.name == chain_name)
{
let status = if chain_ingestible(&block_store.logger, chain, &shard) {
ChainStatus::Ingestible
} else {
ChainStatus::ReadOnly
};
block_store.add_chain_store(chain, status, false).await?;
if chain.shard != shard {
warn!(
&block_store.logger,
"the chain {} is stored in shard {} but is configured for shard {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message would be even clearer if it said

"the chain {} is stored in shard {} but is configured for shard {}; ignoring config and using shard {chain.shard}"

chain.name,
chain.shard,
shard
);
}
block_store.add_chain_store(chain, false).await?;
};
}

Expand All @@ -326,9 +301,7 @@ impl BlockStore {
.iter()
.filter(|chain| !configured_chains.contains(&chain.name))
{
block_store
.add_chain_store(chain, ChainStatus::ReadOnly, false)
.await?;
block_store.add_chain_store(chain, false).await?;
}
Ok(block_store)
}
Expand Down Expand Up @@ -376,7 +349,6 @@ impl BlockStore {
pub async fn add_chain_store(
&self,
chain: &primary::Chain,
status: ChainStatus,
create: bool,
) -> Result<Arc<ChainStore>, StoreError> {
let pool = self
Expand All @@ -395,7 +367,6 @@ impl BlockStore {
logger,
chain.name.clone(),
chain.storage.clone(),
status,
sender,
pool,
ENV_VARS.store.recent_blocks_cache_capacity,
Expand Down Expand Up @@ -457,9 +428,7 @@ impl BlockStore {
async {
match primary::find_chain(conn, &chain).await? {
Some(chain) => {
let chain_store = this
.add_chain_store(&chain, ChainStatus::ReadOnly, false)
.await?;
let chain_store = this.add_chain_store(&chain, false).await?;
Ok(Some(chain_store))
}
None => Ok(None),
Expand Down Expand Up @@ -605,7 +574,7 @@ impl BlockStore {
})
.ok_or_else(|| anyhow!("unable to find shard for network {}", network))?;
let chain = primary::add_chain(&mut conn, network, shard, ident).await?;
self.add_chain_store(&chain, ChainStatus::Ingestible, true)
self.add_chain_store(&chain, true)
.await
.map_err(anyhow::Error::from)
}
Expand Down
11 changes: 1 addition & 10 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use graph::{ensure, internal_error};

use self::recent_blocks_cache::RecentBlocksCache;
use crate::AsyncPgConnection;
use crate::{
block_store::ChainStatus, chain_head_listener::ChainHeadUpdateSender, pool::ConnectionPool,
};
use crate::{chain_head_listener::ChainHeadUpdateSender, pool::ConnectionPool};

/// Our own internal notion of a block
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -2139,7 +2137,6 @@ pub struct ChainStore {
pool: ConnectionPool,
pub chain: String,
pub(crate) storage: data::Storage,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
// TODO: We currently only use this cache for
// [`ChainStore::ancestor_block`], but it could very well be expanded to
Expand All @@ -2164,7 +2161,6 @@ impl ChainStore {
logger: Logger,
chain: String,
storage: data::Storage,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
pool: ConnectionPool,
recent_blocks_cache_capacity: usize,
Expand All @@ -2182,7 +2178,6 @@ impl ChainStore {
pool,
chain,
storage,
status,
chain_head_update_sender,
recent_blocks_cache,
blocks_by_hash_cache,
Expand All @@ -2193,10 +2188,6 @@ impl ChainStore {
}
}

pub fn is_ingestible(&self) -> bool {
matches!(self.status, ChainStatus::Ingestible)
}

/// Execute a cached query, avoiding thundering herd for identical requests.
/// Returns `(result, was_cached)`.
async fn cached_lookup<K, T, F>(
Expand Down
1 change: 0 additions & 1 deletion store/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub mod layout_for_tests {

pub use self::block_store::primary::{add_chain, find_chain, update_chain_name};
pub use self::block_store::BlockStore;
pub use self::block_store::ChainStatus;
pub use self::chain_head_listener::ChainHeadUpdateListener;
pub use self::chain_store::{ChainStore, ChainStoreMetrics, Storage};
pub use self::detail::DeploymentDetail;
Expand Down