Real-time ERC20 token balance tracking service with SSE (Server-Sent Events) support.
- Real-time balance updates via SSE
- Multicall3 for efficient batch balance queries
- WebSocket subscriptions for ERC20 Transfer events
- WETH wrap/unwrap event listening (Deposit/Withdrawal)
- WebSocket auto-reconnect with automatic resubscription on disconnect
- Block-aware snapshot updates (stale update protection via block number comparison)
- Multi-chain support (Ethereum, Arbitrum, Sepolia)
- Session-based token list management
- Shared subscriptions for multiple clients watching the same wallet
- Token list caching with TTL (5 hours)
- Token limit per session (max 1000 tokens)
- Diff-based updates (first SSE event is full snapshot, all subsequent events are diffs only)
- Event batching via calls queue (300ms delay combines rapid events into a single multicall)
Creates a new session with token lists to watch. Must be called before SSE connection.
POST /{chain_id}/sessions/{owner}
Content-Type: application/json
{
"tokensListsUrls": ["https://tokens.coingecko.com/uniswap/all.json"],
"customTokens": ["0xTokenAddress1", "0xTokenAddress2"]
}Response:
| Status | Description |
|---|---|
200 OK |
Session created successfully |
400 Bad Request |
Both tokensListsUrls and customTokens are empty, or token limit exceeded |
Example:
curl -X POST http://localhost:8080/1/sessions/0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045 \
-H "Content-Type: application/json" \
-d '{"tokensListsUrls": ["https://tokens.coingecko.com/uniswap/all.json"]}'Adds more tokens to an existing session.
PUT /{chain_id}/sessions/{owner}
Content-Type: application/json
{
"tokensListsUrls": ["https://another-list.json"],
"customTokens": ["0xNewTokenAddress"]
}Response:
| Status | Description |
|---|---|
200 OK |
Session updated successfully |
400 Bad Request |
Both fields empty or token limit exceeded |
404 Not Found |
Session does not exist |
Subscribe to real-time balance updates. Requires an active session.
curl -N http://localhost:8080/sse/{chain_id}/balances/{owner}Example (Ethereum mainnet):
curl -N http://localhost:8080/sse/1/balances/0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045SSE Events:
| Event | Description |
|---|---|
balance_update |
First event is the full snapshot of all watched token balances. Every subsequent event is a diff only — it contains only the balances that changed since the last update. This applies to both on-chain event triggers and periodic snapshot refreshes. |
error |
Error message |
Response format:
event: balance_update
data: {"balances":{"0xToken1Address":"1000000","0xToken2Address":"500000"}}
event: error
data: {"code":500,"message":"Error description"}
curl http://localhost:8080/{chain_id}/balance/{owner}/{token}Example:
curl http://localhost:8080/1/balance/0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045/0xdAC17F958D2ee523a2206206994597C13D831ec7All error responses follow this structure:
{
"code": 400,
"message": "Bad request: tokens_lists_urls && custom_tokens are empty"
}- Create session with token lists URLs and/or custom tokens
- Connect to SSE to receive real-time updates
- (Optional) Update session to add more tokens dynamically
sequenceDiagram
participant Client
participant Server
participant Blockchain
Client->>Server: POST /1/sessions/0x... (token lists)
Server-->>Client: 200 OK
Client->>Server: GET /sse/1/balances/0x...
Server-->>Client: SSE: balance_update (full snapshot)
loop On ERC20 Transfer / WETH Deposit/Withdrawal
Blockchain-->>Server: Event detected (batched via queue, 300ms delay)
Server-->>Client: SSE: balance_update (diff only)
end
loop Every snapshot interval
Server-->>Client: SSE: balance_update (diff only)
end
Client->>Server: PUT /1/sessions/0x... (add tokens)
Server-->>Client: 200 OK
flowchart TB
subgraph Client
FE[Frontend App]
end
subgraph API["API Layer (Axum)"]
CS["POST /{chain_id}/sessions/{owner}<br/>Create Session"]
US["PUT /{chain_id}/sessions/{owner}<br/>Update Session"]
SSE["GET /sse/{chain_id}/balances/{owner}<br/>SSE Stream"]
end
subgraph SessionMgr["Session Manager"]
SM[SessionManager]
TLF[TokenListFetcher<br/>HTTP fetch + 5h cache]
SubMgr[SubscriptionManager<br/>session registry + cleanup]
end
subgraph Session["Per-Session State (Subscription)"]
Snap["BalanceSnapshot<br/>HashMap<Address, Balance><br/>block-guarded updates"]
BC["broadcast::channel<br/>fan-out to SSE clients"]
Tokens["Watched Tokens<br/>HashSet<Address>"]
CT["CancellationToken"]
end
subgraph Watcher["Watcher (4 background tasks per session)"]
T1["Task 1: Snapshot Updater<br/>periodic timer + sync_notify"]
T2a["Task 2a: ERC20 Transfer Listener<br/>topic: from = owner"]
T2b["Task 2b: ERC20 Transfer Listener<br/>topic: to = owner"]
T3["Task 3: WETH9 Listener<br/>Deposit / Withdrawal"]
T4["Task 4: Queue Result Receiver<br/>reads batched results"]
end
subgraph Queue["CallsQueue (300ms debounce)"]
CQ["Pending tokens map<br/>upsert_delayed_call()"]
FL["flush() → process_batch()"]
end
subgraph Blockchain["Blockchain (via Alchemy)"]
WS["WebSocket Provider<br/>log subscriptions"]
HTTP["HTTP Provider<br/>multicall reads"]
MC["Multicall3<br/>tryBlockAndAggregate"]
end
%% Client → API
FE -->|"POST (token lists + custom tokens)"| CS
FE -->|"PUT (add tokens)"| US
FE <-->|"SSE connection"| SSE
%% API → Session Manager
CS --> SM
US --> SM
SSE --> SubMgr
%% Session Manager internals
SM --> TLF
SM --> SubMgr
TLF -->|"fetch token lists"| HTTP
%% Session Manager → Session
SubMgr -->|"create / update"| Session
SM -->|"spawn once"| Watcher
%% Watcher → WS subscriptions
T2a -->|"subscribe_logs"| WS
T2b -->|"subscribe_logs"| WS
T3 -->|"subscribe_logs"| WS
%% WS events → Queue
T2a -->|"upsert_delayed_call"| CQ
T2b -->|"upsert_delayed_call"| CQ
T3 -->|"upsert_delayed_call"| CQ
%% Queue → Multicall
CQ -->|"300ms debounce"| FL
FL -->|"fetch_balances_via_multicall"| MC
MC --> HTTP
%% Queue results → Watcher → Snapshot → Broadcast
FL -->|"QueueMessage"| T4
T4 -->|"update_balances_and_take_diff"| Snap
T4 -->|"broadcast diff"| BC
%% Snapshot updater → Multicall → Snapshot → Broadcast
T1 -->|"fetch all balances"| MC
T1 -->|"update_balances_and_take_diff"| Snap
T1 -->|"broadcast diff"| BC
%% Broadcast → SSE
BC -->|"BalanceEvent"| SSE
%% Cleanup
SubMgr -->|"idle > 5s, 0 clients"| CT
CT -->|"cancel"| Watcher
%% SSE disconnect
SSE -->|"stream dropped → unsubscribe"| SubMgr
| Variable | Description | Default |
|---|---|---|
HTTP_BIND |
Server bind address | 0.0.0.0:8080 |
ALCHEMY_API_KEY |
Alchemy API key (required) | - |
MULTICALL_ADDRESS |
Multicall3 contract address | 0xcA11bde05977b3631167028862bE2a173976CA11 |
SNAPSHOT_INTERVAL |
Balance snapshot interval in seconds | 60 |
MAX_WATCHED_TOKENS_LIMIT |
Maximum tokens per session | 1000 |
ALLOWED_ORIGINS |
Comma-separated CORS origins | * (all) |
# Set required environment variable
export ALCHEMY_API_KEY=your_alchemy_api_key
# Run
cargo run# Build
docker-compose build
# Run
docker-compose up -d
# View logs
docker-compose logs -f| Network | Chain ID |
|---|---|
| Ethereum Mainnet | 1 |
| Arbitrum One | 42161 |
| Sepolia Testnet | 11155111 |
The service subscribes to the following on-chain events via WebSocket:
| Event | Contract | Description |
|---|---|---|
Transfer(address indexed from, address indexed to, uint256 value) |
ERC20 tokens | Triggered when tokens are transferred to/from the watched wallet |
Deposit(address indexed dst, uint256 wad) |
WETH | Triggered when ETH is wrapped to WETH |
Withdrawal(address indexed src, uint256 wad) |
WETH | Triggered when WETH is unwrapped to ETH |
When any of these events occur, the service fetches the updated balance for the affected token plus the native ETH balance, and broadcasts only the changed balances to connected clients.
| Limit | Value | Description |
|---|---|---|
| Max tokens per session | 1,000 | Maximum number of tokens that can be watched per session |
| Token list cache TTL | 5 hours | Token lists are cached to reduce HTTP requests |
| Session idle TTL | 60 seconds | Sessions with no active SSE clients are cleaned up |
| Broadcast channel capacity | 256 | Maximum pending events per subscription |
src/
├── main.rs # Entry point
├── args.rs # CLI arguments
├── app_state.rs # Application state
├── app_error.rs # Error types
├── api/ # HTTP handlers
│ ├── balance.rs # Single balance endpoint
│ ├── balances.rs # SSE balances stream
│ ├── create_session.rs # Session creation
│ └── update_session.rs # Session update
├── config/ # Configuration
├── domain/ # Domain models
│ ├── events.rs # Balance events
│ ├── network.rs # Network types
│ └── token.rs # Token types
├── evm/ # EVM contracts (ERC20, Multicall3)
├── routes/ # Router setup
├── services/ # Business logic
│ ├── subscription_manager.rs # Shared subscriptions
│ ├── watcher.rs # Balance watchers
│ ├── calls_queue.rs # Batched balance fetch queue (300ms debounce)
│ ├── balances.rs # Multicall service
│ └── token_list_fetcher.rs # Token list fetcher
├── infra/ # Infrastructure (providers)
└── tracing/ # Logging setup
- Health check endpoint -
/healthfor load balancers and monitoring - Prometheus metrics - Track subscriptions, RPC latency, WebSocket reconnections
- Token limit per session - Max 1000 tokens per session with validation
- Session expiry/cleanup - TTL for idle sessions, background cleanup task
- Graceful shutdown - Cancel watchers and close connections on SIGTERM
- WebSocket reconnection - Auto-reconnect and resubscribe on WS disconnect
- Sync state after reconnect - Re-fetch all balances after WS reconnect to recover events missed during disconnect
- Event batching - Debounce rapid events via
CallsQueue(300ms delay) and combine balance requests into a single multicall to reduce RPC usage - Token list validation - HTTPS only, domain blocklist, schema validation
- Token list fetch retry - Exponential backoff on failures
- SSE heartbeat - Periodic
:pingto prevent proxy timeouts
- WETH wrap/unwrap listening - Handle Deposit/Withdrawal events
- Token lists caching - Cache with TTL (5h) to reduce HTTP requests
- CoW Protocol order events - Listen for ETH order settlements
- ETH transactions listening - Monitor native balance changes
- Reorgs handling - Detect and handle chain reorganizations
- Balance change metadata - Include txHash, blockNumber, previousBalance
- Batch balance endpoint - One-off multi-token queries without SSE
- Allowances tracking - ERC20 Approval events and allowances in snapshot
- OpenAPI docs - Auto-generate API docs with utoipa
MIT