Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use axum::{
Json,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::sync::Arc;

use crate::api::error::ApiResult;
use crate::api::handlers::has_complete_erc20_supply_history;
use crate::api::AppState;
use atlas_common::{Address, AtlasError, NftToken, PaginatedResponse, Pagination, Transaction};

Expand Down Expand Up @@ -66,6 +68,22 @@ fn default_limit() -> u32 {
20
}

async fn get_indexed_erc20_total_supply(
pool: &PgPool,
address: &str,
) -> Result<bigdecimal::BigDecimal, sqlx::Error> {
let (supply,): (bigdecimal::BigDecimal,) = sqlx::query_as(
"SELECT COALESCE(SUM(balance), 0)
FROM erc20_balances
WHERE contract_address = $1 AND balance > 0",
)
.bind(address)
.fetch_one(pool)
.await?;

Ok(supply)
}

pub async fn list_addresses(
State(state): State<Arc<AppState>>,
Query(filters): Query<AddressFilters>,
Expand Down Expand Up @@ -221,6 +239,17 @@ pub async fn get_address(
.fetch_optional(&state.pool)
.await?;

let erc20_contract = match erc20_contract {
Some(mut erc20) => {
if has_complete_erc20_supply_history(&state.pool).await? {
erc20.total_supply =
Some(get_indexed_erc20_total_supply(&state.pool, &address).await?);
}
Some(erc20)
}
None => None,
};

// Merge the data
match (base_addr, nft_contract, erc20_contract) {
// Found in addresses table and is an NFT contract
Expand Down
15 changes: 15 additions & 0 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod transactions;
use atlas_common::{Block, BLOCK_COLUMNS};
use sqlx::PgPool;

use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY;

pub async fn get_latest_block(pool: &PgPool) -> Result<Option<Block>, sqlx::Error> {
sqlx::query_as(&format!(
"SELECT {} FROM blocks ORDER BY number DESC LIMIT 1",
Expand All @@ -24,6 +26,19 @@ pub async fn get_latest_block(pool: &PgPool) -> Result<Option<Block>, sqlx::Erro
.fetch_optional(pool)
.await
}

pub async fn has_complete_erc20_supply_history(pool: &PgPool) -> Result<bool, sqlx::Error> {
let value: Option<(String,)> =
sqlx::query_as("SELECT value FROM indexer_state WHERE key = $1 LIMIT 1")
.bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY)
.fetch_optional(pool)
.await?;

Ok(matches!(
value.as_ref().map(|(value,)| value.as_str()),
Some("true")
))
}
fn exact_count_sql(table_name: &str) -> Result<&'static str, sqlx::Error> {
match table_name {
"transactions" => Ok("SELECT COUNT(*) FROM transactions"),
Expand Down
52 changes: 24 additions & 28 deletions backend/crates/atlas-server/src/api/handlers/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use axum::{
Json,
};
use chrono::Utc;
use sqlx::PgPool;
use std::sync::Arc;

use crate::api::error::ApiResult;
use crate::api::handlers::has_complete_erc20_supply_history;
use crate::api::handlers::stats::WindowQuery;
use crate::api::AppState;
use atlas_common::{
Expand Down Expand Up @@ -50,6 +52,22 @@ pub struct TokenDetailResponse {
pub transfer_count: i64,
}

async fn get_indexed_total_supply(
pool: &PgPool,
address: &str,
) -> Result<bigdecimal::BigDecimal, sqlx::Error> {
let (supply,): (bigdecimal::BigDecimal,) = sqlx::query_as(
"SELECT COALESCE(SUM(balance), 0)
FROM erc20_balances
WHERE contract_address = $1 AND balance > 0",
)
.bind(address)
.fetch_one(pool)
.await?;

Ok(supply)
}

/// GET /api/tokens/:address - Get token details
pub async fn get_token(
State(state): State<Arc<AppState>>,
Expand Down Expand Up @@ -80,16 +98,8 @@ pub async fn get_token(
.fetch_one(&state.pool)
.await?;

// Compute total_supply from balances if not set
if contract.total_supply.is_none() {
let computed_supply: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as(
"SELECT COALESCE(SUM(balance), 0) FROM erc20_balances WHERE contract_address = $1 AND balance > 0",
)
.bind(&address)
.fetch_optional(&state.pool)
.await?;

contract.total_supply = computed_supply.map(|(s,)| s);
if has_complete_erc20_supply_history(&state.pool).await? {
contract.total_supply = Some(get_indexed_total_supply(&state.pool, &address).await?);
Comment on lines +101 to +102
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The incomplete-history path still exposes indexed supply.

backend/crates/atlas-server/src/indexer/indexer.rs now advances erc20_contracts.total_supply from indexed mint/burn deltas. When has_complete_erc20_supply_history() is false, get_token leaves that field untouched and get_token_holders falls back to it, so a fresh sync can still return a partial supply before the completeness flag flips. The false branch needs a separate trusted source, or it should suppress total_supply until history is complete.

Also applies to: 137-145

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-server/src/api/handlers/tokens.rs` around lines 101 -
102, The current logic lets indexed supply leak when history is incomplete: when
has_complete_erc20_supply_history(&state.pool) is false you must not surface
erc20_contracts.total_supply (or any value derived from
get_indexed_total_supply) to callers of get_token or get_token_holders; change
the false-branch so that get_token clears/suppresses contract.total_supply (set
to None) and ensure get_token_holders does not fall back to the stored
erc20_contracts.total_supply value—either query a trusted on-chain source
instead or return no total_supply until has_complete_erc20_supply_history
returns true; apply the same change to the other occurrence around lines 137-145
where total_supply is currently used.

}

Ok(Json(TokenDetailResponse {
Expand Down Expand Up @@ -124,29 +134,15 @@ pub async fn get_token_holders(
.fetch_one(&state.pool)
.await?;

// Get total supply for percentage calculation
// First try to get it from the contract, if NULL compute from sum of balances
let total_supply: Option<bigdecimal::BigDecimal> = {
let total_supply = if has_complete_erc20_supply_history(&state.pool).await? {
Some(get_indexed_total_supply(&state.pool, &address).await?)
} else {
let stored: Option<(Option<bigdecimal::BigDecimal>,)> =
sqlx::query_as("SELECT total_supply FROM erc20_contracts WHERE address = $1")
.bind(&address)
.fetch_optional(&state.pool)
.await?;

match stored {
Some((Some(ts),)) => Some(ts),
_ => {
// Compute from sum of balances
let computed: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as(
"SELECT COALESCE(SUM(balance), 0) FROM erc20_balances
WHERE contract_address = $1 AND balance > 0",
)
.bind(&address)
.fetch_optional(&state.pool)
.await?;
computed.map(|(s,)| s)
}
}
stored.and_then(|(supply,)| supply)
};

let balances: Vec<Erc20Balance> = sqlx::query_as(
Expand Down
24 changes: 24 additions & 0 deletions backend/crates/atlas-server/src/indexer/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub(crate) struct BlockBatch {
// erc20_balances — aggregated deltas per (address, contract)
pub(crate) balance_map: HashMap<(String, String), BalanceDelta>,

// erc20 total supply deltas — aggregated per contract from mint/burn events
pub(crate) supply_map: HashMap<String, BigDecimal>,

// Contracts newly discovered in this batch.
// These are NOT merged into the persistent known_* sets until after a
// successful write, so a failed write doesn't leave the in-memory sets
Expand Down Expand Up @@ -158,6 +161,16 @@ impl BlockBatch {
entry.last_block = entry.last_block.max(block);
}

/// Add a total supply delta for a contract.
/// Only mint and burn transfers should touch this accumulator.
pub(crate) fn apply_supply_delta(&mut self, contract: String, delta: BigDecimal) {
let entry = self
.supply_map
.entry(contract)
.or_insert(BigDecimal::from(0));
*entry += delta;
}

pub(crate) fn materialize_blocks(&self, indexed_at: DateTime<Utc>) -> Vec<Block> {
debug_assert_eq!(self.b_numbers.len(), self.b_hashes.len());
debug_assert_eq!(self.b_numbers.len(), self.b_parent_hashes.len());
Expand Down Expand Up @@ -257,6 +270,17 @@ mod tests {
assert_eq!(entry.last_block, 100);
}

#[test]
fn apply_supply_delta_accumulates_by_contract() {
let mut batch = BlockBatch::new();
let contract = "0xtoken".to_string();

batch.apply_supply_delta(contract.clone(), BigDecimal::from(100));
batch.apply_supply_delta(contract.clone(), BigDecimal::from(-25));

assert_eq!(batch.supply_map[&contract], BigDecimal::from(75));
}

#[test]
fn materialize_blocks_preserves_parallel_block_fields() {
let mut batch = BlockBatch::new();
Expand Down
106 changes: 104 additions & 2 deletions backend/crates/atlas-server/src/indexer/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::fetcher::{
};
use crate::config::Config;
use crate::head::HeadTracker;
use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY;

/// Partition size: 10 million blocks per partition
const PARTITION_SIZE: u64 = 10_000_000;
Expand Down Expand Up @@ -118,6 +119,12 @@ impl Indexer {

// Get starting block
let start_block = self.get_start_block().await?;
let erc20_supply_history_status = self.get_erc20_supply_history_status().await?;
let mut erc20_supply_backfill_pending = matches!(erc20_supply_history_status, Some(false))
|| (erc20_supply_history_status.is_none() && start_block == 0);
if erc20_supply_history_status.is_none() && start_block == 0 {
self.set_erc20_supply_history_complete(false).await?;
}
Comment on lines +122 to +127
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t mark supply history complete while gaps can still exist.

This state machine never checks for outstanding failed blocks. If a block exhausts retries and is parked in failed_blocks, current_block > head can still flip ERC20_SUPPLY_HISTORY_COMPLETE_KEY to true, and once the key is true later gaps will not turn it back off. Since the API uses this flag to trust indexed supply, that can expose incorrect totals after any permanent hole in the indexed history. Tie the flag to a gap-free condition, and reset it when a block is deferred into failed_blocks.

Also applies to: 207-210, 407-410

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-server/src/indexer/indexer.rs` around lines 122 - 127,
The logic that sets ERC20 supply-history completion (via
get_erc20_supply_history_status and set_erc20_supply_history_complete) must only
mark complete when there are no outstanding gaps or permanently failed blocks;
update the code so it checks the failed_blocks set (or parked failed count) and
any deferred/failed retry state before setting the flag, and ensure any code
paths that move a block into failed_blocks immediately call
set_erc20_supply_history_complete(false) to reset the flag; apply this change to
the same pattern found around the other similar blocks (the other
get_/set_erc20_supply_history_status usages) so the flag represents a gap-free
condition and is cleared when a block is deferred to failed_blocks.

tracing::info!("Starting indexing from block {}", start_block);

// Load known contracts into memory to avoid a SELECT per transfer
Expand Down Expand Up @@ -197,6 +204,10 @@ impl Indexer {
tracing::debug!("Chain head: {}, current: {}", head, current_block);

if current_block > head {
if erc20_supply_backfill_pending {
self.set_erc20_supply_history_complete(true).await?;
erc20_supply_backfill_pending = false;
}
// At head, wait for new blocks
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
Expand Down Expand Up @@ -393,6 +404,11 @@ impl Indexer {

current_block = end_block + 1;

if erc20_supply_backfill_pending && current_block > head {
self.set_erc20_supply_history_complete(true).await?;
erc20_supply_backfill_pending = false;
}

// Log progress after every batch
let elapsed = last_log_time.elapsed();
let blocks_per_sec = batch_size as f64 / elapsed.as_secs_f64();
Expand Down Expand Up @@ -614,15 +630,19 @@ impl Indexer {
// Aggregate balance deltas — multiple transfers in the same batch
// for the same (address, contract) pair are summed in Rust,
// so we only need one DB upsert per unique pair.
if from != ZERO_ADDRESS {
if from == ZERO_ADDRESS {
batch.apply_supply_delta(contract.clone(), value.clone());
} else {
batch.apply_balance_delta(
from,
contract.clone(),
-value.clone(),
block_num as i64,
);
}
if to != ZERO_ADDRESS {
if to == ZERO_ADDRESS {
batch.apply_supply_delta(contract.clone(), -value);
} else {
batch.apply_balance_delta(
to,
contract.clone(),
Expand Down Expand Up @@ -673,6 +693,7 @@ impl Indexer {
ec_addresses,
ec_first_seen_blocks,
balance_map,
supply_map,
last_block,
..
} = batch;
Expand Down Expand Up @@ -800,6 +821,26 @@ impl Indexer {
.await?;
}

if !supply_map.is_empty() {
let mut supply_contracts = Vec::with_capacity(supply_map.len());
let mut supply_deltas = Vec::with_capacity(supply_map.len());
for (contract, delta) in supply_map {
supply_contracts.push(contract);
supply_deltas.push(delta.to_string());
}

let params: [&(dyn ToSql + Sync); 2] = [&supply_contracts, &supply_deltas];
pg_tx
.execute(
"UPDATE erc20_contracts AS c
SET total_supply = COALESCE(c.total_supply, 0) + s.supply_delta::numeric
FROM unnest($1::text[], $2::text[]) AS s(contract_address, supply_delta)
WHERE c.address = s.contract_address",
&params,
)
.await?;
}

if update_watermark {
let last_value = last_block.to_string();
pg_tx
Expand Down Expand Up @@ -933,6 +974,30 @@ impl Indexer {
.await?;
Ok(())
}

async fn get_erc20_supply_history_status(&self) -> Result<Option<bool>> {
let value: Option<(String,)> =
sqlx::query_as("SELECT value FROM indexer_state WHERE key = $1 LIMIT 1")
.bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY)
.fetch_optional(&self.pool)
.await?;

Ok(value.map(|(value,)| value == "true"))
}

async fn set_erc20_supply_history_complete(&self, complete: bool) -> Result<()> {
sqlx::query(
"INSERT INTO indexer_state (key, value, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at",
)
.bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY)
.bind(if complete { "true" } else { "false" })
.execute(&self.pool)
.await?;

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1049,6 +1114,43 @@ mod tests {
let contract = batch.ec_addresses[0].clone();
let to = "0x2222222222222222222222222222222222222222";
assert!(batch.balance_map.contains_key(&(to.to_string(), contract)));
assert_eq!(
batch.supply_map["0x3333333333333333333333333333333333333333"],
BigDecimal::from(1000)
);
}

#[test]
fn collect_erc20_burn_tracks_negative_supply_delta() {
let mut batch = BlockBatch::new();
let known_erc20 = HashSet::new();
let known_nft = HashSet::new();

let logs = serde_json::json!([{
"address": "0x3333333333333333333333333333333333333333",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x0000000000000000000000001111111111111111111111111111111111111111",
"0x0000000000000000000000000000000000000000000000000000000000000000"
],
"data": "0x00000000000000000000000000000000000000000000000000000000000003e8",
"blockNumber": "0x1",
"transactionHash": "0x0000000000000000000000000000000000000000000000000000000000000001",
"transactionIndex": "0x0",
"blockHash": "0x0000000000000000000000000000000000000000000000000000000000000001",
"logIndex": "0x0",
"removed": false
}]);

let mut fb = empty_fetched_block(1);
fb.receipts = vec![make_receipt(logs)];
Indexer::collect_block(&mut batch, &known_erc20, &known_nft, fb);

assert_eq!(batch.balance_map.len(), 1);
assert_eq!(
batch.supply_map["0x3333333333333333333333333333333333333333"],
BigDecimal::from(-1000)
);
}

#[test]
Expand Down
Loading
Loading