From bc7c36ac5e933f5cd19edb4a133a05deec11c852 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 24 Feb 2026 13:38:31 -0500 Subject: [PATCH 1/4] use cancel token instead of broadcast channel --- crates/rproxy/src/server.rs | 110 +++++------- .../src/server/proxy/circuit_breaker.rs | 37 +---- crates/rproxy/src/server/proxy/http/proxy.rs | 49 ++---- crates/rproxy/src/server/proxy/ws/proxy.rs | 156 +++++------------- 4 files changed, 108 insertions(+), 244 deletions(-) diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 59108cc..f00a7d7 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -8,7 +8,6 @@ use std::{error::Error, sync::Arc}; use tokio::{ signal::unix::{SignalKind, signal}, - sync::broadcast, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -34,16 +33,16 @@ pub struct Server {} impl Server { pub async fn run(config: Config) -> Result<(), Box> { - let canceller = Server::wait_for_shutdown_signal(); - let resetter = Server::wait_for_reset_signal(canceller.clone()); + let shutdown_signal = Server::wait_for_shutdown_signal(); + let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); - Self::_run(config, canceller, resetter).await + Self::_run(config, shutdown_signal, reset_signal).await } async fn _run( config: Config, - canceller: CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Result<(), Box> { // try to set system limits match rlimit::getrlimit(rlimit::Resource::NOFILE) { @@ -70,7 +69,7 @@ impl Server { // spawn metrics service let metrics = Arc::new(Metrics::new(config.metrics.clone())); { - let canceller = canceller.clone(); + let canceller = shutdown_signal.clone(); let metrics = metrics.clone(); tokio::spawn(async move { @@ -87,8 +86,8 @@ impl Server { // spawn circuit-breaker if !config.circuit_breaker.url.is_empty() { - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); let _ = std::thread::spawn(move || { let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { @@ -101,12 +100,13 @@ impl Server { let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone()); - tokio::task::LocalSet::new() - .block_on(&rt, async move { circuit_breaker.run(canceller, resetter).await }) + tokio::task::LocalSet::new().block_on(&rt, async move { + circuit_breaker.run(shutdown_signal, reset_signal).await + }) }); } - while !canceller.is_cancelled() { + while !shutdown_signal.is_cancelled() { if config.tls.enabled() { let metrics = metrics.clone(); let (not_before, not_after) = @@ -122,8 +122,8 @@ impl Server { let tls = config.tls.clone(); let config = config.authrpc.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let canceller = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyHttp::::run( @@ -131,7 +131,7 @@ impl Server { tls, metrics, canceller.clone(), - resetter, + reset_signal, ) .await .inspect_err(|err| { @@ -150,8 +150,8 @@ impl Server { let tls = config.tls.clone(); let config = config.rpc.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let canceller = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyHttp::::run( @@ -159,7 +159,7 @@ impl Server { tls, metrics, canceller.clone(), - resetter, + reset_signal, ) .await .inspect_err(|err| { @@ -178,8 +178,8 @@ impl Server { let tls = config.tls.clone(); let config = config.flashblocks.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let canceller = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyWs::::run( @@ -187,7 +187,7 @@ impl Server { tls, metrics, canceller.clone(), - resetter, + reset_signal, ) .await .inspect_err(|err| { @@ -246,36 +246,29 @@ impl Server { canceller } - fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> { - let (resetter, _) = broadcast::channel::<()>(1); + fn wait_for_reset_signal(shutdown_signal: CancellationToken) -> CancellationToken { + let reset_signal = shutdown_signal.child_token(); - { - let resetter = resetter.clone(); - - tokio::spawn(async move { + tokio::spawn({ + let reset_signal = reset_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); + async move { let mut hangup = signal(SignalKind::hangup()).expect("failed to install sighup handler"); loop { tokio::select! { + _ = shutdown_signal.cancelled() => break, _ = hangup.recv() => { info!("Hangup signal received, resetting..."); - - if let Err(err) = resetter.send(()) { - error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy..."); - canceller.cancel(); - } + reset_signal.cancel(); } - - _ = canceller.cancelled() => { - return - }, } } - }); - } + } + }); - resetter + reset_signal } } @@ -291,7 +284,7 @@ mod tests { RpcModule, server::{ServerBuilder, ServerHandle}, }; - use tracing::{debug, info}; + use tracing::debug; use super::*; use crate::config::Config; @@ -340,19 +333,19 @@ mod tests { let proxy_addr_authrpc = cfg.clone().authrpc.listen_address; let proxy_addr_rpc = cfg.clone().rpc.listen_address; - let canceller = tokio_util::sync::CancellationToken::new(); - let resetter = Server::wait_for_reset_signal(canceller.clone()); + let shutdown_signal = CancellationToken::new(); + let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); let server = { - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); - actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await }) + actix_rt::spawn(async move { Server::_run(cfg, shutdown_signal, reset_signal).await }) }; actix_rt::time::sleep(std::time::Duration::from_millis(100)).await; { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); let client = Client::builder().timeout(Duration::from_millis(10)).finish(); actix_rt::spawn(async move { @@ -379,7 +372,7 @@ mod tests { } } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } @@ -388,7 +381,7 @@ mod tests { } { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); let client = Client::builder().timeout(Duration::from_millis(10)).finish(); actix_rt::spawn(async move { @@ -415,7 +408,7 @@ mod tests { } } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } @@ -423,25 +416,12 @@ mod tests { }); } - for i in 0..10 { + for _ in 0..10 { actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await; - - match resetter.send(()) { - Err(err) => { - debug!(iteration = i, error = ?err, "Failed to send a reset"); - } - - Ok(proxies_count) => { - info!(iteration = i, proxies_count = proxies_count, "Sent a reset"); - assert_eq!( - proxies_count, 2, - "sent reset wrong count of proxies: {proxies_count} != 2" - ); - } - } + reset_signal.cancel(); } - canceller.cancel(); + shutdown_signal.cancel(); tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok(); } diff --git a/crates/rproxy/src/server/proxy/circuit_breaker.rs b/crates/rproxy/src/server/proxy/circuit_breaker.rs index d0959f7..df7b801 100644 --- a/crates/rproxy/src/server/proxy/circuit_breaker.rs +++ b/crates/rproxy/src/server/proxy/circuit_breaker.rs @@ -6,8 +6,8 @@ use awc::{ http::{self, Method, header}, }; use parking_lot::Mutex; -use tokio::sync::broadcast; -use tracing::{debug, error, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; use crate::server::proxy::config::ConfigCircuitBreaker; @@ -77,26 +77,21 @@ impl CircuitBreaker { pub(crate) async fn run( self, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) { - let canceller = canceller.clone(); - let resetter = resetter.clone(); - let mut ticker = tokio::time::interval(self.config.poll_interval); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); // spawning locally b/c actix client is thread-local by design if let Err(err) = tokio::task::spawn_local(async move { loop { - let resetter = resetter.clone(); - tokio::select! { _ = ticker.tick() => { - self.poll(resetter).await; + self.poll(reset_signal.clone()).await; } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } @@ -112,7 +107,7 @@ impl CircuitBreaker { } } - async fn poll(&self, resetter: broadcast::Sender<()>) { + async fn poll(&self, reset_signal: CancellationToken) { let req = self .client .request(Method::GET, self.config.url.clone()) @@ -162,26 +157,12 @@ impl CircuitBreaker { this.curr_status = Status::Unhealthy; warn!(service = Self::name(), "Backend became unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } + reset_signal.cancel(); } (Status::Unhealthy, Status::Unhealthy) => { warn!(service = Self::name(), "Backend is still unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } + reset_signal.cancel(); } (Status::Unhealthy, Status::Healthy) => { diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..42dd4e1 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -42,7 +42,7 @@ use futures_core::Stream; use pin_project::pin_project; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use url::Url; use uuid::Uuid; @@ -143,7 +143,7 @@ where tls: ConfigTls, metrics: Arc, canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + reset_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = config.listen_address(); @@ -272,43 +272,20 @@ where let server = server.run(); let handler = server.handle(); - let mut resetter = resetter.subscribe(); tokio::spawn(async move { - loop { - match resetter.recv().await { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } + reset_signal.cancelled().await; - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - "Resetter channel is closed, stopping http-proxy..." - ); - } + info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); - } - } - - if let Err(err) = - tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await - { - error!( - proxy = P::name(), - error = ?err, - "Graceful shutdown of http-proxy failed after 1 minute, forcefully shutting down..." - ); - std::process::exit(1); - } - - break; + if let Err(err) = + tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await + { + error!( + proxy = P::name(), + error = ?err, + "Graceful shutdown of http-proxy failed after 1 minute, forcefully shutting down..." + ); + std::process::exit(1); } }); diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 53be231..cbe9b0d 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -29,8 +29,9 @@ use futures::{ use prometheus_client::metrics::gauge::Atomic; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::{net::TcpStream, sync::broadcast}; +use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -72,8 +73,8 @@ where shared: ProxyWsSharedState, postprocessor: actix::Addr>, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, backend: ProxyWsBackendEndpoint, } @@ -85,8 +86,8 @@ where { fn new( shared: ProxyWsSharedState, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Self { let id = Uuid::now_v7(); @@ -102,7 +103,7 @@ where } .start(); - Self { id, shared, postprocessor, canceller, resetter, backend } + Self { id, shared, postprocessor, shutdown_signal, reset_signal, backend } } fn config(&self) -> &C { @@ -113,8 +114,8 @@ where config: C, tls: ConfigTls, metrics: Arc, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = config.listen_address(); @@ -135,8 +136,8 @@ where let shared = ProxyWsSharedState::::new(config.clone(), &metrics); let client_connections_count = shared.client_connections_count.clone(); - let worker_canceller = canceller.clone(); - let worker_resetter = resetter.clone(); + let worker_canceller = shutdown_signal.clone(); + let worker_resetter = reset_signal.clone(); let shutdown_timeout_sec = shared.config().shutdown_timeout_sec(); info!( @@ -165,7 +166,7 @@ where client_connections_count, config.keepalive_interval(), )) - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .shutdown_timeout(shutdown_timeout_sec) .workers(workers_count); @@ -192,46 +193,20 @@ where .run(); let handler = proxy.handle(); - let mut resetter = resetter.subscribe(); tokio::spawn(async move { - loop { - match resetter.recv().await { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } + reset_signal.cancelled().await; - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - "Resetter channel is closed, stopping websocket-proxy..." - ); - } - - Ok(()) => { - info!( - proxy = P::name(), - "Reset signal received, stopping websocket-proxy..." - ); - } - } + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - if let Err(err) = - tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await - { - error!( - proxy = P::name(), - error = ?err, - "Graceful shutdown of websocket-proxy failed after 1 minute, forcefully shutting down..." - ); - std::process::exit(1); - } - - break; + if let Err(err) = + tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await + { + error!( + proxy = P::name(), + error = ?err, + "Graceful shutdown of websocket-proxy failed after 1 minute, forcefully shutting down..." + ); + std::process::exit(1); } }); @@ -405,8 +380,8 @@ where worker_id: this.id, shared: this.shared.clone(), postprocessor: this.postprocessor.clone(), - canceller: this.canceller.clone(), - resetter: this.resetter.clone(), + shutdown_signal: this.shutdown_signal.clone(), + reset_signal: this.reset_signal.clone(), clnt_tx, clnt_rx, bknd_tx: Some(bknd_tx), @@ -694,8 +669,8 @@ where shared: ProxyWsSharedState, postprocessor: actix::Addr>, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, clnt_tx: Session, clnt_rx: MessageStream, @@ -728,45 +703,21 @@ where let mut pumping: Result<(), &str> = Ok(()); - let mut resetter = self.resetter.subscribe(); - - while pumping.is_ok() && !self.canceller.is_cancelled() && !resetter.is_closed() { + while pumping.is_ok() && + !self.shutdown_signal.is_cancelled() && + !self.reset_signal.is_cancelled() + { #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { tokio::select! { - _ = self.canceller.cancelled() => { + _ = self.shutdown_signal.cancelled() => { break; } - reset = resetter.recv() => { - match reset { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } - - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Resetter channel is closed, stopping websocket-proxy..." - ); - break; - } - - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - break; - } - } - } + _ = self.reset_signal.cancelled() => { + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); + break; + }, // client => backend clnt_msg = self.clnt_rx.next() => { @@ -777,39 +728,14 @@ where } tokio::select! { - _ = self.canceller.cancelled() => { + _ = self.shutdown_signal.cancelled() => { break; } - reset = resetter.recv() => { - match reset { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } - - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Resetter channel is closed, stopping websocket-proxy..." - ); - break; - } - - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - break; - } - } - } + _ = self.reset_signal.cancelled() => { + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); + break; + }, // ping both sides _ = heartbeat.tick() => { From 4dc2f289eb3d2f290f0b6d20bff0a8e24397c12f Mon Sep 17 00:00:00 2001 From: 0x416e746f6e Date: Fri, 27 Feb 2026 18:13:50 +0100 Subject: [PATCH 2/4] chore: rename variables for consistency --- crates/rproxy/src/server.rs | 30 ++++++++++---------- crates/rproxy/src/server/metrics.rs | 5 ++-- crates/rproxy/src/server/proxy/http/proxy.rs | 4 +-- crates/rproxy/src/server/proxy/ws/proxy.rs | 8 +++--- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index f00a7d7..203574f 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -69,11 +69,11 @@ impl Server { // spawn metrics service let metrics = Arc::new(Metrics::new(config.metrics.clone())); { - let canceller = shutdown_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); let metrics = metrics.clone(); tokio::spawn(async move { - metrics.run(canceller).await.inspect_err(|err| { + metrics.run(shutdown_signal).await.inspect_err(|err| { error!( service = Metrics::name(), error = ?err, @@ -122,7 +122,7 @@ impl Server { let tls = config.tls.clone(); let config = config.authrpc.clone(); let metrics = metrics.clone(); - let canceller = shutdown_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { @@ -130,7 +130,7 @@ impl Server { config, tls, metrics, - canceller.clone(), + shutdown_signal.clone(), reset_signal, ) .await @@ -140,7 +140,7 @@ impl Server { error = ?err, "Failed to start http-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -150,7 +150,7 @@ impl Server { let tls = config.tls.clone(); let config = config.rpc.clone(); let metrics = metrics.clone(); - let canceller = shutdown_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { @@ -158,7 +158,7 @@ impl Server { config, tls, metrics, - canceller.clone(), + shutdown_signal.clone(), reset_signal, ) .await @@ -168,7 +168,7 @@ impl Server { error = ?err, "Failed to start http-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -178,7 +178,7 @@ impl Server { let tls = config.tls.clone(); let config = config.flashblocks.clone(); let metrics = metrics.clone(); - let canceller = shutdown_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { @@ -186,7 +186,7 @@ impl Server { config, tls, metrics, - canceller.clone(), + shutdown_signal.clone(), reset_signal, ) .await @@ -196,7 +196,7 @@ impl Server { error = ?err, "Failed to start websocket-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -212,10 +212,10 @@ impl Server { } fn wait_for_shutdown_signal() -> CancellationToken { - let canceller = tokio_util::sync::CancellationToken::new(); + let shutdown_signal = CancellationToken::new(); { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); tokio::spawn(async move { let sigint = async { @@ -239,11 +239,11 @@ impl Server { info!("Shutdown signal received, stopping..."); - canceller.cancel(); + shutdown_signal.cancel(); }); } - canceller + shutdown_signal } fn wait_for_reset_signal(shutdown_signal: CancellationToken) -> CancellationToken { diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index 638846d..61cd78b 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -19,6 +19,7 @@ use prometheus_client::{ registry::{Registry, Unit}, }; use socket2::{SockAddr, Socket, TcpKeepalive}; +use tokio_util::sync::CancellationToken; use tracing::{error, info}; use crate::server::{config::ConfigMetrics, metrics::candlestick::Candlestick}; @@ -281,7 +282,7 @@ impl Metrics { pub(crate) async fn run( self: Arc, - canceller: tokio_util::sync::CancellationToken, + shutdown_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = self.config.listen_address(); @@ -305,7 +306,7 @@ impl Metrics { .default_service(web::route().to(Self::receive)) }) .workers(1) - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .listen(listener) { Ok(metrics) => metrics, diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 42dd4e1..28f1e93 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -142,7 +142,7 @@ where config: C, tls: ConfigTls, metrics: Arc, - canceller: tokio_util::sync::CancellationToken, + shutdown_signal: CancellationToken, reset_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = config.listen_address(); @@ -187,7 +187,7 @@ where ); let server = actix_server::Server::build() - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .shutdown_timeout(shared.config().shutdown_timeout_sec()) .workers(workers_count); diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index cbe9b0d..5d76358 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -136,8 +136,8 @@ where let shared = ProxyWsSharedState::::new(config.clone(), &metrics); let client_connections_count = shared.client_connections_count.clone(); - let worker_canceller = shutdown_signal.clone(); - let worker_resetter = reset_signal.clone(); + let worker_shutdown_signal = shutdown_signal.clone(); + let worker_reset_signal = reset_signal.clone(); let shutdown_timeout_sec = shared.config().shutdown_timeout_sec(); info!( @@ -150,8 +150,8 @@ where let server = HttpServer::new(move || { let this = web::Data::new(Self::new( shared.clone(), - worker_canceller.clone(), - worker_resetter.clone(), + worker_shutdown_signal.clone(), + worker_reset_signal.clone(), )); App::new() From d00d269a801f4f1b4aef7b22bf51bf166d9cd599 Mon Sep 17 00:00:00 2001 From: 0x416e746f6e Date: Fri, 27 Feb 2026 18:29:37 +0100 Subject: [PATCH 3/4] chore: test connectivity after reset --- crates/rproxy/src/server.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 203574f..a86f97d 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -347,6 +347,7 @@ mod tests { { let shutdown_signal = shutdown_signal.clone(); let client = Client::builder().timeout(Duration::from_millis(10)).finish(); + let proxy_addr_authrpc = proxy_addr_authrpc.clone(); actix_rt::spawn(async move { loop { @@ -416,9 +417,43 @@ mod tests { }); } + let client = Client::builder().timeout(Duration::from_millis(10)).finish(); + for _ in 0..10 { - actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await; reset_signal.cancel(); + + actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await; + + let req = client + .post(format!("http://{proxy_addr_authrpc}")) + .insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON)) + .send_body(r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#); + + tokio::select! { + res = req => { + match res { + Ok(mut res) => { + match res.body().await { + Err(err) => { + panic!("Failed to send a request: {err}"); + } + Ok(body) => { + let body = String::from_utf8_lossy(&body).to_string(); + info!("Sent a request and got a response: {body}"); + } + } + } + + Err(err) => { + panic!("Failed to send a request: {err}"); + } + } + } + + _ = shutdown_signal.cancelled() => { + break + } + } } shutdown_signal.cancel(); From 0ea634790cdfeae9bd7453f9075c73ecacf683a3 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:58:12 -0500 Subject: [PATCH 4/4] respawn circuit breaker after it cancels everything --- crates/rproxy/src/server.rs | 52 ++++++++++--------- .../src/server/proxy/circuit_breaker.rs | 4 ++ 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index a86f97d..1945bef 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -34,15 +34,13 @@ pub struct Server {} impl Server { pub async fn run(config: Config) -> Result<(), Box> { let shutdown_signal = Server::wait_for_shutdown_signal(); - let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); - Self::_run(config, shutdown_signal, reset_signal).await + Self::_run(config, shutdown_signal).await } async fn _run( config: Config, shutdown_signal: CancellationToken, - reset_signal: CancellationToken, ) -> Result<(), Box> { // try to set system limits match rlimit::getrlimit(rlimit::Resource::NOFILE) { @@ -84,29 +82,36 @@ impl Server { }); } - // spawn circuit-breaker - if !config.circuit_breaker.url.is_empty() { - let shutdown_signal = shutdown_signal.clone(); - let reset_signal = reset_signal.clone(); + while !shutdown_signal.is_cancelled() { + let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); - let _ = std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { - Ok(rt) => rt, - Err(err) => { - error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); - std::process::exit(-1); - } - }; + // spawn circuit-breaker + if !config.circuit_breaker.url.is_empty() { + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); - let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone()); + let circuit_breaker_config = config.circuit_breaker.clone(); - tokio::task::LocalSet::new().block_on(&rt, async move { - circuit_breaker.run(shutdown_signal, reset_signal).await - }) - }); - } + let _ = std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => { + error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); + std::process::exit(-1); + } + }; + + let circuit_breaker = CircuitBreaker::new(circuit_breaker_config); + + tokio::task::LocalSet::new().block_on(&rt, async move { + circuit_breaker.run(shutdown_signal, reset_signal).await + }) + }); + } - while !shutdown_signal.is_cancelled() { if config.tls.enabled() { let metrics = metrics.clone(); let (not_before, not_after) = @@ -338,9 +343,8 @@ mod tests { let server = { let shutdown_signal = shutdown_signal.clone(); - let reset_signal = reset_signal.clone(); - actix_rt::spawn(async move { Server::_run(cfg, shutdown_signal, reset_signal).await }) + actix_rt::spawn(async move { Server::_run(cfg, shutdown_signal).await }) }; actix_rt::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/crates/rproxy/src/server/proxy/circuit_breaker.rs b/crates/rproxy/src/server/proxy/circuit_breaker.rs index df7b801..1b377ed 100644 --- a/crates/rproxy/src/server/proxy/circuit_breaker.rs +++ b/crates/rproxy/src/server/proxy/circuit_breaker.rs @@ -94,6 +94,10 @@ impl CircuitBreaker { _ = shutdown_signal.cancelled() => { break } + + _ = reset_signal.cancelled() => { + break + } } } })