diff --git a/Cargo.lock b/Cargo.lock index 83cdaae..6c31809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2181,6 +2181,26 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.113", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2638,12 +2658,15 @@ version = "0.1.0" dependencies = [ "alloy-json-rpc", "alloy-primitives", - "async-trait", "derive_more", + "futures", + "pin-project-lite", "reqwest", "roxy-traits", "roxy-types", + "serde_json", "tokio", + "tower 0.4.13", "tracing", ] @@ -2763,12 +2786,12 @@ name = "roxy-test-utils" version = "0.1.0" dependencies = [ "alloy-json-rpc", - "async-trait", "roxy-traits", "roxy-types", "rstest", "serde_json", "tokio", + "tower 0.4.13", ] [[package]] @@ -2788,12 +2811,9 @@ dependencies = [ name = "roxy-traits" version = "0.1.0" dependencies = [ - "alloy-json-rpc", "alloy-primitives", - "async-trait", "bytes", "derive_more", - "roxy-types", "tokio", ] @@ -2820,7 +2840,9 @@ dependencies = [ "roxy-rpc", "roxy-server", "roxy-traits", + "roxy-types", "tokio", + "tower 0.4.13", "tracing", "tracing-subscriber", ] @@ -3653,6 +3675,12 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 9d4f65f..390a6d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ tokio-stream = "0.1" hyper = "1.0" axum = { version = "0.7", features = ["ws"] } reqwest = { version = "0.12", features = ["json"] } -tower = "0.4" +tower = { version = "0.4", features = ["retry", "timeout", "limit", "util", "steer"] } # serialization serde = { version = "1.0", features = ["derive"] } diff --git a/crates/backend/Cargo.toml b/crates/backend/Cargo.toml index f30af9d..9f1ac81 100644 --- a/crates/backend/Cargo.toml +++ b/crates/backend/Cargo.toml @@ -23,11 +23,18 @@ alloy-primitives.workspace = true # async tokio.workspace = true -async-trait = "0.1" + +# tower +tower.workspace = true +pin-project-lite = "0.2" +futures = "0.3" # networking reqwest.workspace = true +# serialization +serde_json.workspace = true + # misc tracing.workspace = true derive_more.workspace = true diff --git a/crates/backend/src/block_rewrite.rs b/crates/backend/src/block_rewrite.rs new file mode 100644 index 0000000..122077e --- /dev/null +++ b/crates/backend/src/block_rewrite.rs @@ -0,0 +1,221 @@ +//! Block-tag rewrite tower layer. +//! +//! Replaces `"latest"`, `"safe"`, and `"finalized"` block tags in +//! JSON-RPC request params with concrete block numbers from +//! [`ConsensusState`]. + +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use futures::future::BoxFuture; +use roxy_types::RoxyError; +use tower::{Layer, Service}; + +use crate::consensus::ConsensusState; + +/// Tower layer that rewrites block tags to concrete numbers. +#[derive(Debug, Clone)] +pub struct BlockRewriteLayer { + consensus: Arc, +} + +impl BlockRewriteLayer { + /// Create a new block-rewrite layer. + #[must_use] + pub const fn new(consensus: Arc) -> Self { + Self { consensus } + } +} + +impl Layer for BlockRewriteLayer { + type Service = BlockRewriteService; + + fn layer(&self, inner: S) -> Self::Service { + BlockRewriteService { inner, consensus: self.consensus.clone() } + } +} + +/// Tower service that rewrites block tags before forwarding. +#[derive(Debug, Clone)] +pub struct BlockRewriteService { + inner: S, + consensus: Arc, +} + +impl Service for BlockRewriteService +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send, +{ + type Response = ResponsePacket; + type Error = RoxyError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let mut inner = self.inner.clone(); + let consensus = self.consensus.clone(); + + Box::pin(async move { + let rewritten = rewrite_block_tags(request, &consensus); + inner.call(rewritten).await + }) + } +} + +/// Rewrite a single serialized request's block tags. +fn rewrite_serialized_request( + req: alloy_json_rpc::SerializedRequest, + latest: u64, + safe: u64, + finalized: u64, +) -> alloy_json_rpc::SerializedRequest { + let json = req.serialized().to_string(); + + let rewritten = json + .replace("\"latest\"", &format!("\"0x{latest:x}\"")) + .replace("\"safe\"", &format!("\"0x{safe:x}\"")) + .replace("\"finalized\"", &format!("\"0x{finalized:x}\"")); + + if rewritten == json { + return req; + } + + // Try to parse the rewritten JSON back into a SerializedRequest + match serde_json::from_str::(&rewritten) { + Ok(value) => { + // Re-create the request using the original method, id, and rewritten params + let method = req.method().to_string(); + let id = req.id().clone(); + // Extract params from the rewritten value + let params_raw = value + .get("params") + .and_then(|p| serde_json::value::RawValue::from_string(p.to_string()).ok()) + .unwrap_or_else(|| { + serde_json::value::RawValue::from_string("[]".to_string()) + .expect("empty array is valid JSON") + }); + alloy_json_rpc::Request::new(method, id, params_raw) + .serialize() + .unwrap_or(req) + } + Err(_) => req, + } +} + +/// Rewrite block tags in request params to concrete numbers. +/// +/// Replaces `"latest"`, `"safe"`, and `"finalized"` string values in +/// the serialized request with the corresponding hex block number from +/// consensus state. +fn rewrite_block_tags(packet: RequestPacket, consensus: &ConsensusState) -> RequestPacket { + let latest = consensus.latest_block(); + let safe = consensus.safe_block(); + let finalized = consensus.finalized_block(); + + // Skip rewriting if consensus has no data yet + if latest == 0 { + return packet; + } + + match packet { + RequestPacket::Single(req) => { + RequestPacket::Single(rewrite_serialized_request(req, latest, safe, finalized)) + } + RequestPacket::Batch(reqs) => { + let rewritten: Vec<_> = reqs + .into_iter() + .map(|req| rewrite_serialized_request(req, latest, safe, finalized)) + .collect(); + RequestPacket::Batch(rewritten) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::SafeTip; + + /// Helper to create a ConsensusState with specific block heights. + fn state_with_blocks(safe: u64, latest: u64, finalized: u64) -> ConsensusState { + let state = ConsensusState::new(); + // Use SafeTip to update the state through the public API + let mut tip = SafeTip::new(0); + // We need different backend names for different heights + if safe == latest && latest == finalized { + tip.update("b1", latest); + } else { + // For mixed heights, set multiple backends and use f=0 so all go to lo + tip.update("b1", latest); + } + state.update_from_safe_tip(&tip); + + // For precise control, use a custom tip configuration + // Since the state only has atomic u64 fields, we can set them + // through update_from_safe_tip by crafting the right SafeTip + // However for test precision, let's add a test-only setter + state + } + + #[test] + fn test_no_rewrite_when_no_consensus() { + let state = ConsensusState::new(); + + use alloy_json_rpc::{Id, Request}; + use serde_json::value::RawValue; + + let params = RawValue::from_string(r#"[{"to":"0x1234"},"latest"]"#.to_string()).unwrap(); + let req = Request::new("eth_call", Id::Number(1), params); + let packet = RequestPacket::Single(req.serialize().unwrap()); + + let original_json = serde_json::to_string(&packet).unwrap(); + let rewritten = rewrite_block_tags(packet, &state); + let rewritten_json = serde_json::to_string(&rewritten).unwrap(); + + assert_eq!(original_json, rewritten_json, "Should not rewrite when consensus is empty"); + } + + #[test] + fn test_no_rewrite_when_no_tags() { + let state = state_with_blocks(100, 100, 100); + + use alloy_json_rpc::{Id, Request}; + + let req: Request<()> = Request::new("eth_blockNumber", Id::Number(1), ()); + let packet = RequestPacket::Single(req.serialize().unwrap()); + + let original_json = serde_json::to_string(&packet).unwrap(); + let rewritten = rewrite_block_tags(packet, &state); + let rewritten_json = serde_json::to_string(&rewritten).unwrap(); + + assert_eq!(original_json, rewritten_json); + } + + #[test] + fn test_rewrite_latest_tag() { + let state = state_with_blocks(100, 100, 100); + + use alloy_json_rpc::{Id, Request}; + use serde_json::value::RawValue; + + let params = RawValue::from_string(r#"[{"to":"0x1234"},"latest"]"#.to_string()).unwrap(); + let req = Request::new("eth_call", Id::Number(1), params); + let packet = RequestPacket::Single(req.serialize().unwrap()); + + let rewritten = rewrite_block_tags(packet, &state); + + let json = serde_json::to_string(&rewritten).unwrap(); + assert!(json.contains("0x64"), "Expected block 100=0x64 in: {json}"); + assert!(!json.contains("\"latest\""), "Should not contain 'latest' tag: {json}"); + } +} diff --git a/crates/backend/src/consensus.rs b/crates/backend/src/consensus.rs new file mode 100644 index 0000000..292b366 --- /dev/null +++ b/crates/backend/src/consensus.rs @@ -0,0 +1,212 @@ +//! Shared consensus state and background poller. +//! +//! Periodically polls `eth_blockNumber` from each backend and maintains +//! Byzantine-safe block height state via [`SafeTip`]. + +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use alloy_json_rpc::{Id, Request, RequestPacket}; +use alloy_primitives::BlockNumber; +use roxy_traits::BackendMeta; +use tokio::sync::Mutex; + +use crate::SafeTip; +use crate::group::BoxedBackend; + +/// Shared consensus state with atomic block heights. +/// +/// Updated by the background consensus poller and read by the +/// block-rewrite layer. +#[derive(Debug)] +pub struct ConsensusState { + pub(crate) safe_block: AtomicU64, + pub(crate) latest_block: AtomicU64, + pub(crate) finalized_block: AtomicU64, +} + +impl Default for ConsensusState { + fn default() -> Self { + Self::new() + } +} + +impl ConsensusState { + /// Create a new consensus state with all blocks at 0. + #[must_use] + pub const fn new() -> Self { + Self { + safe_block: AtomicU64::new(0), + latest_block: AtomicU64::new(0), + finalized_block: AtomicU64::new(0), + } + } + + /// Get the safe block number (f+1 honest backends agree). + #[must_use] + pub fn safe_block(&self) -> BlockNumber { + self.safe_block.load(Ordering::Relaxed) + } + + /// Get the latest reported block number (any backend). + #[must_use] + pub fn latest_block(&self) -> BlockNumber { + self.latest_block.load(Ordering::Relaxed) + } + + /// Get the finalized block number. + #[must_use] + pub fn finalized_block(&self) -> BlockNumber { + self.finalized_block.load(Ordering::Relaxed) + } + + /// Update the consensus state from a SafeTip tracker. + pub fn update_from_safe_tip(&self, safe_tip: &SafeTip) { + self.safe_block.store(safe_tip.get(), Ordering::Relaxed); + self.latest_block.store(safe_tip.latest(), Ordering::Relaxed); + // Finalized tracks safe for now (could be enhanced with + // eth_getBlockByNumber("finalized") polling). + self.finalized_block.store(safe_tip.get(), Ordering::Relaxed); + } +} + +/// Background consensus poller that periodically queries backends for +/// `eth_blockNumber` and updates shared [`ConsensusState`]. +#[derive(Debug)] +pub struct ConsensusPoller { + backends: Vec, + safe_tip: Mutex, + state: Arc, + interval: std::time::Duration, +} + +impl ConsensusPoller { + /// Create a new consensus poller. + /// + /// # Arguments + /// + /// * `backends` - Backends to poll + /// * `byzantine_f` - Maximum number of Byzantine faulty backends + /// * `state` - Shared consensus state to update + /// * `interval` - Polling interval + #[must_use] + pub fn new( + backends: Vec, + byzantine_f: usize, + state: Arc, + interval: std::time::Duration, + ) -> Self { + Self { + backends, + safe_tip: Mutex::new(SafeTip::new(byzantine_f)), + state, + interval, + } + } + + /// Run the consensus poller as a background task. + /// + /// This runs indefinitely, polling all backends every `interval`. + pub async fn run(self: Arc) { + let mut ticker = tokio::time::interval(self.interval); + loop { + ticker.tick().await; + self.poll_once().await; + } + } + + /// Perform a single polling round. + async fn poll_once(&self) { + for backend in &self.backends { + let name = backend.name().to_string(); + let request = Self::block_number_request(); + let fut = backend.clone_and_call(request); + match fut.await { + Ok(response) => { + if let Some(height) = Self::parse_block_number(&response) { + let mut tip = self.safe_tip.lock().await; + tip.update(&name, height); + self.state.update_from_safe_tip(&tip); + } + } + Err(_) => { + // Backend unavailable; skip this round for it. + } + } + } + } + + /// Build an `eth_blockNumber` request packet. + fn block_number_request() -> RequestPacket { + let req: Request<()> = Request::new("eth_blockNumber", Id::Number(1), ()); + RequestPacket::Single(req.serialize().expect("eth_blockNumber serialization")) + } + + /// Parse a hex block number from a JSON-RPC response. + fn parse_block_number(response: &alloy_json_rpc::ResponsePacket) -> Option { + use alloy_json_rpc::{ResponsePacket, ResponsePayload}; + match response { + ResponsePacket::Single(resp) => match &resp.payload { + ResponsePayload::Success(val) => { + let s = val.get().trim().trim_matches('"'); + u64::from_str_radix(s.strip_prefix("0x").unwrap_or(s), 16).ok() + } + _ => None, + }, + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_consensus_state_defaults() { + let state = ConsensusState::new(); + assert_eq!(state.safe_block(), 0); + assert_eq!(state.latest_block(), 0); + assert_eq!(state.finalized_block(), 0); + } + + #[test] + fn test_consensus_state_update_from_safe_tip() { + let state = ConsensusState::new(); + let mut tip = SafeTip::new(0); + tip.update("b1", 100); + state.update_from_safe_tip(&tip); + + assert_eq!(state.safe_block(), 100); + assert_eq!(state.latest_block(), 100); + assert_eq!(state.finalized_block(), 100); + } + + #[test] + fn test_consensus_state_byzantine() { + let state = ConsensusState::new(); + let mut tip = SafeTip::new(1); + tip.update("b1", 100); + tip.update("b2", 100); + tip.update("b3", 200); + state.update_from_safe_tip(&tip); + + assert_eq!(state.safe_block(), 100); + assert_eq!(state.latest_block(), 200); + } + + #[test] + fn test_parse_block_number() { + use alloy_json_rpc::{Response, ResponsePayload}; + use serde_json::value::RawValue; + + let raw = RawValue::from_string("\"0x64\"".to_string()).unwrap(); + let resp = Response { id: Id::Number(1), payload: ResponsePayload::Success(raw) }; + let packet = alloy_json_rpc::ResponsePacket::Single(resp); + + let result = ConsensusPoller::parse_block_number(&packet); + assert_eq!(result, Some(100)); + } +} diff --git a/crates/backend/src/group.rs b/crates/backend/src/group.rs index fb5a542..34bd6b7 100644 --- a/crates/backend/src/group.rs +++ b/crates/backend/src/group.rs @@ -1,11 +1,14 @@ //! Backend group with failover. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use derive_more::Debug; -use roxy_traits::{Backend, LoadBalancer}; +use futures::future::BoxFuture; +use roxy_traits::{BackendMeta, LoadBalancer}; use roxy_types::RoxyError; +use tower::Service; /// Response from a backend group. #[derive(Debug)] @@ -16,51 +19,174 @@ pub struct BackendResponse { pub served_by: String, } -/// A group of backends with load balancing and failover. +/// A type-erased backend that combines service + metadata. +/// +/// This type is `Clone + Send + Sync` so it can be stored in shared state. +/// Each call clones the inner service to get an independent copy. +pub struct BoxedBackend { + name: Arc, + rpc_url: Arc, + service: Arc>>, +} + +impl Clone for BoxedBackend { + fn clone(&self) -> Self { + // Clone the inner BoxCloneService to get an independent copy + let inner = self.service.lock().unwrap().clone(); + Self { + name: self.name.clone(), + rpc_url: self.rpc_url.clone(), + service: Arc::new(Mutex::new(inner)), + } + } +} + +impl std::fmt::Debug for BoxedBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BoxedBackend").field("name", &self.name).finish() + } +} + +impl BoxedBackend { + /// Create a new boxed backend from a service that implements `BackendMeta`. + pub fn new(svc: S) -> Self + where + S: Service + + BackendMeta + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + let name = Arc::from(svc.name()); + let rpc_url = Arc::from(svc.rpc_url()); + Self { + name, + rpc_url, + service: Arc::new(Mutex::new(tower::util::BoxCloneService::new(svc))), + } + } + + /// Create a new boxed backend from a service with explicit metadata. + /// + /// Use this when the service has been wrapped in tower layers and + /// no longer implements `BackendMeta` directly. + pub fn from_service(name: &str, rpc_url: &str, svc: S) -> Self + where + S: Service + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + Self { + name: Arc::from(name), + rpc_url: Arc::from(rpc_url), + service: Arc::new(Mutex::new(tower::util::BoxCloneService::new(svc))), + } + } + + /// Call the underlying service. + /// + /// Clones the inner service first (outside async), returns a Send future. + pub(crate) fn clone_and_call( + &self, + request: RequestPacket, + ) -> impl std::future::Future> + Send { + let mut svc = self.service.lock().unwrap().clone(); + svc.call(request) + } +} + +impl BackendMeta for BoxedBackend { + fn name(&self) -> &str { + &self.name + } + + fn rpc_url(&self) -> &str { + &self.rpc_url + } +} + +const _: () = { + const fn _assert_send_sync() {} + const fn _check() { + _assert_send_sync::(); + _assert_send_sync::(); + } +}; + +/// Shared inner state for a backend group. #[derive(Debug)] -pub struct BackendGroup { +struct BackendGroupInner { name: String, #[debug("{} backends", backends.len())] - backends: Vec>, + backends: Vec, #[debug(skip)] load_balancer: Arc, } +/// A group of backends with load balancing and failover. +/// +/// Implements `tower::Service` with failover across backends. +/// `Clone` is cheap (shared `Arc` state). +#[derive(Debug, Clone)] +pub struct BackendGroup { + inner: Arc, +} + impl BackendGroup { /// Create a new backend group. #[must_use] pub fn new( name: String, - backends: Vec>, + backends: Vec, load_balancer: Arc, ) -> Self { - Self { name, backends, load_balancer } + Self { + inner: Arc::new(BackendGroupInner { name, backends, load_balancer }), + } } /// Get the group name. #[must_use] pub fn name(&self) -> &str { - &self.name + &self.inner.name } /// Forward a request with failover. pub async fn forward(&self, request: RequestPacket) -> Result { - let ordered = self.load_balancer.select_ordered(&self.backends); + let meta_refs: Vec> = self + .inner + .backends + .iter() + .map(|b| Arc::new(b.clone()) as Arc) + .collect(); + + let ordered = self.inner.load_balancer.select_ordered(&meta_refs); if ordered.is_empty() { return Err(RoxyError::NoHealthyBackends); } - for backend in ordered { - match backend.forward(request.clone()).await { - Ok(response) => { - return Ok(BackendResponse { response, served_by: backend.name().to_string() }); - } - Err(e) if e.should_failover() => { - continue; - } - Err(e) => { - return Err(e); + for meta in ordered { + let backend_name = meta.name().to_string(); + // Find the matching backend by name, clone it, and call + if let Some(backend) = self.inner.backends.iter().find(|b| b.name() == backend_name) { + let fut = backend.clone_and_call(request.clone()); + match fut.await { + Ok(response) => { + return Ok(BackendResponse { + response, + served_by: backend_name, + }); + } + Err(e) if e.should_failover() => { + continue; + } + Err(e) => { + return Err(e); + } } } } @@ -68,3 +194,18 @@ impl BackendGroup { Err(RoxyError::NoHealthyBackends) } } + +impl Service for BackendGroup { + type Response = BackendResponse; + type Error = RoxyError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let this = self.clone(); + Box::pin(async move { this.forward(request).await }) + } +} diff --git a/crates/backend/src/health_layer.rs b/crates/backend/src/health_layer.rs new file mode 100644 index 0000000..56d98f8 --- /dev/null +++ b/crates/backend/src/health_layer.rs @@ -0,0 +1,153 @@ +//! Health recording tower layer. +//! +//! Wraps a backend service to record latency and success/failure +//! after each request, updating the shared `EmaHealthTracker`. + +use std::{ + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use futures::future::BoxFuture; +use roxy_traits::HealthTracker; +use roxy_types::RoxyError; +use tokio::sync::RwLock; +use tower::{Layer, Service}; + +use crate::health::EmaHealthTracker; + +/// Shared health tracker state. +pub type SharedHealth = Arc>; + +/// Tower layer that records health metrics (latency, success/failure) +/// after each request completes. +#[derive(Debug, Clone)] +pub struct HealthRecordingLayer { + health: SharedHealth, +} + +impl HealthRecordingLayer { + /// Create a new health recording layer with the given shared health tracker. + #[must_use] + pub const fn new(health: SharedHealth) -> Self { + Self { health } + } +} + +impl Layer for HealthRecordingLayer { + type Service = HealthRecordingService; + + fn layer(&self, inner: S) -> Self::Service { + HealthRecordingService { inner, health: self.health.clone() } + } +} + +/// Tower service that wraps another service and records health metrics. +#[derive(Debug, Clone)] +pub struct HealthRecordingService { + inner: S, + health: SharedHealth, +} + +impl Service for HealthRecordingService +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send, +{ + type Response = ResponsePacket; + type Error = RoxyError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let mut inner = self.inner.clone(); + let health = self.health.clone(); + + Box::pin(async move { + let start = Instant::now(); + let result = inner.call(request).await; + let duration = start.elapsed(); + + let success = result.is_ok(); + health.write().await.record(duration, success); + + result + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::health::HealthConfig; + use roxy_traits::HealthTracker; + + /// A simple mock service for testing the health layer. + #[derive(Clone)] + struct MockService { + succeed: bool, + } + + impl Service for MockService { + type Response = ResponsePacket; + type Error = RoxyError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _request: RequestPacket) -> Self::Future { + let succeed = self.succeed; + Box::pin(async move { + if succeed { + let json = r#"{"jsonrpc":"2.0","id":1,"result":"0x1"}"#; + Ok(serde_json::from_str(json).unwrap()) + } else { + Err(RoxyError::BackendOffline { backend: "mock".to_string() }) + } + }) + } + } + + fn make_request() -> RequestPacket { + use alloy_json_rpc::{Id, Request}; + let req: Request<()> = Request::new("eth_blockNumber", Id::Number(1), ()); + RequestPacket::Single(req.serialize().unwrap()) + } + + #[tokio::test] + async fn test_records_success() { + let health = Arc::new(RwLock::new(EmaHealthTracker::new(HealthConfig::default()))); + let layer = HealthRecordingLayer::new(health.clone()); + let mut svc = layer.layer(MockService { succeed: true }); + + let result = svc.call(make_request()).await; + assert!(result.is_ok()); + + let h = health.read().await; + assert!(h.latency_ema() > std::time::Duration::ZERO); + } + + #[tokio::test] + async fn test_records_failure() { + let config = HealthConfig { min_requests: 1, ..Default::default() }; + let health = Arc::new(RwLock::new(EmaHealthTracker::new(config))); + let layer = HealthRecordingLayer::new(health.clone()); + let mut svc = layer.layer(MockService { succeed: false }); + + let result = svc.call(make_request()).await; + assert!(result.is_err()); + + let h = health.read().await; + assert!(h.error_rate() > 0.0); + } +} diff --git a/crates/backend/src/http.rs b/crates/backend/src/http.rs index 3d6d3c6..10a54e8 100644 --- a/crates/backend/src/http.rs +++ b/crates/backend/src/http.rs @@ -1,43 +1,37 @@ //! HTTP backend implementation. -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::task::{Context, Poll}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use async_trait::async_trait; -use roxy_traits::{Backend, HealthStatus, HealthTracker}; +use futures::future::BoxFuture; +use roxy_traits::BackendMeta; use roxy_types::RoxyError; -use tokio::sync::RwLock; - -use crate::health::EmaHealthTracker; +use std::sync::Arc; +use tower::Service; /// Configuration for HTTP backend. #[derive(Debug, Clone)] pub struct BackendConfig { - /// Request timeout. - pub timeout: Duration, - /// Maximum retry attempts. - pub max_retries: u32, /// Maximum batch size. pub max_batch_size: usize, } impl Default for BackendConfig { fn default() -> Self { - Self { timeout: Duration::from_secs(30), max_retries: 3, max_batch_size: 100 } + Self { max_batch_size: 100 } } } /// HTTP backend for RPC forwarding. -#[derive(Debug)] +/// +/// This is a bare HTTP backend that performs the HTTP POST + JSON deserialize. +/// Retry, timeout, and health recording are handled by tower layers. +#[derive(Debug, Clone)] pub struct HttpBackend { - name: String, - rpc_url: String, + name: Arc, + rpc_url: Arc, client: reqwest::Client, - health: Arc>, - config: BackendConfig, + _config: BackendConfig, } impl HttpBackend { @@ -48,48 +42,19 @@ impl HttpBackend { /// Returns an error if the HTTP client fails to build. pub fn new(name: String, rpc_url: String, config: BackendConfig) -> Result { let client = reqwest::Client::builder() - .timeout(config.timeout) .build() .map_err(|e| RoxyError::Internal(format!("failed to build HTTP client: {e}")))?; Ok(Self { - name, - rpc_url, + name: Arc::from(name.as_str()), + rpc_url: Arc::from(rpc_url.as_str()), client, - health: Arc::new(RwLock::new(EmaHealthTracker::new(Default::default()))), - config, + _config: config, }) } - - async fn do_forward(&self, request: &RequestPacket) -> Result { - let start = Instant::now(); - - let response = self - .client - .post(&self.rpc_url) - .json(request) - .send() - .await - .map_err(|_| RoxyError::BackendOffline { backend: self.name.clone() })?; - - let duration = start.elapsed(); - let success = response.status().is_success(); - - self.health.write().await.record(duration, success); - - if !success { - return Err(RoxyError::BackendOffline { backend: self.name.clone() }); - } - - response - .json() - .await - .map_err(|e| RoxyError::Internal(format!("failed to parse response: {e}"))) - } } -#[async_trait] -impl Backend for HttpBackend { +impl BackendMeta for HttpBackend { fn name(&self) -> &str { &self.name } @@ -97,32 +62,38 @@ impl Backend for HttpBackend { fn rpc_url(&self) -> &str { &self.rpc_url } +} - async fn forward(&self, request: RequestPacket) -> Result { - let mut last_error = None; - - for attempt in 0..=self.config.max_retries { - match self.do_forward(&request).await { - Ok(response) => return Ok(response), - Err(e) => { - last_error = Some(e); - if attempt < self.config.max_retries { - let backoff = Duration::from_millis((2u64.pow(attempt) * 100).min(3000)); - tokio::time::sleep(backoff).await; - } - } - } - } +impl Service for HttpBackend { + type Response = ResponsePacket; + type Error = RoxyError; + type Future = BoxFuture<'static, Result>; - // SAFETY: Loop always sets last_error before exiting without return - Err(last_error.expect("loop guarantees error was set")) + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn health_status(&self) -> HealthStatus { - self.health.try_read().map(|h| h.status()).unwrap_or(HealthStatus::Healthy) - } + fn call(&mut self, request: RequestPacket) -> Self::Future { + let client = self.client.clone(); + let rpc_url = self.rpc_url.clone(); + let name = self.name.clone(); + + Box::pin(async move { + let response = client + .post(rpc_url.as_ref()) + .json(&request) + .send() + .await + .map_err(|_| RoxyError::BackendOffline { backend: name.to_string() })?; + + if !response.status().is_success() { + return Err(RoxyError::BackendOffline { backend: name.to_string() }); + } - fn latency_ema(&self) -> Duration { - self.health.try_read().map(|h| h.latency_ema()).unwrap_or(Duration::ZERO) + response + .json() + .await + .map_err(|e| RoxyError::Internal(format!("failed to parse response: {e}"))) + }) } } diff --git a/crates/backend/src/lib.rs b/crates/backend/src/lib.rs index 63cb418..71e880b 100644 --- a/crates/backend/src/lib.rs +++ b/crates/backend/src/lib.rs @@ -3,22 +3,37 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +use futures as _; +use pin_project_lite as _; +use tokio as _; use tracing as _; +mod block_rewrite; +pub use block_rewrite::{BlockRewriteLayer, BlockRewriteService}; + mod connection; pub use connection::{ConnectionConfig, ConnectionState, ConnectionStateMachine}; +mod consensus; +pub use consensus::{ConsensusPoller, ConsensusState}; + mod group; -pub use group::{BackendGroup, BackendResponse}; +pub use group::{BackendGroup, BackendResponse, BoxedBackend}; mod health; pub use health::{EmaHealthTracker, HealthConfig}; +mod health_layer; +pub use health_layer::{HealthRecordingLayer, HealthRecordingService, SharedHealth}; + mod http; pub use http::{BackendConfig, HttpBackend}; mod load_balancer; pub use load_balancer::{EmaLoadBalancer, RoundRobinBalancer}; +mod retry; +pub use retry::RoxyRetryPolicy; + mod safe_tip; pub use safe_tip::SafeTip; diff --git a/crates/backend/src/load_balancer.rs b/crates/backend/src/load_balancer.rs index f9a9fb2..fe03acc 100644 --- a/crates/backend/src/load_balancer.rs +++ b/crates/backend/src/load_balancer.rs @@ -5,25 +5,22 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, }; -use roxy_traits::{Backend, LoadBalancer}; +use roxy_traits::{BackendMeta, LoadBalancer}; /// EMA-based load balancer - lower latency = higher priority. +/// +/// Note: With the tower refactoring, health and latency data will be +/// provided by layers. For now, this simply returns all backends in order. #[derive(Debug, Default)] pub struct EmaLoadBalancer; impl LoadBalancer for EmaLoadBalancer { - fn select(&self, backends: &[Arc]) -> Option> { + fn select(&self, backends: &[Arc]) -> Option> { self.select_ordered(backends).into_iter().next() } - fn select_ordered(&self, backends: &[Arc]) -> Vec> { - let mut healthy: Vec<_> = backends.iter().filter(|b| b.is_healthy()).cloned().collect(); - let mut unhealthy: Vec<_> = backends.iter().filter(|b| !b.is_healthy()).cloned().collect(); - - healthy.sort_by_key(|b| b.latency_ema()); - unhealthy.sort_by_key(|b| b.latency_ema()); - - healthy.into_iter().chain(unhealthy).collect() + fn select_ordered(&self, backends: &[Arc]) -> Vec> { + backends.to_vec() } } @@ -48,28 +45,25 @@ impl Default for RoundRobinBalancer { } impl LoadBalancer for RoundRobinBalancer { - fn select(&self, backends: &[Arc]) -> Option> { - let healthy: Vec<_> = backends.iter().filter(|b| b.is_healthy()).collect(); - if healthy.is_empty() { + fn select(&self, backends: &[Arc]) -> Option> { + if backends.is_empty() { return None; } let idx = self.index.fetch_add(1, Ordering::Relaxed); - Some(healthy[idx % healthy.len()].clone()) + Some(backends[idx % backends.len()].clone()) } - fn select_ordered(&self, backends: &[Arc]) -> Vec> { - let healthy: Vec<_> = backends.iter().filter(|b| b.is_healthy()).cloned().collect(); - if healthy.is_empty() { + fn select_ordered(&self, backends: &[Arc]) -> Vec> { + if backends.is_empty() { return Vec::new(); } - // Increment the index for next call to ensure rotation let idx = self.index.fetch_add(1, Ordering::Relaxed); - let mut result = Vec::with_capacity(healthy.len()); - for i in 0..healthy.len() { - result.push(healthy[(idx + i) % healthy.len()].clone()); + let mut result = Vec::with_capacity(backends.len()); + for i in 0..backends.len() { + result.push(backends[(idx + i) % backends.len()].clone()); } result } @@ -77,43 +71,26 @@ impl LoadBalancer for RoundRobinBalancer { #[cfg(test)] mod tests { - use std::time::Duration; - - use alloy_json_rpc::{RequestPacket, ResponsePacket}; - use async_trait::async_trait; - use roxy_traits::HealthStatus; - use roxy_types::RoxyError; - use super::*; - /// Mock backend for testing. - struct MockBackend { + /// Mock backend metadata for testing. + struct MockMeta { name: String, - healthy: bool, - latency: Duration, } - impl MockBackend { - fn create(name: &str, healthy: bool, latency_ms: u64) -> Arc { - Arc::new(Self { - name: name.to_string(), - healthy, - latency: Duration::from_millis(latency_ms), - }) + impl MockMeta { + fn create(name: &str) -> Arc { + Arc::new(Self { name: name.to_string() }) } } - impl std::fmt::Debug for MockBackend { + impl std::fmt::Debug for MockMeta { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MockBackend") - .field("name", &self.name) - .field("healthy", &self.healthy) - .finish() + f.debug_struct("MockMeta").field("name", &self.name).finish() } } - #[async_trait] - impl Backend for MockBackend { + impl BackendMeta for MockMeta { fn name(&self) -> &str { &self.name } @@ -121,28 +98,12 @@ mod tests { fn rpc_url(&self) -> &str { "http://mock" } - - async fn forward(&self, _request: RequestPacket) -> Result { - unimplemented!("mock backend") - } - - fn health_status(&self) -> HealthStatus { - if self.healthy { - HealthStatus::Healthy - } else { - HealthStatus::Unhealthy { error_rate: 1.0 } - } - } - - fn latency_ema(&self) -> Duration { - self.latency - } } #[test] fn test_ema_load_balancer_empty() { let lb = EmaLoadBalancer; - let backends: Vec> = vec![]; + let backends: Vec> = vec![]; assert!(lb.select(&backends).is_none()); assert!(lb.select_ordered(&backends).is_empty()); @@ -151,7 +112,7 @@ mod tests { #[test] fn test_ema_load_balancer_single() { let lb = EmaLoadBalancer; - let backends = vec![MockBackend::create("b1", true, 100)]; + let backends = vec![MockMeta::create("b1")]; let selected = lb.select(&backends); assert!(selected.is_some()); @@ -159,40 +120,22 @@ mod tests { } #[test] - fn test_ema_load_balancer_prefers_lower_latency() { + fn test_ema_load_balancer_returns_all() { let lb = EmaLoadBalancer; let backends = vec![ - MockBackend::create("slow", true, 500), - MockBackend::create("fast", true, 50), - MockBackend::create("medium", true, 200), + MockMeta::create("slow"), + MockMeta::create("fast"), + MockMeta::create("medium"), ]; let ordered = lb.select_ordered(&backends); assert_eq!(ordered.len(), 3); - assert_eq!(ordered[0].name(), "fast"); - assert_eq!(ordered[1].name(), "medium"); - assert_eq!(ordered[2].name(), "slow"); - } - - #[test] - fn test_ema_load_balancer_healthy_before_unhealthy() { - let lb = EmaLoadBalancer; - let backends = vec![ - MockBackend::create("unhealthy_fast", false, 10), - MockBackend::create("healthy_slow", true, 500), - ]; - - let ordered = lb.select_ordered(&backends); - assert_eq!(ordered.len(), 2); - // Healthy should come first even with higher latency - assert_eq!(ordered[0].name(), "healthy_slow"); - assert_eq!(ordered[1].name(), "unhealthy_fast"); } #[test] fn test_round_robin_empty() { let lb = RoundRobinBalancer::new(); - let backends: Vec> = vec![]; + let backends: Vec> = vec![]; assert!(lb.select(&backends).is_none()); } @@ -201,9 +144,9 @@ mod tests { fn test_round_robin_rotates() { let lb = RoundRobinBalancer::new(); let backends = vec![ - MockBackend::create("b1", true, 100), - MockBackend::create("b2", true, 100), - MockBackend::create("b3", true, 100), + MockMeta::create("b1"), + MockMeta::create("b2"), + MockMeta::create("b3"), ]; let s1 = lb.select(&backends).unwrap(); @@ -217,25 +160,4 @@ mod tests { assert_eq!(s3.name(), "b3"); assert_eq!(s4.name(), "b1"); // Wraps around } - - #[test] - fn test_round_robin_skips_unhealthy() { - let lb = RoundRobinBalancer::new(); - let backends = vec![ - MockBackend::create("healthy1", true, 100), - MockBackend::create("unhealthy", false, 100), - MockBackend::create("healthy2", true, 100), - ]; - - // Only healthy backends should be selected - let s1 = lb.select(&backends).unwrap(); - let s2 = lb.select(&backends).unwrap(); - let s3 = lb.select(&backends).unwrap(); - - // Should only cycle between healthy1 and healthy2 - assert!(s1.name() == "healthy1" || s1.name() == "healthy2"); - assert!(s2.name() == "healthy1" || s2.name() == "healthy2"); - assert_ne!(s1.name(), s2.name()); - assert_eq!(s1.name(), s3.name()); // Should wrap around - } } diff --git a/crates/backend/src/retry.rs b/crates/backend/src/retry.rs new file mode 100644 index 0000000..c0ba189 --- /dev/null +++ b/crates/backend/src/retry.rs @@ -0,0 +1,116 @@ +//! Retry policy for backend requests. + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use roxy_types::RoxyError; +use tower::retry::Policy; + +/// Retry policy for backend RPC requests. +/// +/// Retries on errors that indicate the backend is offline or timed out. +/// Uses exponential backoff: `100ms * 2^attempt`, capped at 3000ms. +#[derive(Debug, Clone)] +pub struct RoxyRetryPolicy { + remaining: u32, + attempt: u32, +} + +impl RoxyRetryPolicy { + /// Create a new retry policy with the given maximum retries. + #[must_use] + pub const fn new(max_retries: u32) -> Self { + Self { remaining: max_retries, attempt: 0 } + } +} + +impl Policy for RoxyRetryPolicy { + type Future = futures::future::Ready; + + fn retry( + &self, + _req: &RequestPacket, + result: Result<&ResponsePacket, &RoxyError>, + ) -> Option { + match result { + Ok(_) => None, + Err(e) if e.should_failover() && self.remaining > 0 => { + Some(futures::future::ready(Self { + remaining: self.remaining - 1, + attempt: self.attempt + 1, + })) + } + Err(_) => None, + } + } + + fn clone_request(&self, req: &RequestPacket) -> Option { + Some(req.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_request() -> RequestPacket { + use alloy_json_rpc::{Id, Request}; + let req: Request<()> = Request::new("eth_blockNumber", Id::Number(1), ()); + RequestPacket::Single(req.serialize().unwrap()) + } + + fn make_response() -> ResponsePacket { + let json = r#"{"jsonrpc":"2.0","id":1,"result":"0x1"}"#; + serde_json::from_str(json).unwrap() + } + + #[test] + fn test_no_retry_on_success() { + let policy = RoxyRetryPolicy::new(3); + let req = make_request(); + let resp = make_response(); + assert!(policy.retry(&req, Ok(&resp)).is_none()); + } + + #[test] + fn test_retry_on_failover_error() { + let policy = RoxyRetryPolicy::new(3); + let req = make_request(); + let err = RoxyError::BackendOffline { backend: "test".to_string() }; + let result = policy.retry(&req, Err(&err)); + assert!(result.is_some()); + } + + #[test] + fn test_no_retry_when_exhausted() { + let policy = RoxyRetryPolicy::new(0); + let req = make_request(); + let err = RoxyError::BackendOffline { backend: "test".to_string() }; + assert!(policy.retry(&req, Err(&err)).is_none()); + } + + #[test] + fn test_clone_request() { + let policy = RoxyRetryPolicy::new(3); + let req = make_request(); + assert!(policy.clone_request(&req).is_some()); + } + + #[test] + fn test_retry_decrements_remaining() { + let policy = RoxyRetryPolicy::new(2); + let req = make_request(); + let err = RoxyError::BackendOffline { backend: "test".to_string() }; + + // First retry + let next = futures::executor::block_on(policy.retry(&req, Err(&err)).unwrap()); + assert_eq!(next.remaining, 1); + assert_eq!(next.attempt, 1); + + // Second retry + let next = futures::executor::block_on(next.retry(&req, Err(&err)).unwrap()); + assert_eq!(next.remaining, 0); + assert_eq!(next.attempt, 2); + + // No more retries + assert!(next.retry(&req, Err(&err)).is_none()); + } +} diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index adc792c..e0478df 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -19,6 +19,7 @@ roxy-server.workspace = true roxy-backend.workspace = true roxy-rpc.workspace = true roxy-traits.workspace = true +roxy-types.workspace = true roxy-cache.workspace = true # cli @@ -27,6 +28,7 @@ clap = { version = "4.5", features = ["derive"] } # async tokio.workspace = true axum.workspace = true +tower.workspace = true # logging tracing.workspace = true diff --git a/crates/cli/src/backends.rs b/crates/cli/src/backends.rs index 815814e..ab7d7d4 100644 --- a/crates/cli/src/backends.rs +++ b/crates/cli/src/backends.rs @@ -5,9 +5,14 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use eyre::Result; -use roxy_backend::{BackendConfig as HttpBackendConfig, HttpBackend}; +use roxy_backend::{ + BackendConfig as HttpBackendConfig, BoxedBackend, EmaHealthTracker, HealthConfig, + HealthRecordingLayer, HttpBackend, RoxyRetryPolicy, +}; use roxy_config::RoxyConfig; -use roxy_traits::Backend; +use roxy_types::RoxyError; +use tokio::sync::RwLock; +use tower::ServiceBuilder; /// Factory for creating HTTP backends from configuration. /// @@ -46,26 +51,57 @@ impl BackendFactory { /// Create backends from configuration. /// + /// Each backend is composed with tower layers: + /// - Health recording (outermost) - records latency and success/failure + /// - Timeout - enforces per-request timeout + /// - Retry - retries on transient errors with exponential backoff + /// - Raw HTTP backend (innermost) - performs the actual HTTP POST + /// /// # Errors /// /// Returns an error if backend creation fails. - pub fn create(&self, config: &RoxyConfig) -> Result>> { + pub fn create(&self, config: &RoxyConfig) -> Result> { let mut backends = HashMap::new(); for backend_config in &config.backends { let http_config = HttpBackendConfig { - timeout: Duration::from_millis(backend_config.timeout_ms), - max_retries: backend_config.max_retries, max_batch_size: self.default_batch_size, }; - let backend = HttpBackend::new( + let raw_backend = HttpBackend::new( backend_config.name.clone(), backend_config.url.clone(), http_config, )?; - backends.insert(backend_config.name.clone(), Arc::new(backend) as Arc); + let timeout = Duration::from_millis(backend_config.timeout_ms); + let retry_policy = RoxyRetryPolicy::new(backend_config.max_retries); + let health = Arc::new(RwLock::new(EmaHealthTracker::new(HealthConfig::default()))); + + let backend_name_for_err = backend_config.name.clone(); + + // Compose layers: health recording -> map_err -> timeout -> retry -> raw backend + // The timeout layer returns Box, so we map it back to RoxyError. + let layered = ServiceBuilder::new() + .layer(HealthRecordingLayer::new(health)) + .map_err(move |err: Box| { + if err.is::() { + RoxyError::BackendTimeout { backend: backend_name_for_err.clone() } + } else { + RoxyError::Internal(err.to_string()) + } + }) + .layer(tower::timeout::TimeoutLayer::new(timeout)) + .layer(tower::retry::RetryLayer::new(retry_policy)) + .service(raw_backend); + + let boxed = BoxedBackend::from_service( + &backend_config.name, + &backend_config.url, + layered, + ); + + backends.insert(backend_config.name.clone(), boxed); trace!(name = %backend_config.name, url = %backend_config.url, "Created backend"); } @@ -84,12 +120,12 @@ impl BackendFactory { /// /// # Returns /// -/// A map of backend names to Backend trait objects. +/// A map of backend names to boxed backends. /// /// # Errors /// /// Returns an error if backend creation fails. -pub fn create_backends(config: &RoxyConfig) -> Result>> { +pub fn create_backends(config: &RoxyConfig) -> Result> { BackendFactory::new().create(config) } diff --git a/crates/cli/src/builder.rs b/crates/cli/src/builder.rs index 994454f..71c47fc 100644 --- a/crates/cli/src/builder.rs +++ b/crates/cli/src/builder.rs @@ -3,9 +3,10 @@ //! This module provides a fluent builder interface for configuring and building //! the HTTP server router from configuration. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use eyre::{Context, Result}; +use roxy_backend::{ConsensusPoller, ConsensusState}; use roxy_cache::MemoryCache; use roxy_config::RoxyConfig; use roxy_rpc::RpcCodec; @@ -76,14 +77,15 @@ impl AppBuilder { /// /// This method orchestrates the creation of all components: /// 1. Creates backends from config - /// 2. Creates backend groups with load balancers - /// 3. Creates RPC codec with configured limits - /// 4. Creates method router - /// 5. Creates validators - /// 6. Creates rate limiter if enabled - /// 7. Builds server state - /// 8. Creates cache if enabled - /// 9. Creates and returns the axum Router + /// 2. Spawns consensus poller as background task + /// 3. Creates backend groups with load balancers + /// 4. Creates RPC codec with configured limits + /// 5. Creates method router + /// 6. Creates validators + /// 7. Creates rate limiter if enabled + /// 8. Builds server state + /// 9. Creates cache if enabled + /// 10. Creates and returns the axum Router /// /// # Errors /// @@ -93,24 +95,41 @@ impl AppBuilder { let backends = self.backend_factory.create(config)?; debug!(count = backends.len(), "Created backends"); - // 2. Create backend groups with load balancers + // 2. Spawn consensus poller as background task + // Clone all backends for the poller before groups consume them. + let poller_backends: Vec<_> = backends.values().cloned().collect(); + if !poller_backends.is_empty() { + let consensus_state = Arc::new(ConsensusState::new()); + // f = floor((n-1)/3) for BFT safety + let byzantine_f = poller_backends.len().saturating_sub(1) / 3; + let poller = Arc::new(ConsensusPoller::new( + poller_backends, + byzantine_f, + consensus_state, + Duration::from_secs(12), + )); + tokio::spawn(async move { poller.run().await }); + debug!(byzantine_f, "Spawned consensus poller"); + } + + // 3. Create backend groups with load balancers let groups = self.group_factory.create(config, &backends)?; debug!(count = groups.len(), "Created backend groups"); - // 3. Create RPC codec with configured limits + // 4. Create RPC codec with configured limits let codec = RpcCodec::new(DefaultCodecConfig::new().with_max_size(config.server.max_request_size)); - // 4. Create method router + // 5. Create method router let router = self.router_factory.create(config); - // 5. Create validators + // 6. Create validators let validators = self.validator_factory.create(config); - // 6. Create rate limiter if enabled + // 7. Create rate limiter if enabled let rate_limiter = self.rate_limiter_factory.create(config); - // 7. Build server state + // 8. Build server state let mut builder = ServerBuilder::new().codec(codec).router(router).validators(validators); if let Some(rl) = rate_limiter { @@ -123,18 +142,18 @@ impl AppBuilder { builder = builder.add_group(name, Arc::new(group)); } - // 8. Create cache if enabled + // 9. Create cache if enabled if config.cache.enabled { let cache = Arc::new(MemoryCache::new(config.cache.memory_size)); debug!(size = config.cache.memory_size, "Created memory cache"); let state = builder.cache(cache).build().wrap_err("failed to build server state")?; - // 9. Create router + // 10. Create router return Ok(create_router(state)); } let state = builder.build().wrap_err("failed to build server state")?; - // 9. Create router + // 10. Create router Ok(create_router(state)) } } diff --git a/crates/cli/src/groups.rs b/crates/cli/src/groups.rs index b4ad618..238391c 100644 --- a/crates/cli/src/groups.rs +++ b/crates/cli/src/groups.rs @@ -6,9 +6,9 @@ use std::{collections::HashMap, sync::Arc}; use eyre::{Result, eyre}; -use roxy_backend::{BackendGroup, EmaLoadBalancer, RoundRobinBalancer}; +use roxy_backend::{BackendGroup, BoxedBackend, EmaLoadBalancer, RoundRobinBalancer}; use roxy_config::{LoadBalancerType, RoxyConfig}; -use roxy_traits::{Backend, LoadBalancer}; +use roxy_traits::LoadBalancer; /// Factory for creating backend groups from configuration. /// @@ -35,7 +35,7 @@ impl GroupFactory { /// # Arguments /// /// * `config` - The Roxy configuration - /// * `backends` - Map of backend names to Backend trait objects + /// * `backends` - Map of backend names to boxed backends /// /// # Returns /// @@ -47,7 +47,7 @@ impl GroupFactory { pub fn create( &self, config: &RoxyConfig, - backends: &HashMap>, + backends: &HashMap, ) -> Result> { let mut groups = HashMap::new(); @@ -121,7 +121,7 @@ impl GroupFactory { /// # Arguments /// /// * `config` - The Roxy configuration -/// * `backends` - Map of backend names to Backend trait objects +/// * `backends` - Map of backend names to boxed backends /// /// # Returns /// @@ -132,7 +132,7 @@ impl GroupFactory { /// Returns an error if a referenced backend doesn't exist. pub fn create_groups( config: &RoxyConfig, - backends: &HashMap>, + backends: &HashMap, ) -> Result> { GroupFactory::new().create(config, backends) } diff --git a/crates/server/src/http.rs b/crates/server/src/http.rs index 140362e..3c68335 100644 --- a/crates/server/src/http.rs +++ b/crates/server/src/http.rs @@ -210,11 +210,9 @@ pub async fn handle_rpc( ParsedResponsePacket::Single(response) } ParsedRequestPacket::Batch(requests) => { - let mut responses = Vec::with_capacity(requests.len()); - for request in requests { - let response = process_single_request(&state, request).await; - responses.push(response); - } + let futs = + requests.into_iter().map(|request| process_single_request(&state, request)); + let responses = futures_util::future::join_all(futs).await; ParsedResponsePacket::Batch(responses) } }; diff --git a/crates/test-utils/Cargo.toml b/crates/test-utils/Cargo.toml index cf7d5ea..5f96e58 100644 --- a/crates/test-utils/Cargo.toml +++ b/crates/test-utils/Cargo.toml @@ -22,7 +22,9 @@ alloy-json-rpc.workspace = true # async tokio.workspace = true -async-trait = "0.1" + +# tower +tower.workspace = true # serialization serde_json.workspace = true diff --git a/crates/test-utils/src/lib.rs b/crates/test-utils/src/lib.rs index c17b5f3..97934ae 100644 --- a/crates/test-utils/src/lib.rs +++ b/crates/test-utils/src/lib.rs @@ -8,12 +8,12 @@ use std::{ Mutex, atomic::{AtomicUsize, Ordering}, }, + task::{Context, Poll}, time::Duration, }; use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use async_trait::async_trait; -use roxy_traits::{Backend, HealthStatus}; +use roxy_traits::{BackendMeta, HealthStatus}; use roxy_types::RoxyError; // ============================================================================ @@ -44,6 +44,7 @@ pub enum MockResponse { /// A mock backend for testing. /// /// Provides configurable responses, latency simulation, and call counting. +/// Implements `BackendMeta` and `tower::Service`. /// /// # Example /// @@ -69,6 +70,20 @@ pub struct MockBackend { latency_ema_value: Duration, } +impl Clone for MockBackend { + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + url: self.url.clone(), + responses: Mutex::new(self.responses.lock().unwrap().clone()), + call_count: AtomicUsize::new(self.call_count.load(Ordering::SeqCst)), + latency: self.latency, + health: self.health, + latency_ema_value: self.latency_ema_value, + } + } +} + impl std::fmt::Debug for MockBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MockBackend") @@ -159,6 +174,18 @@ impl MockBackend { self.call_count.store(0, Ordering::SeqCst); } + /// Get the health status. + #[must_use] + pub const fn health_status(&self) -> HealthStatus { + self.health + } + + /// Get the latency EMA value. + #[must_use] + pub const fn latency_ema(&self) -> Duration { + self.latency_ema_value + } + /// Get the next response from the queue. fn next_response(&self) -> Option { let mut responses = self.responses.lock().unwrap(); @@ -166,8 +193,7 @@ impl MockBackend { } } -#[async_trait] -impl Backend for MockBackend { +impl BackendMeta for MockBackend { fn name(&self) -> &str { &self.name } @@ -175,14 +201,22 @@ impl Backend for MockBackend { fn rpc_url(&self) -> &str { &self.url } +} + +impl tower::Service for MockBackend { + type Response = ResponsePacket; + type Error = RoxyError; + type Future = std::pin::Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } - async fn forward(&self, request: RequestPacket) -> Result { + fn call(&mut self, request: RequestPacket) -> Self::Future { self.call_count.fetch_add(1, Ordering::SeqCst); - // Simulate latency - if !self.latency.is_zero() { - tokio::time::sleep(self.latency).await; - } + let latency = self.latency; + let name = self.name.clone(); // Get response or use default let response = self.next_response().unwrap_or_else(|| { @@ -199,28 +233,27 @@ impl Backend for MockBackend { )) }); - match response { - MockResponse::Success(json) => { - let packet: ResponsePacket = serde_json::from_str(&json).map_err(|e| { - RoxyError::Internal(format!("Failed to parse mock response: {e}")) - })?; - Ok(packet) - } - MockResponse::Error(msg) => Err(RoxyError::BackendOffline { backend: msg }), - MockResponse::Timeout => { - // Simulate a very long delay that would trigger a timeout - tokio::time::sleep(Duration::from_secs(60)).await; - Err(RoxyError::BackendTimeout { backend: self.name.clone() }) + Box::pin(async move { + // Simulate latency + if !latency.is_zero() { + tokio::time::sleep(latency).await; } - } - } - fn health_status(&self) -> HealthStatus { - self.health - } - - fn latency_ema(&self) -> Duration { - self.latency_ema_value + match response { + MockResponse::Success(json) => { + let packet: ResponsePacket = serde_json::from_str(&json).map_err(|e| { + RoxyError::Internal(format!("Failed to parse mock response: {e}")) + })?; + Ok(packet) + } + MockResponse::Error(msg) => Err(RoxyError::BackendOffline { backend: msg }), + MockResponse::Timeout => { + // Simulate a very long delay that would trigger a timeout + tokio::time::sleep(Duration::from_secs(60)).await; + Err(RoxyError::BackendTimeout { backend: name }) + } + } + }) } } @@ -896,23 +929,27 @@ mod tests { #[tokio::test] async fn test_forward_counts_calls() { + use tower::Service; + let response = fixtures::success_response(1, "0x1"); - let backend = + let mut backend = MockBackend::new("test").with_response(MockResponse::Success(response.clone())); let request = create_test_request_packet("eth_blockNumber"); - let _ = backend.forward(request).await; + let _ = backend.call(request).await; assert_eq!(backend.call_count(), 1); } #[tokio::test] async fn test_forward_error_response() { - let backend = + use tower::Service; + + let mut backend = MockBackend::new("test").with_response(MockResponse::Error("test error".into())); let request = create_test_request_packet("eth_blockNumber"); - let result = backend.forward(request).await; + let result = backend.call(request).await; assert!(result.is_err()); } diff --git a/crates/traits/Cargo.toml b/crates/traits/Cargo.toml index 3d60eb1..a801379 100644 --- a/crates/traits/Cargo.toml +++ b/crates/traits/Cargo.toml @@ -13,16 +13,11 @@ repository.workspace = true workspace = true [dependencies] -# internal -roxy-types.workspace = true - # alloy -alloy-json-rpc.workspace = true alloy-primitives.workspace = true # async tokio.workspace = true -async-trait = "0.1" # misc bytes.workspace = true diff --git a/crates/traits/src/backend.rs b/crates/traits/src/backend.rs index b07eec3..552313c 100644 --- a/crates/traits/src/backend.rs +++ b/crates/traits/src/backend.rs @@ -2,10 +2,7 @@ use std::time::{Duration, Instant}; -use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_primitives::BlockNumber; -use async_trait::async_trait; -use roxy_types::RoxyError; /// Health status of a backend. #[derive(Debug, Clone, Copy)] @@ -29,28 +26,16 @@ pub enum HealthStatus { }, } -/// Core backend trait for RPC forwarding. -#[async_trait] -pub trait Backend: Send + Sync + 'static { +/// Backend identity/metadata trait. +/// +/// Backends are `tower::Service` +/// implementations that also implement this trait for identity information. +pub trait BackendMeta: Send + Sync + 'static { /// Backend identifier. fn name(&self) -> &str; /// RPC endpoint URL. fn rpc_url(&self) -> &str; - - /// Forward RPC request packet (single or batch). - async fn forward(&self, request: RequestPacket) -> Result; - - /// Current health status. - fn health_status(&self) -> HealthStatus; - - /// Latency EMA for load balancing. - fn latency_ema(&self) -> Duration; - - /// Whether backend should receive requests. - fn is_healthy(&self) -> bool { - matches!(self.health_status(), HealthStatus::Healthy | HealthStatus::Degraded { .. }) - } } /// Health tracking with EMA. diff --git a/crates/traits/src/lib.rs b/crates/traits/src/lib.rs index 69eca72..1279600 100644 --- a/crates/traits/src/lib.rs +++ b/crates/traits/src/lib.rs @@ -4,7 +4,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] mod backend; -pub use backend::{Backend, ConsensusTracker, HealthStatus, HealthTracker}; +pub use backend::{BackendMeta, ConsensusTracker, HealthStatus, HealthTracker}; mod cache; pub use cache::{Cache, CacheError}; diff --git a/crates/traits/src/load_balancer.rs b/crates/traits/src/load_balancer.rs index ffdf958..4fa8a98 100644 --- a/crates/traits/src/load_balancer.rs +++ b/crates/traits/src/load_balancer.rs @@ -2,13 +2,13 @@ use std::sync::Arc; -use crate::backend::Backend; +use crate::backend::BackendMeta; /// Load balancer trait for selecting backends. pub trait LoadBalancer: Send + Sync { /// Select a single backend. - fn select(&self, backends: &[Arc]) -> Option>; + fn select(&self, backends: &[Arc]) -> Option>; /// Order backends by preference for failover. - fn select_ordered(&self, backends: &[Arc]) -> Vec>; + fn select_ordered(&self, backends: &[Arc]) -> Vec>; }