diff --git a/src/cli.rs b/src/cli.rs index 32e6deca..7aeb7772 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -266,6 +266,16 @@ pub struct OrderflowIngressArgs { #[clap(long = "http.enable-gzip", default_value_t = false)] pub gzip_enabled: bool, + /// Maximum local ClickHouse backup disk size in MB above which user RPC (e.g. eth_sendBundle) + /// is rejected with disk full. Defaults to 1024 MB (1 GiB). + #[clap( + long = "disk-max-size-to-accept-user-rpc-mb", + default_value_t = 1024, + env = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC", + id = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC" + )] + pub disk_max_size_to_accept_user_rpc_mb: u64, + /// The interval in seconds to update the peer list from BuilderHub. #[clap( long = "peer.update-interval-s", @@ -342,6 +352,7 @@ impl Default for OrderflowIngressArgs { score_bucket_s: 4, log_json: false, gzip_enabled: false, + disk_max_size_to_accept_user_rpc_mb: 1024, tcp_small_clients: NonZero::new(1).expect("non-zero"), tcp_big_clients: 0, io_threads: 4, diff --git a/src/indexer/click/mod.rs b/src/indexer/click/mod.rs index a8348f11..786e35db 100644 --- a/src/indexer/click/mod.rs +++ b/src/indexer/click/mod.rs @@ -1,6 +1,14 @@ //! Indexing functionality powered by Clickhouse. -use std::{fmt::Debug, time::Duration}; +use std::{ + fmt::Debug, + marker::PhantomData, + sync::{ + LazyLock, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; use crate::{ cli::ClickhouseArgs, @@ -34,9 +42,59 @@ fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> Click } } -struct MetricsWrapper; +/// little global (puaj) info to easily get the current clickhouse disk size. +#[derive(Default)] +pub(crate) struct ClickhouseLocalBackupDiskSize { + bundles_size: AtomicU64, + bundle_receipts_size: AtomicU64, +} + +impl ClickhouseLocalBackupDiskSize { + pub(crate) fn set_bundles_size(&self, size: u64) { + self.bundles_size.store(size, Ordering::Relaxed); + } + pub(crate) fn set_bundle_receipts_size(&self, size: u64) { + self.bundle_receipts_size.store(size, Ordering::Relaxed); + } + pub(crate) fn disk_size(&self) -> u64 { + self.bundles_size.load(Ordering::Relaxed) + + self.bundle_receipts_size.load(Ordering::Relaxed) + } +} + +/// We store here the current disk size of the backup database to avoid querying the metrics since +/// that would include a string map access. +pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock = + LazyLock::new(ClickhouseLocalBackupDiskSize::default); + +/// Callback invoked when disk backup size is set. Implement this trait to observe size updates. +pub(crate) trait DiskBackupSizeCallback: Send + Sync { + fn on_disk_backup_size(size_bytes: u64); +} + +struct UpdateBundleSizeCallback; + +impl DiskBackupSizeCallback for UpdateBundleSizeCallback { + fn on_disk_backup_size(size_bytes: u64) { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundles_size(size_bytes); + } +} + +struct UpdateBundleReceiptsSizeCallback; + +impl DiskBackupSizeCallback for UpdateBundleReceiptsSizeCallback { + fn on_disk_backup_size(size_bytes: u64) { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundle_receipts_size(size_bytes); + } +} + +struct MetricsWrapper(PhantomData) +where + F: DiskBackupSizeCallback; -impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { +impl rbuilder_utils::clickhouse::backup::metrics::Metrics + for MetricsWrapper +{ fn increment_write_failures(err: String) { CLICKHOUSE_METRICS.write_failures(err).inc(); } @@ -60,6 +118,7 @@ impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { } fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str) { + F::on_disk_backup_size(size_bytes); CLICKHOUSE_METRICS.backup_size_bytes(order, "disk").set(size_bytes); CLICKHOUSE_METRICS.backup_size_batches(order, "disk").set(batches); } @@ -120,25 +179,27 @@ impl ClickhouseIndexer { let send_timeout = Duration::from_millis(args.send_timeout_ms); let end_timeout = Duration::from_millis(args.end_timeout_ms); - - let bundle_inserter_join_handle = - spawn_clickhouse_inserter_and_backup::( - &client, - receivers.bundle_rx, - &task_executor, - args.bundles_table_name, - builder_name.clone(), - disk_backup.clone(), - args.backup_memory_max_size_bytes, - send_timeout, - end_timeout, - TARGET_INDEXER, - ); + let bundle_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< + SystemBundle, + BundleRow, + MetricsWrapper, + >( + &client, + receivers.bundle_rx, + &task_executor, + args.bundles_table_name, + builder_name.clone(), + disk_backup.clone(), + args.backup_memory_max_size_bytes, + send_timeout, + end_timeout, + TARGET_INDEXER, + ); let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< BundleReceipt, BundleReceiptRow, - MetricsWrapper, + MetricsWrapper, >( &client, receivers.bundle_receipt_rx, @@ -160,18 +221,29 @@ pub(crate) mod tests { use std::{borrow::Cow, collections::BTreeMap, fs, time::Duration}; use crate::{ - cli::ClickhouseArgs, + cli::{ClickhouseArgs, IndexerArgs, OrderflowIngressArgs}, + consts::FLASHBOTS_SIGNATURE_HEADER, indexer::{ - BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, OrderSenders, TARGET_INDEXER, + BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, Indexer, OrderSenders, TARGET_INDEXER, click::{ ClickhouseClientConfig, ClickhouseIndexer, models::{BundleReceiptRow, BundleRow}, }, tests::{bundle_receipt_example, system_bundle_example}, }, - utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, + jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcResponse, JsonRpcResponseTy}, + utils::{ + SHUTDOWN_TIMEOUT, + testutils::{Random, random_raw_bundle_with_tx_count_and_input_size}, + wait_for_critical_tasks, + }, }; + use alloy_primitives::keccak256; + use alloy_signer::Signer; + use alloy_signer_local::PrivateKeySigner; + use axum::{Json, http::StatusCode, routing::post}; use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult}; + use rbuilder_primitives::serialize::RawBundle; use rbuilder_utils::{ clickhouse::{ Quantities, @@ -180,6 +252,7 @@ pub(crate) mod tests { }, tasks::TaskManager, }; + use serde_json::json; use testcontainers::{ ContainerAsync, Image, core::{ @@ -187,7 +260,8 @@ pub(crate) mod tests { }, runners::AsyncRunner as _, }; - use tokio::{runtime::Handle, sync::mpsc}; + use tokio::{net::TcpListener, runtime::Handle, sync::mpsc}; + use tokio_util::sync::CancellationToken; // Uncomment to enable logging during tests. // use tracing::level_filters::LevelFilter; @@ -552,4 +626,182 @@ pub(crate) mod tests { drop(image); } } + + /// Integration test: when backup DB exceeds disk_max_size_to_accept_user_rpc_mb, user RPC + /// returns DiskFull; after fixing ClickHouse and draining backup, RPC accepts again. + /// A little long func.. consider improving this. + #[tokio::test(flavor = "multi_thread")] + async fn disk_full_rpc_rejects_then_accepts_after_drain() { + const BACKUP_DISK_MAX_SIZE_BYTES: u64 = 5; + const BUNDLE_TX_COUNT: usize = 10; + const BUNDLE_TX_INPUT_SIZE: usize = 1024; + const DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB: u64 = 1; + // We need to fill DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB with bundles. + // 2 is to play it safe. + const BUNDLE_COUNT_TO_FILL_DISK: usize = 2 * + (DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB * 1024 * 1024) as usize / + (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); + + const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); + // Assume 100ms per bundle to clickhouse (it's a LOT) + const DRAIN_TIMEOUT: Duration = + Duration::from_millis(100 * BUNDLE_COUNT_TO_FILL_DISK as u64); + + let mut rng = rand::rng(); + let task_manager = TaskManager::new(tokio::runtime::Handle::current()); + let task_executor = task_manager.executor(); + + // 1. Start ClickHouse without tables so inserts fail and go to backup. + let (image, client, config) = create_test_clickhouse_client(false).await.unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let backup_path = temp_dir.path().join("clickhouse-backup.db"); + + let mut clickhouse_args: ClickhouseArgs = config.clone().into(); + clickhouse_args.backup_disk_database_path = backup_path.to_string_lossy().to_string(); + clickhouse_args.backup_disk_max_size_bytes = BACKUP_DISK_MAX_SIZE_BYTES * 1024 * 1024; + + // 2. Mock builder so forwarder requests complete. + let builder_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let builder_port = builder_listener.local_addr().unwrap().port(); + let builder_url = format!("http://127.0.0.1:{builder_port}"); + let app = + axum::Router::new().route("/", post(|| async { (StatusCode::OK, Json(json!({}))) })); + tokio::spawn(async move { axum::serve(builder_listener, app).await.unwrap() }); + + let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub(); + args.peer_update_interval_s = 5; + args.indexing = IndexerArgs { clickhouse: Some(clickhouse_args), parquet: None }; + args.disk_max_size_to_accept_user_rpc_mb = DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB; // 1 MiB threshold + args.builder_url = Some(builder_url.clone()); + + let user_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let user_port = user_listener.local_addr().unwrap().port(); + let user_url = format!("http://127.0.0.1:{user_port}"); + + let (indexer_handle, _indexer_join_handles) = + Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); + let cancellation_token = CancellationToken::new(); + + tokio::spawn(async move { + crate::run_with_listeners( + args, + user_listener, + None, + task_executor, + indexer_handle, + cancellation_token, + ) + .await + .unwrap(); + }); + + tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; + + let reqwest_client = reqwest::Client::default(); + let signer = PrivateKeySigner::random(); + + // 3. Phase 1: send bundles until backup exceeds threshold and we get DiskFull. + // Delay between requests so the indexer can commit batches (which fail and go to backup). + let mut got_disk_full = false; + for _ in 0..BUNDLE_COUNT_TO_FILL_DISK { + let bundle = random_raw_bundle_with_tx_count_and_input_size( + &mut rng, + BUNDLE_TX_COUNT, + Some(BUNDLE_TX_INPUT_SIZE), + ); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + if is_disk_full(response).await { + got_disk_full = true; + break; + } + tokio::time::sleep(Duration::from_millis(30)).await; + } + assert!(got_disk_full, "expected RPC to eventually return DiskFull"); + + // 4. Phase 2: create tables so backup can drain. + create_clickhouse_bundles_table(&client).await.unwrap(); + create_clickhouse_bundle_receipts_table(&client).await.unwrap(); + + // 5. Poll until RPC accepts again (backup drained). + let poll_interval = Duration::from_millis(500); + let mut elapsed = Duration::ZERO; + while elapsed < DRAIN_TIMEOUT { + tokio::time::sleep(poll_interval).await; + elapsed += poll_interval; + let bundle = RawBundle::random(&mut rng); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + if response.status().is_success() { + let body = response.bytes().await.unwrap(); + let parsed: JsonRpcResponse = + serde_json::from_slice(body.as_ref()).expect("valid json"); + if matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)) { + break; + } + } + } + assert!(elapsed < DRAIN_TIMEOUT, "RPC did not accept again within {:?}", DRAIN_TIMEOUT); + + // 6. Phase 3: one more successful eth_sendBundle. + let bundle = RawBundle::random(&mut rng); + let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await; + assert!( + response.status().is_success(), + "expected success after drain, got {}", + response.text().await.unwrap_or_default() + ); + let body = response.bytes().await.unwrap(); + let parsed: JsonRpcResponse = + serde_json::from_slice(body.as_ref()).unwrap(); + assert!( + matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)), + "expected result, got {:?}", + parsed.result_or_error + ); + + drop(image); + } + + async fn is_disk_full(response: reqwest::Response) -> bool { + let status = response.status(); + let body = response.bytes().await.unwrap(); + if !status.is_success() { + let parsed: JsonRpcResponse<()> = match serde_json::from_slice(body.as_ref()) { + Ok(p) => p, + Err(_) => return false, + }; + matches!( + parsed.result_or_error, + JsonRpcResponseTy::Error { code: -32603, message: JsonRpcError::DiskFull } + ) + } else { + false + } + } + + async fn send_bundle_req( + client: &reqwest::Client, + url: &str, + signer: &PrivateKeySigner, + bundle: &RawBundle, + ) -> reqwest::Response { + let body = json!({ + "id": 0, + "jsonrpc": JSONRPC_VERSION_2, + "method": "eth_sendBundle", + "params": [bundle] + }); + let body_bytes = serde_json::to_vec(&body).unwrap(); + let sighash = format!("{:?}", keccak256(&body_bytes)); + let sig = signer.sign_message(sighash.as_bytes()).await.unwrap(); + let signature_header = format!("{:?}:{}", signer.address(), sig); + client + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(FLASHBOTS_SIGNATURE_HEADER, signature_header) + .body(body_bytes) + .send() + .await + .unwrap() + } } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 5613a93d..28f1187a 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -8,7 +8,7 @@ use crate::{ }, entity::{Entity, EntityBuilderStats, EntityData, EntityRequest, EntityScores, SpamThresholds}, forwarder::IngressForwarders, - indexer::{IndexerHandle, OrderIndexer as _}, + indexer::{IndexerHandle, OrderIndexer as _, click::CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE}, jsonrpc::{JsonRpcError, JsonRpcRequest, JsonRpcResponse}, metrics::{IngressMetrics, SYSTEM_METRICS}, primitives::{ @@ -79,6 +79,8 @@ pub struct OrderflowIngress { pub local_builder_url: Option, pub builder_ready_endpoint: Option, pub indexer_handle: IndexerHandle, + /// Maximum local ClickHouse backup disk size in bytes above which user RPC is rejected. + pub disk_max_size_to_accept_user_rpc: u64, // Metrics pub(crate) user_metrics: IngressMetrics, @@ -145,6 +147,10 @@ impl OrderflowIngress { tracing::info!(entries = len_after, num_removed, "finished state maintenance"); } + fn clickhouse_backup_disk_size_is_ok(&self) -> bool { + CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.disk_size() <= self.disk_max_size_to_accept_user_rpc + } + #[tracing::instrument(skip_all, name = "ingress", fields( handler = "user", @@ -156,6 +162,9 @@ impl OrderflowIngress { headers: HeaderMap, body: axum::body::Bytes, ) -> JsonRpcResponse { + if !ingress.clickhouse_backup_disk_size_is_ok() { + return JsonRpcResponse::error(Value::Null, JsonRpcError::DiskFull); + } let received_at = UtcInstant::now(); let body = match maybe_decompress(ingress.gzip_enabled, &headers, body) { @@ -289,6 +298,13 @@ impl OrderflowIngress { /// returns 200 if the local builder is not configured. #[tracing::instrument(skip_all, name = "ingress_readyz")] pub async fn ready_handler(State(ingress): State>) -> Response { + if !ingress.clickhouse_backup_disk_size_is_ok() { + return Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body(Body::from("clickhouse backup too big")) + .unwrap(); + } + if let Some(ref url) = ingress.builder_ready_endpoint { let client = reqwest::Client::builder() .timeout(Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECS)) @@ -300,7 +316,7 @@ impl OrderflowIngress { tracing::error!(%url, "error sending readyz request"); return Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body(Body::from("not ready")) + .body(Body::from("builder not answering readyz request")) .unwrap(); }; @@ -311,7 +327,7 @@ impl OrderflowIngress { tracing::error!(%url, status = %response.status(), "local builder is not ready"); return Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body(Body::from("not ready")) + .body(Body::from("builder not ready")) .unwrap(); } } diff --git a/src/jsonrpc.rs b/src/jsonrpc.rs index deb61eb9..ba327e6b 100644 --- a/src/jsonrpc.rs +++ b/src/jsonrpc.rs @@ -177,6 +177,8 @@ pub enum JsonRpcError { RateLimited, #[error("Internal error")] Internal, + #[error("Disk full")] + DiskFull, #[error("{0}")] Unknown(String), } @@ -192,6 +194,7 @@ impl FromStr for JsonRpcError { "Invalid params" => Self::InvalidParams, "Rate limited" => Self::RateLimited, "Internal error" => Self::Internal, + "Disk full" => Self::DiskFull, s => { if s.starts_with("Method not found: ") { Self::MethodNotFound( @@ -239,9 +242,11 @@ impl JsonRpcError { Self::InvalidRequest => -32600, Self::MethodNotFound(_) => -32601, Self::InvalidParams => -32602, - Self::RateLimited | Self::Internal | Self::Unknown(_) | Self::InvalidSignature => { - -32603 - } + Self::RateLimited | + Self::Internal | + Self::Unknown(_) | + Self::InvalidSignature | + Self::DiskFull => -32603, } } @@ -254,7 +259,7 @@ impl JsonRpcError { Self::InvalidSignature => StatusCode::BAD_REQUEST, Self::MethodNotFound(_) => StatusCode::NOT_FOUND, Self::RateLimited => StatusCode::TOO_MANY_REQUESTS, - Self::Internal | Self::Unknown(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Internal | Self::Unknown(_) | Self::DiskFull => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/src/lib.rs b/src/lib.rs index 89ed1244..8f911d11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,9 @@ pub async fn run( // Set build info metric metrics::BUILD_INFO_METRICS.info(env!("CARGO_PKG_VERSION"), env!("GIT_HASH")).set(1); + metrics::CLICKHOUSE_METRICS + .disk_max_size_to_accept_user_rpc_bytes() + .set(args.disk_max_size_to_accept_user_rpc_mb.saturating_mul(1024 * 1024)); } let user_listener = TcpListener::bind(&args.user_listen_addr).await?; @@ -259,6 +262,7 @@ pub async fn run_with_listeners( local_builder_url: builder_url, builder_ready_endpoint, indexer_handle, + disk_max_size_to_accept_user_rpc: args.disk_max_size_to_accept_user_rpc_mb * 1024 * 1024, user_metrics: IngressMetrics::builder().with_label("handler", "user").build(), system_metrics: IngressMetrics::builder().with_label("handler", "system").build(), }); diff --git a/src/metrics.rs b/src/metrics.rs index 13b0aaaf..15e9cede 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -189,6 +189,9 @@ pub struct ClickhouseMetrics { /// Errors encountered during Clickhouse disk backup. #[metric(labels = ["order", "error"])] backup_disk_errors: Counter, + /// Configured max disk size (bytes) above which user RPC is rejected. + #[metric(rename = "disk_max_size_to_accept_user_rpc_bytes")] + disk_max_size_to_accept_user_rpc_bytes: Gauge, } #[metrics(scope = "indexer_parquet")] diff --git a/src/utils.rs b/src/utils.rs index 4ed7cc5a..aeb6d852 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -142,20 +142,26 @@ pub mod testutils { impl Random for TxEip1559 { fn random(rng: &mut R) -> Self { - let max_fee_per_gas = rng.random(); - let max_priority_fee_per_gas = rng.random_range(0..max_fee_per_gas); + let input_len = rng.random_range(0..=1024); + create_tx_eip1559_with_input_size(rng, input_len) + } + } - Self { - chain_id: 1, - nonce: 0, - gas_limit: 100_000, - max_fee_per_gas, - max_priority_fee_per_gas, - to: TxKind::Call(Address::random_with(rng)), - value: U256::random_with(rng), - access_list: Default::default(), - input: Bytes::random(rng), - } + pub fn create_tx_eip1559_with_input_size(rng: &mut R, data_size: usize) -> TxEip1559 { + let max_fee_per_gas = rng.random(); + let max_priority_fee_per_gas = rng.random_range(0..max_fee_per_gas); + let mut bytes = vec![0u8; data_size]; + rng.fill_bytes(&mut bytes); + TxEip1559 { + chain_id: 1, + nonce: 0, + gas_limit: 100_000, + max_fee_per_gas, + max_priority_fee_per_gas, + to: TxKind::Call(Address::random_with(rng)), + value: U256::random_with(rng), + access_list: Default::default(), + input: bytes.into(), } } @@ -246,42 +252,56 @@ pub mod testutils { } } + /// Create a random [`RawBundle`] with a fixed number of EIP-1559 transactions. + /// When `input_size` is `Some(n)`, each transaction's input has exactly `n` bytes (via + /// [`create_tx_eip1559_with_input_size`]). When `None`, each transaction uses random input + /// size (same as [`TxEip1559::random`]). + pub fn random_raw_bundle_with_tx_count_and_input_size( + rng: &mut R, + tx_count: usize, + input_size: Option, + ) -> RawBundle { + let txs = (0..tx_count) + .map(|_| { + let signer = PrivateKeySigner::random(); + let tx = EthereumTypedTransaction::Eip1559(match input_size { + Some(n) => create_tx_eip1559_with_input_size(rng, n), + None => TxEip1559::random(rng), + }); + let sighash = tx.signature_hash(); + let signature = signer.sign_hash_sync(&sighash).unwrap(); + TxEnvelope::new_unhashed(tx, signature).encoded_2718().into() + }) + .collect(); + + RawBundle { + txs, + metadata: RawBundleMetadata { + reverting_tx_hashes: vec![], + dropping_tx_hashes: vec![], + refund_tx_hashes: None, + signing_address: None, + version: Some("v2".to_string()), + block_number: None, + replacement_uuid: None, + refund_identity: None, + uuid: None, + min_timestamp: None, + max_timestamp: None, + replacement_nonce: Some(rng.random()), + refund_percent: Some(rng.random_range(0..100)), + refund_recipient: Some(Address::random_with(rng)), + delayed_refund: None, + bundle_hash: None, + }, + } + } + impl Random for RawBundle { /// Generate a random bundle with transactions of type Eip1559. fn random(rng: &mut R) -> Self { let txs_len = rng.random_range(1..=10); - // We only generate Eip1559 here. - let txs = (0..txs_len) - .map(|_| { - let signer = PrivateKeySigner::random(); - let tx = EthereumTypedTransaction::Eip1559(TxEip1559::random(rng)); - let sighash = tx.signature_hash(); - let signature = signer.sign_hash_sync(&sighash).unwrap(); - TxEnvelope::new_unhashed(tx, signature).encoded_2718().into() - }) - .collect(); - - Self { - txs, - metadata: RawBundleMetadata { - reverting_tx_hashes: vec![], - dropping_tx_hashes: vec![], - refund_tx_hashes: None, - signing_address: None, - version: Some("v2".to_string()), - block_number: None, - replacement_uuid: None, - refund_identity: None, - uuid: None, - min_timestamp: None, - max_timestamp: None, - replacement_nonce: Some(rng.random()), - refund_percent: Some(rng.random_range(0..100)), - refund_recipient: Some(Address::random_with(rng)), - delayed_refund: None, - bundle_hash: None, - }, - } + random_raw_bundle_with_tx_count_and_input_size(rng, txs_len, None) } } }