diff --git a/Cargo.lock b/Cargo.lock index 96862a40..67377c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,6 +2931,7 @@ dependencies = [ "tikv-jemallocator", "time", "tokio", + "tokio-util", "tracing", "tracing-subscriber 0.3.22", "uuid", @@ -5467,7 +5468,7 @@ dependencies = [ [[package]] name = "rbuilder-primitives" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=95323c0c6b1ac742a5716aba1942f3e06b8b5241#95323c0c6b1ac742a5716aba1942f3e06b8b5241" +source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" dependencies = [ "ahash", "alloy-consensus", @@ -5499,7 +5500,6 @@ dependencies = [ "ssz_types", "thiserror 1.0.69", "time", - "tracing", "tree_hash 0.8.0", "tree_hash_derive 0.8.0", "typenum", @@ -5509,7 +5509,7 @@ dependencies = [ [[package]] name = "rbuilder-utils" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=95323c0c6b1ac742a5716aba1942f3e06b8b5241#95323c0c6b1ac742a5716aba1942f3e06b8b5241" +source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" dependencies = [ "alloy-primitives 1.5.1", "clickhouse", diff --git a/Cargo.toml b/Cargo.toml index cdf8d767..2f44f801 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ revm-primitives = { version = "21.0.2", default-features = false } revm-interpreter = { version = "29.0.1", default-features = false } # rbuilder -rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241" } -rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241", features = [ +rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2" } +rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2", features = [ "test-utils" ] } @@ -34,6 +34,7 @@ tokio = { version = "1", default-features = false, features = [ "macros" ] } futures = { version = "0.3" } +tokio-util = "0.7.12" # allocator tikv-jemallocator = { version = "0.6", optional = true } diff --git a/src/builderhub/mod.rs b/src/builderhub/mod.rs index e193eb97..dfec9fd5 100644 --- a/src/builderhub/mod.rs +++ b/src/builderhub/mod.rs @@ -81,8 +81,15 @@ impl Peer { /// /// Reference: pub async fn system_api(&self) -> io::Result> { - // NOTE: Needed for integration tests where port is not known upfront. This is also more - // flexible in the case some instances won't run with that default port. + // If `ip` is a valid loopback socket address, use it directly. This is the case in + // integration tests. + if let Ok(addr) = self.ip.parse::() && + addr.ip().is_loopback() + { + return Ok(Some(addr)); + } + + // Parse the provided port, set to DEFAULT_SYSTEM_PORT if not provided. let port = self.ip.split(':').nth(1).and_then(|p| p.parse().ok()).unwrap_or(DEFAULT_SYSTEM_PORT); diff --git a/src/cli.rs b/src/cli.rs index cb436a8f..32e6deca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -98,6 +98,24 @@ pub struct ClickhouseArgs { default_value_t = MAX_DISK_BACKUP_SIZE_BYTES )] pub backup_disk_max_size_bytes: u64, + + /// Send timeout in milliseconds for ClickHouse HTTP requests. Defaults to 2_000. + #[arg( + long = "indexer.clickhouse.send-timeout-ms", + env = "CLICKHOUSE_SEND_TIMEOUT_MS", + id = "CLICKHOUSE_SEND_TIMEOUT_MS", + default_value_t = 2_000 + )] + pub send_timeout_ms: u64, + + /// End-to-end timeout in milliseconds for ClickHouse HTTP requests. Defaults to 3_000. + #[arg( + long = "indexer.clickhouse.end-timeout-ms", + env = "CLICKHOUSE_END_TIMEOUT_MS", + id = "CLICKHOUSE_END_TIMEOUT_MS", + default_value_t = 3_000 + )] + pub end_timeout_ms: u64, } /// Arguments required to setup file-based parquet indexing. diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index 4ffa8c69..a8348f11 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -20,6 +20,7 @@ use rbuilder_utils::{ }, tasks::TaskExecutor, }; +use tokio::task::JoinHandle; mod models; @@ -105,7 +106,7 @@ impl ClickhouseIndexer { receivers: OrderReceivers, task_executor: TaskExecutor, validation: bool, - ) { + ) -> Vec> { let client = config_from_clickhouse_args(&args, validation).into(); tracing::info!("Running with clickhouse indexer"); @@ -117,18 +118,28 @@ impl ClickhouseIndexer { ) .expect("could not create disk backup"); - spawn_clickhouse_inserter_and_backup::( - &client, - receivers.bundle_rx, - &task_executor, - args.bundles_table_name, - builder_name.clone(), - disk_backup.clone(), - args.backup_memory_max_size_bytes, - TARGET_INDEXER, - ); + let send_timeout = Duration::from_millis(args.send_timeout_ms); + let end_timeout = Duration::from_millis(args.end_timeout_ms); - spawn_clickhouse_inserter_and_backup::( + let bundle_inserter_join_handle = + spawn_clickhouse_inserter_and_backup::( + &client, + receivers.bundle_rx, + &task_executor, + args.bundles_table_name, + builder_name.clone(), + disk_backup.clone(), + args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, + TARGET_INDEXER, + ); + + let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< + BundleReceipt, + BundleReceiptRow, + MetricsWrapper, + >( &client, receivers.bundle_receipt_rx, &task_executor, @@ -136,8 +147,11 @@ impl ClickhouseIndexer { builder_name.clone(), disk_backup.clone(), args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, TARGET_INDEXER, ); + vec![bundle_inserter_join_handle, bundle_receipt_inserter_join_handle] } } @@ -155,6 +169,7 @@ pub(crate) mod tests { }, tests::{bundle_receipt_example, system_bundle_example}, }, + utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, }; use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult}; use rbuilder_utils::{ @@ -240,6 +255,7 @@ pub(crate) mod tests { } } + ///Only for testing purposes. impl From for ClickhouseArgs { fn from(config: ClickhouseClientConfig) -> Self { Self { @@ -252,6 +268,8 @@ pub(crate) mod tests { backup_memory_max_size_bytes: 1024 * 1024 * 10, // 10MiB backup_disk_database_path: default_disk_backup_database_path(), backup_disk_max_size_bytes: 1024 * 1024 * 100, // 100MiB + send_timeout_ms: 2_000, + end_timeout_ms: 3_000, } } } @@ -368,7 +386,7 @@ pub(crate) mod tests { let (senders, receivers) = OrderSenders::new(); let validation = false; - ClickhouseIndexer::run( + let indexer_join_handles = ClickhouseIndexer::run( config.into(), builder_name.clone(), receivers, @@ -380,6 +398,8 @@ pub(crate) mod tests { let system_bundle = system_bundle_example(); let system_bundle_row = (system_bundle.clone(), builder_name.clone()).into(); senders.bundle_tx.send(system_bundle.clone()).await.unwrap(); + drop(senders); + wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await; // Wait a bit for bundle to be actually processed before shutting down. tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/src/indexer/click/models.rs b/src/indexer/click/models.rs index 5f9b363e..1755705f 100644 --- a/src/indexer/click/models.rs +++ b/src/indexer/click/models.rs @@ -255,7 +255,7 @@ impl From<(SystemBundle, String)> for BundleRow { .unwrap_or_default(), // Decoded bundles always have a uuid. internal_uuid: decoded.uuid, - replacement_uuid: decoded.replacement_data.clone().map(|r| r.key.id), + replacement_uuid: decoded.replacement_data.map(|r| r.key.id), replacement_nonce: bundle.raw_bundle.metadata.replacement_nonce, signer_address: Some(bundle.metadata.signer), builder_name, diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b95132af..0fea37e5 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -4,7 +4,8 @@ use std::fmt::Debug; use rbuilder_utils::tasks::TaskExecutor; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::error; use crate::{ cli::IndexerArgs, @@ -77,33 +78,38 @@ impl OrderSenders { pub struct Indexer; impl Indexer { + /// Returns the IndexerHandle to send data and a vector of join handles to the indexer tasks so + /// we can wait for them to finish on shutdown. pub fn run( args: IndexerArgs, builder_name: String, task_executor: TaskExecutor, - ) -> IndexerHandle { + ) -> (IndexerHandle, Vec>) { let (senders, receivers) = OrderSenders::new(); match (args.clickhouse, args.parquet) { (None, None) => { MockIndexer.run(receivers, task_executor); - IndexerHandle::new(senders) + (IndexerHandle::new(senders), vec![]) } (Some(clickhouse), None) => { let validation = false; - ClickhouseIndexer::run( + let join_handles = ClickhouseIndexer::run( clickhouse, builder_name, receivers, task_executor, validation, ); - IndexerHandle::new(senders) + (IndexerHandle::new(senders), join_handles) } (None, Some(parquet)) => { ParquetIndexer::run(parquet, builder_name, receivers, task_executor) .expect("failed to start parquet indexer"); - IndexerHandle::new(senders) + error!( + "Parquet indexer does not support proper shutdown, returning empty join handles" + ); + (IndexerHandle::new(senders), vec![]) } (Some(_), Some(_)) => { unreachable!("Cannot specify both clickhouse and parquet indexer"); diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 24037a36..5613a93d 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -52,6 +52,7 @@ use std::{ }; use time::UtcDateTime; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::*; pub mod error; @@ -132,7 +133,7 @@ impl OrderflowIngress { /// Perform maintenance task for internal orderflow ingress state. #[tracing::instrument(skip_all, name = "ingress_maintanance")] - pub async fn maintenance(&self) { + pub fn maintenance(&self) { let len_before = self.entities.len(); tracing::info!(entries = len_before, "starting state maintenance"); @@ -790,7 +791,7 @@ impl IngressSocket { Self { reply_socket: socket, ingress_state, task_executor, certs_rx, acceptor_builder } } - pub async fn listen(mut self) { + pub async fn listen(mut self, cancellation_token: CancellationToken) { loop { tokio::select! { Some(certs) = self.certs_rx.recv() => { @@ -826,6 +827,10 @@ impl IngressSocket { } }); } + _ = cancellation_token.cancelled() => { + info!("Cancellation token cancelled, stopping ingress socket listener"); + break; + } } } diff --git a/src/lib.rs b/src/lib.rs index 12f919b3..89ed1244 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,11 +9,11 @@ use crate::{ client::{HttpClientPool, default_http_builder}, http::spawn_http_forwarder, }, + indexer::IndexerHandle, ingress::IngressSocket, metrics::IngressMetrics, primitives::{AcceptorBuilder, SslAcceptorBuilderExt, SystemBundleDecoder}, priority::workers::PriorityWorkers, - runner::CliContext, statics::LOCAL_PEER_STORE, }; use alloy_signer_local::PrivateKeySigner; @@ -30,6 +30,7 @@ use forwarder::{IngressForwarders, PeerHandle}; use msg_socket::RepSocket; use msg_transport::tcp_tls::{self, TcpTls}; use prometric::exporter::ExporterBuilder; +use rbuilder_utils::tasks::TaskExecutor; use reqwest::Url; use std::{ fs, @@ -38,8 +39,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::net::TcpListener; -use tracing::level_filters::LevelFilter; +use tokio::{net::TcpListener, select}; +use tokio_util::sync::CancellationToken; +use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt}; pub mod cli; @@ -48,7 +50,7 @@ use cli::OrderflowIngressArgs; pub mod ingress; use ingress::OrderflowIngress; -use crate::{cache::OrderCache, indexer::Indexer}; +use crate::cache::OrderCache; pub mod builderhub; mod cache; @@ -61,13 +63,17 @@ pub mod metrics; pub mod primitives; pub mod priority; pub mod rate_limit; -pub mod runner; pub mod statics; pub mod trace; pub mod utils; pub mod validation; -pub async fn run(args: OrderflowIngressArgs, ctx: CliContext) -> eyre::Result<()> { +pub async fn run( + args: OrderflowIngressArgs, + task_executor: TaskExecutor, + indexer_handle: IndexerHandle, + cancellation_token: CancellationToken, +) -> eyre::Result<()> { fdlimit::raise_fd_limit()?; if let Some(ref metrics_addr) = args.metrics { @@ -85,14 +91,26 @@ pub async fn run(args: OrderflowIngressArgs, ctx: CliContext) -> eyre::Result<() } else { None }; - run_with_listeners(args, user_listener, builder_listener, ctx).await + run_with_listeners( + args, + user_listener, + builder_listener, + task_executor, + indexer_handle, + cancellation_token, + ) + .await } +/// Cancellation is a little ugly, just added enough to make it drop indexer_handle but it's a mix +/// of cancellation_token + task_executor which also has a shutdown method. pub async fn run_with_listeners( args: OrderflowIngressArgs, user_listener: TcpListener, builder_listener: Option, - ctx: CliContext, + task_executor: TaskExecutor, + indexer_handle: IndexerHandle, + cancellation_token: CancellationToken, ) -> eyre::Result<()> { // Initialize tracing. let registry = tracing_subscriber::registry().with( @@ -104,8 +122,6 @@ pub async fn run_with_listeners( let _ = registry.with(tracing_subscriber::fmt::layer()).try_init(); } - let indexer_handle = Indexer::run(args.indexing, args.builder_name, ctx.task_executor.clone()); - let orderflow_signer = match args.orderflow_signer { Some(signer) => { tracing::warn!( @@ -154,10 +170,10 @@ pub async fn run_with_listeners( peer_update_config, builder_hub, peers.clone(), - ctx.task_executor.clone(), + task_executor.clone(), ); - ctx.task_executor + task_executor .spawn_critical("run_update_peers", peer_updater.run(args.peer_update_interval_s)); (socket, certs_rx) } else { @@ -176,7 +192,7 @@ pub async fn run_with_listeners( let tls = tcp_tls::Server::new(acceptor.into()); let mut socket = RepSocket::new(TcpTls::Server(tls)); socket.bind(args.system_listen_addr).await.expect("to bind system listener"); - let port = socket.local_addr().expect("bound").port(); + let local = socket.local_addr().unwrap(); let peer = Peer { name: local_signer.to_string(), @@ -184,20 +200,16 @@ pub async fn run_with_listeners( ecdsa_pubkey_address: local_signer, ..Default::default() }, - ip: format!("127.0.0.1:{port}"), - dns_name: "localhost".to_owned(), + ip: format!("{}:{}", local.ip(), local.port()), + dns_name: "localhost".to_string(), instance: InstanceData { tls_cert: peer_cert }, }; let peer_store = peer_store.register(peer); - let (peer_updater, certs_rx) = PeersUpdater::new( - peer_update_config, - peer_store, - peers.clone(), - ctx.task_executor.clone(), - ); + let (peer_updater, certs_rx) = + PeersUpdater::new(peer_update_config, peer_store, peers.clone(), task_executor.clone()); - ctx.task_executor + task_executor .spawn_critical("run_update_peers", peer_updater.run(args.peer_update_interval_s)); (socket, certs_rx) }; @@ -213,7 +225,7 @@ pub async fn run_with_listeners( builder_url.to_string(), // Use 1 client here, this is still using HTTP/1.1 with internal connection pooling. HttpClientPool::new(NonZero::new(1).unwrap(), || client.clone()), - &ctx.task_executor, + &task_executor, )?; IngressForwarders::new(local_sender, peers, orderflow_signer, workers.clone()) @@ -252,12 +264,21 @@ pub async fn run_with_listeners( }); // Spawn a state maintenance task. - tokio::spawn({ + let cancellation_token_clone = cancellation_token.clone(); + task_executor.spawn({ let ingress = ingress.clone(); async move { loop { - tokio::time::sleep(Duration::from_secs(60)).await; - ingress.maintenance().await; + info!("starting state maintenance!!"); + select! { + _ = cancellation_token_clone.cancelled() => { + info!("Cancellation token cancelled, stopping state maintenance"); + break; + } + _ = tokio::time::sleep(Duration::from_secs(60)) => { + ingress.maintenance(); + } + } } } }); @@ -268,9 +289,9 @@ pub async fn run_with_listeners( certs_rx, ingress.clone(), acceptor_builder, - ctx.task_executor.clone(), + task_executor.clone(), ); - ctx.task_executor.spawn(ingress_socket.listen()); + task_executor.spawn(ingress_socket.listen(cancellation_token)); // Spawn user facing HTTP server for accepting bundles and raw transactions. let user_router = Router::new() diff --git a/src/main.rs b/src/main.rs index 8f5b85bb..a27dd414 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,15 @@ +use std::time::Duration; + use clap::Parser; use flowproxy::{ cli::OrderflowIngressArgs, - runner::{CliContext, CliRunner}, + indexer::Indexer, trace::init_tracing, + utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, }; +use rbuilder_utils::tasks::{PanickedTaskError, TaskManager}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; #[cfg(all(feature = "jemalloc", unix))] type AllocatorInner = tikv_jemallocator::Jemalloc; @@ -33,12 +39,115 @@ fn main() { .enable_all() .build() .expect("failed to create runtime"); + let task_manager = TaskManager::new(tokio_runtime.handle().clone()); + info!("Main task started"); + + // Executes the main task command until it finished or ctrl-c was fired. + // IMPORTANT: flowproxy::run has no nice cancellation and will be stopped being polled abruptly + // so it must not contain any critical tasks that need proper shutdown. + tokio_runtime.block_on(run_with_shutdown(args, task_manager)); + + info!("Main task finished. Shutting down tokio runtime"); + + if let Err(error) = wait_tokio_runtime_shutdown(tokio_runtime, Duration::from_secs(5)) { + error!(?error, "Flow proxy terminated with error"); + } +} + +async fn run_with_shutdown(args: OrderflowIngressArgs, mut task_manager: TaskManager) { + let task_executor = task_manager.executor(); + let (indexer_handle, indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + let main_task = flowproxy::run(args, task_executor, indexer_handle, cancellation_token.clone()); + match run_to_completion_or_panic(&mut task_manager, run_until_ctrl_c(main_task)).await { + Ok(()) => { + tracing::warn!(target = "cli", "shutting down gracefully"); + } + Err(err) => { + tracing::error!(?err, target = "cli", "shutting down due to error"); + } + } + // This kills some tasks launched by flowproxy::run that release the last references to the + // indexer_handle. + cancellation_token.cancel(); + // At this point all the rpc was abruptly dropped which dropped the indexer_handle and that will + // allow the indexer core to process all pending data and start shutting down. + wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await; + // We already have a chance to critical tasks to finish by themselves, so we can now call the + // graceful shutdown. + task_manager.graceful_shutdown_with_timeout(SHUTDOWN_TIMEOUT); +} + +fn wait_tokio_runtime_shutdown( + tokio_runtime: tokio::runtime::Runtime, + timeout: Duration, +) -> Result<(), std::sync::mpsc::RecvTimeoutError> { + // `drop(tokio_runtime)` would block the current thread until its pools + // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop + // it on a separate thread and wait for up to 5 seconds for this operation to + // complete. + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::Builder::new() + .name("tokio-runtime-shutdown".to_string()) + .spawn(move || { + drop(tokio_runtime); + let _ = tx.send(()); + }) + .unwrap(); + + rx.recv_timeout(timeout).inspect_err(|err| { + tracing::debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); + }) +} - let runner = CliRunner::from_runtime(tokio_runtime); +/// Runs the future to completion or until: +/// - `ctrl-c` is received. +/// - `SIGTERM` is received (unix only). +async fn run_until_ctrl_c(fut: F) -> Result<(), E> +where + F: Future>, + E: Send + Sync + 'static + From, +{ + let ctrl_c = tokio::signal::ctrl_c(); - let command = |ctx: CliContext| flowproxy::run(args, ctx); + let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + let sigterm = stream.recv(); + let sigterm = Box::pin(sigterm); + let ctrl_c = Box::pin(ctrl_c); + let fut = Box::pin(fut); + + tokio::select! { + _ = ctrl_c => { + tracing::info!("Received ctrl-c"); + }, + _ = sigterm => { + tracing::info!("Received SIGTERM"); + }, + res = fut => res?, + } + + Ok(()) +} - if let Err(e) = runner.run_command_until_exit(command) { - eprintln!("Orderflow proxy terminated with error: {e}"); +/// Runs the given future to completion or until a critical task panicked. +/// +/// Returns the error if a task panicked, or the given future returned an error. +async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> +where + F: Future>, + E: Send + Sync + From + 'static, +{ + { + let fut = Box::pin(fut); + tokio::select! { + task_manager_result = tasks => { + if let Err(panicked_error) = task_manager_result { + return Err(panicked_error.into()); + } + }, + res = fut => res?, + } } + Ok(()) } diff --git a/src/runner/mod.rs b/src/runner/mod.rs deleted file mode 100644 index e1c91373..00000000 --- a/src/runner/mod.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Asynchronous task runner utilities. - -use std::{future::Future, time::Duration}; - -use rbuilder_utils::tasks::{PanickedTaskError, TaskExecutor, TaskManager}; - -#[derive(Debug, Clone)] -pub struct CliContext { - pub task_executor: TaskExecutor, -} - -/// Executes CLI commands. -/// -/// Provides utilities for running a cli command to completion. -#[derive(Debug)] -#[non_exhaustive] -pub struct CliRunner { - tokio_runtime: tokio::runtime::Runtime, -} - -impl CliRunner { - /// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime). - pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self { - Self { tokio_runtime } - } -} - -// === impl CliRunner === - -impl CliRunner { - /// Executes the given _async_ command on the tokio runtime until the command future resolves or - /// until the process receives a `SIGINT` or `SIGTERM` signal. - /// - /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made - /// to drive their shutdown to completion after the command has finished. - pub fn run_command_until_exit( - self, - command: impl FnOnce(CliContext) -> F, - ) -> Result<(), E> - where - F: Future>, - E: Send + Sync + From + From + 'static, - { - let tokio_runtime = self.tokio_runtime; - let mut task_manager = TaskManager::new(tokio_runtime.handle().clone()); - let task_executor = task_manager.executor(); - let context = CliContext { task_executor }; - - // Executes the command until it finished or ctrl-c was fired - let command_res = tokio_runtime.block_on(run_to_completion_or_panic( - &mut task_manager, - run_until_ctrl_c(command(context)), - )); - - if command_res.is_err() { - tracing::error!(target: "cli", "shutting down due to error"); - } else { - tracing::debug!(target: "cli", "shutting down gracefully"); - // after the command has finished or exit signal was received we shutdown the task - // manager which fires the shutdown signal to all tasks spawned via the task - // executor and awaiting on tasks spawned with graceful shutdown - task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5)); - } - - // `drop(tokio_runtime)` would block the current thread until its pools - // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop - // it on a separate thread and wait for up to 5 seconds for this operation to - // complete. - let (tx, rx) = std::sync::mpsc::channel(); - std::thread::Builder::new() - .name("tokio-runtime-shutdown".to_string()) - .spawn(move || { - drop(tokio_runtime); - let _ = tx.send(()); - }) - .unwrap(); - - let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| { - tracing::debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out"); - }); - - command_res - } -} - -/// Runs the future to completion or until: -/// - `ctrl-c` is received. -/// - `SIGTERM` is received (unix only). -async fn run_until_ctrl_c(fut: F) -> Result<(), E> -where - F: Future>, - E: Send + Sync + 'static + From, -{ - let ctrl_c = tokio::signal::ctrl_c(); - - let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; - let sigterm = stream.recv(); - let sigterm = Box::pin(sigterm); - let ctrl_c = Box::pin(ctrl_c); - let fut = Box::pin(fut); - - tokio::select! { - _ = ctrl_c => { - tracing::info!("Received ctrl-c"); - }, - _ = sigterm => { - tracing::info!("Received SIGTERM"); - }, - res = fut => res?, - } - - Ok(()) -} - -/// Runs the given future to completion or until a critical task panicked. -/// -/// Returns the error if a task panicked, or the given future returned an error. -async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> -where - F: Future>, - E: Send + Sync + From + 'static, -{ - { - let fut = Box::pin(fut); - tokio::select! { - task_manager_result = tasks => { - if let Err(panicked_error) = task_manager_result { - return Err(panicked_error.into()); - } - }, - res = fut => res?, - } - } - Ok(()) -} diff --git a/src/utils.rs b/src/utils.rs index 846515cf..4ed7cc5a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,11 @@ use alloy_eips::eip2718::EIP4844_TX_TYPE_ID; use alloy_primitives::Bytes; use alloy_rlp::{Buf as _, Header}; +use futures::{StreamExt, stream::FuturesUnordered}; use std::time::{Duration, Instant}; use time::UtcDateTime; +use tokio::task::JoinHandle; +use tracing::{error, info}; use uuid::Uuid; use crate::{statics::START, validation::MAINNET_CHAIN_ID}; @@ -282,3 +285,36 @@ pub mod testutils { } } } + +/// This time out should be enough for the inserter to flush all pending clickhouse data (timeout is +/// clickhouse usually a few secs) and local DB data (disk flush time). +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20); + +/// Consider move this to rbuilder-utils. +/// Waits for critical_tasks to finish by themselves up to grateful_timeout. +pub async fn wait_for_critical_tasks( + critical_tasks: Vec>, + grateful_timeout: Duration, +) { + let mut critical_tasks: FuturesUnordered<_> = critical_tasks.into_iter().collect(); + let critical_deadline = tokio::time::Instant::now() + grateful_timeout; + loop { + tokio::select! { + biased; + result = critical_tasks.next() => { + match result { + Some(Err(err)) => error!(?err, "Critical task handle await error"), + Some(Ok(())) => {} + None => { + info!("All critical tasks finished ok"); + break; + } + } + } + _ = tokio::time::sleep_until(critical_deadline) => { + error!(pending_task_count = critical_tasks.len(), "Critical tasks shutdown timeout reached"); + break; + } + } + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7adf4587..3e153b1e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,16 +10,18 @@ use axum::{Router, extract::State, routing::post}; use flowproxy::{ cli::OrderflowIngressArgs, consts::FLASHBOTS_SIGNATURE_HEADER, + indexer::Indexer, ingress::maybe_decompress, jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcRequest, JsonRpcResponse}, - runner::CliContext, }; use hyper::{HeaderMap, header}; use rbuilder_primitives::serialize::RawBundle; +use rbuilder_utils::tasks::TaskManager; use revm_primitives::keccak256; use serde::de::DeserializeOwned; use serde_json::{Value, json}; use tokio::{net::TcpListener, sync::mpsc}; +use tokio_util::sync::CancellationToken; pub(crate) struct IngressClient { pub(crate) url: String, @@ -29,19 +31,24 @@ pub(crate) struct IngressClient { pub(crate) async fn spawn_ingress_with_args( args: OrderflowIngressArgs, + task_manager: &TaskManager, ) -> IngressClient { let user_listener = TcpListener::bind(&args.user_listen_addr).await.unwrap(); let builder_listener = None; let address = user_listener.local_addr().unwrap(); - - let task_manager = rbuilder_utils::tasks::TaskManager::current(); - + let task_executor = task_manager.executor(); tokio::spawn(async move { + let (indexer_handle, _indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + flowproxy::run_with_listeners( args, user_listener, builder_listener, - CliContext { task_executor: task_manager.executor() }, + task_executor, + indexer_handle, + cancellation_token, ) .await .unwrap(); @@ -55,11 +62,14 @@ pub(crate) async fn spawn_ingress_with_args( } #[allow(dead_code)] -pub(crate) async fn spawn_ingress(builder_url: Option) -> IngressClient { +pub(crate) async fn spawn_ingress( + builder_url: Option, + task_manager: &TaskManager, +) -> IngressClient { let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub(); args.peer_update_interval_s = 5; args.builder_url = builder_url; - spawn_ingress_with_args(args).await + spawn_ingress_with_args(args, task_manager).await } impl IngressClient { diff --git a/tests/ingress.rs b/tests/ingress.rs index 236f291a..5aa2f6b6 100644 --- a/tests/ingress.rs +++ b/tests/ingress.rs @@ -34,7 +34,8 @@ mod assert { async fn ingress_http_e2e() { let mut rng = rand::rng(); let mut builder = BuilderReceiver::spawn().await; - let client = spawn_ingress(Some(builder.url())).await; + let task_manager = rbuilder_utils::tasks::TaskManager::current(); + let client = spawn_ingress(Some(builder.url()), &task_manager).await; let empty = json!({}); let response = diff --git a/tests/network.rs b/tests/network.rs index c3c9c4a2..5f878a59 100644 --- a/tests/network.rs +++ b/tests/network.rs @@ -8,7 +8,7 @@ use common::BuilderReceiver; mod common; use flowproxy::{cli::OrderflowIngressArgs, utils::testutils::Random as _}; use rbuilder_primitives::serialize::RawBundle; -use tracing::{debug, info}; +use tracing::info; use crate::common::spawn_ingress_with_args; @@ -17,7 +17,7 @@ use crate::common::spawn_ingress_with_args; async fn network_e2e_bundle_tx_works() { let _ = tracing_subscriber::fmt::try_init(); info!("starting network e2e tcp test"); - + let task_manager = rbuilder_utils::tasks::TaskManager::current(); let mut rng = rand::rng(); let builder1 = BuilderReceiver::spawn().await; let builder2 = BuilderReceiver::spawn().await; @@ -31,24 +31,24 @@ async fn network_e2e_bundle_tx_works() { PathBuf::from_str("./tests/testdata/certificates/cert_1.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_1.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); - let client1 = spawn_ingress_with_args(args.clone()).await; + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); + let client1 = spawn_ingress_with_args(args.clone(), &task_manager).await; args.builder_url = Some(builder2.url()); args.server_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_2.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_2.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); - let client2 = spawn_ingress_with_args(args.clone()).await; + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); + let client2 = spawn_ingress_with_args(args.clone(), &task_manager).await; args.builder_url = Some(builder3.url()); args.server_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_3.pem").unwrap(); args.client_certificate_pem_file = PathBuf::from_str("./tests/testdata/certificates/cert_3.pem").unwrap(); - args.system_listen_addr = "[::1]:0".parse().unwrap(); - let _client3 = spawn_ingress_with_args(args.clone()).await; + args.system_listen_addr = "127.0.0.1:0".parse().unwrap(); + let _client3 = spawn_ingress_with_args(args.clone(), &task_manager).await; // args.builder_url = Some(builder4.url()); // args.server_certificate_pem_file = @@ -71,7 +71,7 @@ async fn network_e2e_bundle_tx_works() { for (i, b) in builders.iter_mut().enumerate() { let received = b.recv::().await.unwrap(); assert_eq!(received, raw_tx); - debug!(?i, "builder received tx from client1"); + info!(?i, "builder received tx from client1"); } let bundle = RawBundle::random(&mut rng); @@ -81,7 +81,7 @@ async fn network_e2e_bundle_tx_works() { for (i, b) in builders.iter_mut().enumerate() { let mut received = b.recv::().await.unwrap(); - debug!(?i, "builder received raw bundle from client2"); + info!(?i, "builder received raw bundle from client2"); assert!(received.metadata.signing_address.is_some()); assert!(received.metadata.bundle_hash.is_some()); diff --git a/tests/testdata/openssl.cnf b/tests/testdata/openssl.cnf index a8142f1f..fc30d3cb 100644 --- a/tests/testdata/openssl.cnf +++ b/tests/testdata/openssl.cnf @@ -12,4 +12,5 @@ subjectAltName = @alt_names [alt_names] DNS.1 = localhost -IP.1 = 127.0.0.1 \ No newline at end of file +IP.1 = 127.0.0.1 +IP.2 = ::1