Conversation
|
If it starts rejecting flow, does it also return an error on the |
There was a problem hiding this comment.
Pull request overview
Adds a configurable safeguard to reject user RPC requests when the local ClickHouse backup DB grows beyond a configured size, protecting archive flow and exposing the threshold via metrics.
Changes:
- Introduces
disk_max_size_to_accept_user_rpc_mbCLI/config and propagates it into ingress runtime state. - Adds a
DiskFullJSON-RPC error and rejects user RPC when ClickHouse local backup exceeds threshold. - Exposes configured threshold as a metric and adds an integration test covering reject→drain→accept behavior.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/cli.rs | Adds new CLI/env config for max disk size threshold (MB) |
| src/lib.rs | Publishes threshold metric and passes threshold (bytes) into ingress |
| src/metrics.rs | Adds gauge metric for the configured threshold |
| src/jsonrpc.rs | Adds DiskFull JSON-RPC error and mappings |
| src/ingress/mod.rs | Enforces threshold by rejecting requests based on shared disk-size state |
| src/indexer/click/mod.rs | Tracks backup DB disk size globally and updates it from ClickHouse backup metrics; adds integration test |
| src/utils.rs | Adds helpers for generating bundles with predictable tx input sizes (test support) |
| Cargo.toml | Adds parking_lot dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| metrics::BUILD_INFO_METRICS.info(env!("CARGO_PKG_VERSION"), env!("GIT_HASH")).set(1); | ||
| metrics::CLICKHOUSE_METRICS | ||
| .disk_max_size_to_accept_user_rpc_bytes() | ||
| .set(args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024); |
There was a problem hiding this comment.
Converting *_mb to bytes via * 1024 * 1024 can overflow u64 for large CLI/env values and panic in debug builds (or wrap in release depending on build settings). Use checked_mul (and handle failure) or saturating_mul when computing bytes.
| .set(args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024); | |
| .set(args.disk_max_size_to_accept_user_rpc_mb.saturating_mul(1024 * 1024)); |
| /// Maximum local ClickHouse backup disk size in MB above which user RPC (e.g. eth_sendBundle) | ||
| /// is rejected with disk full. Defaults to 1024 MB (1 GiB). | ||
| #[clap( | ||
| long = "disk-max-size-to-accept-user-rpc-mb", | ||
| default_value_t = 1024, | ||
| env = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC", | ||
| id = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC" | ||
| )] | ||
| pub disk_max_size_to_accept_user_rpc_mb: u64, |
There was a problem hiding this comment.
The flag is documented as 'MB' but the runtime conversion uses 1024 * 1024 (MiB). Either update docs/flag naming to MiB (recommended for clarity) or change the conversion to use decimal MB (1_000_000) to match the current documentation. Also consider aligning the env var name with the unit (e.g., ..._MB or ..._MIB) to avoid ambiguity.
| Self::MethodNotFound(_) => StatusCode::NOT_FOUND, | ||
| Self::RateLimited => StatusCode::TOO_MANY_REQUESTS, | ||
| Self::Internal | Self::Unknown(_) => StatusCode::INTERNAL_SERVER_ERROR, | ||
| Self::Internal | Self::Unknown(_) | Self::DiskFull => StatusCode::INTERNAL_SERVER_ERROR, |
There was a problem hiding this comment.
Mapping DiskFull to HTTP 500 makes it look like a server crash rather than a capacity/backpressure condition. Consider using 507 Insufficient Storage (closest semantic match) or 503 Service Unavailable so clients can distinguish this from internal failures.
| Self::Internal | Self::Unknown(_) | Self::DiskFull => StatusCode::INTERNAL_SERVER_ERROR, | |
| Self::Internal | Self::Unknown(_) => StatusCode::INTERNAL_SERVER_ERROR, | |
| Self::DiskFull => StatusCode::INSUFFICIENT_STORAGE, |
| self.bundle_receipts_size = size; | ||
| } | ||
| pub(crate) fn disk_size(&self) -> u64 { | ||
| self.bundles_size + self.bundle_receipts_size |
There was a problem hiding this comment.
Summing two u64 sizes with + can overflow and wrap/panic depending on build settings. Using saturating_add (or checked_add with fallback/logging) avoids incorrect disk-size readings when sizes get large.
| self.bundles_size + self.bundle_receipts_size | |
| self.bundles_size.saturating_add(self.bundle_receipts_size) |
src/ingress/mod.rs
Outdated
| if CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() > | ||
| ingress.disk_max_size_to_accept_user_rpc | ||
| { | ||
| return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); |
There was a problem hiding this comment.
This takes a global mutex on every user RPC request (hot path). Since the value is just a size snapshot, consider switching to atomics (e.g., AtomicU64 for total size, or two atomics and saturating_add on read) to avoid contention and reduce tail latency under load.
| if CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() > | |
| ingress.disk_max_size_to_accept_user_rpc | |
| { | |
| return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); | |
| if let Ok(backup_disk_size) = CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.try_lock() { | |
| if backup_disk_size.disk_size() > ingress.disk_max_size_to_accept_user_rpc { | |
| return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); | |
| } |
| (DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB * 1024 * 1024) as usize / | ||
| (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); | ||
|
|
||
| const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); |
There was a problem hiding this comment.
The test uses a fixed sleep to wait for the server to start, which is a common source of flaky CI failures. Prefer polling a readiness condition (e.g., hitting /readyz or retrying the RPC until it responds) with a bounded timeout instead of sleeping a fixed duration.
| tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; | ||
|
|
||
| let reqwest_client = reqwest::Client::default(); | ||
| let signer = PrivateKeySigner::random(); | ||
|
|
There was a problem hiding this comment.
The test uses a fixed sleep to wait for the server to start, which is a common source of flaky CI failures. Prefer polling a readiness condition (e.g., hitting /readyz or retrying the RPC until it responds) with a bounded timeout instead of sleeping a fixed duration.
| tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; | |
| let reqwest_client = reqwest::Client::default(); | |
| let signer = PrivateKeySigner::random(); | |
| let reqwest_client = reqwest::Client::default(); | |
| let signer = PrivateKeySigner::random(); | |
| let readiness_client = reqwest_client.clone(); | |
| tokio::time::timeout(FLOWPROXY_START_DELAY_MS, async move { | |
| loop { | |
| match readiness_client.get(&user_url).send().await { | |
| Ok(_) => break, | |
| Err(_) => tokio::time::sleep(Duration::from_millis(50)).await, | |
| } | |
| } | |
| }) | |
| .await | |
| .expect("flowproxy did not become ready in time"); |
| (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); | ||
|
|
||
| const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); | ||
| // Asume 100ms per bundle to clickhouse (it's a LOT) |
There was a problem hiding this comment.
Correct spelling of 'Asume' to 'Assume'.
| // Asume 100ms per bundle to clickhouse (it's a LOT) | |
| // Assume 100ms per bundle to clickhouse (it's a LOT) |
Good point, added. |
This PR adds a new parameter
disk_max_size_to_accept_user_rpc_mb.When the user RPC detects that the local DB backup size exceeds
disk_max_size_to_accept_user_rpc_mb, it starts rejecting flow.This allows us to configure it to guarantee we never lose archive flow.
It also adds a metric for
disk_max_size_to_accept_user_rpc_mbto allow adding alarms easily.