diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7a2db69..65697ff 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -41,6 +41,10 @@ anyhow = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +# Metrics +metrics = "0.24" +metrics-exporter-prometheus = "0.16" + # Config dotenvy = "0.15" @@ -55,7 +59,7 @@ hex = "0.4" chrono = { version = "0.4", features = ["serde"] } # Testing -testcontainers = "0.27" +testcontainers = { version = "0.27", features = ["blocking"] } testcontainers-modules = { version = "0.15", features = ["postgres"] } # CLI diff --git a/backend/crates/atlas-server/Cargo.toml b/backend/crates/atlas-server/Cargo.toml index 08c8e82..fdf9b4e 100644 --- a/backend/crates/atlas-server/Cargo.toml +++ b/backend/crates/atlas-server/Cargo.toml @@ -23,6 +23,8 @@ thiserror = { workspace = true } anyhow = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } dotenvy = { workspace = true } bigdecimal = { workspace = true } hex = { workspace = true } diff --git a/backend/crates/atlas-server/src/api/error.rs b/backend/crates/atlas-server/src/api/error.rs index ef93aa0..7b0adb3 100644 --- a/backend/crates/atlas-server/src/api/error.rs +++ b/backend/crates/atlas-server/src/api/error.rs @@ -44,6 +44,17 @@ impl Deref for ApiError { } } +fn error_type(err: &AtlasError) -> Option<&'static str> { + match err { + AtlasError::Database(_) => Some("database"), + AtlasError::Internal(_) => Some("internal"), + AtlasError::Config(_) => Some("config"), + AtlasError::Rpc(_) => Some("rpc_request"), + AtlasError::MetadataFetch(_) => Some("metadata_fetch"), + _ => None, + } +} + impl IntoResponse for ApiError { fn into_response(self) -> Response { use atlas_common::AtlasError; @@ -51,6 +62,16 @@ impl IntoResponse for ApiError { let status = StatusCode::from_u16(self.0.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + // Increment error counter for Prometheus alerting + if let Some(error_type) = error_type(&self.0) { + metrics::counter!( + "atlas_errors_total", + "component" => "api", + "error_type" => error_type + ) + .increment(1); + } + // Determine the client-facing message based on error type. // Internal details are logged server-side to avoid leaking stack traces or // database internals to callers. @@ -119,6 +140,31 @@ mod tests { use super::*; use axum::body::to_bytes; + #[test] + fn error_type_maps_expected_variants() { + assert_eq!( + error_type(&AtlasError::Database(sqlx::Error::RowNotFound)), + Some("database") + ); + assert_eq!( + error_type(&AtlasError::Internal("x".to_string())), + Some("internal") + ); + assert_eq!( + error_type(&AtlasError::Config("x".to_string())), + Some("config") + ); + assert_eq!( + error_type(&AtlasError::Rpc("x".to_string())), + Some("rpc_request") + ); + assert_eq!( + error_type(&AtlasError::MetadataFetch("x".to_string())), + Some("metadata_fetch") + ); + assert_eq!(error_type(&AtlasError::NotFound("x".to_string())), None); + } + #[tokio::test] async fn too_many_requests_sets_retry_after_header_and_body() { let response = ApiError(AtlasError::TooManyRequests { diff --git a/backend/crates/atlas-server/src/api/handlers/faucet.rs b/backend/crates/atlas-server/src/api/handlers/faucet.rs index 978f823..949ce29 100644 --- a/backend/crates/atlas-server/src/api/handlers/faucet.rs +++ b/backend/crates/atlas-server/src/api/handlers/faucet.rs @@ -162,6 +162,9 @@ mod tests { let head_tracker = Arc::new(crate::head::HeadTracker::empty(10)); let (tx, _) = broadcast::channel(1); let (da_tx, _) = broadcast::channel(1); + let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle(); Arc::new(AppState { pool, block_events_tx: tx, @@ -180,6 +183,8 @@ mod tests { background_color_light: None, success_color: None, error_color: None, + metrics: crate::metrics::Metrics::new(), + prometheus_handle, }) } diff --git a/backend/crates/atlas-server/src/api/handlers/health.rs b/backend/crates/atlas-server/src/api/handlers/health.rs new file mode 100644 index 0000000..e2df0b7 --- /dev/null +++ b/backend/crates/atlas-server/src/api/handlers/health.rs @@ -0,0 +1,196 @@ +use axum::{extract::State, http::StatusCode, response::IntoResponse, Json}; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use std::sync::Arc; + +use crate::api::AppState; + +const MAX_INDEXER_AGE_MINUTES: i64 = 5; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, +} + +fn readiness_status( + latest_indexed_at: Option>, + now: DateTime, +) -> (StatusCode, HealthResponse) { + let Some(indexed_at) = latest_indexed_at else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + HealthResponse { + status: "not_ready", + reason: Some("indexer state unavailable".to_string()), + }, + ); + }; + + let age = now - indexed_at; + if age > chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES) { + return ( + StatusCode::SERVICE_UNAVAILABLE, + HealthResponse { + status: "not_ready", + reason: Some(format!( + "indexer stale: last block indexed {}s ago", + age.num_seconds() + )), + }, + ); + } + + ( + StatusCode::OK, + HealthResponse { + status: "ready", + reason: None, + }, + ) +} + +/// GET /health/live — liveness probe (process is alive) +pub async fn liveness() -> impl IntoResponse { + Json(HealthResponse { + status: "ok", + reason: None, + }) +} + +/// GET /health/ready — readiness probe (DB reachable, indexer fresh) +pub async fn readiness(State(state): State>) -> impl IntoResponse { + // Check DB connectivity + if let Err(e) = sqlx::query("SELECT 1").execute(&state.pool).await { + tracing::warn!(error = %e, "readiness database check failed"); + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not_ready", + reason: Some("database unreachable".to_string()), + }), + ); + } + + let latest = match super::status::latest_indexed_block(state.as_ref()).await { + Ok(latest) => latest, + Err(e) => { + tracing::warn!(error = %e, "readiness indexer state check failed"); + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not_ready", + reason: Some("indexer state unavailable".to_string()), + }), + ); + } + }; + + let (status, body) = readiness_status(latest.map(|(_, indexed_at)| indexed_at), Utc::now()); + (status, Json(body)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::head::HeadTracker; + use crate::metrics::Metrics; + use axum::body::to_bytes; + use sqlx::postgres::PgPoolOptions; + use std::sync::Arc; + use tokio::sync::broadcast; + + fn app_state(pool: sqlx::PgPool, head_tracker: Arc) -> Arc { + let (block_tx, _) = broadcast::channel(1); + let (da_tx, _) = broadcast::channel(1); + let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle(); + + Arc::new(AppState { + pool, + block_events_tx: block_tx, + da_events_tx: da_tx, + head_tracker, + rpc_url: String::new(), + da_tracking_enabled: false, + faucet: None, + chain_id: 1, + chain_name: "Test Chain".to_string(), + chain_logo_url: None, + chain_logo_url_light: None, + chain_logo_url_dark: None, + accent_color: None, + background_color_dark: None, + background_color_light: None, + success_color: None, + error_color: None, + metrics: Metrics::new(), + prometheus_handle, + }) + } + + async fn json_response(response: axum::response::Response) -> (StatusCode, serde_json::Value) { + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("read response body"); + let json = serde_json::from_slice(&body).expect("parse json response"); + (status, json) + } + + #[tokio::test] + async fn liveness_returns_ok() { + let (status, json) = json_response(liveness().await.into_response()).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(json["status"], "ok"); + assert!(json.get("reason").is_none()); + } + + #[tokio::test] + async fn readiness_returns_unavailable_when_database_is_down() { + let pool = PgPoolOptions::new() + .connect_lazy("postgres://postgres:postgres@127.0.0.1:1/atlas") + .expect("create lazy pool"); + let state = app_state(pool, Arc::new(HeadTracker::empty(10))); + + let (status, json) = json_response(readiness(State(state)).await.into_response()).await; + + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(json["status"], "not_ready"); + assert_eq!(json["reason"], "database unreachable"); + } + + #[test] + fn readiness_returns_unavailable_when_indexer_state_is_missing() { + let (status, body) = readiness_status(None, Utc::now()); + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body.status, "not_ready"); + assert_eq!(body.reason.as_deref(), Some("indexer state unavailable")); + } + + #[test] + fn readiness_returns_unavailable_for_stale_indexer_state() { + let (status, body) = readiness_status( + Some(Utc::now() - chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES + 1)), + Utc::now(), + ); + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body.status, "not_ready"); + assert!(body + .reason + .as_deref() + .expect("reason string") + .contains("indexer stale")); + } + + #[test] + fn readiness_returns_ready_for_fresh_indexer_state() { + let (status, body) = readiness_status(Some(Utc::now()), Utc::now()); + assert_eq!(status, StatusCode::OK); + assert_eq!(body.status, "ready"); + assert!(body.reason.is_none()); + } +} diff --git a/backend/crates/atlas-server/src/api/handlers/metrics.rs b/backend/crates/atlas-server/src/api/handlers/metrics.rs new file mode 100644 index 0000000..ec459ce --- /dev/null +++ b/backend/crates/atlas-server/src/api/handlers/metrics.rs @@ -0,0 +1,65 @@ +use axum::extract::State; +use std::sync::Arc; + +use crate::api::AppState; + +/// GET /metrics — Prometheus text format +pub async fn metrics(State(state): State>) -> String { + state.prometheus_handle.render() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::head::HeadTracker; + use crate::metrics::Metrics; + use sqlx::postgres::PgPoolOptions; + use std::sync::OnceLock; + use tokio::sync::broadcast; + + fn test_prometheus_handle() -> metrics_exporter_prometheus::PrometheusHandle { + static PROMETHEUS_HANDLE: OnceLock = + OnceLock::new(); + + PROMETHEUS_HANDLE + .get_or_init(crate::metrics::install_prometheus_recorder) + .clone() + } + + #[tokio::test] + async fn metrics_handler_renders_prometheus_output() { + let pool = PgPoolOptions::new() + .connect_lazy("postgres://test@localhost:5432/test") + .expect("lazy pool"); + let (block_tx, _) = broadcast::channel(1); + let (da_tx, _) = broadcast::channel(1); + let prometheus_handle = test_prometheus_handle(); + let recorder_metrics = Metrics::new(); + recorder_metrics.set_indexer_head_block(42); + let state = Arc::new(AppState { + pool, + block_events_tx: block_tx, + da_events_tx: da_tx, + head_tracker: Arc::new(HeadTracker::empty(10)), + rpc_url: String::new(), + da_tracking_enabled: false, + faucet: None, + chain_id: 1, + chain_name: "Test Chain".to_string(), + chain_logo_url: None, + chain_logo_url_light: None, + chain_logo_url_dark: None, + accent_color: None, + background_color_dark: None, + background_color_light: None, + success_color: None, + error_color: None, + metrics: recorder_metrics, + prometheus_handle, + }); + + let body = super::metrics(State(state)).await; + + assert!(body.contains("atlas_indexer_head_block")); + } +} diff --git a/backend/crates/atlas-server/src/api/handlers/mod.rs b/backend/crates/atlas-server/src/api/handlers/mod.rs index f804941..68ff301 100644 --- a/backend/crates/atlas-server/src/api/handlers/mod.rs +++ b/backend/crates/atlas-server/src/api/handlers/mod.rs @@ -3,7 +3,9 @@ pub mod blocks; pub mod config; pub mod etherscan; pub mod faucet; +pub mod health; pub mod logs; +pub mod metrics; pub mod nfts; pub mod proxy; pub mod search; diff --git a/backend/crates/atlas-server/src/api/handlers/sse.rs b/backend/crates/atlas-server/src/api/handlers/sse.rs index 16c60ae..6e8ddf2 100644 --- a/backend/crates/atlas-server/src/api/handlers/sse.rs +++ b/backend/crates/atlas-server/src/api/handlers/sse.rs @@ -13,6 +13,7 @@ use crate::api::handlers::get_latest_block; use crate::api::AppState; use crate::head::HeadTracker; use crate::indexer::DaSseUpdate; +use crate::metrics::{Metrics, SseConnectionGuard}; use atlas_common::Block; use sqlx::PgPool; use tracing::warn; @@ -45,8 +46,11 @@ fn make_event_stream( head_tracker: Arc, mut block_rx: broadcast::Receiver<()>, mut da_rx: broadcast::Receiver>, + metrics: Option, ) -> impl Stream> + Send { async_stream::stream! { + // Guard decrements the SSE connection gauge when the stream is dropped + let _guard = metrics.map(SseConnectionGuard::new); let mut last_block_number: Option = None; match head_tracker.latest().await { @@ -147,6 +151,7 @@ pub async fn block_events( state.head_tracker.clone(), state.block_events_tx.subscribe(), state.da_events_tx.subscribe(), + Some(state.metrics.clone()), ); Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() @@ -281,7 +286,13 @@ mod tests { let (tx, _) = broadcast::channel::<()>(16); let (da_tx, _) = broadcast::channel::>(16); - let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe()); + let stream = make_event_stream( + dummy_pool(), + tracker, + tx.subscribe(), + da_tx.subscribe(), + None, + ); tokio::pin!(stream); // Drop sender so loop terminates after the initial seed. @@ -313,6 +324,7 @@ mod tests { tracker.clone(), tx.subscribe(), da_tx.subscribe(), + None, ); tokio::pin!(stream); @@ -353,6 +365,7 @@ mod tests { tracker.clone(), tx.subscribe(), da_tx.subscribe(), + None, ); tokio::pin!(stream); @@ -397,7 +410,13 @@ mod tests { let (tx, _) = broadcast::channel::<()>(16); let (da_tx, _) = broadcast::channel::>(1); - let stream = make_event_stream(dummy_pool(), tracker, tx.subscribe(), da_tx.subscribe()); + let stream = make_event_stream( + dummy_pool(), + tracker, + tx.subscribe(), + da_tx.subscribe(), + None, + ); tokio::pin!(stream); let _ = tokio::time::timeout(Duration::from_secs(1), stream.next()) diff --git a/backend/crates/atlas-server/src/api/handlers/status.rs b/backend/crates/atlas-server/src/api/handlers/status.rs index 92baa41..72707e6 100644 --- a/backend/crates/atlas-server/src/api/handlers/status.rs +++ b/backend/crates/atlas-server/src/api/handlers/status.rs @@ -28,9 +28,11 @@ pub struct ChainStatus { pub indexed_at: String, } -async fn latest_height_and_indexed_at(state: &AppState) -> Result<(i64, String), sqlx::Error> { +pub(super) async fn latest_indexed_block( + state: &AppState, +) -> Result)>, sqlx::Error> { if let Some(block) = state.head_tracker.latest().await { - return Ok((block.number, block.indexed_at.to_rfc3339())); + return Ok(Some((block.number, block.indexed_at))); } // Fallback: single key-value lookup from indexer_state (sub-ms, avoids blocks table) @@ -40,8 +42,12 @@ async fn latest_height_and_indexed_at(state: &AppState) -> Result<(i64, String), .fetch_optional(&state.pool) .await?; - if let Some((block_height, updated_at)) = row { - return Ok((block_height, updated_at.to_rfc3339())); + Ok(row) +} + +async fn latest_height_and_indexed_at(state: &AppState) -> Result<(i64, String), sqlx::Error> { + if let Some((block_height, indexed_at)) = latest_indexed_block(state).await? { + return Ok((block_height, indexed_at.to_rfc3339())); } Ok((0, String::new())) @@ -104,6 +110,9 @@ mod tests { let pool = sqlx::postgres::PgPoolOptions::new() .connect_lazy("postgres://test@localhost:5432/test") .expect("lazy pool"); + let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle(); State(Arc::new(AppState { pool, block_events_tx: block_tx, @@ -122,6 +131,8 @@ mod tests { background_color_light: None, success_color: None, error_color: None, + metrics: crate::metrics::Metrics::new(), + prometheus_handle, })) } diff --git a/backend/crates/atlas-server/src/api/mod.rs b/backend/crates/atlas-server/src/api/mod.rs index 559ba18..465fc7b 100644 --- a/backend/crates/atlas-server/src/api/mod.rs +++ b/backend/crates/atlas-server/src/api/mod.rs @@ -1,7 +1,8 @@ pub mod error; pub mod handlers; -use axum::{routing::get, Router}; +use axum::{middleware, routing::get, Router}; +use metrics_exporter_prometheus::PrometheusHandle; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; @@ -13,6 +14,7 @@ use tower_http::trace::TraceLayer; use crate::faucet::SharedFaucetBackend; use crate::head::HeadTracker; use crate::indexer::DaSseUpdate; +use crate::metrics::Metrics; pub struct AppState { pub pool: PgPool, @@ -32,6 +34,8 @@ pub struct AppState { pub background_color_light: Option, pub success_color: Option, pub error_color: Option, + pub metrics: Metrics, + pub prometheus_handle: PrometheusHandle, } /// Build the Axum router. @@ -172,8 +176,12 @@ pub fn build_router(state: Arc, cors_origin: Option) -> Router .route("/api/status", get(handlers::status::get_status)) // Config (white-label branding) .route("/api/config", get(handlers::config::get_config)) + // Metrics + .route("/metrics", get(handlers::metrics::metrics)) // Health - .route("/health", get(|| async { "OK" })); + .route("/health", get(|| async { "OK" })) + .route("/health/live", get(handlers::health::liveness)) + .route("/health/ready", get(handlers::health::readiness)); if state.faucet.is_some() { router = router @@ -189,7 +197,9 @@ pub fn build_router(state: Arc, cors_origin: Option) -> Router axum::http::StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10), )) - // Merge SSE routes (no TimeoutLayer so connections stay alive) + // HTTP metrics middleware — placed after routing so MatchedPath is available + .layer(middleware::from_fn(crate::metrics::http_metrics_middleware)) + // Merge SSE routes without TimeoutLayer so connections stay alive .merge(sse_routes) // Shared layers applied to all routes .layer(build_cors_layer(cors_origin)) @@ -264,6 +274,9 @@ mod tests { let head_tracker = Arc::new(crate::head::HeadTracker::empty(10)); let (tx, _) = broadcast::channel(1); let (da_tx, _) = broadcast::channel(1); + let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle(); Arc::new(AppState { pool, block_events_tx: tx, @@ -282,6 +295,8 @@ mod tests { background_color_light: None, success_color: None, error_color: None, + metrics: Metrics::new(), + prometheus_handle, }) } diff --git a/backend/crates/atlas-server/src/cli.rs b/backend/crates/atlas-server/src/cli.rs index 5dbdc83..d4464c9 100644 --- a/backend/crates/atlas-server/src/cli.rs +++ b/backend/crates/atlas-server/src/cli.rs @@ -385,6 +385,15 @@ pub struct LogArgs { help = "Log filter directive (e.g. info, atlas_server=debug)" )] pub level: String, + + #[arg( + long = "atlas.log.format", + env = "LOG_FORMAT", + default_value = "text", + value_name = "FORMAT", + help = "Log output format: text or json" + )] + pub format: String, } // ── db subcommand ───────────────────────────────────────────────────────────── diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index bacb9f9..b36ec4f 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -490,6 +490,7 @@ mod tests_from_run_args { }, log: cli::LogArgs { level: "info".to_string(), + format: "text".to_string(), }, } } diff --git a/backend/crates/atlas-server/src/indexer/batch.rs b/backend/crates/atlas-server/src/indexer/batch.rs index 4510710..713501e 100644 --- a/backend/crates/atlas-server/src/indexer/batch.rs +++ b/backend/crates/atlas-server/src/indexer/batch.rs @@ -192,6 +192,10 @@ impl BlockBatch { }) .collect() } + + pub(crate) fn last_block_timestamp(&self) -> Option { + self.b_timestamps.last().copied() + } } #[cfg(test)] @@ -305,4 +309,13 @@ mod tests { assert_eq!(blocks[0].transaction_count, 3); assert_eq!(blocks[0].indexed_at, indexed_at); } + + #[test] + fn last_block_timestamp_returns_latest_collected_timestamp() { + let mut batch = BlockBatch::new(); + batch.b_timestamps.push(1_700_000_001); + batch.b_timestamps.push(1_700_000_042); + + assert_eq!(batch.last_block_timestamp(), Some(1_700_000_042)); + } } diff --git a/backend/crates/atlas-server/src/indexer/da_worker.rs b/backend/crates/atlas-server/src/indexer/da_worker.rs index 97c40cf..dd34013 100644 --- a/backend/crates/atlas-server/src/indexer/da_worker.rs +++ b/backend/crates/atlas-server/src/indexer/da_worker.rs @@ -24,6 +24,7 @@ use std::time::Duration; use tokio::sync::broadcast; use super::evnode::EvnodeClient; +use crate::metrics::Metrics; /// Maximum blocks processed per cycle. const BATCH_SIZE: i64 = 100; @@ -72,6 +73,7 @@ pub struct DaWorker { >, >, da_events_tx: broadcast::Sender>, + metrics: Metrics, } impl DaWorker { @@ -81,6 +83,7 @@ impl DaWorker { concurrency: u32, requests_per_second: u32, da_events_tx: broadcast::Sender>, + metrics: Metrics, ) -> Result { let concurrency = NonZeroU32::new(concurrency) .ok_or_else(|| anyhow::anyhow!("DA_WORKER_CONCURRENCY must be greater than 0"))?; @@ -93,20 +96,22 @@ impl DaWorker { requests_per_second, rate_limiter: Arc::new(RateLimiter::direct(Quota::per_second(rate))), da_events_tx, + metrics, }) } pub async fn run(&self) -> Result<()> { tracing::info!( - "DA worker started (concurrency: {}, rate_limit: {} req/s)", - self.concurrency, - self.requests_per_second + concurrency = self.concurrency, + rate_limit_rps = self.requests_per_second, + "DA worker started" ); loop { let processed = self.process_blocks(BATCH_SIZE).await?; if processed > 0 { - tracing::info!("DA worker cycle: processed {}", processed); + self.metrics.record_da_blocks_processed(processed as u64); + tracing::info!(processed, "DA worker cycle complete"); } else { tokio::time::sleep(IDLE_SLEEP).await; } @@ -136,6 +141,7 @@ impl DaWorker { let pool = &self.pool; let client = &self.client; let rate_limiter = &self.rate_limiter; + let metrics = &self.metrics; let results: Vec> = stream::iter(blocks) .map(|(block_number,)| async move { @@ -156,20 +162,23 @@ impl DaWorker { }), Ok(_) => None, Err(e) => { + metrics.error("da_worker", "da_upsert"); tracing::warn!( - "Failed to upsert DA status for block {}: {}", - block_number, - e + block = block_number, + error = %e, + "failed to upsert DA status" ); None } } } Err(e) => { + metrics.record_da_rpc_error(); + metrics.error("da_worker", "da_fetch"); tracing::warn!( - "Failed to fetch DA status for block {}: {}", - block_number, - e + block = block_number, + error = %e, + "failed to fetch DA status" ); None } @@ -199,9 +208,16 @@ mod tests { #[tokio::test] async fn new_rejects_zero_concurrency() { let (tx, _) = broadcast::channel(1); - let err = DaWorker::new(test_pool(), "http://localhost:7331", 0, 50, tx) - .err() - .expect("zero concurrency should fail"); + let err = DaWorker::new( + test_pool(), + "http://localhost:7331", + 0, + 50, + tx, + Metrics::new(), + ) + .err() + .expect("zero concurrency should fail"); assert!(err .to_string() @@ -211,9 +227,16 @@ mod tests { #[tokio::test] async fn new_rejects_zero_rate_limit() { let (tx, _) = broadcast::channel(1); - let err = DaWorker::new(test_pool(), "http://localhost:7331", 4, 0, tx) - .err() - .expect("zero rate limit should fail"); + let err = DaWorker::new( + test_pool(), + "http://localhost:7331", + 4, + 0, + tx, + Metrics::new(), + ) + .err() + .expect("zero rate limit should fail"); assert!(err .to_string() @@ -223,7 +246,15 @@ mod tests { #[tokio::test] async fn notify_da_updates_sends_full_batch() { let (tx, mut rx) = broadcast::channel(1); - let worker = DaWorker::new(test_pool(), "http://localhost:7331", 4, 50, tx).unwrap(); + let worker = DaWorker::new( + test_pool(), + "http://localhost:7331", + 4, + 50, + tx, + Metrics::new(), + ) + .unwrap(); let updates = vec![ DaSseUpdate { block_number: 10, @@ -248,7 +279,15 @@ mod tests { #[tokio::test] async fn notify_da_updates_skips_empty_batch() { let (tx, mut rx) = broadcast::channel(1); - let worker = DaWorker::new(test_pool(), "http://localhost:7331", 4, 50, tx).unwrap(); + let worker = DaWorker::new( + test_pool(), + "http://localhost:7331", + 4, + 50, + tx, + Metrics::new(), + ) + .unwrap(); worker.notify_da_updates(&[]); diff --git a/backend/crates/atlas-server/src/indexer/evnode.rs b/backend/crates/atlas-server/src/indexer/evnode.rs index fb8d51d..cca741b 100644 --- a/backend/crates/atlas-server/src/indexer/evnode.rs +++ b/backend/crates/atlas-server/src/indexer/evnode.rs @@ -100,11 +100,12 @@ impl EvnodeClient { .unwrap_or(*RETRY_DELAYS_MS.last().unwrap()); tracing::warn!( - "ev-node GetBlock failed for height {} (attempt {}): {}. Retrying in {}ms", height, - attempt + 1, - last_error.as_ref().unwrap(), - delay_ms, + attempt = attempt + 1, + max_retries = MAX_RETRIES, + error = %last_error.as_ref().unwrap(), + retry_in_ms = delay_ms, + "ev-node GetBlock failed" ); tokio::time::sleep(Duration::from_millis(delay_ms)).await; diff --git a/backend/crates/atlas-server/src/indexer/fetcher.rs b/backend/crates/atlas-server/src/indexer/fetcher.rs index b7b8f26..dbcc464 100644 --- a/backend/crates/atlas-server/src/indexer/fetcher.rs +++ b/backend/crates/atlas-server/src/indexer/fetcher.rs @@ -7,6 +7,8 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; +use crate::metrics::Metrics; + /// Retry delays for RPC calls (in seconds) const RPC_RETRY_DELAYS: &[u64] = &[2, 5, 10, 20, 30]; const RPC_MAX_RETRIES: usize = 10; @@ -46,11 +48,12 @@ pub(crate) async fn fetch_blocks_batch( start_block: u64, count: usize, rate_limiter: &SharedRateLimiter, + metrics: &Metrics, ) -> Vec { tracing::debug!( - "Fetching batch: blocks {} to {}", start_block, - start_block + count as u64 - 1 + end_block = start_block + count as u64 - 1, + "fetching batch" ); // Wait for rate limiter - we're making 2*count RPC calls in one HTTP request @@ -95,12 +98,14 @@ pub(crate) async fn fetch_blocks_batch( .copied() .unwrap_or(*RPC_RETRY_DELAYS.last().unwrap_or(&30)); + metrics.record_rpc_request("error"); + metrics.error("rpc", "rpc_request"); tracing::warn!( - "RPC batch request failed (attempt {}/{}): {}. Retrying in {}s...", - attempt + 1, - RPC_MAX_RETRIES, - e, - delay + attempt = attempt + 1, + max_retries = RPC_MAX_RETRIES, + error = %e, + retry_in_secs = delay, + "RPC batch request failed" ); last_error = Some(format!("HTTP request failed: {}", e)); @@ -114,12 +119,13 @@ pub(crate) async fn fetch_blocks_batch( Ok(resp) => { if attempt > 0 { tracing::info!( - "RPC batch request succeeded after {} retries (blocks {} to {})", - attempt, + retries = attempt, start_block, - start_block + count as u64 - 1 + end_block = start_block + count as u64 - 1, + "RPC batch request recovered" ); } + metrics.record_rpc_request("success"); batch_response = Some(resp); break; } @@ -129,12 +135,13 @@ pub(crate) async fn fetch_blocks_batch( .copied() .unwrap_or(*RPC_RETRY_DELAYS.last().unwrap_or(&30)); + metrics.error("rpc", "rpc_parse"); tracing::warn!( - "Failed to parse RPC response (attempt {}/{}): {}. Retrying in {}s...", - attempt + 1, - RPC_MAX_RETRIES, - e, - delay + attempt = attempt + 1, + max_retries = RPC_MAX_RETRIES, + error = %e, + retry_in_secs = delay, + "failed to parse RPC response" ); last_error = Some(format!("Failed to parse response: {}", e)); @@ -213,7 +220,11 @@ pub(crate) async fn fetch_blocks_batch( // Combine block + receipts into a single result match (block_result, receipts_result) { (Ok(block), Ok(receipts)) => { - tracing::debug!("Block {} complete ({} receipts)", block_num, receipts.len()); + tracing::debug!( + block = block_num, + receipts = receipts.len(), + "block complete" + ); results.push(FetchResult::Success(Box::new(FetchedBlock { number: block_num, block, @@ -221,14 +232,14 @@ pub(crate) async fn fetch_blocks_batch( }))); } (Err(e), _) => { - tracing::warn!("Failed to fetch block {}: {}", block_num, e); + tracing::warn!(block = block_num, error = %e, "failed to fetch block"); results.push(FetchResult::Error { block_num, error: e, }); } (_, Err(e)) => { - tracing::warn!("Failed to fetch receipts for block {}: {}", block_num, e); + tracing::warn!(block = block_num, error = %e, "failed to fetch receipts"); results.push(FetchResult::Error { block_num, error: e, @@ -241,14 +252,17 @@ pub(crate) async fn fetch_blocks_batch( } /// Get block number with internal retry logic for network failures -pub(crate) async fn get_block_number_with_retry(provider: &HttpProvider) -> Result { +pub(crate) async fn get_block_number_with_retry( + provider: &HttpProvider, + metrics: &Metrics, +) -> Result { let mut last_error = None; for attempt in 0..RPC_MAX_RETRIES { match provider.get_block_number().await { Ok(block_num) => { if attempt > 0 { - tracing::info!("RPC connection restored after {} retries", attempt); + tracing::info!(retries = attempt, "RPC connection restored"); } return Ok(block_num); } @@ -258,12 +272,13 @@ pub(crate) async fn get_block_number_with_retry(provider: &HttpProvider) -> Resu .copied() .unwrap_or(*RPC_RETRY_DELAYS.last().unwrap_or(&30)); + metrics.error("rpc", "rpc_request"); tracing::warn!( - "RPC request failed (attempt {}/{}): {}. Retrying in {}s...", - attempt + 1, - RPC_MAX_RETRIES, - e, - delay + attempt = attempt + 1, + max_retries = RPC_MAX_RETRIES, + error = %e, + retry_in_secs = delay, + "RPC request failed" ); last_error = Some(e); diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index a3704c2..5656e48 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -25,6 +25,7 @@ use super::fetcher::{ }; use crate::config::Config; use crate::head::HeadTracker; +use crate::metrics::Metrics; use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; /// Partition size: 10 million blocks per partition @@ -45,6 +46,7 @@ pub struct Indexer { block_events_tx: broadcast::Sender<()>, /// Shared in-memory tracker for the latest committed head and replay tail head_tracker: Arc, + metrics: Metrics, } impl Indexer { @@ -53,6 +55,7 @@ impl Indexer { config: Config, block_events_tx: broadcast::Sender<()>, head_tracker: Arc, + metrics: Metrics, ) -> Self { Self { pool, @@ -61,6 +64,7 @@ impl Indexer { current_max_partition: std::sync::atomic::AtomicU64::new(0), block_events_tx, head_tracker, + metrics, } } @@ -81,7 +85,7 @@ impl Indexer { let (client, connection) = tokio_postgres::connect(database_url, tls).await?; tokio::spawn(async move { if let Err(e) = connection.await { - tracing::error!("copy connection error: {}", e); + tracing::error!(error = %e, "copy connection error"); } }); Ok(client) @@ -89,7 +93,7 @@ impl Indexer { let (client, connection) = tokio_postgres::connect(database_url, NoTls).await?; tokio::spawn(async move { if let Err(e) = connection.await { - tracing::error!("copy connection error: {}", e); + tracing::error!(error = %e, "copy connection error"); } }); Ok(client) @@ -108,11 +112,11 @@ impl Indexer { let rps = NonZeroU32::new(self.config.rpc_requests_per_second) .unwrap_or(NonZeroU32::new(100).unwrap()); let rate_limiter: SharedRateLimiter = Arc::new(RateLimiter::direct(Quota::per_second(rps))); - tracing::info!("Rate limiting RPC requests to {} req/sec", rps); + tracing::info!(rps = %rps, "rate limiting RPC requests"); // Handle reindex flag if self.config.reindex { - tracing::warn!("Reindex flag set - truncating all tables"); + tracing::warn!("reindex flag set, truncating all tables"); self.head_tracker.clear().await; self.truncate_tables().await?; } @@ -125,20 +129,34 @@ impl Indexer { if erc20_supply_history_status.is_none() && start_block == 0 { self.set_erc20_supply_history_complete(false).await?; } - tracing::info!("Starting indexing from block {}", start_block); + tracing::info!(start_block, "starting indexing"); + + let latest_indexed_block = self.head_tracker.latest().await; + let mut indexed_head = latest_indexed_block + .as_ref() + .map(|block| block.number as u64); + if let Some(block) = latest_indexed_block.as_ref() { + self.metrics.set_indexer_head_block(block.number as u64); + self.metrics + .set_indexer_head_block_timestamp(block.timestamp); + } + + let mut known_missing_blocks = self.get_missing_block_count().await?; + self.metrics + .set_indexer_missing_blocks(known_missing_blocks); // Load known contracts into memory to avoid a SELECT per transfer let mut known_erc20: HashSet = self.load_known_erc20().await?; - tracing::info!("Loaded {} known ERC-20 contracts", known_erc20.len()); + tracing::info!(count = known_erc20.len(), "loaded known ERC-20 contracts"); let mut known_nft: HashSet = self.load_known_nft().await?; - tracing::info!("Loaded {} known NFT contracts", known_nft.len()); + tracing::info!(count = known_nft.len(), "loaded known NFT contracts"); let num_workers = self.config.fetch_workers as usize; let rpc_batch_size = self.config.rpc_batch_size as usize; tracing::info!( - "Starting {} fetch workers with {} blocks per RPC batch", - num_workers, - rpc_batch_size + workers = num_workers, + rpc_batch_size, + "starting fetch workers" ); // Channels for work distribution and results @@ -159,9 +177,10 @@ impl Indexer { let limiter = Arc::clone(&rate_limiter); let client = http_client.clone(); let url = rpc_url.clone(); + let worker_metrics = self.metrics.clone(); tokio::spawn(async move { - tracing::debug!("Worker {} started", worker_id); + tracing::debug!(worker_id, "worker started"); while let Ok(work_item) = work_rx.recv().await { // Fetch batch of blocks using JSON-RPC batching let results = fetch_blocks_batch( @@ -170,6 +189,7 @@ impl Indexer { work_item.start_block, work_item.count, &limiter, + &worker_metrics, ) .await; @@ -180,7 +200,7 @@ impl Indexer { } } } - tracing::debug!("Worker {} shutting down", worker_id); + tracing::debug!(worker_id, "worker shutting down"); }); } @@ -193,7 +213,7 @@ impl Indexer { loop { // Get chain head with retry - let head = match get_block_number_with_retry(&provider).await { + let head = match get_block_number_with_retry(&provider, &self.metrics).await { Ok(h) => h, Err(e) => { // This should only happen after all retries exhausted (very unlikely) @@ -201,7 +221,10 @@ impl Indexer { return Err(e); } }; - tracing::debug!("Chain head: {}, current: {}", head, current_block); + self.metrics.set_chain_head_block(head); + self.metrics + .set_indexer_lag_blocks(lag_blocks(head, indexed_head, start_block)); + tracing::debug!(chain_head = head, current = current_block, "chain head"); if current_block > head { if erc20_supply_backfill_pending { @@ -213,14 +236,16 @@ impl Indexer { continue; } + let processing_start = std::time::Instant::now(); + // Calculate batch end let end_block = (current_block + self.config.batch_size - 1).min(head); let batch_size = (end_block - current_block + 1) as usize; tracing::debug!( - "Fetching batch: {} to {} ({} blocks)", - current_block, - end_block, - batch_size + start = current_block, + end = end_block, + blocks = batch_size, + "fetching batch" ); // Ensure partitions exist for this batch range @@ -243,9 +268,9 @@ impl Indexer { block += count as u64; } tracing::debug!( - "Sent {} blocks to workers in batches of {}", - batch_size, - blocks_per_batch + blocks = batch_size, + batch_size = blocks_per_batch, + "sent blocks to workers" ); }); @@ -271,7 +296,7 @@ impl Indexer { } } Some(FetchResult::Error { block_num, error }) => { - tracing::warn!("Block {} failed to fetch: {}", block_num, error); + tracing::warn!(block = block_num, error = %error, "block failed to fetch"); failed_blocks.push((block_num, error)); blocks_received += 1; // Skip this block for now, continue with others @@ -298,6 +323,8 @@ impl Indexer { // so this is safe even if the DB write is slow. If write_batch fails // the indexer retries the same blocks and head_tracker ignores // non-advancing publishes. + let head_block_timestamp = batch.last_block_timestamp(); + let actual_head_block = batch.last_block; let committed_blocks = batch.materialize_blocks(Utc::now()); self.head_tracker .publish_committed_batch(committed_blocks) @@ -305,7 +332,12 @@ impl Indexer { let _ = self.block_events_tx.send(()); // One DB transaction for the entire batch + let db_write_start = std::time::Instant::now(); self.write_batch(&mut copy_client, batch, true).await?; + self.metrics + .record_db_write_duration(db_write_start.elapsed().as_secs_f64()); + self.metrics + .record_block_processing_duration(processing_start.elapsed().as_secs_f64()); // Write succeeded — now safe to update the persistent in-memory sets known_erc20.extend(new_erc20); @@ -318,9 +350,9 @@ impl Indexer { if !failed_blocks.is_empty() { let block_nums: Vec = failed_blocks.iter().map(|(n, _)| *n).collect(); tracing::warn!( - "Retrying {} failed blocks: {:?}", - failed_blocks.len(), - block_nums + count = failed_blocks.len(), + blocks = ?block_nums, + "retrying failed blocks" ); // Retry up to 3 times with increasing delay @@ -331,19 +363,25 @@ impl Indexer { let delay = Duration::from_secs(attempt * 2); // 2s, 4s, 6s tracing::info!( - "Retry attempt {} for {} blocks (waiting {:?})", attempt, - failed_blocks.len(), - delay + blocks = failed_blocks.len(), + delay_secs = delay.as_secs(), + "retry attempt for failed blocks" ); tokio::time::sleep(delay).await; let mut still_failed = Vec::new(); for (block_num, last_error) in failed_blocks { // Fetch single block - let results = - fetch_blocks_batch(&http_client, &rpc_url, block_num, 1, &rate_limiter) - .await; + let results = fetch_blocks_batch( + &http_client, + &rpc_url, + block_num, + 1, + &rate_limiter, + &self.metrics, + ) + .await; match results.into_iter().next() { Some(FetchResult::Success(fetched)) => { @@ -364,7 +402,7 @@ impl Indexer { .await?; known_erc20.extend(new_erc20); known_nft.extend(new_nft); - tracing::info!("Block {} retry succeeded", block_num); + tracing::info!(block = block_num, "block retry succeeded"); } Some(FetchResult::Error { error, .. }) => { still_failed.push((block_num, error)); @@ -379,10 +417,13 @@ impl Indexer { // Store any remaining failures in failed_blocks table if !failed_blocks.is_empty() { + self.metrics + .record_failed_blocks(failed_blocks.len() as u64); + self.metrics.error("indexer", "block_fetch"); tracing::error!( - "Storing {} blocks in failed_blocks table after 3 retries: {:?}", - failed_blocks.len(), - failed_blocks.iter().map(|(n, _)| n).collect::>() + count = failed_blocks.len(), + blocks = ?failed_blocks.iter().map(|(n, _)| n).collect::>(), + "storing blocks in failed_blocks table after 3 retries" ); for (block_num, error) in &failed_blocks { @@ -399,31 +440,51 @@ impl Indexer { .execute(&self.pool) .await?; } + + known_missing_blocks += failed_blocks.len() as u64; + self.metrics + .set_indexer_missing_blocks(known_missing_blocks); } } current_block = end_block + 1; + indexed_head = Some(actual_head_block); if erc20_supply_backfill_pending && current_block > head { self.set_erc20_supply_history_complete(true).await?; erc20_supply_backfill_pending = false; } - // Log progress after every batch + // Record metrics and log progress + self.metrics.record_blocks_indexed(batch_size as u64); + self.metrics.set_indexer_head_block(actual_head_block); + if let Some(timestamp) = head_block_timestamp { + self.metrics.set_indexer_head_block_timestamp(timestamp); + } + self.metrics + .set_indexer_lag_blocks(lag_blocks(head, indexed_head, start_block)); + + // Full cycle timing includes time between batches such as head sleep. let elapsed = last_log_time.elapsed(); + self.metrics.record_batch_duration(elapsed.as_secs_f64()); let blocks_per_sec = batch_size as f64 / elapsed.as_secs_f64(); let progress = (end_block as f64 / head as f64) * 100.0; tracing::info!( - "Batch complete: {} to {} ({} blocks in {:.2}s = {:.1} blocks/sec) | Progress: {:.2}%", - end_block - batch_size as u64 + 1, end_block, batch_size, elapsed.as_secs_f64(), blocks_per_sec, progress + start_block = end_block - batch_size as u64 + 1, + end_block, + blocks = batch_size, + elapsed_secs = format_args!("{:.2}", elapsed.as_secs_f64()), + blocks_per_sec = format_args!("{:.1}", blocks_per_sec), + progress_pct = format_args!("{:.2}", progress), + "batch complete" ); last_log_time = std::time::Instant::now(); // If we hit the head (batch smaller than configured), sleep to avoid tight loop if (batch_size as u64) < self.config.batch_size { - tracing::debug!("At chain head, sleeping for 1s"); + tracing::debug!("at chain head, sleeping"); tokio::time::sleep(Duration::from_secs(1)).await; } } @@ -890,6 +951,13 @@ impl Indexer { } } + async fn get_missing_block_count(&self) -> Result { + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM failed_blocks") + .fetch_one(&self.pool) + .await?; + Ok(count.0.max(0) as u64) + } + async fn ensure_partitions_exist(&self, block_number: u64) -> Result<()> { use std::sync::atomic::Ordering; @@ -933,10 +1001,10 @@ impl Indexer { let partition_end = partition_start + PARTITION_SIZE; tracing::info!( - "Creating partitions for block range {} to {} (p{})", - partition_start, - partition_end, - p + partition = p, + range_start = partition_start, + range_end = partition_end, + "creating partitions" ); // Create partitions for all partitioned tables @@ -961,17 +1029,18 @@ impl Indexer { // Update our tracked max self.current_max_partition .store(partition_num, Ordering::Relaxed); - tracing::info!("Partitions up to p{} ready", partition_num); + tracing::info!(max_partition = partition_num, "partitions ready"); Ok(()) } async fn truncate_tables(&self) -> Result<()> { sqlx::query( "TRUNCATE blocks, transactions, addresses, nft_contracts, nft_tokens, nft_transfers, - erc20_contracts, erc20_transfers, erc20_balances, event_logs, proxy_contracts, indexer_state CASCADE" + erc20_contracts, erc20_transfers, erc20_balances, event_logs, proxy_contracts, + indexer_state, failed_blocks CASCADE", ) - .execute(&self.pool) - .await?; + .execute(&self.pool) + .await?; Ok(()) } @@ -1000,6 +1069,14 @@ impl Indexer { } } +fn lag_blocks(chain_head: u64, indexed_head: Option, start_block: u64) -> u64 { + match indexed_head { + Some(indexed_head) => chain_head.saturating_sub(indexed_head), + None if chain_head < start_block => 0, + None => chain_head - start_block + 1, + } +} + #[cfg(test)] mod tests { use super::*; @@ -1442,4 +1519,21 @@ mod tests { let state = &batch.nft_token_map[&(contract, "42".to_string())]; assert_eq!(state.owner, "0x3333333333333333333333333333333333333333"); } + + #[test] + fn lag_blocks_uses_indexed_head_when_available() { + assert_eq!(lag_blocks(100, Some(90), 0), 10); + } + + #[test] + fn lag_blocks_counts_from_start_block_when_no_head_is_indexed() { + assert_eq!(lag_blocks(100, None, 95), 6); + assert_eq!(lag_blocks(100, None, 0), 101); + } + + #[test] + fn lag_blocks_clamps_to_zero_when_chain_head_is_before_start_block() { + assert_eq!(lag_blocks(50, None, 100), 0); + assert_eq!(lag_blocks(50, Some(60), 0), 0); + } } diff --git a/backend/crates/atlas-server/src/indexer/metadata.rs b/backend/crates/atlas-server/src/indexer/metadata.rs index 8d604ba..f015922 100644 --- a/backend/crates/atlas-server/src/indexer/metadata.rs +++ b/backend/crates/atlas-server/src/indexer/metadata.rs @@ -9,6 +9,7 @@ use sqlx::PgPool; use std::{str::FromStr, sync::Arc, time::Duration}; use crate::config::Config; +use crate::metrics::Metrics; // ERC-721 interface sol! { @@ -38,10 +39,11 @@ pub struct MetadataFetcher { config: Config, client: reqwest::Client, provider: Arc, + metrics: Metrics, } impl MetadataFetcher { - pub fn new(pool: PgPool, config: Config) -> Result { + pub fn new(pool: PgPool, config: Config, metrics: Metrics) -> Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build() @@ -54,13 +56,14 @@ impl MetadataFetcher { config, client, provider, + metrics, }) } pub async fn run(&self) -> Result<()> { tracing::info!( - "Starting metadata fetcher with {} workers", - self.config.metadata_fetch_workers + workers = self.config.metadata_fetch_workers, + "starting metadata fetcher" ); loop { @@ -95,27 +98,35 @@ impl MetadataFetcher { return Ok(false); } - tracing::debug!("Fetching metadata for {} NFT contracts", contracts.len()); + tracing::debug!(count = contracts.len(), "fetching NFT contract metadata"); let mut handles = Vec::new(); for (address,) in contracts { let pool = self.pool.clone(); let provider = self.provider.clone(); + let m = self.metrics.clone(); handles.push(tokio::spawn(async move { - if let Err(e) = fetch_nft_contract_metadata(&pool, &provider, &address).await { - tracing::debug!( - "Failed to fetch NFT contract metadata for {}: {}", - address, - e - ); - // Mark as fetched to avoid infinite retries - let _ = sqlx::query( - "UPDATE nft_contracts SET metadata_fetched = true WHERE address = $1", - ) - .bind(&address) - .execute(&pool) - .await; + match fetch_nft_contract_metadata(&pool, &provider, &address).await { + Ok(()) => { + m.record_metadata_contract_fetched("nft"); + } + Err(e) => { + m.record_metadata_error("nft"); + m.error("metadata", "metadata_fetch"); + tracing::debug!( + address = %address, + error = %e, + "failed to fetch NFT contract metadata" + ); + // Mark as fetched to avoid infinite retries + let _ = sqlx::query( + "UPDATE nft_contracts SET metadata_fetched = true WHERE address = $1", + ) + .bind(&address) + .execute(&pool) + .await; + } } })); @@ -146,27 +157,35 @@ impl MetadataFetcher { return Ok(false); } - tracing::debug!("Fetching metadata for {} ERC-20 contracts", contracts.len()); + tracing::debug!(count = contracts.len(), "fetching ERC-20 contract metadata"); let mut handles = Vec::new(); for (address,) in contracts { let pool = self.pool.clone(); let provider = self.provider.clone(); + let m = self.metrics.clone(); handles.push(tokio::spawn(async move { - if let Err(e) = fetch_erc20_contract_metadata(&pool, &provider, &address).await { - tracing::debug!( - "Failed to fetch ERC-20 contract metadata for {}: {}", - address, - e - ); - // Mark as fetched to avoid infinite retries - let _ = sqlx::query( - "UPDATE erc20_contracts SET metadata_fetched = true WHERE address = $1", - ) - .bind(&address) - .execute(&pool) - .await; + match fetch_erc20_contract_metadata(&pool, &provider, &address).await { + Ok(()) => { + m.record_metadata_contract_fetched("erc20"); + } + Err(e) => { + m.record_metadata_error("erc20"); + m.error("metadata", "metadata_fetch"); + tracing::debug!( + address = %address, + error = %e, + "failed to fetch ERC-20 contract metadata" + ); + // Mark as fetched to avoid infinite retries + let _ = sqlx::query( + "UPDATE erc20_contracts SET metadata_fetched = true WHERE address = $1", + ) + .bind(&address) + .execute(&pool) + .await; + } } })); @@ -200,7 +219,7 @@ impl MetadataFetcher { return Ok(false); } - tracing::debug!("Fetching metadata for {} NFT tokens", tokens.len()); + tracing::debug!(count = tokens.len(), "fetching NFT token metadata"); let mut handles = Vec::new(); for (contract_address, token_id, token_uri) in tokens { @@ -209,10 +228,10 @@ impl MetadataFetcher { let provider = self.provider.clone(); let ipfs_gateway = self.config.ipfs_gateway.clone(); let retry_attempts = self.config.metadata_retry_attempts; + let m = self.metrics.clone(); handles.push(tokio::spawn(async move { - // Errors are logged inside fetch_and_store_token_metadata at debug level - let _ = fetch_and_store_token_metadata( + match fetch_and_store_token_metadata( &pool, &client, &provider, @@ -221,7 +240,16 @@ impl MetadataFetcher { token_uri.as_deref(), retry_attempts, ) - .await; + .await + { + Ok(()) => { + m.record_metadata_token_fetched(); + } + Err(_) => { + m.record_metadata_error("token"); + m.error("metadata", "metadata_fetch"); + } + } })); if handles.len() >= self.config.metadata_fetch_workers as usize { @@ -277,7 +305,7 @@ async fn fetch_nft_contract_metadata( .execute(pool) .await?; - tracing::debug!("Fetched NFT contract metadata for {}", contract_address); + tracing::debug!(address = %contract_address, "fetched NFT contract metadata"); Ok(()) } @@ -314,7 +342,7 @@ async fn fetch_erc20_contract_metadata( .execute(pool) .await?; - tracing::debug!("Fetched ERC-20 contract metadata for {}", contract_address); + tracing::debug!(address = %contract_address, "fetched ERC-20 contract metadata"); Ok(()) } @@ -350,10 +378,10 @@ async fn fetch_and_store_token_metadata( } Err(e) => { tracing::debug!( - "Failed to fetch tokenURI for {}:{}: {}", - contract_address, - token_id, - e + contract = %contract_address, + token_id = %token_id, + error = %e, + "failed to fetch tokenURI" ); // Mark as fetched to avoid retrying forever sqlx::query( @@ -414,10 +442,10 @@ async fn fetch_and_store_token_metadata( .await?; tracing::debug!( - "NFT {}:{} has direct image URI ({})", - contract_address, - token_id, - content_type + contract = %contract_address, + token_id = %token_id, + content_type, + "NFT has direct image URI" ); return Ok(()); } @@ -484,10 +512,10 @@ async fn fetch_and_store_token_metadata( // Log at debug level since this is often expected (non-standard NFTs) tracing::debug!( - "Failed to fetch metadata for {}:{}: {}", - contract_address, - token_id, - last_error.as_deref().unwrap_or("Unknown error") + contract = %contract_address, + token_id = %token_id, + error = last_error.as_deref().unwrap_or("Unknown error"), + "failed to fetch token metadata" ); Err(anyhow::anyhow!( diff --git a/backend/crates/atlas-server/src/lib.rs b/backend/crates/atlas-server/src/lib.rs index 9d7d862..717e75e 100644 --- a/backend/crates/atlas-server/src/lib.rs +++ b/backend/crates/atlas-server/src/lib.rs @@ -4,4 +4,5 @@ pub mod config; pub mod faucet; pub mod head; pub mod indexer; +pub mod metrics; pub mod state_keys; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index acea112..8b3f59d 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -14,17 +14,29 @@ mod config; mod faucet; mod head; mod indexer; +mod metrics; mod state_keys; /// Retry delays for exponential backoff (in seconds) const RETRY_DELAYS: &[u64] = &[5, 10, 20, 30, 60]; const MAX_RETRY_DELAY: u64 = 60; -fn init_tracing(filter: &str) { - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(filter)) - .with(tracing_subscriber::fmt::layer()) - .init(); +fn init_tracing(filter: &str, format: &str) { + let env_filter = tracing_subscriber::EnvFilter::new(filter); + match format { + "json" => { + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer().json()) + .init(); + } + _ => { + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .init(); + } + } } fn required_db_url(db_url: &str) -> Result<&str> { @@ -203,7 +215,7 @@ async fn main() -> Result<()> { match cli.command { cli::Command::Run(args) => run(*args).await, cli::Command::Migrate(args) => { - init_tracing(&args.log.level); + init_tracing(&args.log.level, &args.log.format); tracing::info!("Running database migrations"); let database_url = required_db_url(&args.db.url)?; atlas_common::db::run_migrations(database_url).await?; @@ -220,9 +232,13 @@ async fn main() -> Result<()> { } async fn run(args: cli::RunArgs) -> Result<()> { - init_tracing(&args.log.level); + init_tracing(&args.log.level, &args.log.format); tracing::info!("Starting Atlas Server"); + // Install Prometheus metrics recorder + let prometheus_handle = metrics::install_prometheus_recorder(); + let metrics = metrics::Metrics::new(); + let config = config::Config::from_run_args(args.clone())?; let faucet_config = config::FaucetConfig::from_faucet_args(&args.faucet)?; @@ -249,9 +265,9 @@ async fn run(args: cli::RunArgs) -> Result<()> { None }; - tracing::info!("Fetching chain ID from RPC"); + tracing::info!("fetching chain ID from RPC"); let chain_id = fetch_chain_id(&config.rpc_url).await?; - tracing::info!("Chain ID: {}", chain_id); + tracing::info!(chain_id, "chain ID fetched"); tracing::info!("Running database migrations"); atlas_common::db::run_migrations(&config.database_url).await?; @@ -270,6 +286,26 @@ async fn run(args: cli::RunArgs) -> Result<()> { head::HeadTracker::bootstrap(&api_pool, config.sse_replay_buffer_blocks).await? }); + // Set max pool size gauges + metrics.set_db_pool_max("api", config.api_db_max_connections as f64); + metrics.set_db_pool_max("indexer", config.indexer_db_max_connections as f64); + + // Spawn pool stats sampler + { + let api_pool_ref = api_pool.clone(); + let indexer_pool_ref = indexer_pool.clone(); + let metrics_ref = metrics.clone(); + tokio::spawn(async move { + loop { + metrics_ref.set_db_pool_size("api", api_pool_ref.size() as f64); + metrics_ref.set_db_pool_idle("api", api_pool_ref.num_idle() as f64); + metrics_ref.set_db_pool_size("indexer", indexer_pool_ref.size() as f64); + metrics_ref.set_db_pool_idle("indexer", indexer_pool_ref.num_idle() as f64); + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + } + let state = Arc::new(api::AppState { pool: api_pool, block_events_tx: block_events_tx.clone(), @@ -288,6 +324,8 @@ async fn run(args: cli::RunArgs) -> Result<()> { background_color_light: config.background_color_light.clone(), success_color: config.success_color.clone(), error_color: config.error_color.clone(), + metrics: metrics.clone(), + prometheus_handle, }); let da_pool = indexer_pool.clone(); @@ -296,6 +334,7 @@ async fn run(args: cli::RunArgs) -> Result<()> { config.clone(), block_events_tx, head_tracker, + metrics.clone(), ); tokio::spawn(async move { if let Err(e) = run_with_retry(|| indexer.run()).await { @@ -309,9 +348,9 @@ async fn run(args: cli::RunArgs) -> Result<()> { .as_deref() .expect("DA tracking requires EVNODE_URL"); tracing::info!( - "DA tracking enabled (workers: {}, rate_limit: {} req/s)", - config.da_worker_concurrency, - config.da_rpc_requests_per_second + workers = config.da_worker_concurrency, + rate_limit_rps = config.da_rpc_requests_per_second, + "DA tracking enabled" ); let da_worker = indexer::DaWorker::new( da_pool, @@ -319,6 +358,7 @@ async fn run(args: cli::RunArgs) -> Result<()> { config.da_worker_concurrency, config.da_rpc_requests_per_second, da_events_tx, + metrics.clone(), )?; tokio::spawn(async move { if let Err(e) = run_with_retry(|| da_worker.run()).await { @@ -329,10 +369,14 @@ async fn run(args: cli::RunArgs) -> Result<()> { let metadata_pool = indexer_pool; let metadata_config = config.clone(); + let metadata_metrics = metrics.clone(); tokio::spawn(async move { if let Err(e) = run_with_retry(|| async { - let fetcher = - indexer::MetadataFetcher::new(metadata_pool.clone(), metadata_config.clone())?; + let fetcher = indexer::MetadataFetcher::new( + metadata_pool.clone(), + metadata_config.clone(), + metadata_metrics.clone(), + )?; fetcher.run().await }) .await @@ -343,7 +387,7 @@ async fn run(args: cli::RunArgs) -> Result<()> { let app = api::build_router(state, config.cors_origin.clone()); let addr = format!("{}:{}", config.api_host, config.api_port); - tracing::info!("API listening on {}", addr); + tracing::info!(addr = %addr, "API listening"); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve(listener, app) @@ -354,23 +398,23 @@ async fn run(args: cli::RunArgs) -> Result<()> { } async fn check(args: cli::RunArgs) -> Result<()> { - init_tracing(&args.log.level); + init_tracing(&args.log.level, &args.log.format); let config = config::Config::from_run_args(args.clone())?; config::FaucetConfig::from_faucet_args(&args.faucet)?; // Test DB connectivity - tracing::info!("Testing database connectivity..."); + tracing::info!("testing database connectivity"); let pool = atlas_common::db::create_pool(&config.database_url, 1).await?; sqlx::query("SELECT 1").execute(&pool).await?; - tracing::info!("Database OK"); + tracing::info!("database OK"); // Test RPC connectivity - tracing::info!("Testing RPC connectivity..."); + tracing::info!("testing RPC connectivity"); let chain_id = fetch_chain_id(&config.rpc_url).await?; - tracing::info!("RPC OK — chain_id={}", chain_id); + tracing::info!(chain_id, "RPC OK"); - tracing::info!("Configuration is valid"); + tracing::info!("configuration is valid"); Ok(()) } @@ -489,10 +533,10 @@ where .unwrap_or(MAX_RETRY_DELAY); tracing::error!( - "Fatal error (internal retries exhausted): {}. Restarting in {}s (attempt {})...", - e, - delay, - retry_count + 1 + error = %e, + restart_in_secs = delay, + attempt = retry_count + 1, + "fatal error, internal retries exhausted" ); tokio::time::sleep(Duration::from_secs(delay)).await; diff --git a/backend/crates/atlas-server/src/metrics.rs b/backend/crates/atlas-server/src/metrics.rs new file mode 100644 index 0000000..d298642 --- /dev/null +++ b/backend/crates/atlas-server/src/metrics.rs @@ -0,0 +1,345 @@ +use axum::{ + extract::{MatchedPath, Request}, + middleware::Next, + response::Response, +}; +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use metrics_exporter_prometheus::{Matcher, PrometheusHandle}; +use std::sync::OnceLock; +use std::time::Instant; + +/// Install the Prometheus recorder and return a handle for rendering metrics. +pub fn install_prometheus_recorder() -> PrometheusHandle { + static PROMETHEUS_HANDLE: OnceLock = OnceLock::new(); + + PROMETHEUS_HANDLE + .get_or_init(|| { + metrics_exporter_prometheus::PrometheusBuilder::new() + .set_buckets_for_metric( + Matcher::Full("atlas_indexer_block_processing_duration_seconds".to_string()), + &[ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, + ], + ) + .expect("valid processing duration buckets") + .install_recorder() + .expect("failed to install Prometheus recorder") + }) + .clone() +} + +/// Central metrics registry. +/// +/// All metric handles are resolved once at startup after the Prometheus recorder +/// is installed. The struct is `Clone` (metric handles are internally `Arc`). +#[derive(Clone)] +pub struct Metrics { + _private: (), // force construction via Metrics::new() +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} + +impl Metrics { + /// Create all metric handles and register their descriptions. + /// Must be called after `install_prometheus_recorder()`. + pub fn new() -> Self { + // -- HTTP -- + describe_counter!( + "atlas_http_requests_total", + "Total HTTP requests by method, path, and status" + ); + describe_histogram!( + "atlas_http_request_duration_seconds", + "HTTP request latency in seconds" + ); + + // -- Indexer -- + describe_counter!( + "atlas_indexer_blocks_indexed_total", + "Total blocks successfully indexed" + ); + describe_gauge!("atlas_indexer_head_block", "Latest indexed block number"); + describe_gauge!( + "atlas_indexer_head_block_timestamp_seconds", + "Chain timestamp of the latest indexed block" + ); + describe_gauge!( + "atlas_indexer_chain_head_block", + "Latest block on chain (from RPC)" + ); + describe_gauge!( + "atlas_indexer_lag_blocks", + "Difference between chain head and latest indexed block" + ); + describe_gauge!( + "atlas_indexer_missing_blocks", + "Known unresolved missing blocks persisted in failed_blocks" + ); + describe_histogram!( + "atlas_indexer_batch_duration_seconds", + "Time per full indexer batch cycle" + ); + describe_histogram!( + "atlas_indexer_block_processing_duration_seconds", + "Time spent actively processing a batch, excluding idle sleep" + ); + describe_histogram!( + "atlas_indexer_db_write_duration_seconds", + "Time for DB COPY+INSERT per batch" + ); + describe_counter!( + "atlas_indexer_failed_blocks_total", + "Blocks that permanently failed after retries" + ); + describe_counter!( + "atlas_indexer_rpc_requests_total", + "RPC batch requests by status" + ); + + // -- DA Worker -- + describe_counter!( + "atlas_da_blocks_processed_total", + "DA status checks completed" + ); + describe_counter!("atlas_da_rpc_errors_total", "Failed DA RPC calls"); + + // -- Metadata Fetcher -- + describe_counter!( + "atlas_metadata_contracts_fetched_total", + "Contract metadata successfully fetched by type" + ); + describe_counter!( + "atlas_metadata_tokens_fetched_total", + "NFT token metadata successfully fetched" + ); + describe_counter!( + "atlas_metadata_errors_total", + "Failed metadata fetches by type" + ); + + // -- SSE -- + describe_gauge!( + "atlas_sse_active_connections", + "Current number of SSE client connections" + ); + + // -- DB Pools -- + describe_gauge!("atlas_db_pool_size", "Total connections in pool"); + describe_gauge!("atlas_db_pool_idle", "Idle connections in pool"); + describe_gauge!("atlas_db_pool_max", "Max configured connections for pool"); + + // -- Errors -- + describe_counter!( + "atlas_errors_total", + "All errors by component and error_type, for alerting" + ); + + Self { _private: () } + } + + // -- Indexer helpers -- + + pub fn record_blocks_indexed(&self, count: u64) { + counter!("atlas_indexer_blocks_indexed_total").increment(count); + } + + pub fn set_indexer_head_block(&self, block: u64) { + gauge!("atlas_indexer_head_block").set(block as f64); + } + + pub fn set_indexer_head_block_timestamp(&self, timestamp_seconds: i64) { + gauge!("atlas_indexer_head_block_timestamp_seconds").set(timestamp_seconds as f64); + } + + pub fn set_chain_head_block(&self, block: u64) { + gauge!("atlas_indexer_chain_head_block").set(block as f64); + } + + pub fn set_indexer_lag_blocks(&self, lag: u64) { + gauge!("atlas_indexer_lag_blocks").set(lag as f64); + } + + pub fn set_indexer_missing_blocks(&self, count: u64) { + gauge!("atlas_indexer_missing_blocks").set(count as f64); + } + + pub fn record_batch_duration(&self, seconds: f64) { + histogram!("atlas_indexer_batch_duration_seconds").record(seconds); + } + + pub fn record_block_processing_duration(&self, seconds: f64) { + histogram!("atlas_indexer_block_processing_duration_seconds").record(seconds); + } + + pub fn record_db_write_duration(&self, seconds: f64) { + histogram!("atlas_indexer_db_write_duration_seconds").record(seconds); + } + + pub fn record_failed_blocks(&self, count: u64) { + counter!("atlas_indexer_failed_blocks_total").increment(count); + } + + pub fn record_rpc_request(&self, status: &str) { + counter!("atlas_indexer_rpc_requests_total", "status" => status.to_string()).increment(1); + } + + // -- DA Worker helpers -- + + pub fn record_da_blocks_processed(&self, count: u64) { + counter!("atlas_da_blocks_processed_total").increment(count); + } + + pub fn record_da_rpc_error(&self) { + counter!("atlas_da_rpc_errors_total").increment(1); + } + + // -- Metadata Fetcher helpers -- + + pub fn record_metadata_contract_fetched(&self, contract_type: &str) { + counter!("atlas_metadata_contracts_fetched_total", "type" => contract_type.to_string()) + .increment(1); + } + + pub fn record_metadata_token_fetched(&self) { + counter!("atlas_metadata_tokens_fetched_total").increment(1); + } + + pub fn record_metadata_error(&self, metadata_type: &str) { + counter!("atlas_metadata_errors_total", "type" => metadata_type.to_string()).increment(1); + } + + // -- SSE helpers -- + + pub fn increment_sse_connections(&self) { + gauge!("atlas_sse_active_connections").increment(1.0); + } + + pub fn decrement_sse_connections(&self) { + gauge!("atlas_sse_active_connections").decrement(1.0); + } + + // -- DB Pool helpers -- + + pub fn set_db_pool_size(&self, pool_name: &str, size: f64) { + gauge!("atlas_db_pool_size", "pool" => pool_name.to_string()).set(size); + } + + pub fn set_db_pool_idle(&self, pool_name: &str, idle: f64) { + gauge!("atlas_db_pool_idle", "pool" => pool_name.to_string()).set(idle); + } + + pub fn set_db_pool_max(&self, pool_name: &str, max: f64) { + gauge!("atlas_db_pool_max", "pool" => pool_name.to_string()).set(max); + } + + // -- Error helper -- + + /// Increment the error counter with component and error_type labels. + /// Call this alongside `tracing::error!` / `tracing::warn!` at error sites. + pub fn error(&self, component: &str, error_type: &str) { + counter!( + "atlas_errors_total", + "component" => component.to_string(), + "error_type" => error_type.to_string() + ) + .increment(1); + } +} + +/// Guard that decrements the SSE connection gauge on drop. +pub struct SseConnectionGuard { + metrics: Metrics, +} + +impl SseConnectionGuard { + pub fn new(metrics: Metrics) -> Self { + metrics.increment_sse_connections(); + Self { metrics } + } +} + +impl Drop for SseConnectionGuard { + fn drop(&mut self) { + self.metrics.decrement_sse_connections(); + } +} + +/// Axum middleware that records HTTP request metrics. +/// +/// Uses `MatchedPath` to get the route pattern (e.g. `/api/blocks/{number}`) +/// instead of the concrete path, preventing label cardinality explosion. +pub async fn http_metrics_middleware(request: Request, next: Next) -> Response { + let method = request.method().to_string(); + let path = request + .extensions() + .get::() + .map(|p| p.as_str().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + let start = Instant::now(); + let response = next.run(request).await; + let elapsed = start.elapsed().as_secs_f64(); + + let status = response.status().as_u16().to_string(); + + counter!( + "atlas_http_requests_total", + "method" => method.clone(), + "path" => path.clone(), + "status" => status + ) + .increment(1); + + histogram!( + "atlas_http_request_duration_seconds", + "method" => method, + "path" => path + ) + .record(elapsed); + + response +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn metrics_new_does_not_panic_without_recorder() { + // The metrics crate uses a no-op recorder by default, + // so Metrics::new() should not panic even without install_prometheus_recorder(). + let _m = Metrics::new(); + } + + #[test] + fn install_prometheus_recorder_is_idempotent() { + let first = install_prometheus_recorder(); + let second = install_prometheus_recorder(); + let metrics = Metrics::new(); + metrics.set_indexer_head_block(1); + + assert!(first.render().contains("atlas_indexer_head_block")); + assert!(second.render().contains("atlas_indexer_head_block")); + } + + #[test] + fn new_metrics_render_when_emitted() { + let handle = install_prometheus_recorder(); + let metrics = Metrics::new(); + metrics.set_indexer_missing_blocks(3); + metrics.set_indexer_head_block(42); + metrics.set_indexer_head_block_timestamp(1_700_000_042); + metrics.set_indexer_lag_blocks(7); + metrics.record_block_processing_duration(0.1); + + let body = handle.render(); + assert!(body.contains("atlas_indexer_missing_blocks")); + assert!(body.contains("atlas_indexer_head_block_timestamp_seconds")); + assert!(body.contains("atlas_indexer_lag_blocks")); + assert!(body.contains("atlas_indexer_block_processing_duration_seconds")); + } +} diff --git a/backend/crates/atlas-server/tests/integration/addresses.rs b/backend/crates/atlas-server/tests/integration/addresses.rs index e7ce92a..c1b8fd2 100644 --- a/backend/crates/atlas-server/tests/integration/addresses.rs +++ b/backend/crates/atlas-server/tests/integration/addresses.rs @@ -147,7 +147,7 @@ async fn set_erc20_supply_history_complete(pool: &sqlx::PgPool, complete: bool) fn get_address_detail() { common::run(async { let pool = common::pool(); - seed_address_data(pool).await; + seed_address_data(&pool).await; let app = common::test_router(); let response = app @@ -172,7 +172,7 @@ fn get_address_detail() { fn get_address_transactions() { common::run(async { let pool = common::pool(); - seed_address_data(pool).await; + seed_address_data(&pool).await; let app = common::test_router(); let response = app @@ -196,9 +196,9 @@ fn get_address_transactions() { fn get_erc20_address_detail_prefers_indexed_supply() { common::run(async { let pool = common::pool(); - seed_address_data(pool).await; - seed_erc20_address_data(pool).await; - set_erc20_supply_history_complete(pool, true).await; + seed_address_data(&pool).await; + seed_erc20_address_data(&pool).await; + set_erc20_supply_history_complete(&pool, true).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/blocks.rs b/backend/crates/atlas-server/tests/integration/blocks.rs index bb56f46..18d4da4 100644 --- a/backend/crates/atlas-server/tests/integration/blocks.rs +++ b/backend/crates/atlas-server/tests/integration/blocks.rs @@ -32,7 +32,7 @@ async fn seed_blocks(pool: &sqlx::PgPool) { fn list_blocks_paginated() { common::run(async { let pool = common::pool(); - seed_blocks(pool).await; + seed_blocks(&pool).await; let app = common::test_router(); let response = app @@ -61,7 +61,7 @@ fn list_blocks_paginated() { fn get_block_by_number() { common::run(async { let pool = common::pool(); - seed_blocks(pool).await; + seed_blocks(&pool).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/common.rs b/backend/crates/atlas-server/tests/integration/common.rs index e895452..5934595 100644 --- a/backend/crates/atlas-server/tests/integration/common.rs +++ b/backend/crates/atlas-server/tests/integration/common.rs @@ -2,8 +2,10 @@ use axum::Router; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use std::sync::{Arc, LazyLock}; -use testcontainers::runners::AsyncRunner; -use testcontainers::ContainerAsync; +use std::time::Duration; +use std::{env, process::Command}; +use testcontainers::runners::SyncRunner; +use testcontainers::{Container, ImageExt}; use testcontainers_modules::postgres::Postgres; use tokio::sync::broadcast; @@ -11,57 +13,84 @@ use atlas_server::api::{build_router, AppState}; use atlas_server::head::HeadTracker; struct TestEnv { - runtime: tokio::runtime::Runtime, - pool: PgPool, - _container: ContainerAsync, + database_url: String, + _container: Option>, } -// Single LazyLock: runtime + container + pool, all initialized together. -static ENV: LazyLock = LazyLock::new(|| { - let runtime = tokio::runtime::Runtime::new().expect("create test runtime"); +fn docker_available() -> bool { + Command::new("docker") + .arg("version") + .output() + .map(|output| output.status.success()) + .unwrap_or(false) +} - let (pool, container) = runtime.block_on(async { +fn init_env() -> Result { + let (database_url, container) = if let Ok(database_url) = env::var("ATLAS_TEST_DATABASE_URL") { + (database_url, None) + } else if docker_available() { let container = Postgres::default() + .with_startup_timeout(Duration::from_secs(180)) .start() - .await - .expect("Failed to start Postgres container"); - - let host = container.get_host().await.expect("get host"); - let port = container.get_host_port_ipv4(5432).await.expect("get port"); - - let database_url = format!("postgres://postgres:postgres@{}:{}/postgres", host, port); - - let pool = PgPoolOptions::new() - .max_connections(10) - .connect(&database_url) - .await - .expect("Failed to create pool"); - - sqlx::migrate!("../../migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - (pool, container) - }); - - TestEnv { - runtime, - pool, + .map_err(|error| format!("failed to start Postgres test container: {error}"))?; + let host = container + .get_host() + .map_err(|error| format!("failed to get test container host: {error}"))?; + let port = container + .get_host_port_ipv4(5432) + .map_err(|error| format!("failed to get test container port: {error}"))?; + ( + format!("postgres://postgres:postgres@{}:{}/postgres", host, port), + Some(container), + ) + } else { + return Err("Docker is unavailable and ATLAS_TEST_DATABASE_URL is not set".to_string()); + }; + + tokio::runtime::Runtime::new() + .expect("create migration runtime") + .block_on(async { + let pool = PgPoolOptions::new() + .max_connections(10) + .connect(&database_url) + .await + .map_err(|error| format!("failed to create test pool: {error}"))?; + + sqlx::migrate!("../../migrations") + .run(&pool) + .await + .map_err(|error| format!("failed to run test migrations: {error}"))?; + + pool.close().await; + Ok::<(), String>(()) + })?; + + Ok(TestEnv { + database_url, _container: container, - } -}); + }) +} -pub fn pool() -> &'static PgPool { - &ENV.pool +// Single LazyLock: test database configuration, shared across tests. +static ENV: LazyLock> = LazyLock::new(init_env); + +pub fn pool() -> PgPool { + let env = ENV.as_ref().expect("integration test environment"); + PgPoolOptions::new() + .max_connections(10) + .connect_lazy(&env.database_url) + .expect("create lazy pool") } pub fn test_router() -> Router { - let pool = pool().clone(); + let pool = pool(); let head_tracker = Arc::new(HeadTracker::empty(10)); let (tx, _) = broadcast::channel(1); let (da_tx, _) = broadcast::channel(1); + let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle(); let state = Arc::new(AppState { pool, block_events_tx: tx, @@ -80,14 +109,23 @@ pub fn test_router() -> Router { background_color_light: None, success_color: None, error_color: None, + metrics: atlas_server::metrics::Metrics::new(), + prometheus_handle, }); build_router(state, None) } -/// Run an async test block on the shared runtime. +/// Run an async test block when the integration database is available. pub fn run>(f: F) { - ENV.runtime.block_on(f); + if let Err(error) = ENV.as_ref() { + eprintln!("skipping integration test: {error}"); + return; + } + + tokio::runtime::Runtime::new() + .expect("create test runtime") + .block_on(f); } /// Helper to parse a JSON response body. diff --git a/backend/crates/atlas-server/tests/integration/nfts.rs b/backend/crates/atlas-server/tests/integration/nfts.rs index de57e6a..34bf9b9 100644 --- a/backend/crates/atlas-server/tests/integration/nfts.rs +++ b/backend/crates/atlas-server/tests/integration/nfts.rs @@ -86,7 +86,7 @@ async fn seed_nft_data(pool: &sqlx::PgPool) { fn list_nft_collections() { common::run(async { let pool = common::pool(); - seed_nft_data(pool).await; + seed_nft_data(&pool).await; let app = common::test_router(); let response = app @@ -109,7 +109,7 @@ fn list_nft_collections() { fn list_collection_tokens() { common::run(async { let pool = common::pool(); - seed_nft_data(pool).await; + seed_nft_data(&pool).await; let app = common::test_router(); let response = app @@ -142,7 +142,7 @@ fn list_collection_tokens() { fn get_collection_transfers() { common::run(async { let pool = common::pool(); - seed_nft_data(pool).await; + seed_nft_data(&pool).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/schema.rs b/backend/crates/atlas-server/tests/integration/schema.rs index 306c960..e8d72fd 100644 --- a/backend/crates/atlas-server/tests/integration/schema.rs +++ b/backend/crates/atlas-server/tests/integration/schema.rs @@ -7,8 +7,9 @@ fn migrations_are_idempotent() { // sqlx tracks applied migrations in _sqlx_migrations; re-running is a no-op. // This verifies the tracking table is intact and no migration errors on repeat. common::run(async { + let pool = common::pool(); sqlx::migrate!("../../migrations") - .run(common::pool()) + .run(&pool) .await .expect("migrations should be idempotent"); }); @@ -19,10 +20,11 @@ fn migrations_are_idempotent() { #[test] fn all_expected_tables_exist() { common::run(async { + let pool = common::pool(); let tables: Vec = sqlx::query_scalar( "SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename", ) - .fetch_all(common::pool()) + .fetch_all(&pool) .await .expect("query tables"); @@ -57,13 +59,14 @@ fn all_expected_tables_exist() { #[test] fn partitioned_tables_have_initial_partition() { common::run(async { + let pool = common::pool(); // Each partitioned table must have its _p0 partition created by the migration. let partitions: Vec = sqlx::query_scalar( "SELECT relname::text FROM pg_class WHERE relname LIKE '%_p0' AND relkind = 'r' ORDER BY relname", ) - .fetch_all(common::pool()) + .fetch_all(&pool) .await .expect("query partitions"); @@ -87,6 +90,7 @@ fn partitioned_tables_have_initial_partition() { #[test] fn key_indexes_exist() { common::run(async { + let pool = common::pool(); // Include both regular indexes (relkind='i') and partitioned indexes (relkind='I'). let indexes: Vec = sqlx::query_scalar( "SELECT c.relname::text FROM pg_class c @@ -94,7 +98,7 @@ fn key_indexes_exist() { WHERE n.nspname = 'public' AND c.relkind IN ('i', 'I') ORDER BY c.relname", ) - .fetch_all(common::pool()) + .fetch_all(&pool) .await .expect("query indexes"); @@ -130,10 +134,11 @@ fn key_indexes_exist() { fn pg_trgm_extension_is_installed() { // Required for fuzzy search indexes on token names / symbols. common::run(async { + let pool = common::pool(); let exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm')", ) - .fetch_one(common::pool()) + .fetch_one(&pool) .await .expect("query extension"); @@ -160,7 +165,7 @@ fn duplicate_block_number_is_rejected() { .bind(block_number) .bind(format!("0x{:064x}", block_number)) .bind(format!("0x{:064x}", block_number - 1)) - .execute(pool) + .execute(&pool) .await .expect("insert block"); @@ -173,7 +178,7 @@ fn duplicate_block_number_is_rejected() { .bind(block_number) .bind(format!("0x{:064x}", block_number + 9999)) .bind(format!("0x{:064x}", block_number - 1)) - .execute(pool) + .execute(&pool) .await; assert!(result.is_err(), "duplicate block number should be rejected"); @@ -198,7 +203,7 @@ fn duplicate_transaction_is_rejected() { .bind(block_number) .bind(format!("0x{:064x}", block_number + 1)) .bind(format!("0x{:064x}", block_number - 1)) - .execute(pool) + .execute(&pool) .await .expect("insert parent block"); @@ -210,7 +215,7 @@ fn duplicate_transaction_is_rejected() { ) .bind(&tx_hash) .bind(block_number) - .execute(pool) + .execute(&pool) .await .expect("insert tx"); @@ -222,7 +227,7 @@ fn duplicate_transaction_is_rejected() { ) .bind(&tx_hash) .bind(block_number) - .execute(pool) + .execute(&pool) .await; assert!(result.is_err(), "duplicate transaction should be rejected"); @@ -248,8 +253,11 @@ fn duplicate_erc20_transfer_is_rejected() { .bind(block_number) }; - insert().execute(pool).await.expect("insert erc20 transfer"); - let result = insert().execute(pool).await; + insert() + .execute(&pool) + .await + .expect("insert erc20 transfer"); + let result = insert().execute(&pool).await; assert!( result.is_err(), diff --git a/backend/crates/atlas-server/tests/integration/search.rs b/backend/crates/atlas-server/tests/integration/search.rs index 3745be0..c73073a 100644 --- a/backend/crates/atlas-server/tests/integration/search.rs +++ b/backend/crates/atlas-server/tests/integration/search.rs @@ -78,7 +78,7 @@ async fn seed_search_data(pool: &sqlx::PgPool) { fn search_by_block_hash() { common::run(async { let pool = common::pool(); - seed_search_data(pool).await; + seed_search_data(&pool).await; // Search by block hash (66 chars = 0x + 64 hex) let block_hash = format!("0x{:064x}", SEARCH_BLOCK); @@ -111,7 +111,7 @@ fn search_by_block_hash() { fn search_by_tx_hash() { common::run(async { let pool = common::pool(); - seed_search_data(pool).await; + seed_search_data(&pool).await; let app = common::test_router(); let response = app @@ -141,7 +141,7 @@ fn search_by_tx_hash() { fn search_by_address() { common::run(async { let pool = common::pool(); - seed_search_data(pool).await; + seed_search_data(&pool).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/status.rs b/backend/crates/atlas-server/tests/integration/status.rs index 6fea231..6e74e18 100644 --- a/backend/crates/atlas-server/tests/integration/status.rs +++ b/backend/crates/atlas-server/tests/integration/status.rs @@ -88,7 +88,7 @@ fn status_returns_chain_info() { fn stats_charts_return_exact_bucket_count_for_non_aligned_window() { common::run(async { let pool = common::pool(); - seed_chart_data(pool).await; + seed_chart_data(&pool).await; let app = common::test_router(); diff --git a/backend/crates/atlas-server/tests/integration/tokens.rs b/backend/crates/atlas-server/tests/integration/tokens.rs index 8844842..8a57ab3 100644 --- a/backend/crates/atlas-server/tests/integration/tokens.rs +++ b/backend/crates/atlas-server/tests/integration/tokens.rs @@ -186,7 +186,7 @@ async fn set_erc20_supply_history_complete(pool: &sqlx::PgPool, complete: bool) fn list_tokens() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; + seed_token_data(&pool).await; let app = common::test_router(); let response = app @@ -209,7 +209,7 @@ fn list_tokens() { fn get_token_detail() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; + seed_token_data(&pool).await; let app = common::test_router(); let response = app @@ -237,13 +237,13 @@ fn get_token_detail() { fn get_token_detail_prefers_indexed_supply_over_stale_stored_value() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; - set_erc20_supply_history_complete(pool, true).await; + seed_token_data(&pool).await; + set_erc20_supply_history_complete(&pool, true).await; sqlx::query("UPDATE erc20_contracts SET total_supply = $2 WHERE address = $1") .bind(TOKEN_A) .bind(bigdecimal::BigDecimal::from(700_000i64)) - .execute(pool) + .execute(&pool) .await .expect("update stale total supply"); @@ -268,7 +268,7 @@ fn get_token_detail_prefers_indexed_supply_over_stale_stored_value() { fn get_token_holders() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; + seed_token_data(&pool).await; let app = common::test_router(); let response = app @@ -296,7 +296,7 @@ fn get_token_holders() { fn get_tx_erc20_transfers() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; + seed_token_data(&pool).await; let app = common::test_router(); let response = app @@ -323,8 +323,8 @@ fn get_tx_erc20_transfers() { fn get_token_chart_returns_exact_bucket_count_for_non_aligned_window() { common::run(async { let pool = common::pool(); - seed_token_data(pool).await; - seed_token_chart_data(pool).await; + seed_token_data(&pool).await; + seed_token_chart_data(&pool).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/transactions.rs b/backend/crates/atlas-server/tests/integration/transactions.rs index a57b19c..b3f4f53 100644 --- a/backend/crates/atlas-server/tests/integration/transactions.rs +++ b/backend/crates/atlas-server/tests/integration/transactions.rs @@ -59,7 +59,7 @@ async fn seed_transactions(pool: &sqlx::PgPool) { fn list_transactions() { common::run(async { let pool = common::pool(); - seed_transactions(pool).await; + seed_transactions(&pool).await; let app = common::test_router(); let response = app @@ -83,7 +83,7 @@ fn list_transactions() { fn get_transaction_by_hash() { common::run(async { let pool = common::pool(); - seed_transactions(pool).await; + seed_transactions(&pool).await; let app = common::test_router(); let response = app @@ -126,7 +126,7 @@ fn get_transaction_not_found() { fn get_block_transactions() { common::run(async { let pool = common::pool(); - seed_transactions(pool).await; + seed_transactions(&pool).await; let app = common::test_router(); let response = app