diff --git a/Cargo.lock b/Cargo.lock index c623a45..bf66e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,16 +1212,6 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.10" @@ -1234,12 +1224,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1359,7 +1349,8 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", - "socket2 0.4.10", + "libc", + "socket2 0.6.3", ] [[package]] @@ -1393,7 +1384,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -1850,15 +1841,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", -] - [[package]] name = "windows-sys" version = "0.61.2" @@ -1892,30 +1874,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -1928,12 +1893,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -1946,12 +1905,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -1964,24 +1917,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -1994,12 +1935,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -2012,12 +1947,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -2030,12 +1959,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -2048,12 +1971,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index f7df6c6..7035c82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -edition = "2021" +edition = "2024" repository = "https://github.com/oxidecomputer/thundermuffin" [workspace.dependencies] @@ -20,5 +20,6 @@ anyhow = "1.0.101" cargo_metadata = "0.18" clap = { version = "4.5.59", features = ["derive"] } indoc = "2" -socket2 = { version = "0.4.10", features = ["all"] } +libc = "0.2" +socket2 = { version = "0.6.3", features = ["all"] } toml = "0.8" diff --git a/app/Cargo.toml b/app/Cargo.toml index 6bd6371..a708d25 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "thundermuffin" version = "0.1.0" -edition = "2021" +edition.workspace = true [dependencies] anyhow.workspace = true clap.workspace = true +libc.workspace = true socket2.workspace = true diff --git a/app/src/main.rs b/app/src/main.rs index 5fb3248..a69e3fb 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -1,7 +1,9 @@ -use anyhow::Result; -use clap::{Parser, Subcommand, ValueEnum}; -use std::net::IpAddr; +use anyhow::{Result, bail}; +use clap::error::ErrorKind; +use clap::{CommandFactory, Parser, Subcommand, ValueEnum}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +mod ssm; mod tcp; mod udp; mod util; @@ -18,11 +20,13 @@ struct Cli { #[arg(short, long, default_value_t = 4747)] port: u16, - /// Scope to use for IPv6 targets + /// Scope (zone index) to use for IPv6 targets. Also used as the outgoing + /// `IPV6_MULTICAST_IF` interface index when the destination/listen address + /// is an IPv6 multicast address. #[arg(short, long, default_value_t = 0)] scope: u32, - /// How big an individual send bufer is in bytes. + /// How big an individual send buffer is in bytes. #[arg(short, long, default_value_t = 64000)] buffer_size: usize, @@ -30,6 +34,27 @@ struct Cli { #[arg(long, default_value_t = 128)] backlog: i32, + /// TTL (IPv4) or hop limit (IPv6) for outgoing datagrams. For multicast + /// destinations this sets `IP_MULTICAST_TTL` / `IPV6_MULTICAST_HOPS`; + /// otherwise, it sets `IP_TTL` / `IPV6_UNICAST_HOPS`. The on-wire field + /// is 8 bits in both IPv4 and IPv6. + #[arg(long, default_value_t = 64)] + ttl: u8, + + /// Enable `IP_MULTICAST_LOOP` / `IPV6_MULTICAST_LOOP`. Disabled by + /// default to avoid the sender seeing its own traffic on hosts that + /// also act as receivers. + #[arg(long)] + multicast_loop: bool, + + /// Outgoing `IP_MULTICAST_IF` for IPv4 multicast, expressed as the + /// IPv4 address bound to the desired interface. For IPv6 use + /// `--scope` (numeric ifindex) instead. When omitted the kernel selects + /// the interface from its routing table, which may surprise on + /// multi-homed hosts. + #[arg(long)] + multicast_iface: Option, + #[command(subcommand)] kind: Participant, } @@ -42,7 +67,8 @@ enum Participant { #[derive(Parser, Debug)] struct Client { - /// IP address of server + /// IP address of the server, or multicast group address when sending + /// multicast traffic. server: IpAddr, /// Determine test duration in terms of time or data. @@ -57,8 +83,119 @@ struct Client { #[derive(Parser, Debug)] struct Server { - /// IP address to listen on + /// IP address to listen on. When this is a multicast address the + /// receiver binds directly to the group on `--port`, sets `SO_REUSEADDR` + /// (so multiple receivers can co-bind), and joins the group on the + /// interface selected by `--multicast-iface` (IPv4) or `--scope` (IPv6). listen: IpAddr, + + /// Wallclock duration (seconds) for UDP receivers. When unset the + /// receiver runs until interrupted. TCP servers ignore this field. + #[arg(short, long)] + duration: Option, + + /// Source addresses for [source-specific multicast][rfc4607] (SSM). + /// Repeatable: pass `--multicast-source` once per source. With one + /// source it's a `(S, G)` join. With N sources it's a + /// `({S1, S2, ...}, G)` INCLUDE filter. The kernel filters incoming + /// datagrams to only those sourced from one of these addresses + /// (`IP_ADD_SOURCE_MEMBERSHIP` per source on IPv4, `setsourcefilter` + /// with the full slist on IPv6). Each must be a unicast address with + /// family matching `listen`, and `listen` must be a multicast address. + /// + /// [rfc4607]: https://datatracker.ietf.org/doc/html/rfc4607 + #[arg(long, value_parser = parse_unicast_ipaddr)] + multicast_source: Vec, +} + +/// Reject obviously-non-unicast addresses (multicast, unspecified, loopback) +/// at clap parse time so SSM misconfiguration surfaces as a CLI error rather +/// than an opaque kernel `EADDRNOTAVAIL` after socket setup. +fn parse_unicast_ipaddr(s: &str) -> Result { + let addr: IpAddr = s + .parse() + .map_err(|e: std::net::AddrParseError| e.to_string())?; + if addr.is_multicast() || addr.is_unspecified() || addr.is_loopback() { + return Err(format!("must be a unicast address (got `{addr}`)")); + } + Ok(addr) +} + +impl Server { + fn validate( + &self, + cli: &Cli, + cmd: &mut clap::Command, + ) -> Result<(), clap::Error> { + if self.multicast_source.is_empty() { + return Ok(()); + } + if !self.listen.is_multicast() { + return Err(cmd.error( + ErrorKind::ValueValidation, + format!( + "`--multicast-source` requires `listen` to be a multicast \ + address (got `{}`)", + self.listen + ), + )); + } + for source in &self.multicast_source { + if source.is_ipv4() != self.listen.is_ipv4() { + return Err(cmd.error( + ErrorKind::ValueValidation, + format!( + "`--multicast-source` family must match the `listen` \ + address family (got `{source}` vs listen `{}`)", + self.listen + ), + )); + } + } + // SSM joins must be pinned to a specific interface. With + // kernel-picked interface (`INADDR_ANY` for v4, `ifindex = 0` for + // v6) the join can silently land on an interface where the source + // isn't reachable and deliver no traffic, which looks identical + // to a real network regression in CI. + match self.listen { + IpAddr::V4(_) if cli.multicast_iface.is_none() => Err(cmd.error( + ErrorKind::MissingRequiredArgument, + "`--multicast-source` (IPv4) requires `--multicast-iface` \ + to pin the SSM join to a specific interface", + )), + IpAddr::V6(_) if cli.scope == 0 => Err(cmd.error( + ErrorKind::MissingRequiredArgument, + "`--multicast-source` (IPv6) requires non-zero `--scope` \ + (interface index) to pin the SSM join to a specific interface", + )), + _ => Ok(()), + } + } + + /// IPv4 SSM sources. `validate` guarantees that, when `listen` is v4 + /// and a multicast address, every configured source is also v4, so this + /// never silently drops a v6 source meant for the v4 path. + pub(crate) fn multicast_source_v4(&self) -> Vec { + self.multicast_source + .iter() + .filter_map(|source| match source { + IpAddr::V4(addr) => Some(*addr), + IpAddr::V6(_) => None, + }) + .collect() + } + + /// IPv6 SSM sources. See [`Self::multicast_source_v4`] for the + /// corresponding invariant. + pub(crate) fn multicast_source_v6(&self) -> Vec { + self.multicast_source + .iter() + .filter_map(|source| match source { + IpAddr::V6(addr) => Some(*addr), + IpAddr::V4(_) => None, + }) + .collect() + } } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] @@ -81,6 +218,26 @@ enum Kind { fn main() -> Result<()> { let cli = Cli::parse(); + + // TCP cannot carry multicast. Reject early so callers (e.g. commtest) + // get a clear error rather than a silent unicast send to a multicast + // address. + if matches!(cli.transport, Transport::Tcp) { + let mcast = match cli.kind { + Participant::Client(ref c) => c.server.is_multicast(), + Participant::Server(ref s) => s.listen.is_multicast(), + }; + if mcast { + bail!("multicast is only supported with `--transport udp`"); + } + } + + if let Participant::Server(ref server) = cli.kind + && let Err(e) = server.validate(&cli, &mut Cli::command()) + { + e.exit(); + } + match cli.transport { Transport::Tcp => tcp::run(&cli), Transport::Udp => udp::run(&cli), diff --git a/app/src/ssm.rs b/app/src/ssm.rs new file mode 100644 index 0000000..3272311 --- /dev/null +++ b/app/src/ssm.rs @@ -0,0 +1,99 @@ +//! Source-specific multicast (SSM) functionality that isn't covered by `socket2`. +//! +//! `socket2` exposes `Socket::join_ssm_v4` for IPv4 SSM joins via +//! `IP_ADD_SOURCE_MEMBERSHIP`, but has no IPv6 equivalent in any current +//! release. For IPv6 we drop to POSIX [`setsourcefilter(3SOCKET)`] +//! (RFC 3678), which lives in `libsocket` on illumos and `libc` on Linux. +//! +//! [`setsourcefilter(3SOCKET)`]: https://illumos.org/man/3SOCKET/setsourcefilter + +use socket2::Socket; +use std::net::Ipv6Addr; +use std::os::fd::AsRawFd; + +// RFC 3678 / POSIX include-mode source filter. Standardized as `1` on every +// platform that exposes `setsourcefilter`. +const MCAST_INCLUDE: u32 = 1; + +unsafe extern "C" { + fn setsourcefilter( + socket: libc::c_int, + interface: u32, + group: *const libc::sockaddr, + grouplen: libc::socklen_t, + fmode: u32, + numsrc: u32, + slist: *const libc::sockaddr_storage, + ) -> libc::c_int; +} + +/// Join `socket` to the IPv6 source-specific multicast group `group` on +/// interface `ifindex`, with INCLUDE-mode filter listing `sources`. With +/// one entry it's a `(S, G)` join; with N it's a `({S1, ..., Sn}, G)` +/// channel. Caller must pass a non-empty slice (the empty-sources case +/// is the any-source `IPV6_ADD_MEMBERSHIP` join, handled elsewhere). +pub fn join_ssm_v6( + socket: &Socket, + group: &Ipv6Addr, + sources: &[Ipv6Addr], + ifindex: u32, +) -> std::io::Result<()> { + // Construct a `sockaddr_in6` with only family + address populated. All + // other fields (port, flowinfo, scope, platform-specific extras) are + // explicitly zeroed. + let make_in6 = |addr: &Ipv6Addr| -> libc::sockaddr_in6 { + libc::sockaddr_in6 { + sin6_family: libc::AF_INET6 as libc::sa_family_t, + sin6_port: 0, + sin6_flowinfo: 0, + sin6_addr: libc::in6_addr { + s6_addr: addr.octets(), + }, + sin6_scope_id: 0, + #[cfg(any(target_os = "illumos", target_os = "solaris"))] + __sin6_src_id: 0, + } + }; + + let group_sa = make_in6(group); + + // Build the slist as a `Vec`. `sockaddr_storage` is + // sized and aligned to hold any sockaddr family, so a `sockaddr_in6` + // lands at offset 0 with correct alignment. `ptr::write` only + // overwrites the leading `size_of::()` bytes per slot; + // trailing bytes stay zero-initialized. + let slist: Vec = sources + .iter() + .map(|source| { + let mut storage: libc::sockaddr_storage = + unsafe { std::mem::zeroed() }; + unsafe { + std::ptr::write( + &mut storage as *mut _ as *mut libc::sockaddr_in6, + make_in6(source), + ); + } + storage + }) + .collect(); + + // `setsourcefilter` is an FFI call. The pointers reference valid sockaddr + // structures of the indicated lengths and live for the duration of the + // call. `numsrc` is taken from `slist.len()` so both arguments unambiguously + // refer to the same buffer. + let ret = unsafe { + setsourcefilter( + socket.as_raw_fd(), + ifindex, + &group_sa as *const _ as *const libc::sockaddr, + std::mem::size_of::() as libc::socklen_t, + MCAST_INCLUDE, + slist.len() as u32, + slist.as_ptr(), + ) + }; + if ret < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} diff --git a/app/src/tcp.rs b/app/src/tcp.rs index 392efb3..15a7ac2 100644 --- a/app/src/tcp.rs +++ b/app/src/tcp.rs @@ -87,7 +87,7 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { }; //TODO s.set_quickack(true)?; - s.set_nodelay(true)?; + s.set_tcp_nodelay(true)?; s.listen(cli.backlog)?; loop { diff --git a/app/src/udp.rs b/app/src/udp.rs index dc213a5..8b462f0 100644 --- a/app/src/udp.rs +++ b/app/src/udp.rs @@ -1,9 +1,14 @@ -use crate::util::{buffer, show_speed}; +use crate::ssm::join_ssm_v6; +use crate::util::{SEQ_LEN, buffer, get_seq, put_seq, show_speed}; use crate::{Cli, Client, Participant, Server}; -use anyhow::Result; +use anyhow::{Context, Result}; use socket2::{Domain, Protocol, Socket, Type}; -use std::mem::MaybeUninit; -use std::net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::io::Read; +use std::net::{ + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, +}; +use std::num::NonZeroU32; +use std::time::{Duration, Instant}; pub(crate) fn run(cli: &Cli) -> Result<()> { match cli.kind { @@ -13,39 +18,69 @@ pub(crate) fn run(cli: &Cli) -> Result<()> { } fn run_client(cli: &Cli, client: &Client) -> Result<()> { + let is_mcast = client.server.is_multicast(); + let (s, sa) = match client.server { IpAddr::V4(addr) => { let sa = SocketAddrV4::new(addr, cli.port); let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + if is_mcast { + s.set_multicast_ttl_v4(u32::from(cli.ttl)) + .context("set IP_MULTICAST_TTL")?; + s.set_multicast_loop_v4(cli.multicast_loop) + .context("set IP_MULTICAST_LOOP")?; + if let Some(iface) = cli.multicast_iface { + s.set_multicast_if_v4(&iface) + .context("set IP_MULTICAST_IF")?; + } + } else { + s.set_ttl_v4(u32::from(cli.ttl)).context("set IP_TTL")?; + } (s, SocketAddr::V4(sa)) } IpAddr::V6(addr) => { let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; + if is_mcast { + s.set_multicast_hops_v6(u32::from(cli.ttl)) + .context("set IPV6_MULTICAST_HOPS")?; + s.set_multicast_loop_v6(cli.multicast_loop) + .context("set IPV6_MULTICAST_LOOP")?; + // ifindex 0 means "kernel picks". Cross-platform behavior of + // explicit 0 is inconsistent, so skip the call entirely. + if let Some(idx) = NonZeroU32::new(cli.scope) { + s.set_multicast_if_v6(idx.get()) + .context("set IPV6_MULTICAST_IF")?; + } + } else { + s.set_unicast_hops_v6(u32::from(cli.ttl)) + .context("set IPV6_UNICAST_HOPS")?; + } (s, SocketAddr::V6(sa)) } }; let sa = sa.into(); - let buf = buffer(cli.buffer_size); + let mut buf = buffer(cli.buffer_size); - let start = std::time::Instant::now(); + let start = Instant::now(); let mut interval = 0; let mut interval_sent = 0; - //let mut perf = Vec::new(); let mut total = 0; let mut count = 0; + let mut seq: u64 = 0; loop { + put_seq(&mut buf, seq); + seq = seq.wrapping_add(1); let n = s.send_to(&buf, &sa)?; interval_sent += n * 8; - let t = std::time::Instant::now(); + let t = Instant::now(); let d = t.duration_since(start); let ds = d.as_secs(); if ds > interval { - //perf.push(interval_sent); interval = ds; total += interval_sent; print!("[{}] ", count); @@ -64,36 +99,157 @@ fn run_client(cli: &Cli, client: &Client) -> Result<()> { Ok(()) } +/// Join `socket` to the IPv4 multicast `group` on `iface`. With `sources` +/// non-empty it's an SSM include-mode join, one +/// `IP_ADD_SOURCE_MEMBERSHIP` per source. Otherwise it's an any-source +/// `(*, G)` join via `IP_ADD_MEMBERSHIP`. +fn join_v4( + socket: &Socket, + group: Ipv4Addr, + iface: Ipv4Addr, + sources: &[Ipv4Addr], +) -> Result<()> { + if sources.is_empty() { + return socket + .join_multicast_v4(&group, &iface) + .context("IP_ADD_MEMBERSHIP"); + } + for source in sources { + socket + .join_ssm_v4(&group, source, &iface) + .context("IP_ADD_SOURCE_MEMBERSHIP")?; + } + Ok(()) +} + +/// Join `socket` to the IPv6 multicast `group` on `ifindex`. With `sources` +/// non-empty it's an SSM include-mode join via `setsourcefilter` (one call +/// with the full slist). Otherwise it's an any-source `(*, G)` join via +/// `IPV6_ADD_MEMBERSHIP`. +fn join_v6( + socket: &Socket, + group: Ipv6Addr, + ifindex: u32, + sources: &[Ipv6Addr], +) -> Result<()> { + if sources.is_empty() { + return socket + .join_multicast_v6(&group, ifindex) + .context("IPV6_ADD_MEMBERSHIP"); + } + join_ssm_v6(socket, &group, sources, ifindex).context("setsourcefilter") +} + fn run_server(cli: &Cli, server: &Server) -> Result<()> { + let is_mcast = server.listen.is_multicast(); + let s = match server.listen { IpAddr::V4(addr) => { - let sa = SocketAddrV4::new(addr, cli.port); let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + // Bind directly to the group address for multicast so the socket + // only receives datagrams sent to that group. + // + // Note: `SO_REUSEADDR` lets multiple receivers co-bind to the same + // group/port. + let sa = SocketAddrV4::new(addr, cli.port); + if is_mcast { + s.set_reuse_address(true).context("SO_REUSEADDR")?; + } s.bind(&sa.into())?; + if is_mcast { + let iface = + cli.multicast_iface.unwrap_or(Ipv4Addr::UNSPECIFIED); + join_v4(&s, addr, iface, &server.multicast_source_v4())?; + } s } IpAddr::V6(addr) => { - let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); let s = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; + let sa = SocketAddrV6::new(addr, cli.port, 0, cli.scope); + if is_mcast { + s.set_reuse_address(true).context("SO_REUSEADDR")?; + } s.bind(&sa.into())?; + if is_mcast { + join_v6(&s, addr, cli.scope, &server.multicast_source_v6())?; + } s } }; + // When `--duration` is set, install a short read timeout so the recv + // loop wakes up periodically and can observe the wallclock deadline + // even if no datagrams are arriving. + if server.duration.is_some() { + s.set_read_timeout(Some(Duration::from_millis(500))) + .context("SO_RCVTIMEO")?; + } + let mut interval = 0; let mut interval_sent = 0; - //let mut perf = Vec::new(); let mut count = 0; - let start = std::time::Instant::now(); + let start = Instant::now(); + + let mut rx_count: u64 = 0; + let mut loss_count: u64 = 0; + // `out_of_order_count` covers any datagram whose sequence is below the + // running high-water mark. With only a sequence prefix the receiver + // cannot distinguish a reordered delivery from a duplicate, so both + // land in this bucket. + let mut out_of_order_count: u64 = 0; + let mut next_expected: Option = None; + let mut total_bits: u64 = 0; + + let mut buf = vec![0u8; cli.buffer_size]; loop { - let sz = cli.buffer_size; - let mut buf = vec![MaybeUninit::::uninit(); sz]; - let (n, _) = s.recv_from(&mut buf)?; + if let Some(duration_secs) = server.duration + && start.elapsed().as_secs() >= duration_secs + { + break; + } + + // The source address is unused here. `Read::read` on `&Socket` routes + // through the same `recv` syscall as `recv_from` but operates on a + // plain `&mut [u8]`, avoiding the `MaybeUninit` buffer that + // socket2's typed datagram APIs require. + let n = match Read::read(&mut &s, &mut buf) { + Ok(n) => n, + Err(e) + if matches!( + e.kind(), + std::io::ErrorKind::WouldBlock + | std::io::ErrorKind::TimedOut + ) => + { + continue; + } + Err(e) => return Err(e.into()), + }; + + rx_count += 1; interval_sent += n * 8; - let t = std::time::Instant::now(); + total_bits += (n * 8) as u64; + + // The sender's sequence is a `u64` counter starting at 0. At line + // rate that's hundreds of years to wrap, so the receiver makes no + // attempt to detect or compensate for wraparound. `wrapping_add` is + // used purely to avoid an overflow panic on adversarial inputs. + if n >= SEQ_LEN + && let Some(seq) = get_seq(&buf[..n]) + { + let expected = next_expected.unwrap_or(seq); + if seq > expected { + loss_count += seq - expected; + } else if seq < expected { + out_of_order_count += 1; + } + next_expected = Some(expected.max(seq.wrapping_add(1))); + } + + let t = Instant::now(); let d = t.duration_since(start); let ds = d.as_secs(); if ds > interval { @@ -104,4 +260,22 @@ fn run_server(cli: &Cli, server: &Server) -> Result<()> { interval_sent = 0; } } + + let elapsed = Instant::now().duration_since(start).as_secs_f64(); + let bps = if elapsed > 0.0 { + total_bits as f64 / elapsed + } else { + 0.0 + }; + + // Machine-readable summary so commtest can parse the result without + // scraping `show_speed` output. + println!( + "{{\"rx\":{rx_count},\"loss\":{loss_count},\"out_of_order\":{out_of_order_count},\"bps\":{bps:.3}}}" + ); + + if rx_count == 0 { + anyhow::bail!("received zero datagrams"); + } + Ok(()) } diff --git a/app/src/util.rs b/app/src/util.rs index 7e1ab41..cd18fb6 100644 --- a/app/src/util.rs +++ b/app/src/util.rs @@ -1,5 +1,9 @@ const MUFFIN: &[u8] = b"muffin "; +/// Length of the big-endian `u64` sequence-number prefix that precedes the +/// muffin payload when sequence tracking is enabled. +pub const SEQ_LEN: usize = 8; + pub fn show_speed(mut s: f64) { if s > 1024.0 { s /= 1024.0; @@ -19,6 +23,10 @@ pub fn show_speed(mut s: f64) { } } +/// Build a send buffer of `size` bytes. The first [`SEQ_LEN`] bytes are +/// reserved for the sequence-number prefix (rewritten on each send) when +/// sequence tracking is enabled. The rest of the buffer is filled with the +/// repeating `muffin` filler so wire dumps stay recognizable. pub fn buffer(size: usize) -> Vec { let mut buf = vec![0u8; size]; for i in 0..buf.len() { @@ -26,3 +34,20 @@ pub fn buffer(size: usize) -> Vec { } buf } + +/// Write the big-endian sequence-number prefix into the first [`SEQ_LEN`] +/// bytes of `buf`. Caller must ensure `buf.len() >= SEQ_LEN`. +pub fn put_seq(buf: &mut [u8], seq: u64) { + buf[..SEQ_LEN].copy_from_slice(&seq.to_be_bytes()); +} + +/// Read the big-endian sequence-number prefix from the first [`SEQ_LEN`] +/// bytes of `buf`, returning `None` if the slice is too short. +pub fn get_seq(buf: &[u8]) -> Option { + if buf.len() < SEQ_LEN { + return None; + } + let mut bytes = [0u8; SEQ_LEN]; + bytes.copy_from_slice(&buf[..SEQ_LEN]); + Some(u64::from_be_bytes(bytes)) +} diff --git a/package/Cargo.toml b/package/Cargo.toml index 4e539e0..36a0253 100644 --- a/package/Cargo.toml +++ b/package/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "thundermuffin-package" version = "0.1.0" -edition = "2021" +edition.workspace = true [dependencies] omicron-zone-package = "0.9.1"