From 7001f551f57061d6f086bc47d2a42c619d020079 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:59:35 +0200 Subject: [PATCH 1/3] Fix stale ERC20 total supply --- .../src/api/handlers/addresses.rs | 38 ++++++++ .../atlas-server/src/api/handlers/tokens.rs | 51 +++++------ .../crates/atlas-server/src/indexer/batch.rs | 24 +++++ .../atlas-server/src/indexer/indexer.rs | 66 +++++++++++++- .../atlas-server/src/indexer/metadata.rs | 14 +-- .../tests/integration/addresses.rs | 88 +++++++++++++++++++ .../atlas-server/tests/integration/tokens.rs | 31 +++++++ .../20240109000001_recompute_erc20_supply.sql | 15 ++++ 8 files changed, 284 insertions(+), 43 deletions(-) create mode 100644 backend/migrations/20240109000001_recompute_erc20_supply.sql diff --git a/backend/crates/atlas-server/src/api/handlers/addresses.rs b/backend/crates/atlas-server/src/api/handlers/addresses.rs index 9cb0c98..3658d20 100644 --- a/backend/crates/atlas-server/src/api/handlers/addresses.rs +++ b/backend/crates/atlas-server/src/api/handlers/addresses.rs @@ -3,6 +3,7 @@ use axum::{ Json, }; use serde::{Deserialize, Serialize}; +use sqlx::PgPool; use std::sync::Arc; use crate::api::error::ApiResult; @@ -66,6 +67,32 @@ fn default_limit() -> u32 { 20 } +async fn get_indexed_erc20_total_supply( + pool: &PgPool, + address: &str, +) -> Result { + let (supply,): (bigdecimal::BigDecimal,) = sqlx::query_as( + "SELECT COALESCE(SUM(balance), 0) + FROM erc20_balances + WHERE contract_address = $1 AND balance > 0", + ) + .bind(address) + .fetch_one(pool) + .await?; + + Ok(supply) +} + +async fn has_erc20_transfers(pool: &PgPool, address: &str) -> Result { + let (has_transfers,): (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM erc20_transfers WHERE contract_address = $1)") + .bind(address) + .fetch_one(pool) + .await?; + + Ok(has_transfers) +} + pub async fn list_addresses( State(state): State>, Query(filters): Query, @@ -221,6 +248,17 @@ pub async fn get_address( .fetch_optional(&state.pool) .await?; + let erc20_contract = match erc20_contract { + Some(mut erc20) => { + if erc20.total_supply.is_none() || has_erc20_transfers(&state.pool, &address).await? { + erc20.total_supply = + Some(get_indexed_erc20_total_supply(&state.pool, &address).await?); + } + Some(erc20) + } + None => None, + }; + // Merge the data match (base_addr, nft_contract, erc20_contract) { // Found in addresses table and is an NFT contract diff --git a/backend/crates/atlas-server/src/api/handlers/tokens.rs b/backend/crates/atlas-server/src/api/handlers/tokens.rs index bd4f9c4..7fc012d 100644 --- a/backend/crates/atlas-server/src/api/handlers/tokens.rs +++ b/backend/crates/atlas-server/src/api/handlers/tokens.rs @@ -3,6 +3,7 @@ use axum::{ Json, }; use chrono::Utc; +use sqlx::PgPool; use std::sync::Arc; use crate::api::error::ApiResult; @@ -50,6 +51,22 @@ pub struct TokenDetailResponse { pub transfer_count: i64, } +async fn get_indexed_total_supply( + pool: &PgPool, + address: &str, +) -> Result { + let (supply,): (bigdecimal::BigDecimal,) = sqlx::query_as( + "SELECT COALESCE(SUM(balance), 0) + FROM erc20_balances + WHERE contract_address = $1 AND balance > 0", + ) + .bind(address) + .fetch_one(pool) + .await?; + + Ok(supply) +} + /// GET /api/tokens/:address - Get token details pub async fn get_token( State(state): State>, @@ -80,16 +97,8 @@ pub async fn get_token( .fetch_one(&state.pool) .await?; - // Compute total_supply from balances if not set - if contract.total_supply.is_none() { - let computed_supply: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as( - "SELECT COALESCE(SUM(balance), 0) FROM erc20_balances WHERE contract_address = $1 AND balance > 0", - ) - .bind(&address) - .fetch_optional(&state.pool) - .await?; - - contract.total_supply = computed_supply.map(|(s,)| s); + if transfer_count.0 > 0 || contract.total_supply.is_none() { + contract.total_supply = Some(get_indexed_total_supply(&state.pool, &address).await?); } Ok(Json(TokenDetailResponse { @@ -124,29 +133,15 @@ pub async fn get_token_holders( .fetch_one(&state.pool) .await?; - // Get total supply for percentage calculation - // First try to get it from the contract, if NULL compute from sum of balances - let total_supply: Option = { + let total_supply = if total.0 > 0 { + Some(get_indexed_total_supply(&state.pool, &address).await?) + } else { let stored: Option<(Option,)> = sqlx::query_as("SELECT total_supply FROM erc20_contracts WHERE address = $1") .bind(&address) .fetch_optional(&state.pool) .await?; - - match stored { - Some((Some(ts),)) => Some(ts), - _ => { - // Compute from sum of balances - let computed: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as( - "SELECT COALESCE(SUM(balance), 0) FROM erc20_balances - WHERE contract_address = $1 AND balance > 0", - ) - .bind(&address) - .fetch_optional(&state.pool) - .await?; - computed.map(|(s,)| s) - } - } + stored.and_then(|(supply,)| supply) }; let balances: Vec = sqlx::query_as( diff --git a/backend/crates/atlas-server/src/indexer/batch.rs b/backend/crates/atlas-server/src/indexer/batch.rs index 7c49f1c..4510710 100644 --- a/backend/crates/atlas-server/src/indexer/batch.rs +++ b/backend/crates/atlas-server/src/indexer/batch.rs @@ -104,6 +104,9 @@ pub(crate) struct BlockBatch { // erc20_balances — aggregated deltas per (address, contract) pub(crate) balance_map: HashMap<(String, String), BalanceDelta>, + // erc20 total supply deltas — aggregated per contract from mint/burn events + pub(crate) supply_map: HashMap, + // Contracts newly discovered in this batch. // These are NOT merged into the persistent known_* sets until after a // successful write, so a failed write doesn't leave the in-memory sets @@ -158,6 +161,16 @@ impl BlockBatch { entry.last_block = entry.last_block.max(block); } + /// Add a total supply delta for a contract. + /// Only mint and burn transfers should touch this accumulator. + pub(crate) fn apply_supply_delta(&mut self, contract: String, delta: BigDecimal) { + let entry = self + .supply_map + .entry(contract) + .or_insert(BigDecimal::from(0)); + *entry += delta; + } + pub(crate) fn materialize_blocks(&self, indexed_at: DateTime) -> Vec { debug_assert_eq!(self.b_numbers.len(), self.b_hashes.len()); debug_assert_eq!(self.b_numbers.len(), self.b_parent_hashes.len()); @@ -257,6 +270,17 @@ mod tests { assert_eq!(entry.last_block, 100); } + #[test] + fn apply_supply_delta_accumulates_by_contract() { + let mut batch = BlockBatch::new(); + let contract = "0xtoken".to_string(); + + batch.apply_supply_delta(contract.clone(), BigDecimal::from(100)); + batch.apply_supply_delta(contract.clone(), BigDecimal::from(-25)); + + assert_eq!(batch.supply_map[&contract], BigDecimal::from(75)); + } + #[test] fn materialize_blocks_preserves_parallel_block_fields() { let mut batch = BlockBatch::new(); diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index deccc07..939cda7 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -614,7 +614,9 @@ impl Indexer { // Aggregate balance deltas — multiple transfers in the same batch // for the same (address, contract) pair are summed in Rust, // so we only need one DB upsert per unique pair. - if from != ZERO_ADDRESS { + if from == ZERO_ADDRESS { + batch.apply_supply_delta(contract.clone(), value.clone()); + } else { batch.apply_balance_delta( from, contract.clone(), @@ -622,7 +624,9 @@ impl Indexer { block_num as i64, ); } - if to != ZERO_ADDRESS { + if to == ZERO_ADDRESS { + batch.apply_supply_delta(contract.clone(), -value); + } else { batch.apply_balance_delta( to, contract.clone(), @@ -673,6 +677,7 @@ impl Indexer { ec_addresses, ec_first_seen_blocks, balance_map, + supply_map, last_block, .. } = batch; @@ -800,6 +805,26 @@ impl Indexer { .await?; } + if !supply_map.is_empty() { + let mut supply_contracts = Vec::with_capacity(supply_map.len()); + let mut supply_deltas = Vec::with_capacity(supply_map.len()); + for (contract, delta) in supply_map { + supply_contracts.push(contract); + supply_deltas.push(delta.to_string()); + } + + let params: [&(dyn ToSql + Sync); 2] = [&supply_contracts, &supply_deltas]; + pg_tx + .execute( + "UPDATE erc20_contracts AS c + SET total_supply = COALESCE(c.total_supply, 0) + s.supply_delta::numeric + FROM unnest($1::text[], $2::text[]) AS s(contract_address, supply_delta) + WHERE c.address = s.contract_address", + ¶ms, + ) + .await?; + } + if update_watermark { let last_value = last_block.to_string(); pg_tx @@ -1049,6 +1074,43 @@ mod tests { let contract = batch.ec_addresses[0].clone(); let to = "0x2222222222222222222222222222222222222222"; assert!(batch.balance_map.contains_key(&(to.to_string(), contract))); + assert_eq!( + batch.supply_map["0x3333333333333333333333333333333333333333"], + BigDecimal::from(1000) + ); + } + + #[test] + fn collect_erc20_burn_tracks_negative_supply_delta() { + let mut batch = BlockBatch::new(); + let known_erc20 = HashSet::new(); + let known_nft = HashSet::new(); + + let logs = serde_json::json!([{ + "address": "0x3333333333333333333333333333333333333333", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000001111111111111111111111111111111111111111", + "0x0000000000000000000000000000000000000000000000000000000000000000" + ], + "data": "0x00000000000000000000000000000000000000000000000000000000000003e8", + "blockNumber": "0x1", + "transactionHash": "0x0000000000000000000000000000000000000000000000000000000000000001", + "transactionIndex": "0x0", + "blockHash": "0x0000000000000000000000000000000000000000000000000000000000000001", + "logIndex": "0x0", + "removed": false + }]); + + let mut fb = empty_fetched_block(1); + fb.receipts = vec![make_receipt(logs)]; + Indexer::collect_block(&mut batch, &known_erc20, &known_nft, fb); + + assert_eq!(batch.balance_map.len(), 1); + assert_eq!( + batch.supply_map["0x3333333333333333333333333333333333333333"], + BigDecimal::from(-1000) + ); } #[test] diff --git a/backend/crates/atlas-server/src/indexer/metadata.rs b/backend/crates/atlas-server/src/indexer/metadata.rs index 2e7906b..8d604ba 100644 --- a/backend/crates/atlas-server/src/indexer/metadata.rs +++ b/backend/crates/atlas-server/src/indexer/metadata.rs @@ -5,7 +5,6 @@ use alloy::{ sol, }; use anyhow::Result; -use bigdecimal::BigDecimal; use sqlx::PgPool; use std::{str::FromStr, sync::Arc, time::Duration}; @@ -29,7 +28,6 @@ sol! { function name() external view returns (string memory); function symbol() external view returns (string memory); function decimals() external view returns (uint8); - function totalSupply() external view returns (uint256); } } @@ -283,7 +281,7 @@ async fn fetch_nft_contract_metadata( Ok(()) } -/// Fetch ERC-20 contract metadata (name, symbol, decimals, totalSupply) +/// Fetch ERC-20 contract metadata (name, symbol, decimals) async fn fetch_erc20_contract_metadata( pool: &PgPool, provider: &HttpProvider, @@ -301,20 +299,11 @@ async fn fetch_erc20_contract_metadata( // Fetch decimals let decimals = contract.decimals().call().await.ok().map(|r| r as i16); - // Fetch totalSupply - let total_supply = contract - .totalSupply() - .call() - .await - .ok() - .map(|r| BigDecimal::from_str(&r.to_string()).unwrap_or_default()); - sqlx::query( "UPDATE erc20_contracts SET name = COALESCE($2, name), symbol = COALESCE($3, symbol), decimals = COALESCE($4, decimals), - total_supply = COALESCE($5, total_supply), metadata_fetched = true WHERE address = $1", ) @@ -322,7 +311,6 @@ async fn fetch_erc20_contract_metadata( .bind(name) .bind(symbol) .bind(decimals) - .bind(total_supply) .execute(pool) .await?; diff --git a/backend/crates/atlas-server/tests/integration/addresses.rs b/backend/crates/atlas-server/tests/integration/addresses.rs index eda4fa4..863baf9 100644 --- a/backend/crates/atlas-server/tests/integration/addresses.rs +++ b/backend/crates/atlas-server/tests/integration/addresses.rs @@ -10,6 +10,7 @@ use crate::common; const ADDR: &str = "0x5000000000000000000000000000000000000001"; const ADDR_TO: &str = "0x5000000000000000000000000000000000000002"; +const ERC20_ADDR: &str = "0x5000000000000000000000000000000000000010"; const TX_HASH_A: &str = "0x5000000000000000000000000000000000000000000000000000000000000001"; const TX_HASH_B: &str = "0x5000000000000000000000000000000000000000000000000000000000000002"; @@ -66,6 +67,68 @@ async fn seed_address_data(pool: &sqlx::PgPool) { } } +async fn seed_erc20_address_data(pool: &sqlx::PgPool) { + sqlx::query( + "INSERT INTO addresses (address, is_contract, first_seen_block, tx_count) + VALUES ($1, $2, $3, $4) + ON CONFLICT (address) DO NOTHING", + ) + .bind(ERC20_ADDR) + .bind(true) + .bind(5000i64) + .bind(1i32) + .execute(pool) + .await + .expect("seed erc20 address"); + + sqlx::query( + "INSERT INTO erc20_contracts (address, name, symbol, decimals, total_supply, first_seen_block) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (address) DO NOTHING", + ) + .bind(ERC20_ADDR) + .bind("Address Token") + .bind("ATK") + .bind(18i16) + .bind(bigdecimal::BigDecimal::from(500_000i64)) + .bind(5000i64) + .execute(pool) + .await + .expect("seed erc20 contract"); + + for (holder, balance) in [(ADDR, 700_000i64), (ADDR_TO, 300_000i64)] { + sqlx::query( + "INSERT INTO erc20_balances (address, contract_address, balance, last_updated_block) + VALUES ($1, $2, $3, $4) + ON CONFLICT (address, contract_address) DO NOTHING", + ) + .bind(holder) + .bind(ERC20_ADDR) + .bind(bigdecimal::BigDecimal::from(balance)) + .bind(5000i64) + .execute(pool) + .await + .expect("seed erc20 balance"); + } + + sqlx::query( + "INSERT INTO erc20_transfers (tx_hash, log_index, contract_address, from_address, to_address, value, block_number, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (tx_hash, log_index, block_number) DO NOTHING", + ) + .bind(TX_HASH_A) + .bind(0i32) + .bind(ERC20_ADDR) + .bind(ADDR) + .bind(ADDR_TO) + .bind(bigdecimal::BigDecimal::from(10_000i64)) + .bind(5000i64) + .bind(1_700_005_000i64) + .execute(pool) + .await + .expect("seed erc20 transfer"); +} + #[test] fn get_address_detail() { common::run(async { @@ -114,3 +177,28 @@ fn get_address_transactions() { assert_eq!(data.len(), 2); }); } + +#[test] +fn get_erc20_address_detail_prefers_indexed_supply() { + common::run(async { + let pool = common::pool(); + seed_address_data(pool).await; + seed_erc20_address_data(pool).await; + + let app = common::test_router(); + let response = app + .oneshot( + Request::builder() + .uri(format!("/api/addresses/{}", ERC20_ADDR)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = common::json_body(response).await; + assert_eq!(body["address_type"].as_str().unwrap(), "erc20"); + assert_eq!(body["total_supply"].as_str().unwrap(), "1000000"); + }); +} diff --git a/backend/crates/atlas-server/tests/integration/tokens.rs b/backend/crates/atlas-server/tests/integration/tokens.rs index 0684a1e..91b680a 100644 --- a/backend/crates/atlas-server/tests/integration/tokens.rs +++ b/backend/crates/atlas-server/tests/integration/tokens.rs @@ -215,6 +215,37 @@ fn get_token_detail() { assert_eq!(body["symbol"].as_str().unwrap(), "TTA"); assert_eq!(body["holder_count"].as_i64().unwrap(), 2); assert_eq!(body["transfer_count"].as_i64().unwrap(), 1); + assert_eq!(body["total_supply"].as_str().unwrap(), "1000000"); + }); +} + +#[test] +fn get_token_detail_prefers_indexed_supply_over_stale_stored_value() { + common::run(async { + let pool = common::pool(); + seed_token_data(pool).await; + + sqlx::query("UPDATE erc20_contracts SET total_supply = $2 WHERE address = $1") + .bind(TOKEN_A) + .bind(bigdecimal::BigDecimal::from(700_000i64)) + .execute(pool) + .await + .expect("update stale total supply"); + + let app = common::test_router(); + let response = app + .oneshot( + Request::builder() + .uri(format!("/api/tokens/{}", TOKEN_A)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = common::json_body(response).await; + assert_eq!(body["total_supply"].as_str().unwrap(), "1000000"); }); } diff --git a/backend/migrations/20240109000001_recompute_erc20_supply.sql b/backend/migrations/20240109000001_recompute_erc20_supply.sql new file mode 100644 index 0000000..2692c50 --- /dev/null +++ b/backend/migrations/20240109000001_recompute_erc20_supply.sql @@ -0,0 +1,15 @@ +-- Recompute ERC-20 total supply from indexed balances. +-- Metadata fetches only capture a point-in-time value, which goes stale for +-- mintable or burnable tokens during historical indexing. +UPDATE erc20_contracts AS c +SET total_supply = COALESCE(b.total_supply, 0) +FROM ( + SELECT + erc20_contracts.address, + COALESCE(SUM(balance), 0) AS total_supply + FROM erc20_contracts + LEFT JOIN erc20_balances ON erc20_balances.contract_address = erc20_contracts.address + AND erc20_balances.balance > 0 + GROUP BY erc20_contracts.address +) AS b +WHERE c.address = b.address; From 18d683abd2616b418bc7f5c0d20789776db6d01e Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:21:12 +0200 Subject: [PATCH 2/3] Drop ERC20 supply backfill migration --- .../20240109000001_recompute_erc20_supply.sql | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 backend/migrations/20240109000001_recompute_erc20_supply.sql diff --git a/backend/migrations/20240109000001_recompute_erc20_supply.sql b/backend/migrations/20240109000001_recompute_erc20_supply.sql deleted file mode 100644 index 2692c50..0000000 --- a/backend/migrations/20240109000001_recompute_erc20_supply.sql +++ /dev/null @@ -1,15 +0,0 @@ --- Recompute ERC-20 total supply from indexed balances. --- Metadata fetches only capture a point-in-time value, which goes stale for --- mintable or burnable tokens during historical indexing. -UPDATE erc20_contracts AS c -SET total_supply = COALESCE(b.total_supply, 0) -FROM ( - SELECT - erc20_contracts.address, - COALESCE(SUM(balance), 0) AS total_supply - FROM erc20_contracts - LEFT JOIN erc20_balances ON erc20_balances.contract_address = erc20_contracts.address - AND erc20_balances.balance > 0 - GROUP BY erc20_contracts.address -) AS b -WHERE c.address = b.address; From 35b1c288128559e329c26f993fa62a7ddc337b40 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:38:52 +0200 Subject: [PATCH 3/3] Gate ERC20 supply overrides on full history --- .../src/api/handlers/addresses.rs | 13 +----- .../atlas-server/src/api/handlers/mod.rs | 15 +++++++ .../atlas-server/src/api/handlers/tokens.rs | 5 ++- .../atlas-server/src/indexer/indexer.rs | 40 +++++++++++++++++++ backend/crates/atlas-server/src/lib.rs | 1 + backend/crates/atlas-server/src/main.rs | 1 + backend/crates/atlas-server/src/state_keys.rs | 1 + .../tests/integration/addresses.rs | 15 +++++++ .../atlas-server/tests/integration/tokens.rs | 15 +++++++ 9 files changed, 93 insertions(+), 13 deletions(-) create mode 100644 backend/crates/atlas-server/src/state_keys.rs diff --git a/backend/crates/atlas-server/src/api/handlers/addresses.rs b/backend/crates/atlas-server/src/api/handlers/addresses.rs index 3658d20..7fd2f13 100644 --- a/backend/crates/atlas-server/src/api/handlers/addresses.rs +++ b/backend/crates/atlas-server/src/api/handlers/addresses.rs @@ -7,6 +7,7 @@ use sqlx::PgPool; use std::sync::Arc; use crate::api::error::ApiResult; +use crate::api::handlers::has_complete_erc20_supply_history; use crate::api::AppState; use atlas_common::{Address, AtlasError, NftToken, PaginatedResponse, Pagination, Transaction}; @@ -83,16 +84,6 @@ async fn get_indexed_erc20_total_supply( Ok(supply) } -async fn has_erc20_transfers(pool: &PgPool, address: &str) -> Result { - let (has_transfers,): (bool,) = - sqlx::query_as("SELECT EXISTS(SELECT 1 FROM erc20_transfers WHERE contract_address = $1)") - .bind(address) - .fetch_one(pool) - .await?; - - Ok(has_transfers) -} - pub async fn list_addresses( State(state): State>, Query(filters): Query, @@ -250,7 +241,7 @@ pub async fn get_address( let erc20_contract = match erc20_contract { Some(mut erc20) => { - if erc20.total_supply.is_none() || has_erc20_transfers(&state.pool, &address).await? { + if has_complete_erc20_supply_history(&state.pool).await? { erc20.total_supply = Some(get_indexed_erc20_total_supply(&state.pool, &address).await?); } diff --git a/backend/crates/atlas-server/src/api/handlers/mod.rs b/backend/crates/atlas-server/src/api/handlers/mod.rs index d4c2076..f804941 100644 --- a/backend/crates/atlas-server/src/api/handlers/mod.rs +++ b/backend/crates/atlas-server/src/api/handlers/mod.rs @@ -16,6 +16,8 @@ pub mod transactions; use atlas_common::{Block, BLOCK_COLUMNS}; use sqlx::PgPool; +use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; + pub async fn get_latest_block(pool: &PgPool) -> Result, sqlx::Error> { sqlx::query_as(&format!( "SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", @@ -24,6 +26,19 @@ pub async fn get_latest_block(pool: &PgPool) -> Result, sqlx::Erro .fetch_optional(pool) .await } + +pub async fn has_complete_erc20_supply_history(pool: &PgPool) -> Result { + let value: Option<(String,)> = + sqlx::query_as("SELECT value FROM indexer_state WHERE key = $1 LIMIT 1") + .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) + .fetch_optional(pool) + .await?; + + Ok(matches!( + value.as_ref().map(|(value,)| value.as_str()), + Some("true") + )) +} fn exact_count_sql(table_name: &str) -> Result<&'static str, sqlx::Error> { match table_name { "transactions" => Ok("SELECT COUNT(*) FROM transactions"), diff --git a/backend/crates/atlas-server/src/api/handlers/tokens.rs b/backend/crates/atlas-server/src/api/handlers/tokens.rs index 7fc012d..7e8323c 100644 --- a/backend/crates/atlas-server/src/api/handlers/tokens.rs +++ b/backend/crates/atlas-server/src/api/handlers/tokens.rs @@ -7,6 +7,7 @@ use sqlx::PgPool; use std::sync::Arc; use crate::api::error::ApiResult; +use crate::api::handlers::has_complete_erc20_supply_history; use crate::api::handlers::stats::WindowQuery; use crate::api::AppState; use atlas_common::{ @@ -97,7 +98,7 @@ pub async fn get_token( .fetch_one(&state.pool) .await?; - if transfer_count.0 > 0 || contract.total_supply.is_none() { + if has_complete_erc20_supply_history(&state.pool).await? { contract.total_supply = Some(get_indexed_total_supply(&state.pool, &address).await?); } @@ -133,7 +134,7 @@ pub async fn get_token_holders( .fetch_one(&state.pool) .await?; - let total_supply = if total.0 > 0 { + let total_supply = if has_complete_erc20_supply_history(&state.pool).await? { Some(get_indexed_total_supply(&state.pool, &address).await?) } else { let stored: Option<(Option,)> = diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index 939cda7..a3704c2 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -25,6 +25,7 @@ use super::fetcher::{ }; use crate::config::Config; use crate::head::HeadTracker; +use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; /// Partition size: 10 million blocks per partition const PARTITION_SIZE: u64 = 10_000_000; @@ -118,6 +119,12 @@ impl Indexer { // Get starting block let start_block = self.get_start_block().await?; + let erc20_supply_history_status = self.get_erc20_supply_history_status().await?; + let mut erc20_supply_backfill_pending = matches!(erc20_supply_history_status, Some(false)) + || (erc20_supply_history_status.is_none() && start_block == 0); + if erc20_supply_history_status.is_none() && start_block == 0 { + self.set_erc20_supply_history_complete(false).await?; + } tracing::info!("Starting indexing from block {}", start_block); // Load known contracts into memory to avoid a SELECT per transfer @@ -197,6 +204,10 @@ impl Indexer { tracing::debug!("Chain head: {}, current: {}", head, current_block); if current_block > head { + if erc20_supply_backfill_pending { + self.set_erc20_supply_history_complete(true).await?; + erc20_supply_backfill_pending = false; + } // At head, wait for new blocks tokio::time::sleep(Duration::from_secs(1)).await; continue; @@ -393,6 +404,11 @@ impl Indexer { current_block = end_block + 1; + if erc20_supply_backfill_pending && current_block > head { + self.set_erc20_supply_history_complete(true).await?; + erc20_supply_backfill_pending = false; + } + // Log progress after every batch let elapsed = last_log_time.elapsed(); let blocks_per_sec = batch_size as f64 / elapsed.as_secs_f64(); @@ -958,6 +974,30 @@ impl Indexer { .await?; Ok(()) } + + async fn get_erc20_supply_history_status(&self) -> Result> { + let value: Option<(String,)> = + sqlx::query_as("SELECT value FROM indexer_state WHERE key = $1 LIMIT 1") + .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) + .fetch_optional(&self.pool) + .await?; + + Ok(value.map(|(value,)| value == "true")) + } + + async fn set_erc20_supply_history_complete(&self, complete: bool) -> Result<()> { + sqlx::query( + "INSERT INTO indexer_state (key, value, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at", + ) + .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) + .bind(if complete { "true" } else { "false" }) + .execute(&self.pool) + .await?; + + Ok(()) + } } #[cfg(test)] diff --git a/backend/crates/atlas-server/src/lib.rs b/backend/crates/atlas-server/src/lib.rs index f9da928..9d7d862 100644 --- a/backend/crates/atlas-server/src/lib.rs +++ b/backend/crates/atlas-server/src/lib.rs @@ -4,3 +4,4 @@ pub mod config; pub mod faucet; pub mod head; pub mod indexer; +pub mod state_keys; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 14d932c..f192460 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -14,6 +14,7 @@ mod config; mod faucet; mod head; mod indexer; +mod state_keys; /// Retry delays for exponential backoff (in seconds) const RETRY_DELAYS: &[u64] = &[5, 10, 20, 30, 60]; diff --git a/backend/crates/atlas-server/src/state_keys.rs b/backend/crates/atlas-server/src/state_keys.rs new file mode 100644 index 0000000..f8df2e5 --- /dev/null +++ b/backend/crates/atlas-server/src/state_keys.rs @@ -0,0 +1 @@ +pub const ERC20_SUPPLY_HISTORY_COMPLETE_KEY: &str = "erc20_supply_history_complete"; diff --git a/backend/crates/atlas-server/tests/integration/addresses.rs b/backend/crates/atlas-server/tests/integration/addresses.rs index 863baf9..e7ce92a 100644 --- a/backend/crates/atlas-server/tests/integration/addresses.rs +++ b/backend/crates/atlas-server/tests/integration/addresses.rs @@ -5,6 +5,7 @@ use axum::{ use tower::ServiceExt; use crate::common; +use atlas_server::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; // Block range: 5000-5999 @@ -129,6 +130,19 @@ async fn seed_erc20_address_data(pool: &sqlx::PgPool) { .expect("seed erc20 transfer"); } +async fn set_erc20_supply_history_complete(pool: &sqlx::PgPool, complete: bool) { + sqlx::query( + "INSERT INTO indexer_state (key, value, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at", + ) + .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) + .bind(if complete { "true" } else { "false" }) + .execute(pool) + .await + .expect("set erc20 supply history completeness"); +} + #[test] fn get_address_detail() { common::run(async { @@ -184,6 +198,7 @@ fn get_erc20_address_detail_prefers_indexed_supply() { let pool = common::pool(); seed_address_data(pool).await; seed_erc20_address_data(pool).await; + set_erc20_supply_history_complete(pool, true).await; let app = common::test_router(); let response = app diff --git a/backend/crates/atlas-server/tests/integration/tokens.rs b/backend/crates/atlas-server/tests/integration/tokens.rs index 91b680a..8844842 100644 --- a/backend/crates/atlas-server/tests/integration/tokens.rs +++ b/backend/crates/atlas-server/tests/integration/tokens.rs @@ -5,6 +5,7 @@ use axum::{ use tower::ServiceExt; use crate::common; +use atlas_server::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; // Block range: 6000-6999 @@ -168,6 +169,19 @@ async fn seed_token_chart_data(pool: &sqlx::PgPool) { .expect("seed token chart transfer"); } +async fn set_erc20_supply_history_complete(pool: &sqlx::PgPool, complete: bool) { + sqlx::query( + "INSERT INTO indexer_state (key, value, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at", + ) + .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) + .bind(if complete { "true" } else { "false" }) + .execute(pool) + .await + .expect("set erc20 supply history completeness"); +} + #[test] fn list_tokens() { common::run(async { @@ -224,6 +238,7 @@ fn get_token_detail_prefers_indexed_supply_over_stale_stored_value() { common::run(async { let pool = common::pool(); seed_token_data(pool).await; + set_erc20_supply_history_complete(pool, true).await; sqlx::query("UPDATE erc20_contracts SET total_supply = $2 WHERE address = $1") .bind(TOKEN_A)