Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }

# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

# Config
dotenvy = "0.15"

Expand All @@ -55,7 +59,7 @@ hex = "0.4"
chrono = { version = "0.4", features = ["serde"] }

# Testing
testcontainers = "0.27"
testcontainers = { version = "0.27", features = ["blocking"] }
testcontainers-modules = { version = "0.15", features = ["postgres"] }

# CLI
Expand Down
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
dotenvy = { workspace = true }
bigdecimal = { workspace = true }
hex = { workspace = true }
Expand Down
46 changes: 46 additions & 0 deletions backend/crates/atlas-server/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,34 @@ impl Deref for ApiError {
}
}

fn error_type(err: &AtlasError) -> Option<&'static str> {
match err {
AtlasError::Database(_) => Some("database"),
AtlasError::Internal(_) => Some("internal"),
AtlasError::Config(_) => Some("config"),
AtlasError::Rpc(_) => Some("rpc_request"),
AtlasError::MetadataFetch(_) => Some("metadata_fetch"),
_ => None,
}
}

impl IntoResponse for ApiError {
fn into_response(self) -> Response {
use atlas_common::AtlasError;

let status =
StatusCode::from_u16(self.0.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);

// Increment error counter for Prometheus alerting
if let Some(error_type) = error_type(&self.0) {
metrics::counter!(
"atlas_errors_total",
"component" => "api",
"error_type" => error_type
)
.increment(1);
}

// Determine the client-facing message based on error type.
// Internal details are logged server-side to avoid leaking stack traces or
// database internals to callers.
Expand Down Expand Up @@ -119,6 +140,31 @@ mod tests {
use super::*;
use axum::body::to_bytes;

#[test]
fn error_type_maps_expected_variants() {
assert_eq!(
error_type(&AtlasError::Database(sqlx::Error::RowNotFound)),
Some("database")
);
assert_eq!(
error_type(&AtlasError::Internal("x".to_string())),
Some("internal")
);
assert_eq!(
error_type(&AtlasError::Config("x".to_string())),
Some("config")
);
assert_eq!(
error_type(&AtlasError::Rpc("x".to_string())),
Some("rpc_request")
);
assert_eq!(
error_type(&AtlasError::MetadataFetch("x".to_string())),
Some("metadata_fetch")
);
assert_eq!(error_type(&AtlasError::NotFound("x".to_string())), None);
}

#[tokio::test]
async fn too_many_requests_sets_retry_after_header_and_body() {
let response = ApiError(AtlasError::TooManyRequests {
Expand Down
5 changes: 5 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ mod tests {
let head_tracker = Arc::new(crate::head::HeadTracker::empty(10));
let (tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();
Arc::new(AppState {
pool,
block_events_tx: tx,
Expand All @@ -178,6 +181,8 @@ mod tests {
background_color_light: None,
success_color: None,
error_color: None,
metrics: crate::metrics::Metrics::new(),
prometheus_handle,
})
}

Expand Down
194 changes: 194 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::sync::Arc;

use crate::api::AppState;

const MAX_INDEXER_AGE_MINUTES: i64 = 5;

#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
}

fn readiness_status(
latest_indexed_at: Option<DateTime<Utc>>,
now: DateTime<Utc>,
) -> (StatusCode, HealthResponse) {
let Some(indexed_at) = latest_indexed_at else {
return (
StatusCode::SERVICE_UNAVAILABLE,
HealthResponse {
status: "not_ready",
reason: Some("indexer state unavailable".to_string()),
},
);
};

let age = now - indexed_at;
if age > chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES) {
return (
StatusCode::SERVICE_UNAVAILABLE,
HealthResponse {
status: "not_ready",
reason: Some(format!(
"indexer stale: last block indexed {}s ago",
age.num_seconds()
)),
},
);
}

(
StatusCode::OK,
HealthResponse {
status: "ready",
reason: None,
},
)
}

/// GET /health/live — liveness probe (process is alive)
pub async fn liveness() -> impl IntoResponse {
Json(HealthResponse {
status: "ok",
reason: None,
})
}

/// GET /health/ready — readiness probe (DB reachable, indexer fresh)
pub async fn readiness(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// Check DB connectivity
if let Err(e) = sqlx::query("SELECT 1").execute(&state.pool).await {
tracing::warn!(error = %e, "readiness database check failed");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some("database unreachable".to_string()),
}),
);
}

let latest = match super::status::latest_indexed_block(state.as_ref()).await {
Ok(latest) => latest,
Err(e) => {
tracing::warn!(error = %e, "readiness indexer state check failed");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not_ready",
reason: Some("indexer state unavailable".to_string()),
}),
);
}
};

let (status, body) = readiness_status(latest.map(|(_, indexed_at)| indexed_at), Utc::now());
(status, Json(body))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::head::HeadTracker;
use crate::metrics::Metrics;
use axum::body::to_bytes;
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
use tokio::sync::broadcast;

fn app_state(pool: sqlx::PgPool, head_tracker: Arc<HeadTracker>) -> Arc<AppState> {
let (block_tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.build_recorder()
.handle();

Arc::new(AppState {
pool,
block_events_tx: block_tx,
da_events_tx: da_tx,
head_tracker,
rpc_url: String::new(),
da_tracking_enabled: false,
faucet: None,
chain_id: 1,
chain_name: "Test Chain".to_string(),
chain_logo_url: None,
accent_color: None,
background_color_dark: None,
background_color_light: None,
success_color: None,
error_color: None,
metrics: Metrics::new(),
prometheus_handle,
})
}

async fn json_response(response: axum::response::Response) -> (StatusCode, serde_json::Value) {
let status = response.status();
let body = to_bytes(response.into_body(), usize::MAX)
.await
.expect("read response body");
let json = serde_json::from_slice(&body).expect("parse json response");
(status, json)
}

#[tokio::test]
async fn liveness_returns_ok() {
let (status, json) = json_response(liveness().await.into_response()).await;

assert_eq!(status, StatusCode::OK);
assert_eq!(json["status"], "ok");
assert!(json.get("reason").is_none());
}

#[tokio::test]
async fn readiness_returns_unavailable_when_database_is_down() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres://postgres:postgres@127.0.0.1:1/atlas")
.expect("create lazy pool");
let state = app_state(pool, Arc::new(HeadTracker::empty(10)));

let (status, json) = json_response(readiness(State(state)).await.into_response()).await;

assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(json["status"], "not_ready");
assert_eq!(json["reason"], "database unreachable");
}

#[test]
fn readiness_returns_unavailable_when_indexer_state_is_missing() {
let (status, body) = readiness_status(None, Utc::now());
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body.status, "not_ready");
assert_eq!(body.reason.as_deref(), Some("indexer state unavailable"));
}

#[test]
fn readiness_returns_unavailable_for_stale_indexer_state() {
let (status, body) = readiness_status(
Some(Utc::now() - chrono::Duration::minutes(MAX_INDEXER_AGE_MINUTES + 1)),
Utc::now(),
);
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body.status, "not_ready");
assert!(body
.reason
.as_deref()
.expect("reason string")
.contains("indexer stale"));
}

#[test]
fn readiness_returns_ready_for_fresh_indexer_state() {
let (status, body) = readiness_status(Some(Utc::now()), Utc::now());
assert_eq!(status, StatusCode::OK);
assert_eq!(body.status, "ready");
assert!(body.reason.is_none());
}
}
63 changes: 63 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use axum::extract::State;
use std::sync::Arc;

use crate::api::AppState;

/// GET /metrics — Prometheus text format
pub async fn metrics(State(state): State<Arc<AppState>>) -> String {
state.prometheus_handle.render()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::head::HeadTracker;
use crate::metrics::Metrics;
use sqlx::postgres::PgPoolOptions;
use std::sync::OnceLock;
use tokio::sync::broadcast;

fn test_prometheus_handle() -> metrics_exporter_prometheus::PrometheusHandle {
static PROMETHEUS_HANDLE: OnceLock<metrics_exporter_prometheus::PrometheusHandle> =
OnceLock::new();

PROMETHEUS_HANDLE
.get_or_init(crate::metrics::install_prometheus_recorder)
.clone()
}

#[tokio::test]
async fn metrics_handler_renders_prometheus_output() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres://test@localhost:5432/test")
.expect("lazy pool");
let (block_tx, _) = broadcast::channel(1);
let (da_tx, _) = broadcast::channel(1);
let prometheus_handle = test_prometheus_handle();
let recorder_metrics = Metrics::new();
recorder_metrics.set_indexer_head_block(42);
let state = Arc::new(AppState {
pool,
block_events_tx: block_tx,
da_events_tx: da_tx,
head_tracker: Arc::new(HeadTracker::empty(10)),
rpc_url: String::new(),
da_tracking_enabled: false,
faucet: None,
chain_id: 1,
chain_name: "Test Chain".to_string(),
chain_logo_url: None,
accent_color: None,
background_color_dark: None,
background_color_light: None,
success_color: None,
error_color: None,
metrics: recorder_metrics,
prometheus_handle,
});

let body = super::metrics(State(state)).await;

assert!(body.contains("atlas_indexer_head_block"));
}
}
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ pub mod blocks;
pub mod config;
pub mod etherscan;
pub mod faucet;
pub mod health;
pub mod logs;
pub mod metrics;
pub mod nfts;
pub mod proxy;
pub mod search;
Expand Down
Loading
Loading