Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom {
}
}

// TODO: configuration for which servers to start
fn start_servers(
config: &Arc<Config>,
query: &Arc<Query>,
metrics: &Metrics,
salt_rwlock: &Arc<RwLock<String>>,
) -> (rest::Handle, ElectrumRPC) {
let rest_server = rest::start(Arc::clone(config), Arc::clone(query));
let electrum_server = ElectrumRPC::start(
Arc::clone(config),
Arc::clone(query),
metrics,
Arc::clone(salt_rwlock),
);
(rest_server, electrum_server)
}

fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<()> {
let (block_hash_notify, block_hash_receive) = channel::bounded(1);
let signal = Waiter::start(block_hash_receive);
Expand Down Expand Up @@ -94,10 +111,15 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
&metrics,
));

// Pre-caching is pure cache warming; run it in the background so it doesn't
// delay startup. The file is still read upfront to fail fast on a bad path.
if let Some(ref precache_file) = config.precache_scripts {
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
.expect("cannot load scripts to precache");
precache::precache(&chain, precache_scripthashes);
let precache_chain = Arc::clone(&chain);
thread::spawn(move || {
precache::precache(&precache_chain, precache_scripthashes);
});
}

info!("loading mempool");
Expand All @@ -107,12 +129,6 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
Arc::clone(&config),
)));

while !Mempool::update(&mempool, &daemon, &tip)? {
// Mempool syncing was aborted because the chain tip moved;
// Index the new block(s) and try again.
tip = indexer.update(&daemon)?;
}

#[cfg(feature = "liquid")]
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
let asset_db = Arc::new(RwLock::new(AssetRegistry::new(db_dir.clone())));
Expand All @@ -129,15 +145,25 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
asset_db,
));

// TODO: configuration for which servers to start
let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query));
let electrum_server = ElectrumRPC::start(
Arc::clone(&config),
Arc::clone(&query),
&metrics,
Arc::clone(&salt_rwlock),
);
// With --serve-during-mempool-sync, the servers start serving chain-based queries
// right away while the mempool syncs; /health/ready reports mempool_synced=false
// until the initial sync completes, keeping the instance out of LB rotation.
let mut servers = if config.serve_during_mempool_sync {
Some(start_servers(&config, &query, &metrics, &salt_rwlock))
} else {
None
};

while !Mempool::update(&mempool, &daemon, &tip)? {
// Mempool syncing was aborted because the chain tip moved;
// Index the new block(s) and try again.
tip = indexer.update(&daemon)?;
}

let (rest_server, electrum_server) = match servers.take() {
Some(servers) => servers,
None => start_servers(&config, &query, &metrics, &salt_rwlock),
};
info!("startup complete");

let main_loop_count = metrics.gauge(MetricOpts::new(
Expand Down
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub struct Config {
pub index_unspendables: bool,
pub cors: Option<String>,
pub precache_scripts: Option<String>,
/// Start the REST and Electrum servers before the initial mempool sync completes.
/// Chain-based queries are fully correct during the sync; mempool-derived data is
/// incomplete until /health/ready reports mempool_synced=true, so readiness probes
/// must use that endpoint instead of a TCP check when this is enabled.
pub serve_during_mempool_sync: bool,
pub utxos_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
Expand Down Expand Up @@ -215,6 +220,11 @@ impl Config {
.help("Path to file with list of scripts to pre-cache")
.takes_value(true)
)
.arg(
Arg::with_name("serve_during_mempool_sync")
.long("serve-during-mempool-sync")
.help("Start the REST/Electrum servers before the initial mempool sync completes. Requires readiness checks to use /health/ready instead of a TCP probe.")
)
.arg(
Arg::with_name("utxos_limit")
.long("utxos-limit")
Expand Down Expand Up @@ -517,6 +527,7 @@ impl Config {
index_unspendables: m.is_present("index_unspendables"),
cors: m.value_of("cors").map(|s| s.to_string()),
precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()),
serve_during_mempool_sync: m.is_present("serve_during_mempool_sync"),
db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize),
db_parallelism: value_t_or_exit!(m, "db_parallelism", usize),
db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize),
Expand Down
18 changes: 18 additions & 0 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub struct Mempool {
edges: HashMap<OutPoint, (Txid, u32)>, // OutPoint -> (spending_txid, spending_vin)
recent: ArrayDeque<TxOverview, RECENT_TXS_SIZE, Wrapping>, // The N most recent txs to enter the mempool
backlog_stats: (BacklogStats, Instant),
// Whether the initial sync with bitcoind's mempool has completed at least once.
// Until then, mempool-derived data (unconfirmed history, outspends, backlog stats)
// is incomplete; exposed via /health/ready for readiness checks.
synced: bool,

// monitoring
latency: HistogramVec, // mempool requests latency
Expand Down Expand Up @@ -81,6 +85,7 @@ impl Mempool {
BacklogStats::default(),
Instant::now() - Duration::from_secs(BACKLOG_STATS_TTL),
),
synced: false,
latency: metrics.histogram_vec(
HistogramOpts::new("mempool_latency", "Mempool requests latency (in seconds)"),
&["part"],
Expand All @@ -105,6 +110,10 @@ impl Mempool {
self.config.network_type
}

pub fn is_synced(&self) -> bool {
self.synced
}

pub fn lookup_txn(&self, txid: &Txid) -> Option<Transaction> {
self.txstore.get(txid).cloned()
}
Expand Down Expand Up @@ -576,6 +585,7 @@ impl Mempool {
.set(new_txids.len() as f64);

if new_txids.is_empty() {
Self::mark_synced(mempool);
return Ok(true);
}

Expand Down Expand Up @@ -648,9 +658,17 @@ impl Mempool {
}

trace!("mempool is synced");
Self::mark_synced(mempool);

Ok(true)
}

fn mark_synced(mempool: &Arc<RwLock<Mempool>>) {
if !mempool.read().unwrap().synced {
mempool.write().unwrap().synced = true;
info!("initial mempool sync complete");
}
}
}

fn prune_history_entries(
Expand Down
4 changes: 4 additions & 0 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ impl Query {
self.mempool.read().unwrap()
}

pub fn mempool_synced(&self) -> bool {
self.mempool.read().unwrap().is_synced()
}

#[trace]
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.daemon.broadcast_raw(txhex)?;
Expand Down
22 changes: 22 additions & 0 deletions src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,28 @@ fn handle_request(
path.get(3),
path.get(4),
) {
(&Method::GET, Some(&"health"), Some(&"ready"), None, None, None) => {
// The chain index is always synced by the time the server is listening; the
// mempool may still be performing its initial sync when started with
// --serve-during-mempool-sync. 503 keeps not-fully-synced instances out of
// load balancer rotation while still being reachable for diagnostics.
let mempool_synced = query.mempool_synced();
let status = if mempool_synced {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Ok(Response::builder()
.status(status)
.header("Content-Type", "application/json")
.header("Cache-Control", "no-cache")
.body(Full::new(Bytes::from(format!(
"{{\"chain_synced\":true,\"mempool_synced\":{}}}",
mempool_synced
))))
.unwrap())
}

(&Method::GET, Some(&"blocks"), Some(&"tip"), Some(&"hash"), None, None) => http_message(
StatusCode::OK,
query.chain().best_hash().to_string(),
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl TestRunner {
index_unspendables: false,
cors: None,
precache_scripts: None,
serve_during_mempool_sync: false,
utxos_limit: 100,
electrum_txs_limit: 100,
electrum_banner: "".into(),
Expand Down
Loading