From 55aa6484d7786af934a0ced73a191d63cd217322 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 15:51:02 -0300 Subject: [PATCH 01/17] proper-shutdown --- Cargo.lock | 6 +- Cargo.toml | 5 +- src/cli.rs | 18 +++++ src/indexer/click/mod.rs | 41 +++++++---- src/indexer/mod.rs | 17 +++-- src/ingress/mod.rs | 19 +++-- src/lib.rs | 71 +++++++++++------- src/main.rs | 150 ++++++++++++++++++++++++++++++++++++--- src/runner/mod.rs | 135 ----------------------------------- 9 files changed, 263 insertions(+), 199 deletions(-) delete mode 100644 src/runner/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 96862a40..67377c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,6 +2931,7 @@ dependencies = [ "tikv-jemallocator", "time", "tokio", + "tokio-util", "tracing", "tracing-subscriber 0.3.22", "uuid", @@ -5467,7 +5468,7 @@ dependencies = [ [[package]] name = "rbuilder-primitives" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=95323c0c6b1ac742a5716aba1942f3e06b8b5241#95323c0c6b1ac742a5716aba1942f3e06b8b5241" +source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" dependencies = [ "ahash", "alloy-consensus", @@ -5499,7 +5500,6 @@ dependencies = [ "ssz_types", "thiserror 1.0.69", "time", - "tracing", "tree_hash 0.8.0", "tree_hash_derive 0.8.0", "typenum", @@ -5509,7 +5509,7 @@ dependencies = [ [[package]] name = "rbuilder-utils" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=95323c0c6b1ac742a5716aba1942f3e06b8b5241#95323c0c6b1ac742a5716aba1942f3e06b8b5241" +source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" dependencies = [ "alloy-primitives 1.5.1", "clickhouse", diff --git a/Cargo.toml b/Cargo.toml index cdf8d767..2f44f801 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ revm-primitives = { version = "21.0.2", default-features = false } revm-interpreter = { version = "29.0.1", default-features = false } # rbuilder -rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241" } -rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241", features = [ +rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2" } +rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2", features = [ "test-utils" ] } @@ -34,6 +34,7 @@ tokio = { version = "1", default-features = false, features = [ "macros" ] } futures = { version = "0.3" } +tokio-util = "0.7.12" # allocator tikv-jemallocator = { version = "0.6", optional = true } diff --git a/src/cli.rs b/src/cli.rs index cb436a8f..32e6deca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -98,6 +98,24 @@ pub struct ClickhouseArgs { default_value_t = MAX_DISK_BACKUP_SIZE_BYTES )] pub backup_disk_max_size_bytes: u64, + + /// Send timeout in milliseconds for ClickHouse HTTP requests. Defaults to 2_000. + #[arg( + long = "indexer.clickhouse.send-timeout-ms", + env = "CLICKHOUSE_SEND_TIMEOUT_MS", + id = "CLICKHOUSE_SEND_TIMEOUT_MS", + default_value_t = 2_000 + )] + pub send_timeout_ms: u64, + + /// End-to-end timeout in milliseconds for ClickHouse HTTP requests. Defaults to 3_000. + #[arg( + long = "indexer.clickhouse.end-timeout-ms", + env = "CLICKHOUSE_END_TIMEOUT_MS", + id = "CLICKHOUSE_END_TIMEOUT_MS", + default_value_t = 3_000 + )] + pub end_timeout_ms: u64, } /// Arguments required to setup file-based parquet indexing. diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 4ffa8c69..1e9d7b1e 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -20,6 +20,7 @@ use rbuilder_utils::{ }, tasks::TaskExecutor, }; +use tokio::task::JoinHandle; mod models; @@ -105,7 +106,7 @@ impl ClickhouseIndexer { receivers: OrderReceivers, task_executor: TaskExecutor, validation: bool, - ) { + ) -> Vec> { let client = config_from_clickhouse_args(&args, validation).into(); tracing::info!("Running with clickhouse indexer"); @@ -117,18 +118,28 @@ impl ClickhouseIndexer { ) .expect("could not create disk backup"); - spawn_clickhouse_inserter_and_backup::( - &client, - receivers.bundle_rx, - &task_executor, - args.bundles_table_name, - builder_name.clone(), - disk_backup.clone(), - args.backup_memory_max_size_bytes, - TARGET_INDEXER, - ); + let send_timeout = Duration::from_millis(args.send_timeout_ms); + let end_timeout = Duration::from_millis(args.end_timeout_ms); + + let bundle_inserter_join_handle = + spawn_clickhouse_inserter_and_backup::( + &client, + receivers.bundle_rx, + &task_executor, + args.bundles_table_name, + builder_name.clone(), + disk_backup.clone(), + args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, + TARGET_INDEXER, + ); - spawn_clickhouse_inserter_and_backup::( + let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< + BundleReceipt, + BundleReceiptRow, + MetricsWrapper, + >( &client, receivers.bundle_receipt_rx, &task_executor, @@ -136,8 +147,11 @@ impl ClickhouseIndexer { builder_name.clone(), disk_backup.clone(), args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, TARGET_INDEXER, ); + vec![bundle_inserter_join_handle, bundle_receipt_inserter_join_handle] } } @@ -240,6 +254,7 @@ pub(crate) mod tests { } } + ///Only for testing purposes. impl From for ClickhouseArgs { fn from(config: ClickhouseClientConfig) -> Self { Self { @@ -252,6 +267,8 @@ pub(crate) mod tests { backup_memory_max_size_bytes: 1024 * 1024 * 10, // 10MiB backup_disk_database_path: default_disk_backup_database_path(), backup_disk_max_size_bytes: 1024 * 1024 * 100, // 100MiB + send_timeout_ms: 2_000, + end_timeout_ms: 3_000, } } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b95132af..912c4c43 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -4,7 +4,8 @@ use std::fmt::Debug; use rbuilder_utils::tasks::TaskExecutor; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::error; use crate::{ cli::IndexerArgs, @@ -77,33 +78,37 @@ impl OrderSenders { pub struct Indexer; impl Indexer { + /// Returns the IndexerHandle to send dada and a vector of join handles to the indexer tasks so we can wait for them to finish on shutdown. pub fn run( args: IndexerArgs, builder_name: String, task_executor: TaskExecutor, - ) -> IndexerHandle { + ) -> (IndexerHandle, Vec>) { let (senders, receivers) = OrderSenders::new(); match (args.clickhouse, args.parquet) { (None, None) => { MockIndexer.run(receivers, task_executor); - IndexerHandle::new(senders) + (IndexerHandle::new(senders), vec![]) } (Some(clickhouse), None) => { let validation = false; - ClickhouseIndexer::run( + let join_handles = ClickhouseIndexer::run( clickhouse, builder_name, receivers, task_executor, validation, ); - IndexerHandle::new(senders) + (IndexerHandle::new(senders), join_handles) } (None, Some(parquet)) => { ParquetIndexer::run(parquet, builder_name, receivers, task_executor) .expect("failed to start parquet indexer"); - IndexerHandle::new(senders) + error!( + "Parquet indexer does not support proper shutdown, returning empty join handles" + ); + (IndexerHandle::new(senders), vec![]) } (Some(_), Some(_)) => { unreachable!("Cannot specify both clickhouse and parquet indexer"); diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 24037a36..4441e34a 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -52,6 +52,7 @@ use std::{ }; use time::UtcDateTime; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::*; pub mod error; @@ -132,7 +133,7 @@ impl OrderflowIngress { /// Perform maintenance task for internal orderflow ingress state. #[tracing::instrument(skip_all, name = "ingress_maintanance")] - pub async fn maintenance(&self) { + pub fn maintenance(&self) { let len_before = self.entities.len(); tracing::info!(entries = len_before, "starting state maintenance"); @@ -183,8 +184,8 @@ impl OrderflowIngress { let entity = Entity::Signer(signer); - if ingress.rate_limiting_enabled && - let Some(mut data) = ingress.entity_data(entity) + if ingress.rate_limiting_enabled + && let Some(mut data) = ingress.entity_data(entity) { if data.rate_limit.count() > ingress.rate_limit_count { tracing::trace!("rate limited request"); @@ -271,8 +272,8 @@ impl OrderflowIngress { let response = match result { Ok(eth) => JsonRpcResponse::result(request.id, eth), Err(error) => { - if error.is_validation() && - let Some(mut data) = ingress.entity_data(entity) + if error.is_validation() + && let Some(mut data) = ingress.entity_data(entity) { data.scores.score_mut(received_at.into()).invalid_requests += 1; } @@ -664,7 +665,7 @@ impl OrderflowIngress { } self.indexer_handle.index_bundle(bundle.clone()); - + info!(id = ?bundle.uuid(), "DX bundle sent"); self.send_bundle(bundle).await } @@ -790,7 +791,7 @@ impl IngressSocket { Self { reply_socket: socket, ingress_state, task_executor, certs_rx, acceptor_builder } } - pub async fn listen(mut self) { + pub async fn listen(mut self, cancellation_token: CancellationToken) { loop { tokio::select! { Some(certs) = self.certs_rx.recv() => { @@ -826,6 +827,10 @@ impl IngressSocket { } }); } + _ = cancellation_token.cancelled() => { + info!("Cancellation token cancelled, stopping ingress socket listener"); + break; + } } } diff --git a/src/lib.rs b/src/lib.rs index 12f919b3..cb08cdd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,11 +9,11 @@ use crate::{ client::{HttpClientPool, default_http_builder}, http::spawn_http_forwarder, }, + indexer::IndexerHandle, ingress::IngressSocket, metrics::IngressMetrics, primitives::{AcceptorBuilder, SslAcceptorBuilderExt, SystemBundleDecoder}, priority::workers::PriorityWorkers, - runner::CliContext, statics::LOCAL_PEER_STORE, }; use alloy_signer_local::PrivateKeySigner; @@ -30,6 +30,7 @@ use forwarder::{IngressForwarders, PeerHandle}; use msg_socket::RepSocket; use msg_transport::tcp_tls::{self, TcpTls}; use prometric::exporter::ExporterBuilder; +use rbuilder_utils::tasks::TaskExecutor; use reqwest::Url; use std::{ fs, @@ -38,8 +39,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::net::TcpListener; -use tracing::level_filters::LevelFilter; +use tokio::{net::TcpListener, select}; +use tokio_util::sync::CancellationToken; +use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt}; pub mod cli; @@ -48,7 +50,7 @@ use cli::OrderflowIngressArgs; pub mod ingress; use ingress::OrderflowIngress; -use crate::{cache::OrderCache, indexer::Indexer}; +use crate::cache::OrderCache; pub mod builderhub; mod cache; @@ -61,13 +63,17 @@ pub mod metrics; pub mod primitives; pub mod priority; pub mod rate_limit; -pub mod runner; pub mod statics; pub mod trace; pub mod utils; pub mod validation; -pub async fn run(args: OrderflowIngressArgs, ctx: CliContext) -> eyre::Result<()> { +pub async fn run( + args: OrderflowIngressArgs, + task_executor: TaskExecutor, + indexer_handle: IndexerHandle, + cancellation_token: CancellationToken, +) -> eyre::Result<()> { fdlimit::raise_fd_limit()?; if let Some(ref metrics_addr) = args.metrics { @@ -85,14 +91,25 @@ pub async fn run(args: OrderflowIngressArgs, ctx: CliContext) -> eyre::Result<() } else { None }; - run_with_listeners(args, user_listener, builder_listener, ctx).await + run_with_listeners( + args, + user_listener, + builder_listener, + task_executor, + indexer_handle, + cancellation_token, + ) + .await } +/// Cancellation is a little ugly, just added enough to make it drop indexer_handle but it's a mix of cancellation_token + task_executor which also has a shutdown method. pub async fn run_with_listeners( args: OrderflowIngressArgs, user_listener: TcpListener, builder_listener: Option, - ctx: CliContext, + task_executor: TaskExecutor, + indexer_handle: IndexerHandle, + cancellation_token: CancellationToken, ) -> eyre::Result<()> { // Initialize tracing. let registry = tracing_subscriber::registry().with( @@ -104,8 +121,6 @@ pub async fn run_with_listeners( let _ = registry.with(tracing_subscriber::fmt::layer()).try_init(); } - let indexer_handle = Indexer::run(args.indexing, args.builder_name, ctx.task_executor.clone()); - let orderflow_signer = match args.orderflow_signer { Some(signer) => { tracing::warn!( @@ -154,10 +169,10 @@ pub async fn run_with_listeners( peer_update_config, builder_hub, peers.clone(), - ctx.task_executor.clone(), + task_executor.clone(), ); - ctx.task_executor + task_executor .spawn_critical("run_update_peers", peer_updater.run(args.peer_update_interval_s)); (socket, certs_rx) } else { @@ -190,14 +205,10 @@ pub async fn run_with_listeners( }; let peer_store = peer_store.register(peer); - let (peer_updater, certs_rx) = PeersUpdater::new( - peer_update_config, - peer_store, - peers.clone(), - ctx.task_executor.clone(), - ); + let (peer_updater, certs_rx) = + PeersUpdater::new(peer_update_config, peer_store, peers.clone(), task_executor.clone()); - ctx.task_executor + task_executor .spawn_critical("run_update_peers", peer_updater.run(args.peer_update_interval_s)); (socket, certs_rx) }; @@ -213,7 +224,7 @@ pub async fn run_with_listeners( builder_url.to_string(), // Use 1 client here, this is still using HTTP/1.1 with internal connection pooling. HttpClientPool::new(NonZero::new(1).unwrap(), || client.clone()), - &ctx.task_executor, + &task_executor, )?; IngressForwarders::new(local_sender, peers, orderflow_signer, workers.clone()) @@ -252,12 +263,22 @@ pub async fn run_with_listeners( }); // Spawn a state maintenance task. - tokio::spawn({ + let cancellation_token_clone = cancellation_token.clone(); + task_executor.spawn({ let ingress = ingress.clone(); async move { loop { - tokio::time::sleep(Duration::from_secs(60)).await; - ingress.maintenance().await; + info!("starting state maintenance!!"); + select! { + _ = cancellation_token_clone.cancelled() => { + info!("Cancellation token cancelled, stopping state maintenance"); + break; + } + _ = tokio::time::sleep(Duration::from_secs(60)) => { + ingress.maintenance(); + } + } + ingress.maintenance(); } } }); @@ -268,9 +289,9 @@ pub async fn run_with_listeners( certs_rx, ingress.clone(), acceptor_builder, - ctx.task_executor.clone(), + task_executor.clone(), ); - ctx.task_executor.spawn(ingress_socket.listen()); + task_executor.spawn(ingress_socket.listen(cancellation_token)); // Spawn user facing HTTP server for accepting bundles and raw transactions. let user_router = Router::new() diff --git a/src/main.rs b/src/main.rs index 8f5b85bb..c1b0e1d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,12 @@ +use std::time::Duration; + use clap::Parser; -use flowproxy::{ - cli::OrderflowIngressArgs, - runner::{CliContext, CliRunner}, - trace::init_tracing, -}; +use flowproxy::{cli::OrderflowIngressArgs, indexer::Indexer, trace::init_tracing}; +use futures::{StreamExt, stream::FuturesUnordered}; +use rbuilder_utils::tasks::{PanickedTaskError, TaskManager}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; #[cfg(all(feature = "jemalloc", unix))] type AllocatorInner = tikv_jemallocator::Jemalloc; @@ -33,12 +36,141 @@ fn main() { .enable_all() .build() .expect("failed to create runtime"); + let task_manager = TaskManager::new(tokio_runtime.handle().clone()); + info!("Main task started"); + + // Executes the main task command until it finished or ctrl-c was fired. + // IMPORTANT: flowproxy::run has no nice cancellation and will be stopped being polled abruptly so + // it must not contain any critical tasks that need proper shutdown. + tokio_runtime.block_on(run_with_shutdown(args, task_manager)); + + info!("Main task finished. Shutting down tokio runtime"); + + if let Err(error) = wait_tokio_runtime_shutdown(tokio_runtime, Duration::from_secs(5)) { + error!(?error, "Flow proxy terminated with error"); + } +} - let runner = CliRunner::from_runtime(tokio_runtime); +async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskManager) { + warn!("starting blocking"); + let task_executor = task_manager.executor(); + let (indexer_handle, indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + let main_task = flowproxy::run(args, task_executor, indexer_handle, cancellation_token.clone()); + match run_to_completion_or_panic(&mut task_manager, run_until_ctrl_c(main_task)).await { + Ok(()) => { + tracing::warn!(target = "cli", "shutting down gracefully"); + } + Err(err) => { + tracing::error!(?err, target = "cli", "shutting down due to error"); + } + } + // This kills some tasks launched by flowproxy::run that release the last references to the indexer_handle. + cancellation_token.cancel(); + // At this point all the rpc was abruptly dropped which dropped the indexer_handle and that will allow the indexer core + // to process all pending data and start shutting down. + wait_for_critical_tasks(indexer_join_handles, Duration::from_secs(20)).await; + // We already have a chance to critical tasks to finish by themselves, so we can now call the graceful shutdown. + task_manager.graceful_shutdown_with_timeout(Duration::from_secs(20)); +} - let command = |ctx: CliContext| flowproxy::run(args, ctx); +/// Consider move this to rbuilder-utils. +/// Waits for critical_tasks to finish by themselves up to grateful_timeout. +/// After they finish or grateful_timeout is reached, we call task_manager.graceful_shutdown_with_timeout(abort_timeout) and +async fn wait_for_critical_tasks(critical_tasks: Vec>, grateful_timeout: Duration) { + let mut critical_tasks: FuturesUnordered<_> = critical_tasks.into_iter().collect(); + let critical_deadline = tokio::time::Instant::now() + grateful_timeout; + loop { + tokio::select! { + biased; + result = critical_tasks.next() => { + match result { + Some(Err(err)) => error!(?err, "Critical task handle await error"), + Some(Ok(())) => {} + None => { + info!("All critical tasks finished ok"); + break; + } + } + } + _ = tokio::time::sleep_until(critical_deadline) => { + error!(pendig_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); + break; + } + } + } +} + +fn wait_tokio_runtime_shutdown( + tokio_runtime: tokio::runtime::Runtime, + timeout: Duration, +) -> Result<(), std::sync::mpsc::RecvTimeoutError> { + // `drop(tokio_runtime)` would block the current thread until its pools + // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop + // it on a separate thread and wait for up to 5 seconds for this operation to + // complete. + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::Builder::new() + .name("tokio-runtime-shutdown".to_string()) + .spawn(move || { + drop(tokio_runtime); + let _ = tx.send(()); + }) + .unwrap(); + + rx.recv_timeout(timeout).inspect_err(|err| { + tracing::debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); + }) +} + +/// Runs the future to completion or until: +/// - `ctrl-c` is received. +/// - `SIGTERM` is received (unix only). +async fn run_until_ctrl_c(fut: F) -> Result<(), E> +where + F: Future>, + E: Send + Sync + 'static + From, +{ + let ctrl_c = tokio::signal::ctrl_c(); + + let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + let sigterm = stream.recv(); + let sigterm = Box::pin(sigterm); + let ctrl_c = Box::pin(ctrl_c); + let fut = Box::pin(fut); + + tokio::select! { + _ = ctrl_c => { + tracing::info!("Received ctrl-c"); + }, + _ = sigterm => { + tracing::info!("Received SIGTERM"); + }, + res = fut => res?, + } + + Ok(()) +} - if let Err(e) = runner.run_command_until_exit(command) { - eprintln!("Orderflow proxy terminated with error: {e}"); +/// Runs the given future to completion or until a critical task panicked. +/// +/// Returns the error if a task panicked, or the given future returned an error. +async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> +where + F: Future>, + E: Send + Sync + From + 'static, +{ + { + let fut = Box::pin(fut); + tokio::select! { + task_manager_result = tasks => { + if let Err(panicked_error) = task_manager_result { + return Err(panicked_error.into()); + } + }, + res = fut => res?, + } } + Ok(()) } diff --git a/src/runner/mod.rs b/src/runner/mod.rs deleted file mode 100644 index e1c91373..00000000 --- a/src/runner/mod.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Asynchronous task runner utilities. - -use std::{future::Future, time::Duration}; - -use rbuilder_utils::tasks::{PanickedTaskError, TaskExecutor, TaskManager}; - -#[derive(Debug, Clone)] -pub struct CliContext { - pub task_executor: TaskExecutor, -} - -/// Executes CLI commands. -/// -/// Provides utilities for running a cli command to completion. -#[derive(Debug)] -#[non_exhaustive] -pub struct CliRunner { - tokio_runtime: tokio::runtime::Runtime, -} - -impl CliRunner { - /// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime). - pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self { - Self { tokio_runtime } - } -} - -// === impl CliRunner === - -impl CliRunner { - /// Executes the given _async_ command on the tokio runtime until the command future resolves or - /// until the process receives a `SIGINT` or `SIGTERM` signal. - /// - /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made - /// to drive their shutdown to completion after the command has finished. - pub fn run_command_until_exit( - self, - command: impl FnOnce(CliContext) -> F, - ) -> Result<(), E> - where - F: Future>, - E: Send + Sync + From + From + 'static, - { - let tokio_runtime = self.tokio_runtime; - let mut task_manager = TaskManager::new(tokio_runtime.handle().clone()); - let task_executor = task_manager.executor(); - let context = CliContext { task_executor }; - - // Executes the command until it finished or ctrl-c was fired - let command_res = tokio_runtime.block_on(run_to_completion_or_panic( - &mut task_manager, - run_until_ctrl_c(command(context)), - )); - - if command_res.is_err() { - tracing::error!(target: "cli", "shutting down due to error"); - } else { - tracing::debug!(target: "cli", "shutting down gracefully"); - // after the command has finished or exit signal was received we shutdown the task - // manager which fires the shutdown signal to all tasks spawned via the task - // executor and awaiting on tasks spawned with graceful shutdown - task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5)); - } - - // `drop(tokio_runtime)` would block the current thread until its pools - // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop - // it on a separate thread and wait for up to 5 seconds for this operation to - // complete. - let (tx, rx) = std::sync::mpsc::channel(); - std::thread::Builder::new() - .name("tokio-runtime-shutdown".to_string()) - .spawn(move || { - drop(tokio_runtime); - let _ = tx.send(()); - }) - .unwrap(); - - let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| { - tracing::debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); - }); - - command_res - } -} - -/// Runs the future to completion or until: -/// - `ctrl-c` is received. -/// - `SIGTERM` is received (unix only). -async fn run_until_ctrl_c(fut: F) -> Result<(), E> -where - F: Future>, - E: Send + Sync + 'static + From, -{ - let ctrl_c = tokio::signal::ctrl_c(); - - let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; - let sigterm = stream.recv(); - let sigterm = Box::pin(sigterm); - let ctrl_c = Box::pin(ctrl_c); - let fut = Box::pin(fut); - - tokio::select! { - _ = ctrl_c => { - tracing::info!("Received ctrl-c"); - }, - _ = sigterm => { - tracing::info!("Received SIGTERM"); - }, - res = fut => res?, - } - - Ok(()) -} - -/// Runs the given future to completion or until a critical task panicked. -/// -/// Returns the error if a task panicked, or the given future returned an error. -async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> -where - F: Future>, - E: Send + Sync + From + 'static, -{ - { - let fut = Box::pin(fut); - tokio::select! { - task_manager_result = tasks => { - if let Err(panicked_error) = task_manager_result { - return Err(panicked_error.into()); - } - }, - res = fut => res?, - } - } - Ok(()) -} From 548e42dece6b55a97637982883551abb9af84a91 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 15:55:56 -0300 Subject: [PATCH 02/17] polish --- src/ingress/mod.rs | 2 +- src/main.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 4441e34a..168f84b2 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -665,7 +665,7 @@ impl OrderflowIngress { } self.indexer_handle.index_bundle(bundle.clone()); - info!(id = ?bundle.uuid(), "DX bundle sent"); + self.send_bundle(bundle).await } diff --git a/src/main.rs b/src/main.rs index c1b0e1d5..b7c84178 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,6 +51,8 @@ fn main() { } } +/// This time out should be enough for the inserter to flush all pending clickhouse data (timeout is clickhouse usually a few secs) and local DB data (disk flush time). +const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20); async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskManager) { warn!("starting blocking"); let task_executor = task_manager.executor(); @@ -70,9 +72,9 @@ async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskMan cancellation_token.cancel(); // At this point all the rpc was abruptly dropped which dropped the indexer_handle and that will allow the indexer core // to process all pending data and start shutting down. - wait_for_critical_tasks(indexer_join_handles, Duration::from_secs(20)).await; + wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await; // We already have a chance to critical tasks to finish by themselves, so we can now call the graceful shutdown. - task_manager.graceful_shutdown_with_timeout(Duration::from_secs(20)); + task_manager.graceful_shutdown_with_timeout(SHUTDOWN_TIMEOUT); } /// Consider move this to rbuilder-utils. From 80d5818550e3030204d1f2e5e6877e6b8343f5fb Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 16:39:18 -0300 Subject: [PATCH 03/17] test/lint --- src/indexer/click/mod.rs | 5 +++- src/indexer/mod.rs | 3 ++- src/ingress/mod.rs | 8 +++---- src/lib.rs | 3 ++- src/main.rs | 50 +++++++++++----------------------------- src/utils.rs | 36 +++++++++++++++++++++++++++++ tests/common/mod.rs | 12 ++++++++-- 7 files changed, 72 insertions(+), 45 deletions(-) diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 1e9d7b1e..a8348f11 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -169,6 +169,7 @@ pub(crate) mod tests { }, tests::{bundle_receipt_example, system_bundle_example}, }, + utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, }; use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult}; use rbuilder_utils::{ @@ -385,7 +386,7 @@ pub(crate) mod tests { let (senders, receivers) = OrderSenders::new(); let validation = false; - ClickhouseIndexer::run( + let indexer_join_handles = ClickhouseIndexer::run( config.into(), builder_name.clone(), receivers, @@ -397,6 +398,8 @@ pub(crate) mod tests { let system_bundle = system_bundle_example(); let system_bundle_row = (system_bundle.clone(), builder_name.clone()).into(); senders.bundle_tx.send(system_bundle.clone()).await.unwrap(); + drop(senders); + wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await; // Wait a bit for bundle to be actually processed before shutting down. tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 912c4c43..0fea37e5 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -78,7 +78,8 @@ impl OrderSenders { pub struct Indexer; impl Indexer { - /// Returns the IndexerHandle to send dada and a vector of join handles to the indexer tasks so we can wait for them to finish on shutdown. + /// Returns the IndexerHandle to send data and a vector of join handles to the indexer tasks so + /// we can wait for them to finish on shutdown. pub fn run( args: IndexerArgs, builder_name: String, diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 168f84b2..5613a93d 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -184,8 +184,8 @@ impl OrderflowIngress { let entity = Entity::Signer(signer); - if ingress.rate_limiting_enabled - && let Some(mut data) = ingress.entity_data(entity) + if ingress.rate_limiting_enabled && + let Some(mut data) = ingress.entity_data(entity) { if data.rate_limit.count() > ingress.rate_limit_count { tracing::trace!("rate limited request"); @@ -272,8 +272,8 @@ impl OrderflowIngress { let response = match result { Ok(eth) => JsonRpcResponse::result(request.id, eth), Err(error) => { - if error.is_validation() - && let Some(mut data) = ingress.entity_data(entity) + if error.is_validation() && + let Some(mut data) = ingress.entity_data(entity) { data.scores.score_mut(received_at.into()).invalid_requests += 1; } diff --git a/src/lib.rs b/src/lib.rs index cb08cdd9..09eb1a7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,7 +102,8 @@ pub async fn run( .await } -/// Cancellation is a little ugly, just added enough to make it drop indexer_handle but it's a mix of cancellation_token + task_executor which also has a shutdown method. +/// Cancellation is a little ugly, just added enough to make it drop indexer_handle but it's a mix +/// of cancellation_token + task_executor which also has a shutdown method. pub async fn run_with_listeners( args: OrderflowIngressArgs, user_listener: TcpListener, diff --git a/src/main.rs b/src/main.rs index b7c84178..688c7f84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,12 @@ use std::time::Duration; use clap::Parser; -use flowproxy::{cli::OrderflowIngressArgs, indexer::Indexer, trace::init_tracing}; +use flowproxy::{ + cli::OrderflowIngressArgs, + indexer::Indexer, + trace::init_tracing, + utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, +}; use futures::{StreamExt, stream::FuturesUnordered}; use rbuilder_utils::tasks::{PanickedTaskError, TaskManager}; use tokio::task::JoinHandle; @@ -40,8 +45,8 @@ fn main() { info!("Main task started"); // Executes the main task command until it finished or ctrl-c was fired. - // IMPORTANT: flowproxy::run has no nice cancellation and will be stopped being polled abruptly so - // it must not contain any critical tasks that need proper shutdown. + // IMPORTANT: flowproxy::run has no nice cancellation and will be stopped being polled abruptly + // so it must not contain any critical tasks that need proper shutdown. tokio_runtime.block_on(run_with_shutdown(args, task_manager)); info!("Main task finished. Shutting down tokio runtime"); @@ -51,8 +56,6 @@ fn main() { } } -/// This time out should be enough for the inserter to flush all pending clickhouse data (timeout is clickhouse usually a few secs) and local DB data (disk flush time). -const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20); async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskManager) { warn!("starting blocking"); let task_executor = task_manager.executor(); @@ -68,42 +71,17 @@ async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskMan tracing::error!(?err, target = "cli", "shutting down due to error"); } } - // This kills some tasks launched by flowproxy::run that release the last references to the indexer_handle. + // This kills some tasks launched by flowproxy::run that release the last references to the + // indexer_handle. cancellation_token.cancel(); - // At this point all the rpc was abruptly dropped which dropped the indexer_handle and that will allow the indexer core - // to process all pending data and start shutting down. + // At this point all the rpc was abruptly dropped which dropped the indexer_handle and that will + // allow the indexer core to process all pending data and start shutting down. wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await; - // We already have a chance to critical tasks to finish by themselves, so we can now call the graceful shutdown. + // We already have a chance to critical tasks to finish by themselves, so we can now call the + // graceful shutdown. task_manager.graceful_shutdown_with_timeout(SHUTDOWN_TIMEOUT); } -/// Consider move this to rbuilder-utils. -/// Waits for critical_tasks to finish by themselves up to grateful_timeout. -/// After they finish or grateful_timeout is reached, we call task_manager.graceful_shutdown_with_timeout(abort_timeout) and -async fn wait_for_critical_tasks(critical_tasks: Vec>, grateful_timeout: Duration) { - let mut critical_tasks: FuturesUnordered<_> = critical_tasks.into_iter().collect(); - let critical_deadline = tokio::time::Instant::now() + grateful_timeout; - loop { - tokio::select! { - biased; - result = critical_tasks.next() => { - match result { - Some(Err(err)) => error!(?err, "Critical task handle await error"), - Some(Ok(())) => {} - None => { - info!("All critical tasks finished ok"); - break; - } - } - } - _ = tokio::time::sleep_until(critical_deadline) => { - error!(pendig_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); - break; - } - } - } -} - fn wait_tokio_runtime_shutdown( tokio_runtime: tokio::runtime::Runtime, timeout: Duration, diff --git a/src/utils.rs b/src/utils.rs index 846515cf..a0cd21ba 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,11 @@ use alloy_eips::eip2718::EIP4844_TX_TYPE_ID; use alloy_primitives::Bytes; use alloy_rlp::{Buf as _, Header}; +use futures::{StreamExt, stream::FuturesUnordered}; use std::time::{Duration, Instant}; use time::UtcDateTime; +use tokio::task::JoinHandle; +use tracing::{error, info}; use uuid::Uuid; use crate::{statics::START, validation::MAINNET_CHAIN_ID}; @@ -282,3 +285,36 @@ pub mod testutils { } } } + +/// This time out should be enough for the inserter to flush all pending clickhouse data (timeout is +/// clickhouse usually a few secs) and local DB data (disk flush time). +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20); + +/// Consider move this to rbuilder-utils. +/// Waits for critical_tasks to finish by themselves up to grateful_timeout. +pub async fn wait_for_critical_tasks( + critical_tasks: Vec>, + grateful_timeout: Duration, +) { + let mut critical_tasks: FuturesUnordered<_> = critical_tasks.into_iter().collect(); + let critical_deadline = tokio::time::Instant::now() + grateful_timeout; + loop { + tokio::select! { + biased; + result = critical_tasks.next() => { + match result { + Some(Err(err)) => error!(?err, "Critical task handle await error"), + Some(Ok(())) => {} + None => { + info!("All critical tasks finished ok"); + break; + } + } + } + _ = tokio::time::sleep_until(critical_deadline) => { + error!(pendig_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); + break; + } + } + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7adf4587..99819062 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,9 +10,9 @@ use axum::{Router, extract::State, routing::post}; use flowproxy::{ cli::OrderflowIngressArgs, consts::FLASHBOTS_SIGNATURE_HEADER, + indexer::Indexer, ingress::maybe_decompress, jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcRequest, JsonRpcResponse}, - runner::CliContext, }; use hyper::{HeaderMap, header}; use rbuilder_primitives::serialize::RawBundle; @@ -20,6 +20,7 @@ use revm_primitives::keccak256; use serde::de::DeserializeOwned; use serde_json::{Value, json}; use tokio::{net::TcpListener, sync::mpsc}; +use tokio_util::sync::CancellationToken; pub(crate) struct IngressClient { pub(crate) url: String, @@ -36,12 +37,19 @@ pub(crate) async fn spawn_ingress_with_args( let task_manager = rbuilder_utils::tasks::TaskManager::current(); + let task_executor = task_manager.executor(); + let (indexer_handle, _indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + tokio::spawn(async move { flowproxy::run_with_listeners( args, user_listener, builder_listener, - CliContext { task_executor: task_manager.executor() }, + task_executor, + indexer_handle, + cancellation_token, ) .await .unwrap(); From 7d6928200150cce82f7c960a8dfd1fff48c0697e Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 16:46:17 -0300 Subject: [PATCH 04/17] lint --- src/indexer/click/models.rs | 2 +- src/main.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/indexer/click/models.rs b/src/indexer/click/models.rs index 5f9b363e..1755705f 100644 --- a/src/indexer/click/models.rs +++ b/src/indexer/click/models.rs @@ -255,7 +255,7 @@ impl From<(SystemBundle, String)> for BundleRow { .unwrap_or_default(), // Decoded bundles always have a uuid. internal_uuid: decoded.uuid, - replacement_uuid: decoded.replacement_data.clone().map(|r| r.key.id), + replacement_uuid: decoded.replacement_data.map(|r| r.key.id), replacement_nonce: bundle.raw_bundle.metadata.replacement_nonce, signer_address: Some(bundle.metadata.signer), builder_name, diff --git a/src/main.rs b/src/main.rs index 688c7f84..b4b461f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,7 @@ use flowproxy::{ trace::init_tracing, utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, }; -use futures::{StreamExt, stream::FuturesUnordered}; use rbuilder_utils::tasks::{PanickedTaskError, TaskManager}; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; From dc00c4a4e4ae2f0273d2fc3d9b52624f3d5dfd7d Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 18:18:58 -0300 Subject: [PATCH 05/17] trying to fix tests --- tests/common/mod.rs | 10 +++++----- tests/network.rs | 16 ++++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 99819062..30af3b65 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -37,12 +37,12 @@ pub(crate) async fn spawn_ingress_with_args( let task_manager = rbuilder_utils::tasks::TaskManager::current(); - let task_executor = task_manager.executor(); - let (indexer_handle, _indexer_join_handles) = - Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); - let cancellation_token = CancellationToken::new(); - tokio::spawn(async move { + let task_executor = task_manager.executor(); + let (indexer_handle, _indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + flowproxy::run_with_listeners( args, user_listener, diff --git a/tests/network.rs b/tests/network.rs index c3c9c4a2..09894164 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -15,6 +15,7 @@ use crate::common::spawn_ingress_with_args; /// This tests proper order propagation between 2 proxies. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn network_e2e_bundle_tx_works() { + println!("---- DX starting network e2e tcp test"); let _ = tracing_subscriber::fmt::try_init(); info!("starting network e2e tcp test"); @@ -31,7 +32,7 @@ async fn network_e2e_bundle_tx_works() { PathBuf::from_str("./tests/testdata/certificates/cert_1.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_1.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); let client1 = spawn_ingress_with_args(args.clone()).await; args.builder_url = Some(builder2.url()); @@ -39,7 +40,7 @@ async fn network_e2e_bundle_tx_works() { PathBuf::from_str("./tests/testdata/certificates/cert_2.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_2.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); let client2 = spawn_ingress_with_args(args.clone()).await; args.builder_url = Some(builder3.url()); @@ -47,7 +48,7 @@ async fn network_e2e_bundle_tx_works() { PathBuf::from_str("./tests/testdata/certificates/cert_3.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_3.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); let _client3 = spawn_ingress_with_args(args.clone()).await; // args.builder_url = Some(builder4.url()); @@ -60,26 +61,29 @@ async fn network_e2e_bundle_tx_works() { let mut builders = [builder1, builder2, builder3 /* builder4 */]; + println!("---- DX 2222222"); // Wait for the proxies to be ready and connected to each other. tokio::time::sleep(Duration::from_secs(10)).await; - + println!("---- DX 3333333"); let raw_tx = TxEnvelope::random(&mut rng).encoded_2718().into(); let response = client1.send_raw_tx(&raw_tx).await; info!("sent raw tx from client1"); assert!(response.status().is_success()); - + println!("---- DX 44444444 {}", builders.len()); for (i, b) in builders.iter_mut().enumerate() { let received = b.recv::().await.unwrap(); assert_eq!(received, raw_tx); debug!(?i, "builder received tx from client1"); + println!("builder received tx from client1 {i}"); } - + println!("---- DX 55555555"); let bundle = RawBundle::random(&mut rng); let response = client2.send_bundle(&bundle).await; info!("sent raw bundle from client2"); assert!(response.status().is_success()); for (i, b) in builders.iter_mut().enumerate() { + println!("---- DX 66666 {i}"); let mut received = b.recv::().await.unwrap(); debug!(?i, "builder received raw bundle from client2"); From acdcef3bf0e185d35f9c6ee9522c251e83a948ee Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 18:24:27 -0300 Subject: [PATCH 06/17] removed println! --- tests/network.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/network.rs b/tests/network.rs index 09894164..a3f4f5a9 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -15,7 +15,6 @@ use crate::common::spawn_ingress_with_args; /// This tests proper order propagation between 2 proxies. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn network_e2e_bundle_tx_works() { - println!("---- DX starting network e2e tcp test"); let _ = tracing_subscriber::fmt::try_init(); info!("starting network e2e tcp test"); @@ -61,29 +60,26 @@ async fn network_e2e_bundle_tx_works() { let mut builders = [builder1, builder2, builder3 /* builder4 */]; - println!("---- DX 2222222"); // Wait for the proxies to be ready and connected to each other. tokio::time::sleep(Duration::from_secs(10)).await; - println!("---- DX 3333333"); + let raw_tx = TxEnvelope::random(&mut rng).encoded_2718().into(); let response = client1.send_raw_tx(&raw_tx).await; info!("sent raw tx from client1"); assert!(response.status().is_success()); - println!("---- DX 44444444 {}", builders.len()); + for (i, b) in builders.iter_mut().enumerate() { let received = b.recv::().await.unwrap(); assert_eq!(received, raw_tx); debug!(?i, "builder received tx from client1"); - println!("builder received tx from client1 {i}"); } - println!("---- DX 55555555"); + let bundle = RawBundle::random(&mut rng); let response = client2.send_bundle(&bundle).await; info!("sent raw bundle from client2"); assert!(response.status().is_success()); for (i, b) in builders.iter_mut().enumerate() { - println!("---- DX 66666 {i}"); let mut received = b.recv::().await.unwrap(); debug!(?i, "builder received raw bundle from client2"); From 62dbf47df2a4d341be649c8e60131c61e19cd7d2 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 4 Feb 2026 19:14:38 -0300 Subject: [PATCH 07/17] avoid Global executor already set error log --- tests/common/mod.rs | 14 ++++++++------ tests/ingress.rs | 3 ++- tests/network.rs | 8 ++++---- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 30af3b65..3e153b1e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -16,6 +16,7 @@ use flowproxy::{ }; use hyper::{HeaderMap, header}; use rbuilder_primitives::serialize::RawBundle; +use rbuilder_utils::tasks::TaskManager; use revm_primitives::keccak256; use serde::de::DeserializeOwned; use serde_json::{Value, json}; @@ -30,15 +31,13 @@ pub(crate) struct IngressClient { pub(crate) async fn spawn_ingress_with_args( args: OrderflowIngressArgs, + task_manager: &TaskManager, ) -> IngressClient { let user_listener = TcpListener::bind(&args.user_listen_addr).await.unwrap(); let builder_listener = None; let address = user_listener.local_addr().unwrap(); - - let task_manager = rbuilder_utils::tasks::TaskManager::current(); - + let task_executor = task_manager.executor(); tokio::spawn(async move { - let task_executor = task_manager.executor(); let (indexer_handle, _indexer_join_handles) = Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); let cancellation_token = CancellationToken::new(); @@ -63,11 +62,14 @@ pub(crate) async fn spawn_ingress_with_args( } #[allow(dead_code)] -pub(crate) async fn spawn_ingress(builder_url: Option) -> IngressClient { +pub(crate) async fn spawn_ingress( + builder_url: Option, + task_manager: &TaskManager, +) -> IngressClient { let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub(); args.peer_update_interval_s = 5; args.builder_url = builder_url; - spawn_ingress_with_args(args).await + spawn_ingress_with_args(args, task_manager).await } impl IngressClient { diff --git a/tests/ingress.rs b/tests/ingress.rs index 236f291a..5aa2f6b6 100644 --- a/tests/ingress.rs +++ b/tests/ingress.rs @@ -34,7 +34,8 @@ mod assert { async fn ingress_http_e2e() { let mut rng = rand::rng(); let mut builder = BuilderReceiver::spawn().await; - let client = spawn_ingress(Some(builder.url())).await; + let task_manager = rbuilder_utils::tasks::TaskManager::current(); + let client = spawn_ingress(Some(builder.url()), &task_manager).await; let empty = json!({}); let response = diff --git a/tests/network.rs b/tests/network.rs index a3f4f5a9..7cd24a8b 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -17,7 +17,7 @@ use crate::common::spawn_ingress_with_args; async fn network_e2e_bundle_tx_works() { let _ = tracing_subscriber::fmt::try_init(); info!("starting network e2e tcp test"); - + let task_manager = rbuilder_utils::tasks::TaskManager::current(); let mut rng = rand::rng(); let builder1 = BuilderReceiver::spawn().await; let builder2 = BuilderReceiver::spawn().await; @@ -32,7 +32,7 @@ async fn network_e2e_bundle_tx_works() { args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_1.pem").unwrap(); args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); - let client1 = spawn_ingress_with_args(args.clone()).await; + let client1 = spawn_ingress_with_args(args.clone(), &task_manager).await; args.builder_url = Some(builder2.url()); args.server_certificate_pem_file = @@ -40,7 +40,7 @@ async fn network_e2e_bundle_tx_works() { args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_2.pem").unwrap(); args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); - let client2 = spawn_ingress_with_args(args.clone()).await; + let client2 = spawn_ingress_with_args(args.clone(), &task_manager).await; args.builder_url = Some(builder3.url()); args.server_certificate_pem_file = @@ -48,7 +48,7 @@ async fn network_e2e_bundle_tx_works() { args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_3.pem").unwrap(); args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); - let _client3 = spawn_ingress_with_args(args.clone()).await; + let _client3 = spawn_ingress_with_args(args.clone(), &task_manager).await; // args.builder_url = Some(builder4.url()); // args.server_certificate_pem_file = From 3871ee27aa224a3c0bf4fab9899539673bdf64a3 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Thu, 5 Feb 2026 14:38:29 +0100 Subject: [PATCH 08/17] fix: IPv4/IPv6 issues + TLS SNI --- src/builderhub/mod.rs | 11 +++++++++-- src/lib.rs | 6 +++--- tests/network.rs | 6 +++--- tests/testdata/openssl.cnf | 3 ++- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/builderhub/mod.rs b/src/builderhub/mod.rs index e193eb97..dfec9fd5 100644 --- a/src/builderhub/mod.rs +++ b/src/builderhub/mod.rs @@ -81,8 +81,15 @@ impl Peer { /// /// Reference: pub async fn system_api(&self) -> io::Result> { - // NOTE: Needed for integration tests where port is not known upfront. This is also more - // flexible in the case some instances won't run with that default port. + // If `ip` is a valid loopback socket address, use it directly. This is the case in + // integration tests. + if let Ok(addr) = self.ip.parse::() && + addr.ip().is_loopback() + { + return Ok(Some(addr)); + } + + // Parse the provided port, set to DEFAULT_SYSTEM_PORT if not provided. let port = self.ip.split(':').nth(1).and_then(|p| p.parse().ok()).unwrap_or(DEFAULT_SYSTEM_PORT); diff --git a/src/lib.rs b/src/lib.rs index 09eb1a7c..ac1444c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,7 +192,7 @@ pub async fn run_with_listeners( let tls = tcp_tls::Server::new(acceptor.into()); let mut socket = RepSocket::new(TcpTls::Server(tls)); socket.bind(args.system_listen_addr).await.expect("to bind system listener"); - let port = socket.local_addr().expect("bound").port(); + let local = socket.local_addr().unwrap(); let peer = Peer { name: local_signer.to_string(), @@ -200,8 +200,8 @@ pub async fn run_with_listeners( ecdsa_pubkey_address: local_signer, ..Default::default() }, - ip: format!("127.0.0.1:{port}"), - dns_name: "localhost".to_owned(), + ip: format!("{}:{}", local.ip(), local.port()), + dns_name: "localhost".to_string(), instance: InstanceData { tls_cert: peer_cert }, }; let peer_store = peer_store.register(peer); diff --git a/tests/network.rs b/tests/network.rs index 7cd24a8b..5f878a59 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -8,7 +8,7 @@ use common::BuilderReceiver; mod common; use flowproxy::{cli::OrderflowIngressArgs, utils::testutils::Random as _}; use rbuilder_primitives::serialize::RawBundle; -use tracing::{debug, info}; +use tracing::info; use crate::common::spawn_ingress_with_args; @@ -71,7 +71,7 @@ async fn network_e2e_bundle_tx_works() { for (i, b) in builders.iter_mut().enumerate() { let received = b.recv::().await.unwrap(); assert_eq!(received, raw_tx); - debug!(?i, "builder received tx from client1"); + info!(?i, "builder received tx from client1"); } let bundle = RawBundle::random(&mut rng); @@ -81,7 +81,7 @@ async fn network_e2e_bundle_tx_works() { for (i, b) in builders.iter_mut().enumerate() { let mut received = b.recv::().await.unwrap(); - debug!(?i, "builder received raw bundle from client2"); + info!(?i, "builder received raw bundle from client2"); assert!(received.metadata.signing_address.is_some()); assert!(received.metadata.bundle_hash.is_some()); diff --git a/tests/testdata/openssl.cnf b/tests/testdata/openssl.cnf index a8142f1f..fc30d3cb 100644 --- a/tests/testdata/openssl.cnf +++ b/tests/testdata/openssl.cnf @@ -12,4 +12,5 @@ subjectAltName = @alt_names [alt_names] DNS.1 = localhost -IP.1 = 127.0.0.1 \ No newline at end of file +IP.1 = 127.0.0.1 +IP.2 = ::1 From a5311fcf0b66c433339e6e106315921ff9da5eb6 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Thu, 5 Feb 2026 12:30:38 -0300 Subject: [PATCH 09/17] min fixes --- src/lib.rs | 1 - src/main.rs | 3 +-- src/utils.rs | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ac1444c4..89ed1244 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -279,7 +279,6 @@ pub async fn run_with_listeners( ingress.maintenance(); } } - ingress.maintenance(); } } }); diff --git a/src/main.rs b/src/main.rs index b4b461f4..a27dd414 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use flowproxy::{ }; use rbuilder_utils::tasks::{PanickedTaskError, TaskManager}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; +use tracing::{error, info}; #[cfg(all(feature = "jemalloc", unix))] type AllocatorInner = tikv_jemallocator::Jemalloc; @@ -55,7 +55,6 @@ fn main() { } async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskManager) { - warn!("starting blocking"); let task_executor = task_manager.executor(); let (indexer_handle, indexer_join_handles) = Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); diff --git a/src/utils.rs b/src/utils.rs index a0cd21ba..4ed7cc5a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -312,7 +312,7 @@ pub async fn wait_for_critical_tasks( } } _ = tokio::time::sleep_until(critical_deadline) => { - error!(pendig_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); + error!(pending_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); break; } } From 47a4f913c7d71eec2dd12f23d4996d1cb54afbac Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Thu, 5 Feb 2026 16:36:51 -0300 Subject: [PATCH 10/17] done --- Cargo.lock | 1 + Cargo.toml | 1 + src/cli.rs | 11 +++++ src/indexer/click/mod.rs | 89 ++++++++++++++++++++++++++++++++-------- src/ingress/mod.rs | 9 +++- src/jsonrpc.rs | 12 ++++-- src/lib.rs | 1 + 7 files changed, 102 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67377c06..6e3650f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2909,6 +2909,7 @@ dependencies = [ "msg-socket", "msg-transport", "openssl", + "parking_lot", "parquet", "prometheus", "prometric", diff --git a/Cargo.toml b/Cargo.toml index 2f44f801..105e11ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ rand = "0.9.2" derive_more = { version = "2", features = ["deref", "from", "into"] } fdlimit = "0.3.0" rayon = "1.11.0" +parking_lot = { version = "0.12.3" } # serialization serde = { version = "1", features = ["derive"] } diff --git a/src/cli.rs b/src/cli.rs index 32e6deca..7aeb7772 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -266,6 +266,16 @@ pub struct OrderflowIngressArgs { #[clap(long = "http.enable-gzip", default_value_t = false)] pub gzip_enabled: bool, + /// Maximum local ClickHouse backup disk size in MB above which user RPC (e.g. eth_sendBundle) + /// is rejected with disk full. Defaults to 1024 MB (1 GiB). + #[clap( + long = "disk-max-size-to-accept-user-rpc-mb", + default_value_t = 1024, + env = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC", + id = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC" + )] + pub disk_max_size_to_accept_user_rpc_mb: u64, + /// The interval in seconds to update the peer list from BuilderHub. #[clap( long = "peer.update-interval-s", @@ -342,6 +352,7 @@ impl Default for OrderflowIngressArgs { score_bucket_s: 4, log_json: false, gzip_enabled: false, + disk_max_size_to_accept_user_rpc_mb: 1024, tcp_small_clients: NonZero::new(1).expect("non-zero"), tcp_big_clients: 0, io_threads: 4, diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index a8348f11..58e2b5b5 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -1,6 +1,6 @@ //! Indexing functionality powered by Clickhouse. -use std::{fmt::Debug, time::Duration}; +use std::{fmt::Debug, marker::PhantomData, sync::LazyLock, time::Duration}; use crate::{ cli::ClickhouseArgs, @@ -11,6 +11,7 @@ use crate::{ metrics::CLICKHOUSE_METRICS, primitives::{BundleReceipt, SystemBundle}, }; +use parking_lot::Mutex; use rbuilder_utils::{ clickhouse::{ Quantities, @@ -34,9 +35,59 @@ fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> Click } } -struct MetricsWrapper; +/// little global (puaj) info to easily get the current clickhouse disk size. +#[derive(Default)] +pub(crate) struct ClickhouseLocalBackupDiskSize { + bundles_size: u64, + bundle_receipts_size: u64, +} + +impl ClickhouseLocalBackupDiskSize { + pub(crate) fn set_bundles_size(&mut self, size: u64) { + self.bundles_size = size; + } + pub(crate) fn set_bundle_receipts_size(&mut self, size: u64) { + self.bundle_receipts_size = size; + } + pub(crate) fn disk_size(&self) -> u64 { + self.bundles_size + self.bundle_receipts_size + } +} + +/// We store here the current disk size of the backup database to avoid querying the metrics since +/// that would include a string map access. +pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock< + Mutex, +> = LazyLock::new(|| Mutex::new(ClickhouseLocalBackupDiskSize::default())); + +/// Callback invoked when disk backup size is set. Implement this trait to observe size updates. +pub(crate) trait DiskBackupSizeCallback: Send + Sync { + fn on_disk_backup_size(size_bytes: u64); +} + +struct UpdateBundleSizeCallback; + +impl DiskBackupSizeCallback for UpdateBundleSizeCallback { + fn on_disk_backup_size(size_bytes: u64) { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().set_bundles_size(size_bytes); + } +} + +struct UpdateBundleReceiptsSizeCallback; + +impl DiskBackupSizeCallback for UpdateBundleReceiptsSizeCallback { + fn on_disk_backup_size(size_bytes: u64) { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().set_bundle_receipts_size(size_bytes); + } +} -impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { +struct MetricsWrapper(PhantomData) +where + F: DiskBackupSizeCallback; + +impl rbuilder_utils::clickhouse::backup::metrics::Metrics + for MetricsWrapper +{ fn increment_write_failures(err: String) { CLICKHOUSE_METRICS.write_failures(err).inc(); } @@ -60,6 +111,7 @@ impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { } fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str) { + F::on_disk_backup_size(size_bytes); CLICKHOUSE_METRICS.backup_size_bytes(order, "disk").set(size_bytes); CLICKHOUSE_METRICS.backup_size_batches(order, "disk").set(batches); } @@ -121,24 +173,27 @@ impl ClickhouseIndexer { let send_timeout = Duration::from_millis(args.send_timeout_ms); let end_timeout = Duration::from_millis(args.end_timeout_ms); - let bundle_inserter_join_handle = - spawn_clickhouse_inserter_and_backup::( - &client, - receivers.bundle_rx, - &task_executor, - args.bundles_table_name, - builder_name.clone(), - disk_backup.clone(), - args.backup_memory_max_size_bytes, - send_timeout, - end_timeout, - TARGET_INDEXER, - ); + let bundle_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< + SystemBundle, + BundleRow, + MetricsWrapper, + >( + &client, + receivers.bundle_rx, + &task_executor, + args.bundles_table_name, + builder_name.clone(), + disk_backup.clone(), + args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, + TARGET_INDEXER, + ); let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< BundleReceipt, BundleReceiptRow, - MetricsWrapper, + MetricsWrapper, >( &client, receivers.bundle_receipt_rx, diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 5613a93d..ff18968b 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -8,7 +8,7 @@ use crate::{ }, entity::{Entity, EntityBuilderStats, EntityData, EntityRequest, EntityScores, SpamThresholds}, forwarder::IngressForwarders, - indexer::{IndexerHandle, OrderIndexer as _}, + indexer::{IndexerHandle, OrderIndexer as _, click::CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE}, jsonrpc::{JsonRpcError, JsonRpcRequest, JsonRpcResponse}, metrics::{IngressMetrics, SYSTEM_METRICS}, primitives::{ @@ -79,6 +79,8 @@ pub struct OrderflowIngress { pub local_builder_url: Option, pub builder_ready_endpoint: Option, pub indexer_handle: IndexerHandle, + /// Maximum local ClickHouse backup disk size in bytes above which user RPC is rejected. + pub disk_max_size_to_accept_user_rpc: u64, // Metrics pub(crate) user_metrics: IngressMetrics, @@ -156,6 +158,11 @@ impl OrderflowIngress { headers: HeaderMap, body: axum::body::Bytes, ) -> JsonRpcResponse { + if CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() > + ingress.disk_max_size_to_accept_user_rpc + { + return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); + } let received_at = UtcInstant::now(); let body = match maybe_decompress(ingress.gzip_enabled, &headers, body) { diff --git a/src/jsonrpc.rs b/src/jsonrpc.rs index deb61eb9..328fdba5 100644 --- a/src/jsonrpc.rs +++ b/src/jsonrpc.rs @@ -177,6 +177,8 @@ pub enum JsonRpcError { RateLimited, #[error("Internal error")] Internal, + #[error("Disk full")] + DiskFull, #[error("{0}")] Unknown(String), } @@ -239,9 +241,11 @@ impl JsonRpcError { Self::InvalidRequest => -32600, Self::MethodNotFound(_) => -32601, Self::InvalidParams => -32602, - Self::RateLimited | Self::Internal | Self::Unknown(_) | Self::InvalidSignature => { - -32603 - } + Self::RateLimited | + Self::Internal | + Self::Unknown(_) | + Self::InvalidSignature | + Self::DiskFull => -32603, } } @@ -254,7 +258,7 @@ impl JsonRpcError { Self::InvalidSignature => StatusCode::BAD_REQUEST, Self::MethodNotFound(_) => StatusCode::NOT_FOUND, Self::RateLimited => StatusCode::TOO_MANY_REQUESTS, - Self::Internal | Self::Unknown(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Internal | Self::Unknown(_) | Self::DiskFull => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/src/lib.rs b/src/lib.rs index 89ed1244..33971677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -259,6 +259,7 @@ pub async fn run_with_listeners( local_builder_url: builder_url, builder_ready_endpoint, indexer_handle, + disk_max_size_to_accept_user_rpc: args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024, user_metrics: IngressMetrics::builder().with_label("handler", "user").build(), system_metrics: IngressMetrics::builder().with_label("handler", "system").build(), }); From 490d5621295555092a79a1bcc3682667bc5214c2 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Thu, 5 Feb 2026 16:46:43 -0300 Subject: [PATCH 11/17] new metric --- src/lib.rs | 3 +++ src/metrics.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 33971677..2a78fbd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,9 @@ pub async fn run( // Set build info metric metrics::BUILD_INFO_METRICS.info(env!("CARGO_PKG_VERSION"), env!("GIT_HASH")).set(1); + metrics::CLICKHOUSE_METRICS + .disk_max_size_to_accept_user_rpc_bytes() + .set(args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024); } let user_listener = TcpListener::bind(&args.user_listen_addr).await?; diff --git a/src/metrics.rs b/src/metrics.rs index 13b0aaaf..15e9cede 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -189,6 +189,9 @@ pub struct ClickhouseMetrics { /// Errors encountered during Clickhouse disk backup. #[metric(labels = ["order", "error"])] backup_disk_errors: Counter, + /// Configured max disk size (bytes) above which user RPC is rejected. + #[metric(rename = "disk_max_size_to_accept_user_rpc_bytes")] + disk_max_size_to_accept_user_rpc_bytes: Gauge, } #[metrics(scope = "indexer_parquet")] From 0c6841892a4856a16060403539f951243171f274 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Thu, 5 Feb 2026 18:03:03 -0300 Subject: [PATCH 12/17] test --- src/indexer/click/mod.rs | 199 ++++++++++++++++++++++++++++++++++++++- src/jsonrpc.rs | 1 + src/utils.rs | 110 +++++++++++++--------- 3 files changed, 261 insertions(+), 49 deletions(-) diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 58e2b5b5..5832cb1d 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -215,18 +215,29 @@ pub(crate) mod tests { use std::{borrow::Cow, collections::BTreeMap, fs, time::Duration}; use crate::{ - cli::ClickhouseArgs, + cli::{ClickhouseArgs, IndexerArgs, OrderflowIngressArgs}, + consts::FLASHBOTS_SIGNATURE_HEADER, indexer::{ - BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, OrderSenders, TARGET_INDEXER, + BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, Indexer, OrderSenders, TARGET_INDEXER, click::{ ClickhouseClientConfig, ClickhouseIndexer, models::{BundleReceiptRow, BundleRow}, }, tests::{bundle_receipt_example, system_bundle_example}, }, - utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, + jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcResponse, JsonRpcResponseTy}, + utils::{ + SHUTDOWN_TIMEOUT, + testutils::{Random, random_raw_bundle_with_tx_count_and_input_size}, + wait_for_critical_tasks, + }, }; + use alloy_primitives::keccak256; + use alloy_signer::Signer; + use alloy_signer_local::PrivateKeySigner; + use axum::{Json, http::StatusCode, routing::post}; use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult}; + use rbuilder_primitives::serialize::RawBundle; use rbuilder_utils::{ clickhouse::{ Quantities, @@ -235,6 +246,7 @@ pub(crate) mod tests { }, tasks::TaskManager, }; + use serde_json::json; use testcontainers::{ ContainerAsync, Image, core::{ @@ -242,7 +254,8 @@ pub(crate) mod tests { }, runners::AsyncRunner as _, }; - use tokio::{runtime::Handle, sync::mpsc}; + use tokio::{net::TcpListener, runtime::Handle, sync::mpsc}; + use tokio_util::sync::CancellationToken; // Uncomment to enable logging during tests. // use tracing::level_filters::LevelFilter; @@ -607,4 +620,182 @@ pub(crate) mod tests { drop(image); } } + + /// Integration test: when backup DB exceeds disk_max_size_to_accept_user_rpc_mb, user RPC + /// returns DiskFull; after fixing ClickHouse and draining backup, RPC accepts again. + /// A little long func.. consider improving this. + #[tokio::test(flavor = "multi_thread")] + async fn disk_full_rpc_rejects_then_accepts_after_drain() { + const BACKUP_DISK_MAX_SIZE_BYTES: u64 = 5; + const BUNDLE_TX_COUNT: usize = 10; + const BUNDLE_TX_INPUT_SIZE: usize = 1024; + const DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB: u64 = 1; + // We need to fill DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB with bundles. + // 2 is to play it safe. + const BUNDLE_COUNT_TO_FILL_DISK: usize = 2 * + (DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB * 1024 * 1024) as usize / + (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); + + const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); + // Asume 100ms per bundle to clickhouse (it's a LOT) + const DRAIN_TIMEOUT: Duration = + Duration::from_millis(100 * BUNDLE_COUNT_TO_FILL_DISK as u64); + + let mut rng = rand::rng(); + let task_manager = TaskManager::new(tokio::runtime::Handle::current()); + let task_executor = task_manager.executor(); + + // 1. Start ClickHouse without tables so inserts fail and go to backup. + let (image, client, config) = create_test_clickhouse_client(false).await.unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let backup_path = temp_dir.path().join("clickhouse-backup.db"); + + let mut clickhouse_args: ClickhouseArgs = config.clone().into(); + clickhouse_args.backup_disk_database_path = backup_path.to_string_lossy().to_string(); + clickhouse_args.backup_disk_max_size_bytes = BACKUP_DISK_MAX_SIZE_BYTES * 1024 * 1024; + + // 2. Mock builder so forwarder requests complete. + let builder_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let builder_port = builder_listener.local_addr().unwrap().port(); + let builder_url = format!("http://127.0.0.1:{builder_port}"); + let app = + axum::Router::new().route("/", post(|| async { (StatusCode::OK, Json(json!({}))) })); + tokio::spawn(async move { axum::serve(builder_listener, app).await.unwrap() }); + + let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub(); + args.peer_update_interval_s = 5; + args.indexing = IndexerArgs { clickhouse: Some(clickhouse_args), parquet: None }; + args.disk_max_size_to_accept_user_rpc_mb = DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB; // 1 MiB threshold + args.builder_url = Some(builder_url.clone()); + + let user_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let user_port = user_listener.local_addr().unwrap().port(); + let user_url = format!("http://127.0.0.1:{user_port}"); + + let (indexer_handle, _indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + + tokio::spawn(async move { + crate::run_with_listeners( + args, + user_listener, + None, + task_executor, + indexer_handle, + cancellation_token, + ) + .await + .unwrap(); + }); + + tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; + + let reqwest_client = reqwest::Client::default(); + let signer = PrivateKeySigner::random(); + + // 3. Phase 1: send bundles until backup exceeds threshold and we get DiskFull. + // Delay between requests so the indexer can commit batches (which fail and go to backup). + let mut got_disk_full = false; + for _ in 0..BUNDLE_COUNT_TO_FILL_DISK { + let bundle = random_raw_bundle_with_tx_count_and_input_size( + &mut rng, + BUNDLE_TX_COUNT, + Some(BUNDLE_TX_INPUT_SIZE), + ); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + if is_disk_full(response).await { + got_disk_full = true; + break; + } + tokio::time::sleep(Duration::from_millis(30)).await; + } + assert!(got_disk_full, "expected RPC to eventually return DiskFull"); + + // 4. Phase 2: create tables so backup can drain. + create_clickhouse_bundles_table(&client).await.unwrap(); + create_clickhouse_bundle_receipts_table(&client).await.unwrap(); + + // 5. Poll until RPC accepts again (backup drained). + let poll_interval = Duration::from_millis(500); + let mut elapsed = Duration::ZERO; + while elapsed < DRAIN_TIMEOUT { + tokio::time::sleep(poll_interval).await; + elapsed += poll_interval; + let bundle = RawBundle::random(&mut rng); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + if response.status().is_success() { + let body = response.bytes().await.unwrap(); + let parsed: JsonRpcResponse = + serde_json::from_slice(body.as_ref()).expect("valid json"); + if matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)) { + break; + } + } + } + assert!(elapsed < DRAIN_TIMEOUT, "RPC did not accept again within {:?}", DRAIN_TIMEOUT); + + // 6. Phase 3: one more successful eth_sendBundle. + let bundle = RawBundle::random(&mut rng); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + assert!( + response.status().is_success(), + "expected success after drain, got {}", + response.text().await.unwrap_or_default() + ); + let body = response.bytes().await.unwrap(); + let parsed: JsonRpcResponse = + serde_json::from_slice(body.as_ref()).unwrap(); + assert!( + matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)), + "expected result, got {:?}", + parsed.result_or_error + ); + + drop(image); + } + + async fn is_disk_full(response: reqwest::Response) -> bool { + let status = response.status(); + let body = response.bytes().await.unwrap(); + if !status.is_success() { + let parsed: JsonRpcResponse<()> = match serde_json::from_slice(body.as_ref()) { + Ok(p) => p, + Err(_) => return false, + }; + matches!( + parsed.result_or_error, + JsonRpcResponseTy::Error { code: -32603, message: JsonRpcError::DiskFull } + ) + } else { + false + } + } + + async fn send_bundle_req( + client: &reqwest::Client, + url: &str, + signer: &PrivateKeySigner, + bundle: &RawBundle, + ) -> reqwest::Response { + let body = json!({ + "id": 0, + "jsonrpc": JSONRPC_VERSION_2, + "method": "eth_sendBundle", + "params": [bundle] + }); + let body_bytes = serde_json::to_vec(&body).unwrap(); + let sighash = format!("{:?}", keccak256(&body_bytes)); + let sig = signer.sign_message(sighash.as_bytes()).await.unwrap(); + let signature_header = format!("{:?}:{}", signer.address(), sig); + client + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(FLASHBOTS_SIGNATURE_HEADER, signature_header) + .body(body_bytes) + .send() + .await + .unwrap() + } } diff --git a/src/jsonrpc.rs b/src/jsonrpc.rs index 328fdba5..ba327e6b 100644 --- a/src/jsonrpc.rs +++ b/src/jsonrpc.rs @@ -194,6 +194,7 @@ impl FromStr for JsonRpcError { "Invalid params" => Self::InvalidParams, "Rate limited" => Self::RateLimited, "Internal error" => Self::Internal, + "Disk full" => Self::DiskFull, s => { if s.starts_with("Method not found: ") { Self::MethodNotFound( diff --git a/src/utils.rs b/src/utils.rs index 4ed7cc5a..aeb6d852 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -142,20 +142,26 @@ pub mod testutils { impl Random for TxEip1559 { fn random(rng: &mut R) -> Self { - let max_fee_per_gas = rng.random(); - let max_priority_fee_per_gas = rng.random_range(0..max_fee_per_gas); + let input_len = rng.random_range(0..=1024); + create_tx_eip1559_with_input_size(rng, input_len) + } + } - Self { - chain_id: 1, - nonce: 0, - gas_limit: 100_000, - max_fee_per_gas, - max_priority_fee_per_gas, - to: TxKind::Call(Address::random_with(rng)), - value: U256::random_with(rng), - access_list: Default::default(), - input: Bytes::random(rng), - } + pub fn create_tx_eip1559_with_input_size(rng: &mut R, data_size: usize) -> TxEip1559 { + let max_fee_per_gas = rng.random(); + let max_priority_fee_per_gas = rng.random_range(0..max_fee_per_gas); + let mut bytes = vec![0u8; data_size]; + rng.fill_bytes(&mut bytes); + TxEip1559 { + chain_id: 1, + nonce: 0, + gas_limit: 100_000, + max_fee_per_gas, + max_priority_fee_per_gas, + to: TxKind::Call(Address::random_with(rng)), + value: U256::random_with(rng), + access_list: Default::default(), + input: bytes.into(), } } @@ -246,42 +252,56 @@ pub mod testutils { } } + /// Create a random [`RawBundle`] with a fixed number of EIP-1559 transactions. + /// When `input_size` is `Some(n)`, each transaction's input has exactly `n` bytes (via + /// [`create_tx_eip1559_with_input_size`]). When `None`, each transaction uses random input + /// size (same as [`TxEip1559::random`]). + pub fn random_raw_bundle_with_tx_count_and_input_size( + rng: &mut R, + tx_count: usize, + input_size: Option, + ) -> RawBundle { + let txs = (0..tx_count) + .map(|_| { + let signer = PrivateKeySigner::random(); + let tx = EthereumTypedTransaction::Eip1559(match input_size { + Some(n) => create_tx_eip1559_with_input_size(rng, n), + None => TxEip1559::random(rng), + }); + let sighash = tx.signature_hash(); + let signature = signer.sign_hash_sync(&sighash).unwrap(); + TxEnvelope::new_unhashed(tx, signature).encoded_2718().into() + }) + .collect(); + + RawBundle { + txs, + metadata: RawBundleMetadata { + reverting_tx_hashes: vec![], + dropping_tx_hashes: vec![], + refund_tx_hashes: None, + signing_address: None, + version: Some("v2".to_string()), + block_number: None, + replacement_uuid: None, + refund_identity: None, + uuid: None, + min_timestamp: None, + max_timestamp: None, + replacement_nonce: Some(rng.random()), + refund_percent: Some(rng.random_range(0..100)), + refund_recipient: Some(Address::random_with(rng)), + delayed_refund: None, + bundle_hash: None, + }, + } + } + impl Random for RawBundle { /// Generate a random bundle with transactions of type Eip1559. fn random(rng: &mut R) -> Self { let txs_len = rng.random_range(1..=10); - // We only generate Eip1559 here. - let txs = (0..txs_len) - .map(|_| { - let signer = PrivateKeySigner::random(); - let tx = EthereumTypedTransaction::Eip1559(TxEip1559::random(rng)); - let sighash = tx.signature_hash(); - let signature = signer.sign_hash_sync(&sighash).unwrap(); - TxEnvelope::new_unhashed(tx, signature).encoded_2718().into() - }) - .collect(); - - Self { - txs, - metadata: RawBundleMetadata { - reverting_tx_hashes: vec![], - dropping_tx_hashes: vec![], - refund_tx_hashes: None, - signing_address: None, - version: Some("v2".to_string()), - block_number: None, - replacement_uuid: None, - refund_identity: None, - uuid: None, - min_timestamp: None, - max_timestamp: None, - replacement_nonce: Some(rng.random()), - refund_percent: Some(rng.random_range(0..100)), - refund_recipient: Some(Address::random_with(rng)), - delayed_refund: None, - bundle_hash: None, - }, - } + random_raw_bundle_with_tx_count_and_input_size(rng, txs_len, None) } } } From a91ebae2a842a4e71c0ee882451a7ee92e726279 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Thu, 12 Feb 2026 14:11:23 -0300 Subject: [PATCH 13/17] ready handler --- src/ingress/mod.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index ff18968b..68e7e969 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -147,6 +147,11 @@ impl OrderflowIngress { tracing::info!(entries = len_after, num_removed, "finished state maintenance"); } + fn clickhouse_backup_disk_size_is_ok(&self) -> bool { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() <= + self.disk_max_size_to_accept_user_rpc + } + #[tracing::instrument(skip_all, name = "ingress", fields( handler = "user", @@ -158,9 +163,7 @@ impl OrderflowIngress { headers: HeaderMap, body: axum::body::Bytes, ) -> JsonRpcResponse { - if CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() > - ingress.disk_max_size_to_accept_user_rpc - { + if !ingress.clickhouse_backup_disk_size_is_ok() { return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); } let received_at = UtcInstant::now(); @@ -296,6 +299,13 @@ impl OrderflowIngress { /// returns 200 if the local builder is not configured. #[tracing::instrument(skip_all, name = "ingress_readyz")] pub async fn ready_handler(State(ingress): State>) -> Response { + if !ingress.clickhouse_backup_disk_size_is_ok() { + return Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body(Body::from("clickhouse backup too big")) + .unwrap(); + } + if let Some(ref url) = ingress.builder_ready_endpoint { let client = reqwest::Client::builder() .timeout(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECS)) @@ -307,7 +317,7 @@ impl OrderflowIngress { tracing::error!(%url, "error sending readyz request"); return Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body(Body::from("not ready")) + .body(Body::from("builder not answering readyz request")) .unwrap(); }; @@ -318,7 +328,7 @@ impl OrderflowIngress { tracing::error!(%url, status = %response.status(), "local builder is not ready"); return Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body(Body::from("not ready")) + .body(Body::from("builder not ready")) .unwrap(); } } From af292c431a969e3863fdb9ba40361ef0d85d79ad Mon Sep 17 00:00:00 2001 From: Daniel Xifra <126988525+ZanCorDX@users.noreply.github.com> Date: Wed, 18 Feb 2026 09:38:06 -0300 Subject: [PATCH 14/17] Update src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 2a78fbd9..8f911d11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,7 +84,7 @@ pub async fn run( metrics::BUILD_INFO_METRICS.info(env!("CARGO_PKG_VERSION"), env!("GIT_HASH")).set(1); metrics::CLICKHOUSE_METRICS .disk_max_size_to_accept_user_rpc_bytes() - .set(args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024); + .set(args.disk_max_size_to_accept_user_rpc_mb.saturating_mul(1024 * 1024)); } let user_listener = TcpListener::bind(&args.user_listen_addr).await?; From ce5067e4292d796ecba4fb9727b66ddd141056f4 Mon Sep 17 00:00:00 2001 From: Daniel Xifra <126988525+ZanCorDX@users.noreply.github.com> Date: Wed, 18 Feb 2026 09:38:47 -0300 Subject: [PATCH 15/17] Update src/indexer/click/mod.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/indexer/click/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 0c7c20ea..2bc6c34b 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -636,7 +636,7 @@ pub(crate) mod tests { (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); - // Asume 100ms per bundle to clickhouse (it's a LOT) + // Assume 100ms per bundle to clickhouse (it's a LOT) const DRAIN_TIMEOUT: Duration = Duration::from_millis(100 * BUNDLE_COUNT_TO_FILL_DISK as u64); From d568db3a5aa5ba9a510cd3f0ffc9ed53978283f7 Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 18 Feb 2026 10:04:58 -0300 Subject: [PATCH 16/17] mutex->atomic --- src/indexer/click/mod.rs | 35 +++++++++++++++++++++-------------- src/ingress/mod.rs | 3 +-- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 2bc6c34b..494b6a09 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -1,6 +1,14 @@ //! Indexing functionality powered by Clickhouse. -use std::{fmt::Debug, marker::PhantomData, sync::LazyLock, time::Duration}; +use std::{ + fmt::Debug, + marker::PhantomData, + sync::{ + LazyLock, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; use crate::{ cli::ClickhouseArgs, @@ -11,7 +19,6 @@ use crate::{ metrics::CLICKHOUSE_METRICS, primitives::{BundleReceipt, SystemBundle}, }; -use parking_lot::Mutex; use rbuilder_utils::{ clickhouse::{ Quantities, @@ -38,27 +45,27 @@ fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> Click /// little global (puaj) info to easily get the current clickhouse disk size. #[derive(Default)] pub(crate) struct ClickhouseLocalBackupDiskSize { - bundles_size: u64, - bundle_receipts_size: u64, + bundles_size: AtomicU64, + bundle_receipts_size: AtomicU64, } impl ClickhouseLocalBackupDiskSize { - pub(crate) fn set_bundles_size(&mut self, size: u64) { - self.bundles_size = size; + pub(crate) fn set_bundles_size(&self, size: u64) { + self.bundles_size.store(size, Ordering::Relaxed); } - pub(crate) fn set_bundle_receipts_size(&mut self, size: u64) { - self.bundle_receipts_size = size; + pub(crate) fn set_bundle_receipts_size(&self, size: u64) { + self.bundle_receipts_size.store(size, Ordering::Relaxed); } pub(crate) fn disk_size(&self) -> u64 { - self.bundles_size + self.bundle_receipts_size + self.bundles_size.load(Ordering::Relaxed) + + self.bundle_receipts_size.load(Ordering::Relaxed) } } /// We store here the current disk size of the backup database to avoid querying the metrics since /// that would include a string map access. -pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock< - Mutex, -> = LazyLock::new(|| Mutex::new(ClickhouseLocalBackupDiskSize::default())); +pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock = + LazyLock::new(|| ClickhouseLocalBackupDiskSize::default()); /// Callback invoked when disk backup size is set. Implement this trait to observe size updates. pub(crate) trait DiskBackupSizeCallback: Send + Sync { @@ -69,7 +76,7 @@ struct UpdateBundleSizeCallback; impl DiskBackupSizeCallback for UpdateBundleSizeCallback { fn on_disk_backup_size(size_bytes: u64) { - CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().set_bundles_size(size_bytes); + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundles_size(size_bytes); } } @@ -77,7 +84,7 @@ struct UpdateBundleReceiptsSizeCallback; impl DiskBackupSizeCallback for UpdateBundleReceiptsSizeCallback { fn on_disk_backup_size(size_bytes: u64) { - CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().set_bundle_receipts_size(size_bytes); + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundle_receipts_size(size_bytes); } } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 68e7e969..28f1187a 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -148,8 +148,7 @@ impl OrderflowIngress { } fn clickhouse_backup_disk_size_is_ok(&self) -> bool { - CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.lock().disk_size() <= - self.disk_max_size_to_accept_user_rpc + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.disk_size() <= self.disk_max_size_to_accept_user_rpc } #[tracing::instrument(skip_all, name = "ingress", From 7d784eccdec3f1e6935914f7a12a4da0fafa4cbc Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 18 Feb 2026 10:11:56 -0300 Subject: [PATCH 17/17] lint/shear --- Cargo.lock | 1 - Cargo.toml | 1 - src/indexer/click/mod.rs | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e3650f6..67377c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2909,7 +2909,6 @@ dependencies = [ "msg-socket", "msg-transport", "openssl", - "parking_lot", "parquet", "prometheus", "prometric", diff --git a/Cargo.toml b/Cargo.toml index 105e11ef..2f44f801 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,6 @@ rand = "0.9.2" derive_more = { version = "2", features = ["deref", "from", "into"] } fdlimit = "0.3.0" rayon = "1.11.0" -parking_lot = { version = "0.12.3" } # serialization serde = { version = "1", features = ["derive"] } diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 494b6a09..786e35db 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -65,7 +65,7 @@ impl ClickhouseLocalBackupDiskSize { /// We store here the current disk size of the backup database to avoid querying the metrics since /// that would include a string map access. pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock = - LazyLock::new(|| ClickhouseLocalBackupDiskSize::default()); + LazyLock::new(ClickhouseLocalBackupDiskSize::default); /// Callback invoked when disk backup size is set. Implement this trait to observe size updates. pub(crate) trait DiskBackupSizeCallback: Send + Sync {