From 82b1da2087c2e8d6d8f2b068e68a5409a932b2d5 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Thu, 7 May 2026 11:42:40 +0000 Subject: [PATCH 1/3] [multicast] add UDP multicast send/recv This adds multicast send/recv to the UDP probe so omicron's a4x2 multicast end-to-end test(s) (using thundermuffin as the send/recv probe) can validate forwarding through real OPTE + softnpu. Multicast is detected at runtime via `IpAddr::is_multicast()`. The sender configures TTL/hops, loop, and the outgoing interface, while the receiver binds wildcard with `SO_REUSEADDR` and joins via `join_multicast_v{4,6}`. The receiver tracks `rx_count` and `loss_count` via an 8-byte big-endian sequence prefix and emits a JSON summary on exit, where exit is non-zero if `rx_count == 0`. `Server::duration` was also added as a wallclock cutoff. TCP + multicast bails out at parse (multicast is a UDP-only concept at the IP layer). Note that workspace edition bumped 2021 to 2024. --- Cargo.toml | 2 +- app/Cargo.toml | 2 +- app/src/main.rs | 58 +++++++++++++++-- app/src/udp.rs | 155 +++++++++++++++++++++++++++++++++++++++------ app/src/util.rs | 25 ++++++++ package/Cargo.toml | 2 +- 6 files changed, 216 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f7df6c6..39da20b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -edition = "2021" +edition = "2024" repository = "https://github.com/oxidecomputer/thundermuffin" [workspace.dependencies] diff --git a/app/Cargo.toml b/app/Cargo.toml index 6bd6371..7452a9f 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "thundermuffin" version = "0.1.0" -edition = "2021" +edition.workspace = true [dependencies] anyhow.workspace = true diff --git a/app/src/main.rs b/app/src/main.rs index 5fb3248..a3448ec 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -1,6 +1,6 @@ -use anyhow::Result; +use anyhow::{Result, bail}; use clap::{Parser, Subcommand, ValueEnum}; -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr}; mod tcp; mod udp; @@ -18,11 +18,13 @@ struct Cli { #[arg(short, long, default_value_t = 4747)] port: u16, - /// Scope to use for IPv6 targets + /// Scope (zone index) to use for IPv6 targets. Also used as the outgoing + /// `IPV6_MULTICAST_IF` interface index when the destination/listen address + /// is an IPv6 multicast address. #[arg(short, long, default_value_t = 0)] scope: u32, - /// How big an individual send bufer is in bytes. + /// How big an individual send buffer is in bytes. #[arg(short, long, default_value_t = 64000)] buffer_size: usize, @@ -30,6 +32,25 @@ struct Cli { #[arg(long, default_value_t = 128)] backlog: i32, + /// Multicast TTL (IPv4) or hop limit (IPv6). Applied only when the + /// destination address is a multicast address. + #[arg(long, default_value_t = 64)] + multicast_ttl: u32, + + /// Enable `IP_MULTICAST_LOOP` / `IPV6_MULTICAST_LOOP`. Disabled by + /// default to avoid the sender seeing its own traffic on hosts that + /// also act as receivers. + #[arg(long)] + multicast_loop: bool, + + /// Outgoing `IP_MULTICAST_IF` for IPv4 multicast, expressed as the + /// IPv4 address bound to the desired interface. For IPv6 use + /// `--scope` (numeric ifindex) instead. When omitted the kernel selects + /// the interface from its routing table, which may surprise on + /// multi-homed hosts. + #[arg(long)] + multicast_iface: Option, + #[command(subcommand)] kind: Participant, } @@ -42,7 +63,8 @@ enum Participant { #[derive(Parser, Debug)] struct Client { - /// IP address of server + /// IP address of the server, or multicast group address when sending + /// multicast traffic. server: IpAddr, /// Determine test duration in terms of time or data. @@ -57,8 +79,18 @@ struct Client { #[derive(Parser, Debug)] struct Server { - /// IP address to listen on + /// IP address to listen on. When this is a multicast address the + /// receiver binds the wildcard address (`0.0.0.0` for IPv4, `[::]` for + /// IPv6) on `--port`, sets `SO_REUSEADDR`, and joins the multicast + /// group on the interface selected by `--multicast-iface` (IPv4) or + /// `--scope` (IPv6). listen: IpAddr, + + /// Wallclock duration (seconds) for UDP receivers. When unset the + /// receiver runs until interrupted, matching the prior behavior. TCP + /// servers ignore this field. + #[arg(short, long)] + duration: Option, } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] @@ -81,6 +113,20 @@ enum Kind { fn main() -> Result<()> { let cli = Cli::parse(); + + // TCP cannot carry multicast. Reject early so callers (e.g. commtest) + // get a clear error rather than a silent unicast send to a multicast + // address. + if matches!(cli.transport, Transport::Tcp) { + let mcast = match cli.kind { + Participant::Client(ref c) => c.server.is_multicast(), + Participant::Server(ref s) => s.listen.is_multicast(), + }; + if mcast { + bail!("multicast is only supported with `--transport udp`"); + } + } + match cli.transport { Transport::Tcp => tcp::run(&cli), Transport::Udp => udp::run(&cli), diff --git a/app/src/udp.rs b/app/src/udp.rs index dc213a5..f50616e 100644 --- a/app/src/udp.rs +++ b/app/src/udp.rs @@ -1,9 +1,13 @@ -use crate::util::{buffer, show_speed}; +use crate::util::{SEQ_LEN, buffer, get_seq, put_seq, show_speed}; use crate::{Cli, Client, Participant, Server}; -use anyhow::Result; +use anyhow::{Context, Result}; use socket2::{Domain, Protocol, Socket, Type}; -use std::mem::MaybeUninit; -use std::net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::io::Read; +use std::net::{ + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, +}; +use std::num::NonZeroU32; +use std::time::{Duration, Instant}; pub(crate) fn run(cli: &Cli) -> Result<()> { match cli.kind { @@ -13,39 +17,66 @@ pub(crate) fn run(cli: &Cli) -> Result<()> { } fn run_client(cli: &Cli, client: &Client) -> Result<()> { + let is_mcast = client.server.is_multicast(); + let (s, sa) = match client.server { IpAddr::V4(addr) => { let sa = SocketAddrV4::new(addr, cli.port); let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + if is_mcast { + s.set_multicast_ttl_v4(cli.multicast_ttl) + .context("set IP_MULTICAST_TTL")?; + s.set_multicast_loop_v4(cli.multicast_loop) + .context("set IP_MULTICAST_LOOP")?; + if let Some(iface) = cli.multicast_iface { + s.set_multicast_if_v4(&iface) + .context("set IP_MULTICAST_IF")?; + } + } (s, SocketAddr::V4(sa)) } IpAddr::V6(addr) => { let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; + if is_mcast { + s.set_multicast_hops_v6(cli.multicast_ttl) + .context("set IPV6_MULTICAST_HOPS")?; + s.set_multicast_loop_v6(cli.multicast_loop) + .context("set IPV6_MULTICAST_LOOP")?; + // ifindex 0 means "kernel picks". Cross-platform behavior of + // explicit 0 is inconsistent, so skip the call entirely. + if let Some(idx) = NonZeroU32::new(cli.scope) { + s.set_multicast_if_v6(idx.get()) + .context("set IPV6_MULTICAST_IF")?; + } + } (s, SocketAddr::V6(sa)) } }; let sa = sa.into(); - let buf = buffer(cli.buffer_size); + let mut buf = buffer(cli.buffer_size); - let start = std::time::Instant::now(); + let start = Instant::now(); let mut interval = 0; let mut interval_sent = 0; - //let mut perf = Vec::new(); let mut total = 0; let mut count = 0; + let mut seq: u64 = 0; loop { + if is_mcast { + put_seq(&mut buf, seq); + seq = seq.wrapping_add(1); + } let n = s.send_to(&buf, &sa)?; interval_sent += n * 8; - let t = std::time::Instant::now(); + let t = Instant::now(); let d = t.duration_since(start); let ds = d.as_secs(); if ds > interval { - //perf.push(interval_sent); interval = ds; total += interval_sent; print!("[{}] ", count); @@ -65,35 +96,105 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { } fn run_server(cli: &Cli, server: &Server) -> Result<()> { + let is_mcast = server.listen.is_multicast(); + let s = match server.listen { IpAddr::V4(addr) => { - let sa = SocketAddrV4::new(addr, cli.port); let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; - s.bind(&sa.into())?; + if is_mcast { + s.set_reuse_address(true).context("SO_REUSEADDR")?; + let bind = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, cli.port); + s.bind(&bind.into())?; + let iface = + cli.multicast_iface.unwrap_or(Ipv4Addr::UNSPECIFIED); + s.join_multicast_v4(&addr, &iface) + .context("IP_ADD_MEMBERSHIP")?; + } else { + let sa = SocketAddrV4::new(addr, cli.port); + s.bind(&sa.into())?; + } s } IpAddr::V6(addr) => { - let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; - s.bind(&sa.into())?; + if is_mcast { + s.set_reuse_address(true).context("SO_REUSEADDR")?; + let bind = + SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, cli.port, 0, 0); + s.bind(&bind.into())?; + s.join_multicast_v6(&addr, cli.scope) + .context("IPV6_ADD_MEMBERSHIP")?; + } else { + let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); + s.bind(&sa.into())?; + } s } }; + // When `--duration` is set, install a short read timeout so the recv + // loop wakes up periodically and can observe the wallclock deadline + // even if no datagrams are arriving. + if server.duration.is_some() { + s.set_read_timeout(Some(Duration::from_millis(500))) + .context("SO_RCVTIMEO")?; + } + let mut interval = 0; let mut interval_sent = 0; - //let mut perf = Vec::new(); let mut count = 0; - let start = std::time::Instant::now(); + let start = Instant::now(); + let deadline = server.duration.map(|d| start + Duration::from_secs(d)); + + let mut rx_count: u64 = 0; + let mut loss_count: u64 = 0; + let mut next_expected: Option = None; + let mut total_bits: u64 = 0; + + let mut buf = vec![0u8; cli.buffer_size]; loop { - let sz = cli.buffer_size; - let mut buf = vec![MaybeUninit::::uninit(); sz]; - let (n, _) = s.recv_from(&mut buf)?; + if let Some(deadline) = deadline + && Instant::now() >= deadline + { + break; + } + + // Read::read on `&Socket` calls into the same `recv` syscall as + // `recv_from`, but returns a `&[u8]`-shaped result. The source address + // is discarded either way. + let n = match Read::read(&mut &s, &mut buf) { + Ok(n) => n, + Err(e) + if matches!( + e.kind(), + std::io::ErrorKind::WouldBlock + | std::io::ErrorKind::TimedOut + ) => + { + continue; + } + Err(e) => return Err(e.into()), + }; + + rx_count += 1; interval_sent += n * 8; - let t = std::time::Instant::now(); + total_bits += (n * 8) as u64; + + if is_mcast + && n >= SEQ_LEN + && let Some(seq) = get_seq(&buf[..n]) + { + let expected = next_expected.unwrap_or(seq); + if seq > expected { + loss_count += seq - expected; + } + next_expected = Some(expected.max(seq.wrapping_add(1))); + } + + let t = Instant::now(); let d = t.duration_since(start); let ds = d.as_secs(); if ds > interval { @@ -104,4 +205,20 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { interval_sent = 0; } } + + let elapsed = Instant::now().duration_since(start).as_secs_f64(); + let bps = if elapsed > 0.0 { + total_bits as f64 / elapsed + } else { + 0.0 + }; + + // Machine-readable summary so commtest can parse the result without + // scraping `show_speed` output. + println!("{{\"rx\":{rx_count},\"loss\":{loss_count},\"bps\":{bps:.3}}}"); + + if rx_count == 0 { + anyhow::bail!("received zero datagrams"); + } + Ok(()) } diff --git a/app/src/util.rs b/app/src/util.rs index 7e1ab41..cd18fb6 100644 --- a/app/src/util.rs +++ b/app/src/util.rs @@ -1,5 +1,9 @@ const MUFFIN: &[u8] = b"muffin "; +/// Length of the big-endian `u64` sequence-number prefix that precedes the +/// muffin payload when sequence tracking is enabled. +pub const SEQ_LEN: usize = 8; + pub fn show_speed(mut s: f64) { if s > 1024.0 { s /= 1024.0; @@ -19,6 +23,10 @@ pub fn show_speed(mut s: f64) { } } +/// Build a send buffer of `size` bytes. The first [`SEQ_LEN`] bytes are +/// reserved for the sequence-number prefix (rewritten on each send) when +/// sequence tracking is enabled. The rest of the buffer is filled with the +/// repeating `muffin` filler so wire dumps stay recognizable. pub fn buffer(size: usize) -> Vec { let mut buf = vec![0u8; size]; for i in 0..buf.len() { @@ -26,3 +34,20 @@ pub fn buffer(size: usize) -> Vec { } buf } + +/// Write the big-endian sequence-number prefix into the first [`SEQ_LEN`] +/// bytes of `buf`. Caller must ensure `buf.len() >= SEQ_LEN`. +pub fn put_seq(buf: &mut [u8], seq: u64) { + buf[..SEQ_LEN].copy_from_slice(&seq.to_be_bytes()); +} + +/// Read the big-endian sequence-number prefix from the first [`SEQ_LEN`] +/// bytes of `buf`, returning `None` if the slice is too short. +pub fn get_seq(buf: &[u8]) -> Option { + if buf.len() < SEQ_LEN { + return None; + } + let mut bytes = [0u8; SEQ_LEN]; + bytes.copy_from_slice(&buf[..SEQ_LEN]); + Some(u64::from_be_bytes(bytes)) +} diff --git a/package/Cargo.toml b/package/Cargo.toml index 4e539e0..36a0253 100644 --- a/package/Cargo.toml +++ b/package/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "thundermuffin-package" version = "0.1.0" -edition = "2021" +edition.workspace = true [dependencies] omicron-zone-package = "0.9.1" From e7ce5975d53f91dbd5696afa959698d8924766ba Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Fri, 8 May 2026 01:13:34 +0000 Subject: [PATCH 2/3] [review, ssm] address feedback and extend Includes: - renames and generalizes `--ttl` for mcast and unicast needs - receivers now bind directly to the group addr with SO_REUSEADDR - the 8-byte BE seq prefix is now written/parsed on every UDP packet so loss/rx counts work for unicast too - recv-loop duration check uses start.elapsed() instead of a precomputed deadline - a bigger change: adding SSM (RFC 4607) for v4 + v6 via a repeatable `--multicast-source` on Server. v4 calls socket2's join_ssm_v4 once per source; however, for v6 we have to use a POSIX setsourcefilter (socket2 0.6.3 still has no v6 SSM API). Sources must parse as unicast at clap time, etc. - [deps] This bumps socket2 to 0.6.3 (set_nodelay -> set_tcp_nodelay, set_ttl -> set_ttl_v4) and adds libc dep for the setsourcefilter wrapper --- Cargo.lock | 97 +++--------------------------------- Cargo.toml | 3 +- app/Cargo.toml | 1 + app/src/main.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++---- app/src/ssm.rs | 93 ++++++++++++++++++++++++++++++++++ app/src/tcp.rs | 2 +- app/src/udp.rs | 102 +++++++++++++++++++++++++++----------- 7 files changed, 297 insertions(+), 130 deletions(-) create mode 100644 app/src/ssm.rs diff --git a/Cargo.lock b/Cargo.lock index c623a45..bf66e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,16 +1212,6 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.10" @@ -1234,12 +1224,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1359,7 +1349,8 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "socket2 0.4.10", + "libc", + "socket2 0.6.3", ] [[package]] @@ -1393,7 +1384,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -1850,15 +1841,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", -] - [[package]] name = "windows-sys" version = "0.61.2" @@ -1892,30 +1874,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -1928,12 +1893,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -1946,12 +1905,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -1964,24 +1917,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -1994,12 +1935,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -2012,12 +1947,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -2030,12 +1959,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -2048,12 +1971,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index 39da20b..7035c82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,5 +20,6 @@ anyhow = "1.0.101" cargo_metadata = "0.18" clap = { version = "4.5.59", features = ["derive"] } indoc = "2" -socket2 = { version = "0.4.10", features = ["all"] } +libc = "0.2" +socket2 = { version = "0.6.3", features = ["all"] } toml = "0.8" diff --git a/app/Cargo.toml b/app/Cargo.toml index 7452a9f..a708d25 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -6,4 +6,5 @@ edition.workspace = true [dependencies] anyhow.workspace = true clap.workspace = true +libc.workspace = true socket2.workspace = true diff --git a/app/src/main.rs b/app/src/main.rs index a3448ec..9aeb27b 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -1,7 +1,9 @@ use anyhow::{Result, bail}; -use clap::{Parser, Subcommand, ValueEnum}; -use std::net::{IpAddr, Ipv4Addr}; +use clap::error::ErrorKind; +use clap::{CommandFactory, Parser, Subcommand, ValueEnum}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +mod ssm; mod tcp; mod udp; mod util; @@ -32,10 +34,11 @@ struct Cli { #[arg(long, default_value_t = 128)] backlog: i32, - /// Multicast TTL (IPv4) or hop limit (IPv6). Applied only when the - /// destination address is a multicast address. + /// TTL (IPv4) or hop limit (IPv6) for outgoing datagrams. For multicast + /// destinations this sets `IP_MULTICAST_TTL` / `IPV6_MULTICAST_HOPS`; + /// otherwise, it sets `IP_TTL` / `IPV6_UNICAST_HOPS`. #[arg(long, default_value_t = 64)] - multicast_ttl: u32, + ttl: u32, /// Enable `IP_MULTICAST_LOOP` / `IPV6_MULTICAST_LOOP`. Disabled by /// default to avoid the sender seeing its own traffic on hosts that @@ -80,10 +83,9 @@ struct Client { #[derive(Parser, Debug)] struct Server { /// IP address to listen on. When this is a multicast address the - /// receiver binds the wildcard address (`0.0.0.0` for IPv4, `[::]` for - /// IPv6) on `--port`, sets `SO_REUSEADDR`, and joins the multicast - /// group on the interface selected by `--multicast-iface` (IPv4) or - /// `--scope` (IPv6). + /// receiver binds directly to the group on `--port`, sets `SO_REUSEADDR` + /// (so multiple receivers can co-bind), and joins the group on the + /// interface selected by `--multicast-iface` (IPv4) or `--scope` (IPv6). listen: IpAddr, /// Wallclock duration (seconds) for UDP receivers. When unset the @@ -91,6 +93,109 @@ struct Server { /// servers ignore this field. #[arg(short, long)] duration: Option, + + /// Source addresses for [source-specific multicast][rfc4607] (SSM). + /// Repeatable: pass `--multicast-source` once per source. With one + /// source it's a `(S, G)` join. With N sources it's a + /// `({S1, S2, ...}, G)` INCLUDE filter. The kernel filters incoming + /// datagrams to only those sourced from one of these addresses + /// (`IP_ADD_SOURCE_MEMBERSHIP` per source on IPv4, `setsourcefilter` + /// with the full slist on IPv6). Each must be a unicast address with + /// family matching `listen`, and `listen` must be a multicast address. + /// + /// [rfc4607]: https://datatracker.ietf.org/doc/html/rfc4607 + #[arg(long, value_parser = parse_unicast_ipaddr)] + multicast_source: Vec, +} + +/// Reject obviously-non-unicast addresses (multicast, unspecified, loopback) +/// at clap parse time so SSM misconfiguration surfaces as a CLI error rather +/// than an opaque kernel `EADDRNOTAVAIL` after socket setup. +fn parse_unicast_ipaddr(s: &str) -> Result { + let addr: IpAddr = s + .parse() + .map_err(|e: std::net::AddrParseError| e.to_string())?; + if addr.is_multicast() || addr.is_unspecified() || addr.is_loopback() { + return Err(format!("must be a unicast address (got `{addr}`)")); + } + Ok(addr) +} + +impl Server { + fn validate( + &self, + cli: &Cli, + cmd: &mut clap::Command, + ) -> Result<(), clap::Error> { + if self.multicast_source.is_empty() { + return Ok(()); + } + if !self.listen.is_multicast() { + return Err(cmd.error( + ErrorKind::ValueValidation, + format!( + "`--multicast-source` requires `listen` to be a multicast \ + address (got `{}`)", + self.listen + ), + )); + } + for source in &self.multicast_source { + if source.is_ipv4() != self.listen.is_ipv4() { + return Err(cmd.error( + ErrorKind::ValueValidation, + format!( + "`--multicast-source` family must match the `listen` \ + address family (got `{source}` vs listen `{}`)", + self.listen + ), + )); + } + } + // SSM joins must be pinned to a specific interface. With + // kernel-picked interface (`INADDR_ANY` for v4, `ifindex = 0` for + // v6) the join can silently land on an interface where the source + // isn't reachable and deliver no traffic, which looks identical + // to a real network regression in CI. + match self.listen { + IpAddr::V4(_) if cli.multicast_iface.is_none() => Err(cmd.error( + ErrorKind::MissingRequiredArgument, + "`--multicast-source` (IPv4) requires `--multicast-iface` \ + to pin the SSM join to a specific interface", + )), + IpAddr::V6(_) if cli.scope == 0 => Err(cmd.error( + ErrorKind::MissingRequiredArgument, + "`--multicast-source` (IPv6) requires non-zero `--scope` \ + (interface index) to pin the SSM join to a specific interface", + )), + _ => Ok(()), + } + } + + /// IPv4 SSM sources. `validate` guarantees that, when `listen` is v4 + /// and a multicast address, every configured source is also v4, so this + /// never silently drops a v6 source meant for the v4 path. + pub(crate) fn multicast_source_v4(&self) -> Vec { + self.multicast_source + .iter() + .filter_map(|source| match source { + IpAddr::V4(addr) => Some(*addr), + IpAddr::V6(_) => None, + }) + .collect() + } + + /// IPv6 SSM sources. See [`Self::multicast_source_v4`] for the + /// corresponding invariant. + pub(crate) fn multicast_source_v6(&self) -> Vec { + self.multicast_source + .iter() + .filter_map(|source| match source { + IpAddr::V6(addr) => Some(*addr), + IpAddr::V4(_) => None, + }) + .collect() + } } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] @@ -127,6 +232,12 @@ fn main() -> Result<()> { } } + if let Participant::Server(ref server) = cli.kind + && let Err(e) = server.validate(&cli, &mut Cli::command()) + { + e.exit(); + } + match cli.transport { Transport::Tcp => tcp::run(&cli), Transport::Udp => udp::run(&cli), diff --git a/app/src/ssm.rs b/app/src/ssm.rs new file mode 100644 index 0000000..9fafe94 --- /dev/null +++ b/app/src/ssm.rs @@ -0,0 +1,93 @@ +//! Source-specific multicast (SSM) functionality that isn't covered by `socket2`. +//! +//! `socket2` exposes `Socket::join_ssm_v4` for IPv4 SSM joins via +//! `IP_ADD_SOURCE_MEMBERSHIP`, but has no IPv6 equivalent in any current +//! release. For IPv6 we drop to POSIX [`setsourcefilter(3SOCKET)`] +//! (RFC 3678), which lives in `libsocket` on illumos and `libc` on Linux. +//! +//! [`setsourcefilter(3SOCKET)`]: https://illumos.org/man/3SOCKET/setsourcefilter + +use socket2::Socket; +use std::net::Ipv6Addr; +use std::os::fd::AsRawFd; + +// RFC 3678 / POSIX include-mode source filter. Standardized as `1` on every +// platform that exposes `setsourcefilter`. +const MCAST_INCLUDE: u32 = 1; + +unsafe extern "C" { + fn setsourcefilter( + socket: libc::c_int, + interface: u32, + group: *const libc::sockaddr, + grouplen: libc::socklen_t, + fmode: u32, + numsrc: u32, + slist: *const libc::sockaddr_storage, + ) -> libc::c_int; +} + +/// Join `socket` to the IPv6 source-specific multicast group `group` on +/// interface `ifindex`, with INCLUDE-mode filter listing `sources`. With +/// one entry it's a `(S, G)` join; with N it's a `({S1, ..., Sn}, G)` +/// channel. Caller must pass a non-empty slice (the empty-sources case +/// is the any-source `IPV6_ADD_MEMBERSHIP` join, handled elsewhere). +pub fn join_ssm_v6( + socket: &Socket, + group: &Ipv6Addr, + sources: &[Ipv6Addr], + ifindex: u32, +) -> std::io::Result<()> { + // Construct a `sockaddr_in6` with only family + address populated. All + // other fields (port, flowinfo, scope, platform-specific extras) stay + // zeroed-out. + let make_in6 = |addr: &Ipv6Addr| -> libc::sockaddr_in6 { + // All-zero is a valid bit pattern for `sockaddr_in6`. + let mut sa: libc::sockaddr_in6 = unsafe { std::mem::zeroed() }; + sa.sin6_family = libc::AF_INET6 as libc::sa_family_t; + sa.sin6_addr.s6_addr = addr.octets(); + sa + }; + + let group_sa = make_in6(group); + + // Build the slist as a `Vec`. `sockaddr_storage` is + // sized and aligned to hold any sockaddr family, so a `sockaddr_in6` + // lands at offset 0 with correct alignment. `ptr::write` only + // overwrites the leading `size_of::()` bytes per slot; + // trailing bytes stay zero-initialized. + let slist: Vec = sources + .iter() + .map(|source| { + let mut storage: libc::sockaddr_storage = + unsafe { std::mem::zeroed() }; + unsafe { + std::ptr::write( + &mut storage as *mut _ as *mut libc::sockaddr_in6, + make_in6(source), + ); + } + storage + }) + .collect(); + + // `setsourcefilter` is an FFI call. The pointers reference valid sockaddr + // structures of the indicated lengths and live for the duration of the + // call. `numsrc = sources.len()` matches the `slist` length (each entry + // is one `sockaddr_in6` written into a `sockaddr_storage`). + let ret = unsafe { + setsourcefilter( + socket.as_raw_fd(), + ifindex, + &group_sa as *const _ as *const libc::sockaddr, + std::mem::size_of::() as libc::socklen_t, + MCAST_INCLUDE, + sources.len() as u32, + slist.as_ptr(), + ) + }; + if ret < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} diff --git a/app/src/tcp.rs b/app/src/tcp.rs index 392efb3..15a7ac2 100644 --- a/app/src/tcp.rs +++ b/app/src/tcp.rs @@ -87,7 +87,7 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { }; //TODO s.set_quickack(true)?; - s.set_nodelay(true)?; + s.set_tcp_nodelay(true)?; s.listen(cli.backlog)?; loop { diff --git a/app/src/udp.rs b/app/src/udp.rs index f50616e..e0718ad 100644 --- a/app/src/udp.rs +++ b/app/src/udp.rs @@ -1,3 +1,4 @@ +use crate::ssm::join_ssm_v6; use crate::util::{SEQ_LEN, buffer, get_seq, put_seq, show_speed}; use crate::{Cli, Client, Participant, Server}; use anyhow::{Context, Result}; @@ -25,7 +26,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; if is_mcast { - s.set_multicast_ttl_v4(cli.multicast_ttl) + s.set_multicast_ttl_v4(cli.ttl) .context("set IP_MULTICAST_TTL")?; s.set_multicast_loop_v4(cli.multicast_loop) .context("set IP_MULTICAST_LOOP")?; @@ -33,6 +34,8 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { s.set_multicast_if_v4(&iface) .context("set IP_MULTICAST_IF")?; } + } else { + s.set_ttl_v4(cli.ttl).context("set IP_TTL")?; } (s, SocketAddr::V4(sa)) } @@ -41,7 +44,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; if is_mcast { - s.set_multicast_hops_v6(cli.multicast_ttl) + s.set_multicast_hops_v6(cli.ttl) .context("set IPV6_MULTICAST_HOPS")?; s.set_multicast_loop_v6(cli.multicast_loop) .context("set IPV6_MULTICAST_LOOP")?; @@ -51,6 +54,9 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { s.set_multicast_if_v6(idx.get()) .context("set IPV6_MULTICAST_IF")?; } + } else { + s.set_unicast_hops_v6(cli.ttl) + .context("set IPV6_UNICAST_HOPS")?; } (s, SocketAddr::V6(sa)) } @@ -66,10 +72,8 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { let mut count = 0; let mut seq: u64 = 0; loop { - if is_mcast { - put_seq(&mut buf, seq); - seq = seq.wrapping_add(1); - } + put_seq(&mut buf, seq); + seq = seq.wrapping_add(1); let n = s.send_to(&buf, &sa)?; interval_sent += n * 8; @@ -95,6 +99,47 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { Ok(()) } +/// Join `socket` to the IPv4 multicast `group` on `iface`. With `sources` +/// non-empty it's an SSM include-mode join, one +/// `IP_ADD_SOURCE_MEMBERSHIP` per source. Otherwise it's an any-source +/// `(*, G)` join via `IP_ADD_MEMBERSHIP`. +fn join_v4( + socket: &Socket, + group: Ipv4Addr, + iface: Ipv4Addr, + sources: &[Ipv4Addr], +) -> Result<()> { + if sources.is_empty() { + return socket + .join_multicast_v4(&group, &iface) + .context("IP_ADD_MEMBERSHIP"); + } + for source in sources { + socket + .join_ssm_v4(&group, source, &iface) + .context("IP_ADD_SOURCE_MEMBERSHIP")?; + } + Ok(()) +} + +/// Join `socket` to the IPv6 multicast `group` on `ifindex`. With `sources` +/// non-empty it's an SSM include-mode join via `setsourcefilter` (one call +/// with the full slist). Otherwise it's an any-source `(*, G)` join via +/// `IPV6_ADD_MEMBERSHIP`. +fn join_v6( + socket: &Socket, + group: Ipv6Addr, + ifindex: u32, + sources: &[Ipv6Addr], +) -> Result<()> { + if sources.is_empty() { + return socket + .join_multicast_v6(&group, ifindex) + .context("IPV6_ADD_MEMBERSHIP"); + } + join_ssm_v6(socket, &group, sources, ifindex).context("setsourcefilter") +} + fn run_server(cli: &Cli, server: &Server) -> Result<()> { let is_mcast = server.listen.is_multicast(); @@ -102,33 +147,33 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { IpAddr::V4(addr) => { let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + // Bind directly to the group address for multicast so the socket + // only receives datagrams sent to that group. + // + // Note: `SO_REUSEADDR` lets multiple receivers co-bind to the same + // group/port. + let sa = SocketAddrV4::new(addr, cli.port); if is_mcast { s.set_reuse_address(true).context("SO_REUSEADDR")?; - let bind = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, cli.port); - s.bind(&bind.into())?; + } + s.bind(&sa.into())?; + if is_mcast { let iface = cli.multicast_iface.unwrap_or(Ipv4Addr::UNSPECIFIED); - s.join_multicast_v4(&addr, &iface) - .context("IP_ADD_MEMBERSHIP")?; - } else { - let sa = SocketAddrV4::new(addr, cli.port); - s.bind(&sa.into())?; + join_v4(&s, addr, iface, &server.multicast_source_v4())?; } s } IpAddr::V6(addr) => { let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; + let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); if is_mcast { s.set_reuse_address(true).context("SO_REUSEADDR")?; - let bind = - SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, cli.port, 0, 0); - s.bind(&bind.into())?; - s.join_multicast_v6(&addr, cli.scope) - .context("IPV6_ADD_MEMBERSHIP")?; - } else { - let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); - s.bind(&sa.into())?; + } + s.bind(&sa.into())?; + if is_mcast { + join_v6(&s, addr, cli.scope, &server.multicast_source_v6())?; } s } @@ -146,7 +191,6 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { let mut interval_sent = 0; let mut count = 0; let start = Instant::now(); - let deadline = server.duration.map(|d| start + Duration::from_secs(d)); let mut rx_count: u64 = 0; let mut loss_count: u64 = 0; @@ -156,15 +200,16 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { let mut buf = vec![0u8; cli.buffer_size]; loop { - if let Some(deadline) = deadline - && Instant::now() >= deadline + if let Some(duration_secs) = server.duration + && start.elapsed().as_secs() >= duration_secs { break; } - // Read::read on `&Socket` calls into the same `recv` syscall as - // `recv_from`, but returns a `&[u8]`-shaped result. The source address - // is discarded either way. + // The source address is unused here. `Read::read` on `&Socket` routes + // through the same `recv` syscall as `recv_from` but operates on a + // plain `&mut [u8]`, avoiding the `MaybeUninit` buffer that + // socket2's typed datagram APIs require. let n = match Read::read(&mut &s, &mut buf) { Ok(n) => n, Err(e) @@ -183,8 +228,7 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { interval_sent += n * 8; total_bits += (n * 8) as u64; - if is_mcast - && n >= SEQ_LEN + if n >= SEQ_LEN && let Some(seq) = get_seq(&buf[..n]) { let expected = next_expected.unwrap_or(seq); From 473a90f8081e99f1b77f2625d97ab1f7817b3468 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Fri, 22 May 2026 16:12:03 +0000 Subject: [PATCH 3/3] [review] address feedback --- app/src/main.rs | 8 ++++---- app/src/ssm.rs | 26 ++++++++++++++++---------- app/src/udp.rs | 23 ++++++++++++++++++----- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/app/src/main.rs b/app/src/main.rs index 9aeb27b..a69e3fb 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -36,9 +36,10 @@ struct Cli { /// TTL (IPv4) or hop limit (IPv6) for outgoing datagrams. For multicast /// destinations this sets `IP_MULTICAST_TTL` / `IPV6_MULTICAST_HOPS`; - /// otherwise, it sets `IP_TTL` / `IPV6_UNICAST_HOPS`. + /// otherwise, it sets `IP_TTL` / `IPV6_UNICAST_HOPS`. The on-wire field + /// is 8 bits in both IPv4 and IPv6. #[arg(long, default_value_t = 64)] - ttl: u32, + ttl: u8, /// Enable `IP_MULTICAST_LOOP` / `IPV6_MULTICAST_LOOP`. Disabled by /// default to avoid the sender seeing its own traffic on hosts that @@ -89,8 +90,7 @@ struct Server { listen: IpAddr, /// Wallclock duration (seconds) for UDP receivers. When unset the - /// receiver runs until interrupted, matching the prior behavior. TCP - /// servers ignore this field. + /// receiver runs until interrupted. TCP servers ignore this field. #[arg(short, long)] duration: Option, diff --git a/app/src/ssm.rs b/app/src/ssm.rs index 9fafe94..3272311 100644 --- a/app/src/ssm.rs +++ b/app/src/ssm.rs @@ -39,14 +39,20 @@ pub fn join_ssm_v6( ifindex: u32, ) -> std::io::Result<()> { // Construct a `sockaddr_in6` with only family + address populated. All - // other fields (port, flowinfo, scope, platform-specific extras) stay - // zeroed-out. + // other fields (port, flowinfo, scope, platform-specific extras) are + // explicitly zeroed. let make_in6 = |addr: &Ipv6Addr| -> libc::sockaddr_in6 { - // All-zero is a valid bit pattern for `sockaddr_in6`. - let mut sa: libc::sockaddr_in6 = unsafe { std::mem::zeroed() }; - sa.sin6_family = libc::AF_INET6 as libc::sa_family_t; - sa.sin6_addr.s6_addr = addr.octets(); - sa + libc::sockaddr_in6 { + sin6_family: libc::AF_INET6 as libc::sa_family_t, + sin6_port: 0, + sin6_flowinfo: 0, + sin6_addr: libc::in6_addr { + s6_addr: addr.octets(), + }, + sin6_scope_id: 0, + #[cfg(any(target_os = "illumos", target_os = "solaris"))] + __sin6_src_id: 0, + } }; let group_sa = make_in6(group); @@ -73,8 +79,8 @@ pub fn join_ssm_v6( // `setsourcefilter` is an FFI call. The pointers reference valid sockaddr // structures of the indicated lengths and live for the duration of the - // call. `numsrc = sources.len()` matches the `slist` length (each entry - // is one `sockaddr_in6` written into a `sockaddr_storage`). + // call. `numsrc` is taken from `slist.len()` so both arguments unambiguously + // refer to the same buffer. let ret = unsafe { setsourcefilter( socket.as_raw_fd(), @@ -82,7 +88,7 @@ pub fn join_ssm_v6( &group_sa as *const _ as *const libc::sockaddr, std::mem::size_of::() as libc::socklen_t, MCAST_INCLUDE, - sources.len() as u32, + slist.len() as u32, slist.as_ptr(), ) }; diff --git a/app/src/udp.rs b/app/src/udp.rs index e0718ad..8b462f0 100644 --- a/app/src/udp.rs +++ b/app/src/udp.rs @@ -26,7 +26,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; if is_mcast { - s.set_multicast_ttl_v4(cli.ttl) + s.set_multicast_ttl_v4(u32::from(cli.ttl)) .context("set IP_MULTICAST_TTL")?; s.set_multicast_loop_v4(cli.multicast_loop) .context("set IP_MULTICAST_LOOP")?; @@ -35,7 +35,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { .context("set IP_MULTICAST_IF")?; } } else { - s.set_ttl_v4(cli.ttl).context("set IP_TTL")?; + s.set_ttl_v4(u32::from(cli.ttl)).context("set IP_TTL")?; } (s, SocketAddr::V4(sa)) } @@ -44,7 +44,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; if is_mcast { - s.set_multicast_hops_v6(cli.ttl) + s.set_multicast_hops_v6(u32::from(cli.ttl)) .context("set IPV6_MULTICAST_HOPS")?; s.set_multicast_loop_v6(cli.multicast_loop) .context("set IPV6_MULTICAST_LOOP")?; @@ -55,7 +55,7 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { .context("set IPV6_MULTICAST_IF")?; } } else { - s.set_unicast_hops_v6(cli.ttl) + s.set_unicast_hops_v6(u32::from(cli.ttl)) .context("set IPV6_UNICAST_HOPS")?; } (s, SocketAddr::V6(sa)) @@ -194,6 +194,11 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { let mut rx_count: u64 = 0; let mut loss_count: u64 = 0; + // `out_of_order_count` covers any datagram whose sequence is below the + // running high-water mark. With only a sequence prefix the receiver + // cannot distinguish a reordered delivery from a duplicate, so both + // land in this bucket. + let mut out_of_order_count: u64 = 0; let mut next_expected: Option = None; let mut total_bits: u64 = 0; @@ -228,12 +233,18 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { interval_sent += n * 8; total_bits += (n * 8) as u64; + // The sender's sequence is a `u64` counter starting at 0. At line + // rate that's hundreds of years to wrap, so the receiver makes no + // attempt to detect or compensate for wraparound. `wrapping_add` is + // used purely to avoid an overflow panic on adversarial inputs. if n >= SEQ_LEN && let Some(seq) = get_seq(&buf[..n]) { let expected = next_expected.unwrap_or(seq); if seq > expected { loss_count += seq - expected; + } else if seq < expected { + out_of_order_count += 1; } next_expected = Some(expected.max(seq.wrapping_add(1))); } @@ -259,7 +270,9 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { // Machine-readable summary so commtest can parse the result without // scraping `show_speed` output. - println!("{{\"rx\":{rx_count},\"loss\":{loss_count},\"bps\":{bps:.3}}}"); + println!( + "{{\"rx\":{rx_count},\"loss\":{loss_count},\"out_of_order\":{out_of_order_count},\"bps\":{bps:.3}}}" + ); if rx_count == 0 { anyhow::bail!("received zero datagrams");