diff --git a/Cargo.lock b/Cargo.lock index b77c8b9d..f8128d45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1350,6 +1350,8 @@ dependencies = [ "crc32fast", "derror-macro", "dyn-clone", + "foldhash", + "hashbrown 0.15.5", "heapless", "illumos-sys-hdrs", "ingot", diff --git a/Cargo.toml b/Cargo.toml index a5a9cbdc..68c00bd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,8 @@ criterion = "0.8" ctor = "0.10" darling = "0.23" dyn-clone = "1.0" +foldhash = { version = "0.1", default-features = false } +hashbrown = { version = "0.15", default-features = false, features = ["inline-more"] } heapless = "0.8" ingot = "0.1.1" ipnetwork = { version = "0.21", default-features = false } diff --git a/lib/opte/Cargo.toml b/lib/opte/Cargo.toml index dbf31c01..8b4b345d 100644 --- a/lib/opte/Cargo.toml +++ b/lib/opte/Cargo.toml @@ -14,6 +14,8 @@ engine = [ "dep:cfg-if", "dep:crc32fast", "dep:derror-macro", + "dep:foldhash", + "dep:hashbrown", "dep:heapless", "dep:itertools", "dep:smoltcp", @@ -42,6 +44,8 @@ bitflags = { workspace = true , features = ["serde"] } cfg-if = { workspace = true, optional = true } crc32fast = { workspace = true, optional = true } dyn-clone.workspace = true +foldhash = { workspace = true, optional = true } +hashbrown = { workspace = true, optional = true } heapless = { workspace = true, optional = true } itertools = { workspace = true, optional = true } postcard.workspace = true diff --git a/lib/opte/src/engine/flow_table.rs b/lib/opte/src/engine/flow_table.rs index cbbb7d42..20b8c9f3 100644 --- a/lib/opte/src/engine/flow_table.rs +++ b/lib/opte/src/engine/flow_table.rs @@ -13,7 +13,6 @@ use super::packet::InnerFlowId; use crate::ddi::time::MILLIS; use crate::ddi::time::Moment; use alloc::boxed::Box; -use alloc::collections::BTreeMap; use alloc::ffi::CString; use alloc::string::String; use alloc::sync::Arc; @@ -23,6 +22,8 @@ use core::num::NonZeroU32; use core::sync::atomic::AtomicBool; use core::sync::atomic::AtomicU64; use core::sync::atomic::Ordering; +use foldhash::fast::FixedState; +use hashbrown::HashMap; #[cfg(all(not(feature = "std"), not(test)))] use illumos_sys_hdrs::uintptr_t; use opte_api::OpteError; @@ -37,6 +38,9 @@ pub const FLOW_DEF_TTL: Ttl = Ttl::new_seconds(FLOW_DEF_EXPIRE_SECS); pub const FLOW_TABLE_DEF_MAX_ENTRIES: u32 = 8192; +/// Capacity below which a table's allocation is left alone during reclaim. +const FT_MIN_CAPACITY: usize = 64; + type Result = core::result::Result; /// The Time To Live in milliseconds. @@ -78,13 +82,16 @@ impl ExpiryPolicy for Ttl { pub type FlowTableDump = Vec<(InnerFlowId, T)>; +/// Per-table flow hasher, seeded from the kernel PRNG in [`hash_seed`]. +type FlowHasher = FixedState; + #[derive(Debug)] pub struct FlowTable { port_c: CString, name_c: CString, limit: NonZeroU32, policy: Box>, - map: BTreeMap>>, + map: HashMap>, FlowHasher>, } impl FlowTable @@ -150,6 +157,8 @@ where for (flow_id, entry) in &self.map { flows.push((*flow_id, entry.dump())); } + // HashMap order is arbitrary; sort for stable dumps. + flows.sort_unstable_by_key(|(flow_id, _)| *flow_id); flows } @@ -182,9 +191,21 @@ where true }); + self.reclaim(); expired } + /// Shrinks the backing allocation toward `2 * len` once a table drains to + /// under a quarter full. Only firing on a deep drain keeps tables near + /// capacity from thrashing and bounds how often the rehash runs under the + /// port lock. + fn reclaim(&mut self) { + let target = self.map.len().saturating_mul(2).max(FT_MIN_CAPACITY); + if self.map.capacity() > target.saturating_mul(2) { + self.map.shrink_to(target); + } + } + /// Get the maximum number of entries this flow table may hold. pub fn get_limit(&self) -> NonZeroU32 { self.limit @@ -220,7 +241,7 @@ where name_c: CString::new(name).unwrap(), limit, policy, - map: BTreeMap::new(), + map: HashMap::with_hasher(FixedState::with_seed(hash_seed())), } } @@ -264,6 +285,23 @@ fn flow_expired_probe( } } +/// Returns a hasher seed: the kernel PRNG in the kmod, a fixed value under +/// std/test. +fn hash_seed() -> u64 { + cfg_if! { + if #[cfg(all(not(feature = "std"), not(test)))] { + let mut seed = [0u8; 8]; + // SAFETY: writes exactly `seed.len()` bytes into a valid buffer. + unsafe { + random_get_pseudo_bytes(seed.as_mut_ptr(), seed.len()); + } + u64::from_ne_bytes(seed) + } else { + 0 + } + } +} + /// A type that can be "dumped" for the purposes of presenting an /// external view into internal state of the [`FlowEntry`]. pub trait Dump { @@ -379,6 +417,11 @@ unsafe extern "C" { ifid: *const InnerFlowId, epoch: uintptr_t, ); + + fn random_get_pseudo_bytes( + ptr: *mut u8, + len: usize, + ) -> illumos_sys_hdrs::c_int; } impl Dump for () { @@ -398,6 +441,17 @@ mod test { pub const FT_SIZE: Option = NonZeroU32::new(16); + fn flow_id(dst_port: u16) -> InnerFlowId { + InnerFlowId { + proto: Protocol::TCP.into(), + addrs: AddrPair::V4 { + src: "192.168.2.10".parse().unwrap(), + dst: "76.76.21.21".parse().unwrap(), + }, + proto_info: PortInfo { src_port: 37890, dst_port }.into(), + } + } + #[test] fn flow_expired() { let flowid = InnerFlowId { @@ -442,4 +496,73 @@ mod test { ft.clear(); assert_eq!(ft.num_flows(), 0); } + + #[test] + fn flow_add_get_remove() { + let flowid = flow_id(443); + let mut ft = + FlowTable::new("port", "flow-crud-test", FT_SIZE.unwrap(), None); + + assert!(ft.get(&flowid).is_none()); + ft.add(flowid, ()).unwrap(); + assert!(ft.get(&flowid).is_some()); + assert_eq!(ft.num_flows(), 1); + + assert!(ft.remove(&flowid).is_some()); + assert!(ft.get(&flowid).is_none()); + assert_eq!(ft.num_flows(), 0); + } + + #[test] + fn flow_table_enforces_limit() { + let limit = FT_SIZE.unwrap().get(); + let mut ft = + FlowTable::new("port", "flow-limit-test", FT_SIZE.unwrap(), None); + + for dst_port in 0..limit as u16 { + ft.add(flow_id(dst_port), ()).unwrap(); + } + assert_eq!(ft.num_flows(), limit); + + let err = ft.add(flow_id(limit as u16), ()).unwrap_err(); + assert!(matches!(err, OpteError::MaxCapacity(_))); + assert_eq!(ft.num_flows(), limit); + } + + #[test] + fn flow_dump_is_sorted() { + let mut ft = + FlowTable::new("port", "flow-dump-test", FT_SIZE.unwrap(), None); + for dst_port in [5u16, 1, 3, 2, 4] { + ft.add(flow_id(dst_port), ()).unwrap(); + } + + let dumped: Vec = + ft.dump().into_iter().map(|(flow_id, _)| flow_id).collect(); + let mut expected = dumped.clone(); + expected.sort_unstable(); + assert_eq!(dumped, expected); + assert_eq!(dumped.len(), 5); + } + + #[test] + fn flow_table_reclaims_after_drain() { + let limit = NonZeroU32::new(4096).unwrap(); + let mut ft = FlowTable::new("port", "flow-reclaim-test", limit, None); + + for dst_port in 0..2000u16 { + ft.add(flow_id(dst_port), ()).unwrap(); + } + let grown = ft.map.capacity(); + assert!(grown >= 2000); + + let now = Moment::now() + Duration::new(FLOW_DEF_EXPIRE_SECS + 1, 0); + ft.expire_flows(now, |_| FLOW_ID_DEFAULT); + assert_eq!(ft.num_flows(), 0); + + assert!(ft.map.capacity() < grown); + + ft.add(flow_id(1), ()).unwrap(); + assert!(ft.get(&flow_id(1)).is_some()); + } }