diff --git a/src/forwarder/mod.rs b/src/forwarder/mod.rs index acff9f71..2dec3ff7 100644 --- a/src/forwarder/mod.rs +++ b/src/forwarder/mod.rs @@ -127,12 +127,16 @@ impl IngressForwarders { /// Broadcast request to all peers. fn broadcast_inner(&self, forward: Arc) { - for entry in self.peers.iter() { - let handle = entry.value(); + self.peers.retain(|peer, handle| { if let Err(e) = handle.sender.send(forward.priority(), forward.clone()) { - error!(?e, peer = %handle.info.name, "failed to send forwarding request to peer"); + error!(?e, %peer, "peer channel closed, removing peer"); + + // Remove the peer from the list if sending fails + false + } else { + true } - } + }); } /// Send request only to local forwarder. diff --git a/src/forwarder/tcp.rs b/src/forwarder/tcp.rs index 664e572a..7776dfc6 100644 --- a/src/forwarder/tcp.rs +++ b/src/forwarder/tcp.rs @@ -25,6 +25,9 @@ use std::{ use tokio::sync::mpsc; use tracing::Instrument as _; +/// Default timeout for TCP calls. +const TCP_CALL_TIMEOUT: Duration = Duration::from_secs(2); + pub fn spawn_tcp_forwarder( name: String, address: SocketAddr, @@ -59,6 +62,15 @@ struct ForwarderResponse { span: tracing::Span, } +impl ForwarderResponse { + /// Returns true if the response error is fatal, i.e. [`ReqError::SocketClosed`] + fn is_fatal(&self) -> bool { + let Err(ref err) = self.response else { return false }; + + matches!(err, ReqError::SocketClosed) + } +} + type RequestFut = Pin> + Send>>; /// An TCP forwarder that forwards requests to a peer. @@ -128,7 +140,13 @@ impl TcpForwarder { let start_time = Instant::now(); let socket = client_pool.socket(size); - let response = socket.request(bytes.into()).await; + + // NOTE: Add timeout here so we don't EVER block indefinitely. + let response = tokio::time::timeout(TCP_CALL_TIMEOUT, socket.request(bytes.into())) + .await + .map_err(|_| ReqError::Timeout) + .flatten(); + let elapsed = start_time.elapsed(); let stats = socket.transport_stats(); @@ -199,7 +217,16 @@ impl Future for TcpForwarder { loop { // First poll for completed work. if let Poll::Ready(Some(response)) = this.pending.poll_next_unpin(cx) { + // Check if the response is fatal. If so, exit the forwarder. + let fatal = response.is_fatal(); + this.on_response(response); + + if fatal { + tracing::error!(peer_name = %this.peer_name, peer_addr = %this.peer_address, "fatal response, terminating forwarder"); + return Poll::Ready(()); + } + continue; }