-
Notifications
You must be signed in to change notification settings - Fork 0
fix: stale ERC20 total supply #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,9 +3,11 @@ use axum::{ | |
| Json, | ||
| }; | ||
| use chrono::Utc; | ||
| use sqlx::PgPool; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::api::error::ApiResult; | ||
| use crate::api::handlers::has_complete_erc20_supply_history; | ||
| use crate::api::handlers::stats::WindowQuery; | ||
| use crate::api::AppState; | ||
| use atlas_common::{ | ||
|
|
@@ -50,6 +52,22 @@ pub struct TokenDetailResponse { | |
| pub transfer_count: i64, | ||
| } | ||
|
|
||
| async fn get_indexed_total_supply( | ||
| pool: &PgPool, | ||
| address: &str, | ||
| ) -> Result<bigdecimal::BigDecimal, sqlx::Error> { | ||
| let (supply,): (bigdecimal::BigDecimal,) = sqlx::query_as( | ||
| "SELECT COALESCE(SUM(balance), 0) | ||
| FROM erc20_balances | ||
| WHERE contract_address = $1 AND balance > 0", | ||
| ) | ||
| .bind(address) | ||
| .fetch_one(pool) | ||
| .await?; | ||
|
|
||
| Ok(supply) | ||
| } | ||
|
|
||
| /// GET /api/tokens/:address - Get token details | ||
| pub async fn get_token( | ||
| State(state): State<Arc<AppState>>, | ||
|
|
@@ -80,16 +98,8 @@ pub async fn get_token( | |
| .fetch_one(&state.pool) | ||
| .await?; | ||
|
|
||
| // Compute total_supply from balances if not set | ||
| if contract.total_supply.is_none() { | ||
| let computed_supply: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as( | ||
| "SELECT COALESCE(SUM(balance), 0) FROM erc20_balances WHERE contract_address = $1 AND balance > 0", | ||
| ) | ||
| .bind(&address) | ||
| .fetch_optional(&state.pool) | ||
| .await?; | ||
|
|
||
| contract.total_supply = computed_supply.map(|(s,)| s); | ||
| if has_complete_erc20_supply_history(&state.pool).await? { | ||
| contract.total_supply = Some(get_indexed_total_supply(&state.pool, &address).await?); | ||
|
Comment on lines
+101
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The incomplete-history path still exposes indexed supply.
Also applies to: 137-145 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| Ok(Json(TokenDetailResponse { | ||
|
|
@@ -124,29 +134,15 @@ pub async fn get_token_holders( | |
| .fetch_one(&state.pool) | ||
| .await?; | ||
|
|
||
| // Get total supply for percentage calculation | ||
| // First try to get it from the contract, if NULL compute from sum of balances | ||
| let total_supply: Option<bigdecimal::BigDecimal> = { | ||
| let total_supply = if has_complete_erc20_supply_history(&state.pool).await? { | ||
| Some(get_indexed_total_supply(&state.pool, &address).await?) | ||
| } else { | ||
| let stored: Option<(Option<bigdecimal::BigDecimal>,)> = | ||
| sqlx::query_as("SELECT total_supply FROM erc20_contracts WHERE address = $1") | ||
| .bind(&address) | ||
| .fetch_optional(&state.pool) | ||
| .await?; | ||
|
|
||
| match stored { | ||
| Some((Some(ts),)) => Some(ts), | ||
| _ => { | ||
| // Compute from sum of balances | ||
| let computed: Option<(bigdecimal::BigDecimal,)> = sqlx::query_as( | ||
| "SELECT COALESCE(SUM(balance), 0) FROM erc20_balances | ||
| WHERE contract_address = $1 AND balance > 0", | ||
| ) | ||
| .bind(&address) | ||
| .fetch_optional(&state.pool) | ||
| .await?; | ||
| computed.map(|(s,)| s) | ||
| } | ||
| } | ||
| stored.and_then(|(supply,)| supply) | ||
| }; | ||
|
|
||
| let balances: Vec<Erc20Balance> = sqlx::query_as( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ use super::fetcher::{ | |
| }; | ||
| use crate::config::Config; | ||
| use crate::head::HeadTracker; | ||
| use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; | ||
|
|
||
| /// Partition size: 10 million blocks per partition | ||
| const PARTITION_SIZE: u64 = 10_000_000; | ||
|
|
@@ -118,6 +119,12 @@ impl Indexer { | |
|
|
||
| // Get starting block | ||
| let start_block = self.get_start_block().await?; | ||
| let erc20_supply_history_status = self.get_erc20_supply_history_status().await?; | ||
| let mut erc20_supply_backfill_pending = matches!(erc20_supply_history_status, Some(false)) | ||
| || (erc20_supply_history_status.is_none() && start_block == 0); | ||
| if erc20_supply_history_status.is_none() && start_block == 0 { | ||
| self.set_erc20_supply_history_complete(false).await?; | ||
| } | ||
|
Comment on lines
+122
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t mark supply history complete while gaps can still exist. This state machine never checks for outstanding failed blocks. If a block exhausts retries and is parked in Also applies to: 207-210, 407-410 🤖 Prompt for AI Agents |
||
| tracing::info!("Starting indexing from block {}", start_block); | ||
|
|
||
| // Load known contracts into memory to avoid a SELECT per transfer | ||
|
|
@@ -197,6 +204,10 @@ impl Indexer { | |
| tracing::debug!("Chain head: {}, current: {}", head, current_block); | ||
|
|
||
| if current_block > head { | ||
| if erc20_supply_backfill_pending { | ||
| self.set_erc20_supply_history_complete(true).await?; | ||
| erc20_supply_backfill_pending = false; | ||
| } | ||
| // At head, wait for new blocks | ||
| tokio::time::sleep(Duration::from_secs(1)).await; | ||
| continue; | ||
|
|
@@ -393,6 +404,11 @@ impl Indexer { | |
|
|
||
| current_block = end_block + 1; | ||
|
|
||
| if erc20_supply_backfill_pending && current_block > head { | ||
| self.set_erc20_supply_history_complete(true).await?; | ||
| erc20_supply_backfill_pending = false; | ||
| } | ||
|
|
||
| // Log progress after every batch | ||
| let elapsed = last_log_time.elapsed(); | ||
| let blocks_per_sec = batch_size as f64 / elapsed.as_secs_f64(); | ||
|
|
@@ -614,15 +630,19 @@ impl Indexer { | |
| // Aggregate balance deltas — multiple transfers in the same batch | ||
| // for the same (address, contract) pair are summed in Rust, | ||
| // so we only need one DB upsert per unique pair. | ||
| if from != ZERO_ADDRESS { | ||
| if from == ZERO_ADDRESS { | ||
| batch.apply_supply_delta(contract.clone(), value.clone()); | ||
| } else { | ||
| batch.apply_balance_delta( | ||
| from, | ||
| contract.clone(), | ||
| -value.clone(), | ||
| block_num as i64, | ||
| ); | ||
| } | ||
| if to != ZERO_ADDRESS { | ||
| if to == ZERO_ADDRESS { | ||
| batch.apply_supply_delta(contract.clone(), -value); | ||
| } else { | ||
| batch.apply_balance_delta( | ||
| to, | ||
| contract.clone(), | ||
|
|
@@ -673,6 +693,7 @@ impl Indexer { | |
| ec_addresses, | ||
| ec_first_seen_blocks, | ||
| balance_map, | ||
| supply_map, | ||
| last_block, | ||
| .. | ||
| } = batch; | ||
|
|
@@ -800,6 +821,26 @@ impl Indexer { | |
| .await?; | ||
| } | ||
|
|
||
| if !supply_map.is_empty() { | ||
| let mut supply_contracts = Vec::with_capacity(supply_map.len()); | ||
| let mut supply_deltas = Vec::with_capacity(supply_map.len()); | ||
| for (contract, delta) in supply_map { | ||
| supply_contracts.push(contract); | ||
| supply_deltas.push(delta.to_string()); | ||
| } | ||
|
|
||
| let params: [&(dyn ToSql + Sync); 2] = [&supply_contracts, &supply_deltas]; | ||
| pg_tx | ||
| .execute( | ||
| "UPDATE erc20_contracts AS c | ||
| SET total_supply = COALESCE(c.total_supply, 0) + s.supply_delta::numeric | ||
| FROM unnest($1::text[], $2::text[]) AS s(contract_address, supply_delta) | ||
| WHERE c.address = s.contract_address", | ||
| ¶ms, | ||
| ) | ||
| .await?; | ||
| } | ||
|
|
||
| if update_watermark { | ||
| let last_value = last_block.to_string(); | ||
| pg_tx | ||
|
|
@@ -933,6 +974,30 @@ impl Indexer { | |
| .await?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn get_erc20_supply_history_status(&self) -> Result<Option<bool>> { | ||
| let value: Option<(String,)> = | ||
| sqlx::query_as("SELECT value FROM indexer_state WHERE key = $1 LIMIT 1") | ||
| .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) | ||
| .fetch_optional(&self.pool) | ||
| .await?; | ||
|
|
||
| Ok(value.map(|(value,)| value == "true")) | ||
| } | ||
|
|
||
| async fn set_erc20_supply_history_complete(&self, complete: bool) -> Result<()> { | ||
| sqlx::query( | ||
| "INSERT INTO indexer_state (key, value, updated_at) | ||
| VALUES ($1, $2, NOW()) | ||
| ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at", | ||
| ) | ||
| .bind(ERC20_SUPPLY_HISTORY_COMPLETE_KEY) | ||
| .bind(if complete { "true" } else { "false" }) | ||
| .execute(&self.pool) | ||
| .await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
@@ -1049,6 +1114,43 @@ mod tests { | |
| let contract = batch.ec_addresses[0].clone(); | ||
| let to = "0x2222222222222222222222222222222222222222"; | ||
| assert!(batch.balance_map.contains_key(&(to.to_string(), contract))); | ||
| assert_eq!( | ||
| batch.supply_map["0x3333333333333333333333333333333333333333"], | ||
| BigDecimal::from(1000) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn collect_erc20_burn_tracks_negative_supply_delta() { | ||
| let mut batch = BlockBatch::new(); | ||
| let known_erc20 = HashSet::new(); | ||
| let known_nft = HashSet::new(); | ||
|
|
||
| let logs = serde_json::json!([{ | ||
| "address": "0x3333333333333333333333333333333333333333", | ||
| "topics": [ | ||
| "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", | ||
| "0x0000000000000000000000001111111111111111111111111111111111111111", | ||
| "0x0000000000000000000000000000000000000000000000000000000000000000" | ||
| ], | ||
| "data": "0x00000000000000000000000000000000000000000000000000000000000003e8", | ||
| "blockNumber": "0x1", | ||
| "transactionHash": "0x0000000000000000000000000000000000000000000000000000000000000001", | ||
| "transactionIndex": "0x0", | ||
| "blockHash": "0x0000000000000000000000000000000000000000000000000000000000000001", | ||
| "logIndex": "0x0", | ||
| "removed": false | ||
| }]); | ||
|
|
||
| let mut fb = empty_fetched_block(1); | ||
| fb.receipts = vec![make_receipt(logs)]; | ||
| Indexer::collect_block(&mut batch, &known_erc20, &known_nft, fb); | ||
|
|
||
| assert_eq!(batch.balance_map.len(), 1); | ||
| assert_eq!( | ||
| batch.supply_map["0x3333333333333333333333333333333333333333"], | ||
| BigDecimal::from(-1000) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.