Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ revm-primitives = { version = "21.0.2", default-features = false }
revm-interpreter = { version = "29.0.1", default-features = false }

# rbuilder
rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241" }
rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "95323c0c6b1ac742a5716aba1942f3e06b8b5241", features = [
rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2" }
rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2", features = [
"test-utils"
] }

Expand All @@ -34,6 +34,7 @@ tokio = { version = "1", default-features = false, features = [
"macros"
] }
futures = { version = "0.3" }
tokio-util = "0.7.12"

# allocator
tikv-jemallocator = { version = "0.6", optional = true }
Expand Down
11 changes: 9 additions & 2 deletions src/builderhub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ impl Peer {
///
/// Reference: <https://github.com/flashbots/buildernet-orderflow-proxy/blob/main/proxy/confighub.go>
pub async fn system_api(&self) -> io::Result<Option<SocketAddr>> {
// NOTE: Needed for integration tests where port is not known upfront. This is also more
// flexible in the case some instances won't run with that default port.
// If `ip` is a valid loopback socket address, use it directly. This is the case in
// integration tests.
if let Ok(addr) = self.ip.parse::<SocketAddr>() &&
addr.ip().is_loopback()
{
return Ok(Some(addr));
}

// Parse the provided port, set to DEFAULT_SYSTEM_PORT if not provided.
let port =
self.ip.split(':').nth(1).and_then(|p| p.parse().ok()).unwrap_or(DEFAULT_SYSTEM_PORT);

Expand Down
18 changes: 18 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ pub struct ClickhouseArgs {
default_value_t = MAX_DISK_BACKUP_SIZE_BYTES
)]
pub backup_disk_max_size_bytes: u64,

/// Send timeout in milliseconds for ClickHouse HTTP requests. Defaults to 2_000.
#[arg(
long = "indexer.clickhouse.send-timeout-ms",
env = "CLICKHOUSE_SEND_TIMEOUT_MS",
id = "CLICKHOUSE_SEND_TIMEOUT_MS",
default_value_t = 2_000
)]
pub send_timeout_ms: u64,

/// End-to-end timeout in milliseconds for ClickHouse HTTP requests. Defaults to 3_000.
#[arg(
long = "indexer.clickhouse.end-timeout-ms",
env = "CLICKHOUSE_END_TIMEOUT_MS",
id = "CLICKHOUSE_END_TIMEOUT_MS",
default_value_t = 3_000
)]
pub end_timeout_ms: u64,
}

/// Arguments required to setup file-based parquet indexing.
Expand Down
46 changes: 33 additions & 13 deletions src/indexer/click/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use rbuilder_utils::{
},
tasks::TaskExecutor,
};
use tokio::task::JoinHandle;

mod models;

Expand Down Expand Up @@ -105,7 +106,7 @@ impl ClickhouseIndexer {
receivers: OrderReceivers,
task_executor: TaskExecutor,
validation: bool,
) {
) -> Vec<JoinHandle<()>> {
let client = config_from_clickhouse_args(&args, validation).into();
tracing::info!("Running with clickhouse indexer");

Expand All @@ -117,27 +118,40 @@ impl ClickhouseIndexer {
)
.expect("could not create disk backup");

spawn_clickhouse_inserter_and_backup::<SystemBundle, BundleRow, MetricsWrapper>(
&client,
receivers.bundle_rx,
&task_executor,
args.bundles_table_name,
builder_name.clone(),
disk_backup.clone(),
args.backup_memory_max_size_bytes,
TARGET_INDEXER,
);
let send_timeout = Duration::from_millis(args.send_timeout_ms);
let end_timeout = Duration::from_millis(args.end_timeout_ms);

spawn_clickhouse_inserter_and_backup::<BundleReceipt, BundleReceiptRow, MetricsWrapper>(
let bundle_inserter_join_handle =
spawn_clickhouse_inserter_and_backup::<SystemBundle, BundleRow, MetricsWrapper>(
&client,
receivers.bundle_rx,
&task_executor,
args.bundles_table_name,
builder_name.clone(),
disk_backup.clone(),
args.backup_memory_max_size_bytes,
send_timeout,
end_timeout,
TARGET_INDEXER,
);

let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::<
BundleReceipt,
BundleReceiptRow,
MetricsWrapper,
>(
&client,
receivers.bundle_receipt_rx,
&task_executor,
args.bundle_receipts_table_name,
builder_name.clone(),
disk_backup.clone(),
args.backup_memory_max_size_bytes,
send_timeout,
end_timeout,
TARGET_INDEXER,
);
vec![bundle_inserter_join_handle, bundle_receipt_inserter_join_handle]
}
}

Expand All @@ -155,6 +169,7 @@ pub(crate) mod tests {
},
tests::{bundle_receipt_example, system_bundle_example},
},
utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks},
};
use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult};
use rbuilder_utils::{
Expand Down Expand Up @@ -240,6 +255,7 @@ pub(crate) mod tests {
}
}

///Only for testing purposes.
impl From<ClickhouseClientConfig> for ClickhouseArgs {
fn from(config: ClickhouseClientConfig) -> Self {
Self {
Expand All @@ -252,6 +268,8 @@ pub(crate) mod tests {
backup_memory_max_size_bytes: 1024 * 1024 * 10, // 10MiB
backup_disk_database_path: default_disk_backup_database_path(),
backup_disk_max_size_bytes: 1024 * 1024 * 100, // 100MiB
send_timeout_ms: 2_000,
end_timeout_ms: 3_000,
}
}
}
Expand Down Expand Up @@ -368,7 +386,7 @@ pub(crate) mod tests {
let (senders, receivers) = OrderSenders::new();

let validation = false;
ClickhouseIndexer::run(
let indexer_join_handles = ClickhouseIndexer::run(
config.into(),
builder_name.clone(),
receivers,
Expand All @@ -380,6 +398,8 @@ pub(crate) mod tests {
let system_bundle = system_bundle_example();
let system_bundle_row = (system_bundle.clone(), builder_name.clone()).into();
senders.bundle_tx.send(system_bundle.clone()).await.unwrap();
drop(senders);
wait_for_critical_tasks(indexer_join_handles, SHUTDOWN_TIMEOUT).await;

// Wait a bit for bundle to be actually processed before shutting down.
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/click/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl From<(SystemBundle, String)> for BundleRow {
.unwrap_or_default(),
// Decoded bundles always have a uuid.
internal_uuid: decoded.uuid,
replacement_uuid: decoded.replacement_data.clone().map(|r| r.key.id),
replacement_uuid: decoded.replacement_data.map(|r| r.key.id),
replacement_nonce: bundle.raw_bundle.metadata.replacement_nonce,
signer_address: Some(bundle.metadata.signer),
builder_name,
Expand Down
18 changes: 12 additions & 6 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use std::fmt::Debug;

use rbuilder_utils::tasks::TaskExecutor;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::error;

use crate::{
cli::IndexerArgs,
Expand Down Expand Up @@ -77,33 +78,38 @@ impl OrderSenders {
pub struct Indexer;

impl Indexer {
/// Returns the IndexerHandle to send data and a vector of join handles to the indexer tasks so
/// we can wait for them to finish on shutdown.
pub fn run(
args: IndexerArgs,
builder_name: String,
task_executor: TaskExecutor,
) -> IndexerHandle {
) -> (IndexerHandle, Vec<JoinHandle<()>>) {
let (senders, receivers) = OrderSenders::new();

match (args.clickhouse, args.parquet) {
(None, None) => {
MockIndexer.run(receivers, task_executor);
IndexerHandle::new(senders)
(IndexerHandle::new(senders), vec![])
}
(Some(clickhouse), None) => {
let validation = false;
ClickhouseIndexer::run(
let join_handles = ClickhouseIndexer::run(
clickhouse,
builder_name,
receivers,
task_executor,
validation,
);
IndexerHandle::new(senders)
(IndexerHandle::new(senders), join_handles)
}
(None, Some(parquet)) => {
ParquetIndexer::run(parquet, builder_name, receivers, task_executor)
.expect("failed to start parquet indexer");
IndexerHandle::new(senders)
error!(
"Parquet indexer does not support proper shutdown, returning empty join handles"
);
(IndexerHandle::new(senders), vec![])
}
(Some(_), Some(_)) => {
unreachable!("Cannot specify both clickhouse and parquet indexer");
Expand Down
9 changes: 7 additions & 2 deletions src/ingress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use std::{
};
use time::UtcDateTime;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::*;

pub mod error;
Expand Down Expand Up @@ -132,7 +133,7 @@ impl OrderflowIngress {

/// Perform maintenance task for internal orderflow ingress state.
#[tracing::instrument(skip_all, name = "ingress_maintanance")]
pub async fn maintenance(&self) {
pub fn maintenance(&self) {
let len_before = self.entities.len();
tracing::info!(entries = len_before, "starting state maintenance");

Expand Down Expand Up @@ -790,7 +791,7 @@ impl IngressSocket {
Self { reply_socket: socket, ingress_state, task_executor, certs_rx, acceptor_builder }
}

pub async fn listen(mut self) {
pub async fn listen(mut self, cancellation_token: CancellationToken) {
loop {
tokio::select! {
Some(certs) = self.certs_rx.recv() => {
Expand Down Expand Up @@ -826,6 +827,10 @@ impl IngressSocket {
}
});
}
_ = cancellation_token.cancelled() => {
info!("Cancellation token cancelled, stopping ingress socket listener");
break;
}

}
}
Expand Down
Loading