From f6dd88b7ad828bccc7665a8436da43bd9ade0c5b Mon Sep 17 00:00:00 2001 From: Orvaxis <198349614+Orvaxis@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:54:26 +0800 Subject: [PATCH 1/3] refactor: Avoid using tokio locks to reduce performance loss --- src/lib.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ba01b71..5a761f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ use tokio::{ pub(crate) type PacketSender = UnboundedSender; pub(crate) type PacketReceiver = UnboundedReceiver; -pub(crate) type SessionCollection = std::sync::Arc>>; +pub(crate) type SessionCollection = AHashMap; mod error; mod packet; @@ -105,7 +105,8 @@ fn run( mut device: Device, accept_sender: UnboundedSender, ) -> JoinHandle> { - let sessions: SessionCollection = std::sync::Arc::new(tokio::sync::Mutex::new(AHashMap::new())); + let mut sessions: SessionCollection = AHashMap::new(); + let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::(); let pi = config.packet_information; let offset = if pi && cfg!(unix) { 4 } else { 0 }; let mut buffer = vec![0_u8; u16::MAX as usize + offset]; @@ -115,8 +116,7 @@ fn run( loop { select! { Ok(n) = device.read(&mut buffer) => { - let u = up_pkt_sender.clone(); - if let Err(e) = process_device_read(&buffer[offset..n], sessions.clone(), u, &config, &accept_sender).await { + if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions,&session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { let io_err: std::io::Error = e.into(); if io_err.kind() == std::io::ErrorKind::ConnectionRefused { log::trace!("Received junk data: {io_err}"); @@ -125,6 +125,12 @@ fn run( } } } + network_tuple = session_remove_rx.recv() => { + if let Some(network_tuple) = network_tuple { + sessions.remove(&network_tuple); + log::debug!("session destroyed: {network_tuple}"); + } + } Some(packet) = up_pkt_receiver.recv() => { process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?; } @@ -135,8 +141,9 @@ fn run( async fn process_device_read( data: &[u8], - sessions: SessionCollection, - up_pkt_sender: PacketSender, + sessions: &mut SessionCollection, + session_remove_tx: &UnboundedSender, + up_pkt_sender: &PacketSender, config: &IpStackConfig, accept_sender: &UnboundedSender, ) -> Result<()> { @@ -153,15 +160,14 @@ async fn process_device_read( packet.payload.unwrap_or_default(), &packet.ip, config.mtu, - up_pkt_sender, + up_pkt_sender.clone(), )); accept_sender.send(stream)?; return Ok(()); } - let sessions_clone = sessions.clone(); let network_tuple = packet.network_tuple(); - match sessions.lock().await.entry(network_tuple) { + match sessions.entry(network_tuple) { std::collections::hash_map::Entry::Occupied(entry) => { let len = packet.payload.as_ref().map(|p| p.len()).unwrap_or(0); log::trace!("packet sent to stream: {network_tuple} len {len}"); @@ -169,11 +175,11 @@ async fn process_device_read( } std::collections::hash_map::Entry::Vacant(entry) => { let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - let ip_stack_stream = create_stream(packet, config, up_pkt_sender, Some(tx))?; + let ip_stack_stream = create_stream(packet, config, up_pkt_sender.clone(), Some(tx))?; + let session_remove_tx = session_remove_tx.clone(); tokio::spawn(async move { rx.await.ok(); - sessions_clone.lock().await.remove(&network_tuple); - log::debug!("session destroyed: {network_tuple}"); + session_remove_tx.send(network_tuple).ok(); }); let packet_sender = ip_stack_stream.stream_sender()?; accept_sender.send(ip_stack_stream)?; From 2b3d8e1ae00255b51888bcfbcc22828a9284eff1 Mon Sep 17 00:00:00 2001 From: Orvaxis <198349614+Orvaxis@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:34:07 +0800 Subject: [PATCH 2/3] refactor: code optimization --- src/lib.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5a761f1..bd7a0e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ fn run( loop { select! { Ok(n) = device.read(&mut buffer) => { - if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions,&session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { + if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { let io_err: std::io::Error = e.into(); if io_err.kind() == std::io::ErrorKind::ConnectionRefused { log::trace!("Received junk data: {io_err}"); @@ -125,11 +125,9 @@ fn run( } } } - network_tuple = session_remove_rx.recv() => { - if let Some(network_tuple) = network_tuple { - sessions.remove(&network_tuple); - log::debug!("session destroyed: {network_tuple}"); - } + Some(network_tuple) = session_remove_rx.recv() => { + sessions.remove(&network_tuple); + log::debug!("session destroyed: {network_tuple}"); } Some(packet) = up_pkt_receiver.recv() => { process_upstream_recv(packet, &mut device, #[cfg(unix)]pi).await?; From 42264121b226729a8f86df42ea71a20a50afdefb Mon Sep 17 00:00:00 2001 From: Sajjad Pourali Date: Wed, 8 Oct 2025 21:43:10 -0700 Subject: [PATCH 3/3] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index bd7a0e1..1982d95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -177,7 +177,9 @@ async fn process_device_read( let session_remove_tx = session_remove_tx.clone(); tokio::spawn(async move { rx.await.ok(); - session_remove_tx.send(network_tuple).ok(); + if let Err(e) = session_remove_tx.send(network_tuple) { + log::error!("Failed to send session removal for {network_tuple}: {e}"); + } }); let packet_sender = ip_stack_stream.stream_sender()?; accept_sender.send(ip_stack_stream)?;