diff --git a/chain-bench-results.md b/chain-bench-results.md new file mode 100644 index 000000000..587303e20 --- /dev/null +++ b/chain-bench-results.md @@ -0,0 +1,269 @@ +# chain_bench: multi-writer Chain vs MPSC baseline vs per-writer Mesh + +Harness: `communication/examples/chain_bench.rs`. Apple Silicon (10 cores), +release, median of 3 after warmup. Delta = miniature ChangeBatch +(`Vec<(u64,i64)>`, amortized consolidation). Workload: all-to-all; worker w +mints `(t,+1)` each step, its successor retires it `(-1)` one step later for +`cancel%` of steps. MPSC = per-reader `Mutex>`, send clones +to every reader (models the current Progcaster intra-process path). + +Full tables at the end. Headlines: + +## Scenario C — unread backlog (the design goal): Chain wins by orders of magnitude + +N=8, 100% cancellation, 50k sends/worker with nobody reading: + +| structure | retained entries | retained units | catch-up | +|---|---|---|---| +| mpsc | 6,399,936 | 3,200,000 queued deltas | 46 ms | +| chain | **6,124** | **2 nodes** | **0.4 ms** | +| mesh (per-writer) | 799,992 | 16 nodes | 27 ms | + +Three findings in one table: (1) the chain's in-transit cancellation works — +retained state is the *net* in-flight window (~6k entries), 1000× below the +MPSC backlog and independent of elapsed sends; (2) the per-writer Mesh +pathology predicted in review is real — bounded *nodes* but unbounded +*content* (130× the chain's entries at 100% cancel), because cancellation is +inherently cross-writer; (3) even with zero cancellation the chain retains N× +less than MPSC, which pays the per-peer clone multiplier. + +## Scenario B — laggard protection: Chain's laggard work is flat in N + +Laggard recvs every 1024 steps; mean entries folded per laggard recv: + +| N | mpsc | chain | mesh | +|---|---|---|---| +| 2 | 4,076 | 2,576 | 2,415 | +| 4 | 6,004 | 2,343 | 2,127 | +| 8 | 11,155 | **2,193** | 2,120 | + +MPSC laggard work grows with N (and bursts: max 38k entries/recv at N=8); +the chain's is bounded by the live cancellation window — flat as N grows, +with proportionally lower mean recv latency. + +## Scenario A — everyone keeps up: MPSC wins, and the gap grows with N + +Sends/sec, 100% cancel: N=2: mpsc 5.4M vs chain 2.6M (2.1×); N=4: 3.1M vs +1.0M (3.0×); N=8: 1.07M vs 0.42M (2.6–7× across cancel rates, ~7× at the +worst point). This is the predicted head-mutex contention, confirmed: every +send serializes through one lock, and the benchmark sends in a tight loop +with no work between sends — the maximally contended regime. + +## Verdict: better for whom + +The chain delivers exactly what it was designed for — laggards and memory: +bounded backlog state (orders of magnitude), bounded laggard fold work (flat +in N), fast catch-up — and pays for it on the uncontended fast path, where +MPSC's clone-into-W-queues is 2–7× faster in this harness. Three reasons the +fast-path penalty overstates the real cost, and one real mitigation: + +- Real progress traffic is one batch per worker per *scheduling step* with + dataflow work between sends; the benchmark's send-only tight loop is the + worst case for lock contention. +- The Subgraph already accumulates locally and ships net diffs once per + round (the "thread-local tier"), so send frequency is bounded by step + rate, not update rate. +- MPSC's advantage shrinks as payloads grow (it clones per peer; the chain + writes once). +- If contention still bites, the contention-relief options that *preserve + atomicity* are an aggregation tree (groups of workers share a leaf + structure, a representative folds each leaf into the parent — cancellation + compounds per level) or a writer-local stash with try-lock (delay a send, + never split it). Note: **key-sharding does not work here** — a single send + is a multi-key batch, and sharding by key would split one transmission + across shards, violating the "never break up a send" rule; sharding by + batch (per-writer, the Mesh) destroys cross-writer cancellation instead. + +The real-workload and allocating-key follow-ups below were run after this +synthetic pass. Their short version: on this 10-core machine the chain +loses the throughput race but owns laggard work and backlog memory; +allocating timestamps narrow the throughput gap in the predicted direction +but do not close it here. + +## Real-workload measurements (experimental Progcaster on the chain) + +These numbers come from an **experimental `Progcaster` wiring not included in +this branch** (a crude `TIMELY_PROGRESS_CHAIN=1` flag with a type-erased +registry, kept out of the draft as too invasive to reveal — preserved +separately for anyone who wants to reproduce). They are recorded here because +the w=8 diagnosis is the most important finding. Flag off = the existing +channel broadcast. Apple Silicon, 10 cores, release. + +**event_driven 1000×1000 (progress-heavy), rounds/sec:** + +| workers | off | on (chain) | ratio | +|---|---|---|---| +| 1 | 1.064M | 1.135M | +7% (chain) | +| 2 | 565k | 272k → 336k* | −1.7× | +| 4 | 220k | 62k → 74k* | −3.0× | +| 8 | 86k | 13k | −6.6× | + +\* with the lock-free idle-recv fast path (commit fd7e5e6d); it helps the +mid-range but not w=8. + +**pagerank 2M nodes / 10M edges (data-heavy), wall seconds:** flag off vs on +is a wash — w=1/2 within noise, w=4 +2%, w=8 +3.4%. Progress is not the +bottleneck here, as expected. + +**Why w=8 loses, diagnosed not guessed.** Three experiments: profiles show an +*identical* function mix off vs on (no hot chain frame); an invocation-counter +experiment shows *identical per-round counts* of schedule/send/recv (no +activation amplification — the dirty-list dedup works); which leaves +per-operation cost. The chain trades MPSC's pairwise-private contention +(each queue cacheline shared by one writer + one reader) for globally shared +cachelines (~12 shared-cacheline ops per recv vs MPSC's ~1). The 6× is the +cost of having any shared meeting point, on a single socket, with no work +between sends. + +## Allocating keys (modeling DD/MZ Pointstamp timestamps) + +`Box<[u64;3]>` keys: every clone allocates, every drop frees, possibly on a +foreign thread — the cross-thread free traffic that motivated the chain at +ETHZ scale years ago. Ledger excluded (its O(live-state) recv is too slow). + +**Scenario A throughput (sends/sec), N=8, 100% cancellation:** + +| structure | u64 | alloc | drop | +|---|---|---|---| +| mpsc | 1.45M | 0.77M | −47% | +| mesh | 1.32M | 0.74M | −44% | +| cells | 1.05M | 0.75M | −29% | +| chain | 0.46M | 0.33M | −28% | + +The mechanism reproduces: clone-per-reader structures (mpsc, mesh) take ~1.5× +the relative hit of write-once structures (chain, cells). MPSC's lead over the +chain compresses 3.16× → 2.31× at N=8 (and 3.02× → 2.69× at N=4) — the gap +shrinks with both worker count and allocation, pointing toward a crossover we +cannot reach on 10 cores. + +**Scenario B laggard work (mean entries folded / recv), N=8 alloc:** chain +2,179 (flat in N and key type); mesh 2,817; mpsc 13,845 (max 139k, latency to +141 ms); cells 12,337. Allocation widens the chain's laggard moat. + +**Scenario C backlog (retained entries), N=8 alloc:** chain 266, cells 64, +mesh 800k, mpsc 6.4M. Orders of magnitude, unchanged by key type. + +## Bottom line + +The chain is a tool for the laggard-and-memory regime, not a throughput win on +a single socket. Its design goals are met decisively (bounded laggard work flat +in N; backlog state orders of magnitude below channels), allocation only +strengthens those and narrows the throughput loss, but the regime where it +would *also* win throughput — many cores, NUMA, allocating timestamps — is the +ETHZ setting this machine cannot reproduce. Recommended disposition: retain as +a flagged, unmerged experiment; the decisive next test is the same matrix on a +many-core Linux box. + +A surprise worth recording: **per-reader cells** (one accumulator cell per +reader, all writers merge in place) match MPSC throughput, give the *best* +backlog state (64 entries), but have MPSC-level laggard work — because the cell +only consolidates on read, so it grows by raw appends between a laggard's rare +reads. "MPSC that cancels at rest but not in flight." + +## Full results + +(See `/tmp/chain-bench-out.md` snapshot below.) +# chain_bench results + +Delta: miniature ChangeBatch (`Vec<(u64, i64)>`, amortized consolidation). +Workload: all-to-all; worker w mints (t,+1) each step; its successor +retires it (-1) one step later for `cancel%` of steps. Timing rows are +the median of 3 runs after a warmup run. + +## Scenario A: all keep up (400000 steps/worker; send + recv every step) + +| N | cancel% | structure | wall (s) | sends/sec | +|---|---------|-----------|----------|-----------| +| 2 | 100 | mpsc | 0.147 | 5428123 | +| 2 | 100 | chain | 0.302 | 2644752 | +| 2 | 100 | mesh | 0.330 | 2426396 | +| 2 | 50 | mpsc | 0.136 | 5882342 | +| 2 | 50 | chain | 0.273 | 2927853 | +| 2 | 50 | mesh | 0.267 | 2990975 | +| 2 | 0 | mpsc | 0.109 | 7331403 | +| 2 | 0 | chain | 0.198 | 4044385 | +| 2 | 0 | mesh | 0.226 | 3541374 | +| 4 | 100 | mpsc | 0.516 | 3101072 | +| 4 | 100 | chain | 1.540 | 1038627 | +| 4 | 100 | mesh | 1.620 | 987528 | +| 4 | 50 | mpsc | 0.464 | 3445407 | +| 4 | 50 | chain | 1.360 | 1176794 | +| 4 | 50 | mesh | 1.491 | 1072963 | +| 4 | 0 | mpsc | 0.428 | 3742396 | +| 4 | 0 | chain | 1.102 | 1452381 | +| 4 | 0 | mesh | 1.372 | 1166240 | +| 8 | 100 | mpsc | 2.994 | 1068725 | +| 8 | 100 | chain | 7.636 | 419063 | +| 8 | 100 | mesh | 7.867 | 406768 | +| 8 | 50 | mpsc | 2.669 | 1199062 | +| 8 | 50 | chain | 7.209 | 443915 | +| 8 | 50 | mesh | 7.798 | 410371 | +| 8 | 0 | mpsc | 2.805 | 1140937 | +| 8 | 0 | chain | 6.428 | 497823 | +| 8 | 0 | mesh | 7.242 | 441895 | + +## Scenario B: laggard (400000 steps/worker; worker 0 recvs every 1024 steps) + +| N | cancel% | structure | wall (s) | laggard recvs | mean entries/recv | max entries/recv | mean latency (µs) | max latency (µs) | +|---|---------|-----------|----------|---------------|-------------------|------------------|-------------------|------------------| +| 2 | 100 | mpsc | 0.117 | 390 | 4076 | 6488 | 121.5 | 258.1 | +| 2 | 100 | chain | 0.203 | 390 | 2576 | 3168 | 80.2 | 10399.5 | +| 2 | 100 | mesh | 0.194 | 390 | 2415 | 41102 | 79.4 | 11282.8 | +| 2 | 50 | mpsc | 0.094 | 390 | 3042 | 118637 | 97.8 | 8659.8 | +| 2 | 50 | chain | 0.154 | 390 | 1891 | 2287 | 68.7 | 8357.9 | +| 2 | 50 | mesh | 0.147 | 390 | 1671 | 1795 | 58.7 | 9339.0 | +| 2 | 0 | mpsc | 0.098 | 390 | 2050 | 3195 | 65.2 | 7199.0 | +| 2 | 0 | chain | 0.109 | 390 | 1180 | 1449 | 39.4 | 6636.7 | +| 2 | 0 | mesh | 0.096 | 390 | 1092 | 8884 | 29.3 | 4628.4 | +| 4 | 100 | mpsc | 0.531 | 390 | 6004 | 17672 | 220.8 | 8146.7 | +| 4 | 100 | chain | 1.176 | 390 | 2343 | 2736 | 175.7 | 11835.3 | +| 4 | 100 | mesh | 1.158 | 390 | 2127 | 3356 | 165.2 | 11332.7 | +| 4 | 50 | mpsc | 0.434 | 390 | 4971 | 55476 | 182.9 | 14163.3 | +| 4 | 50 | chain | 0.999 | 390 | 1740 | 2032 | 123.4 | 10331.8 | +| 4 | 50 | mesh | 1.018 | 390 | 1585 | 1680 | 114.7 | 6036.5 | +| 4 | 0 | mpsc | 0.405 | 390 | 3414 | 18212 | 173.1 | 25595.2 | +| 4 | 0 | chain | 0.778 | 390 | 1134 | 1333 | 88.9 | 5466.0 | +| 4 | 0 | mesh | 0.842 | 390 | 1051 | 1100 | 69.3 | 6135.2 | +| 8 | 100 | mpsc | 2.677 | 390 | 11155 | 38426 | 467.7 | 12300.8 | +| 8 | 100 | chain | 6.767 | 390 | 2193 | 2394 | 437.0 | 18932.1 | +| 8 | 100 | mesh | 7.896 | 390 | 2120 | 2394 | 365.9 | 11790.2 | +| 8 | 50 | mpsc | 2.737 | 390 | 9503 | 52156 | 411.0 | 24853.5 | +| 8 | 50 | chain | 6.052 | 390 | 1633 | 1800 | 262.4 | 11031.0 | +| 8 | 50 | mesh | 7.351 | 390 | 1582 | 1703 | 339.4 | 18233.4 | +| 8 | 0 | mpsc | 2.668 | 390 | 5781 | 18775 | 424.2 | 52151.5 | +| 8 | 0 | chain | 5.252 | 390 | 1074 | 1202 | 216.0 | 6562.3 | +| 8 | 0 | mesh | 6.816 | 390 | 1052 | 1217 | 188.7 | 5855.1 | + +## Scenario C: unread backlog (50000 steps/worker; recv only at the end) + +Retained units: queued deltas (mpsc) or live chain nodes (chain/mesh). + +| N | cancel% | structure | retained entries | retained units | folded by reader 0 | catch-up (s) | +|---|---------|-----------|------------------|----------------|--------------------|--------------| +| 2 | 100 | mpsc | 399996 | 200000 | 199998 | 0.005 | +| 2 | 100 | chain | 2964 | 2 | 2964 | 0.000 | +| 2 | 100 | mesh | 199998 | 4 | 199998 | 0.001 | +| 2 | 50 | mpsc | 299996 | 200000 | 149998 | 0.005 | +| 2 | 50 | chain | 66612 | 2 | 66612 | 0.001 | +| 2 | 50 | mesh | 149998 | 4 | 149998 | 0.001 | +| 2 | 0 | mpsc | 200000 | 200000 | 100000 | 0.003 | +| 2 | 0 | chain | 100000 | 2 | 100000 | 0.001 | +| 2 | 0 | mesh | 100000 | 4 | 100000 | 0.000 | +| 4 | 100 | mpsc | 1599984 | 800000 | 399996 | 0.017 | +| 4 | 100 | chain | 334 | 2 | 334 | 0.000 | +| 4 | 100 | mesh | 399996 | 8 | 399996 | 0.010 | +| 4 | 50 | mpsc | 1199984 | 800000 | 299996 | 0.015 | +| 4 | 50 | chain | 106810 | 2 | 106810 | 0.002 | +| 4 | 50 | mesh | 299996 | 8 | 299996 | 0.007 | +| 4 | 0 | mpsc | 800000 | 800000 | 200000 | 0.009 | +| 4 | 0 | chain | 200000 | 2 | 200000 | 0.004 | +| 4 | 0 | mesh | 200000 | 8 | 200000 | 0.003 | +| 8 | 100 | mpsc | 6399936 | 3200000 | 799992 | 0.046 | +| 8 | 100 | chain | 6124 | 2 | 6124 | 0.000 | +| 8 | 100 | mesh | 799992 | 16 | 799992 | 0.027 | +| 8 | 50 | mpsc | 4799936 | 3200000 | 599992 | 0.036 | +| 8 | 50 | chain | 254600 | 2 | 254600 | 0.007 | +| 8 | 50 | mesh | 599992 | 16 | 599992 | 0.027 | +| 8 | 0 | mpsc | 3200000 | 3200000 | 400000 | 0.029 | +| 8 | 0 | chain | 400000 | 2 | 400000 | 0.011 | +| 8 | 0 | mesh | 400000 | 16 | 400000 | 0.015 | diff --git a/chain-design-notes.md b/chain-design-notes.md new file mode 100644 index 000000000..adcf77c2a --- /dev/null +++ b/chain-design-notes.md @@ -0,0 +1,160 @@ +# `timely_communication::chain` design notes + +A single multi-writer, forward-linked, compacting chain: an intra-process +all-reduce intended to eventually replace the Progcaster's intra-process leg. +Lives at `communication/src/chain.rs`. + +## Why multi-writer (the cancellation argument; why per-writer lost) + +The first version used per-writer chains (`Mesh`: one `Chain` per worker, every +reader sweeps all chains). This fails the primary goal — **cross-writer +cancellation**. In progress traffic, one worker's `{(T, +1)}` is typically retired +by another worker's `{(T, -1)}`. With per-writer chains those updates never meet: +two workers sending `{(T, +1)}` and `{(T, -1)}` at distinct increasing `T` produce +internally-incompressible content per chain whose union cancels to nothing. +Compaction bounds the node *count* but not the *content*: the merged values grow +without bound, and a laggard does `O(elapsed sends)` fold work to catch up. + +With one shared multi-writer chain, atoms from all writers meet at the shared head, +where `merge_from` cancels them. Accumulated nodes hold only the *net* update, so a +laggard's catch-up work — and the unread backlog's content — is bounded by the net, +not by elapsed sends. This is why per-writer lost (maintainer-agreed decision). + +`Mesh` is retained as a benchmarking comparison structure only; its docs mark +the known cancellation pathology. + +## Contract + +- A `Chain` is a cloneable handle: any holder may `send` and create readers + (`Chain::reader`). Readers created later observe only later sends. +- `Chain::send(&self, v)` commits an atom. Every reader folds, exactly once, every + atom committed after its registration. Atoms may be merged with adjacent atoms + (never split) via `T: Chainable`, a commutative monoid + (`fn merge_from(&mut self, &Self)`). Commutativity is required because multiple + writers and multiple readers impose no cross-atom ordering. +- Live state is bounded by `O(#readers)` nodes, independent of send count, provided + readers occasionally `recv` (every `recv` and every reader drop compacts); node + content is the merged net of unread atoms. +- `Mesh::new(writers)` bundles `W` per-writer chains (comparison only): per- + worker `Chain` send handles plus `MeshReader` handles (`Mesh::reader`) + that sweep all chains. +- `Reader::recv(&mut self, out: &mut T)` folds; `recv_with(f)` hands each atom to + the caller; `is_caught_up()` is an O(1) peek (pin ptr == newest ptr). + +## Structure + +Forward links (old → new). Each node: `holders: AtomicUsize` (count of readers +pinned there; a pin means "I have folded everything up to and including this node") +and a payload `(value: Option, next: Option>)` behind ONE `RwLock`, so +walkers snapshot a consistent pair and compactors mutate both atomically. The chain +holds `newest: Mutex>` (writer's append point) and `oldest: +Mutex>` (compaction sweep origin; invariant: every pin is at or after +`oldest`). Pins are RAII (`Held`), as in the prototype. + +Send path (multi-writer): `send(&self, v)` locks the `newest` mutex — serializing +concurrent writers and reader registration — then the newest node's payload write +lock. If `holders == 0`, merge in place (zero allocation in steady state; this is +where cross-writer cancellation happens); else allocate, link `old.next = new`, +swap the pointer. Allocation occurs only when a reader has just caught up and +pinned the head, so laggard *content* stays the accumulated net. The `holders` +check happens under the payload *write* lock, which excludes concurrent pinning +(see deviations). + +Reader walk: from its pin, hand-over-hand — pin and fold each successor under that +successor's payload read lock, then drop the old pin. The fold frontier is thus +always a pinned node, which is what makes compaction's `holders` checks sufficient. +A node has `next == None` iff it is the chain's newest, which terminates the walk. + +Garbage collection of the prefix is by `Arc` refcounts: forward links mean nothing +points backward, so once `oldest` advances past an unpinned head node the abandoned +prefix simply deallocates. + +## Compaction rule and safety argument + +`a` may absorb its successor `b` (merge `b.value` into `a.value`, set +`a.next = b.next`) iff `a.holders == 0` AND `b.holders == 0` AND `b` is not the +newest node. Checks are made under both payload write locks (taken older-before- +newer), excluding concurrent pinning (pins are taken under a payload read lock, or +under the `newest` mutex for the newest node). + +- No lost atoms: values move only backwards, into the unique live predecessor. A + reader that has not folded `b` has its (pinned) fold frontier strictly before + `b`; `a.holders == 0` excludes a frontier exactly at `a`, and every path from an + earlier pin to `b` passes through `a`, where the value now lives. +- No double folds: a reader that has folded `b` is pinned at or after `b` and never + revisits `a`; pinned-at-`b` readers are excluded by `b.holders == 0`. +- Pins never dangle: pinned nodes are never absorbed (nor merged into), so a pin's + `next` always leads into the live chain. +- The newest node is never absorbed (the writer merges into it instead), so the + `newest` pointer never dangles. The sweep uses a snapshot of `newest`; the true + newest is at or after the snapshot, so never absorbed. + +Compaction runs as a full sweep from `oldest` at the start of every `recv` and on +every `Reader::drop`: advance `oldest` past unpinned head nodes (folded by everyone, +unpinnable thereafter), then merge every adjacent unpinned pair. Sweeps are +serialized by holding the `oldest` mutex throughout (two interleaved sweeps could +otherwise drain a value into a just-unlinked node). + +## Lock ordering + +1. `oldest` mutex (held across a whole sweep), +2. `newest` mutex, +3. node payload `RwLock`s in chain order (older before newer), at most two at once. + +Send: (2) then (3, newest node only) — taking the newest node's payload write lock +while still holding the `newest` mutex is consistent with the order (2 before 3), +and is required: releasing (2) first would let another writer swap the newest +pointer between the check and the merge. Reader registration: (2) only. Walk: (3), +one at a time. Sweep: (1), then (2) briefly (released before node locks), then (3) +pairwise in order. No cycle. Concurrent `send`s serialize on (2), which is the +multi-writer contention point (see chain-bench-results.md for how it trends with +worker count). + +## Deviations from the brief + +1. **Compaction never bypasses pinned nodes** (brief: "a pinned `b` may be bypassed + safely"). Counterexample to the brief's rule: chain `a → p → c` with reader R + pinned at `p`. Bypass `p` (`a.next = c`, `p.next` frozen at `c`); then absorb + `c` into `a` (`a.holders == 0` holds). R resumes via `p.next = c`, but `c`'s + value has moved to `a`, behind R's fold frontier — a lost atom. Frozen `next` + pointers of bypassed pinned nodes are side entrances into the chain that the + `a.holders` check cannot see. Requiring `b.holders == 0` as well removes them. + +2. **An `oldest` pointer and full sweeps replace "absorb pairs the walk crosses".** + With never-bypass, a reader's own walk cannot compact the region behind its pin, + and with forward links a node can only be unlinked by its predecessor — found + only by walking from behind. Without this, the segment between a laggard and the + active readers grows by one node per catch-up (each catch-up pins the newest + node, forcing the next send to allocate; the abandoned pin then sits unreachable + to every active walk). The `oldest`-origin sweep on every `recv` is strictly + stronger self-healing than the brief's crossed-pairs rule and is what actually + delivers the O(#readers) bound (observed: steady length 4 = #readers + 2 with + one laggard and one active reader, over 10k sends). + +3. **Pins are bumped under the node's payload read lock** (plus the `newest` mutex + for registration), not under the newest-pointer lock as the brief sketched. The + writer and the compactor re-check `holders` under the payload *write* lock, + giving the same exclusion. This finer grain is needed because mid-walk readers + pin hand-over-hand on interior nodes the newest-pointer lock says nothing about; + hand-over-hand pinning in turn is what stops a sweep from outrunning a walker + (absorbing a node the walker has passed but whose successor it has not folded). + +4. **`recv_with` invokes the callback under a node's payload read lock**; it must + not reenter the same chain. Documented on the method. + +5. `Reader` carries the bound on the struct so `Drop` (which cannot + add bounds) can run the compaction sweep. + +## Test results + +`cargo test -p timely_communication`: all pass (15 chain tests, including: the +randomized stress test with 3 writer threads sharing one chain, 3 active reader +threads, and one laggard over 50 phases, asserting chain length ≤ #readers + 2 at +quiescent checkpoints and exact final totals; concurrent-writers tests with one +and with many readers recv-ing while writers send; a cross-writer cancellation +test asserting that 200k paired ±v sends from two handles leave live_len ≤ 2 and +at most one folded atom for a laggard; and a fast-path test asserting chain length +is exactly 2 — the pinned caught-up node plus one accumulating newest — after N +sends with no recv). `cargo test -p timely`: all pass, untouched. Clippy +(workspace lint set): no findings on the new module (one pre-existing warning in +zero_copy/tcp.rs, untouched). diff --git a/communication/examples/chain_bench.rs b/communication/examples/chain_bench.rs new file mode 100644 index 000000000..bbbfd8165 --- /dev/null +++ b/communication/examples/chain_bench.rs @@ -0,0 +1,859 @@ +//! Benchmark harness comparing five intra-process progress-broadcast structures: +//! +//! 1. **MPSC baseline**: per-reader `Mutex>`; send clones the +//! delta into every reader's queue; recv drains its own queue and folds. +//! This models the current Progcaster intra-process path (per-peer clone, +//! no in-transit merging). +//! 2. **Chain**: the multi-writer compacting chain (`timely_communication::chain`), +//! where concurrent sends merge — and cancel — at the shared head. +//! 3. **Mesh**: per-writer chains, retained to demonstrate the cross-writer +//! cancellation pathology empirically. +//! 4. **Ledger**: the maximal-merging end of the spectrum. One mutex-protected +//! consolidated all-time sum of every submitted delta (with cancellation this +//! *is* the outstanding counts, so it stays small), plus an atomic version +//! bumped per submit so readers can cheaply skip when nothing changed. A +//! reader recvs by diffing the shared state against its own local copy +//! (a sorted merge-walk) and folding the diff. +//! 5. **Cells**: per-reader accumulator cells — the in-place accumulation +//! variant of MPSC. One `Mutex` per reader; submit merges the delta +//! into every reader's cell (consolidating, so cross-writer cancellation +//! happens per cell); recv takes its own cell wholesale and folds it. A +//! per-cell atomic version makes an idle recv a single atomic load. +//! +//! The delta type is a miniature ChangeBatch: `Vec<(K, i64)>` with merge = +//! append + amortized consolidation (sort, sum duplicates, drop zeros), mimicking +//! timely's progress updates, including cancellation. +//! +//! The key type `K` is a second axis: `u64` (timely's pointstamps-as-integers +//! stand-in), or an "alloc" key (`Box<[u64; 3]>` wrapping the same logical u64) +//! modeling DD/MZ Pointstamp timestamps that allocate — same comparison and +//! cancellation structure, but every clone allocates and every drop frees, +//! possibly on a different thread than the allocation. +//! +//! Workload: N workers, all-to-all (each worker is one writer and one reader). +//! At step `s`, worker `w` mints `{(t(s, w), +1)}` with `t(s, w) = s * N + w`, +//! and (for a configurable fraction of steps) also retires its predecessor's mint +//! from the previous step, `{(t(s-1, (w-1) mod N), -1)}`, so the all-worker union +//! over a window largely cancels. +//! +//! Scenarios (run for each N, cancellation fraction, and structure): +//! - A. ALL KEEP UP: every worker recvs every step; wall time and sends/sec. +//! - B. LAGGARD: worker 0 recvs every K = 1024 steps; others every step; total +//! wall time, the laggard's fold work (entries folded per recv), and the +//! laggard's recv latency. +//! - C. UNREAD BACKLOG: nobody recvs until the end; peak retained state and +//! final catch-up time. +//! +//! Modes: no argument runs the full N × cancel% matrix with `u64` keys; +//! `alloc` runs the five-structure × two-key comparison (scenarios A and B at +//! N ∈ {4, 8}, cancel 100%; scenario C at N = 8, cancel 100%); `smoke` runs a +//! tiny correctness pass over every structure, scenario, and key type. +//! +//! Timing rows are the median of three runs after one warmup run. + +use std::collections::VecDeque; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Barrier, Mutex}; +use std::time::{Duration, Instant}; + +use timely_communication::chain::{Chain, Chainable, Mesh, MeshReader, Reader}; + +/// The timestamp key: ordered, clonable, and constructible from a logical u64. +trait Key: Ord + Clone + Send + Sync + 'static { + const NAME: &'static str; + fn from_time(time: u64) -> Self; +} + +impl Key for u64 { + const NAME: &'static str = "u64"; + fn from_time(time: u64) -> Self { time } +} + +/// A heap-bearing key modeling DD/MZ Pointstamp timestamps that allocate: +/// the same ordering and cancellation structure as the wrapped u64, but every +/// clone allocates and every drop frees — and a delta cloned at the sender and +/// consumed by a reader is freed on the reader's thread, so cross-thread free +/// traffic occurs exactly as it would in production. +impl Key for Box<[u64; 3]> { + const NAME: &'static str = "alloc"; + fn from_time(time: u64) -> Self { Box::new([time, 0, 0]) } +} + +/// A miniature ChangeBatch: updates with amortized consolidation. +#[derive(Clone)] +struct Delta { + updates: Vec<(K, i64)>, + /// Consolidation is amortized: we consolidate when the length doubles past + /// the last consolidated length (as timely's `ChangeBatch` does). + clean: usize, +} + +impl Default for Delta { + fn default() -> Self { + Delta { updates: Vec::new(), clean: 0 } + } +} + +impl Delta { + fn from_updates(updates: &[(K, i64)]) -> Self { + Delta { updates: updates.to_vec(), clean: 0 } + } + fn maybe_consolidate(&mut self) { + if self.updates.len() > 32 && self.updates.len() > 2 * self.clean { + self.consolidate(); + } + } + /// Extends by cloning from a slice (the cross-endpoint broadcast path). + fn extend_from(&mut self, other: &[(K, i64)]) { + self.updates.extend_from_slice(other); + self.maybe_consolidate(); + } + /// Extends by moving owned updates (the fold path for structures whose + /// recv obtains ownership); the receiver then drops what it consumed. + fn extend_owned(&mut self, other: Vec<(K, i64)>) { + self.updates.extend(other); + self.maybe_consolidate(); + } + fn consolidate(&mut self) { + self.updates.sort_unstable_by(|x, y| x.0.cmp(&y.0)); + let mut write = 0; + for read in 0 .. self.updates.len() { + if write > 0 && self.updates[write - 1].0 == self.updates[read].0 { + self.updates[write - 1].1 += self.updates[read].1; + } + else { + if write > 0 && self.updates[write - 1].1 == 0 { write -= 1; } + self.updates.swap(write, read); + write += 1; + } + } + if write > 0 && self.updates[write - 1].1 == 0 { write -= 1; } + self.updates.truncate(write); + self.clean = self.updates.len(); + } + /// Incrementally consolidates the unconsolidated tail into the (sorted, + /// zero-free) consolidated prefix, leaving the whole fully consolidated. + /// Cost is O(tail + overlap) rather than a full re-sort, which matters for + /// the ledger: its shared state is diffed (and so must be consolidated) on + /// every recv, while submits only append near the tail. + fn consolidate_tail(&mut self) { + if self.updates.len() == self.clean { return; } + let mut tail = self.updates.split_off(self.clean); + tail.sort_unstable_by(|x, y| x.0.cmp(&y.0)); + let pos = self.updates.partition_point(|x| x.0 < tail[0].0); + let mut prefix = self.updates.split_off(pos).into_iter().peekable(); + let mut tail = tail.into_iter().peekable(); + loop { + let entry = match (prefix.peek(), tail.peek()) { + (Some(p), Some(t)) => { + if p.0 <= t.0 { prefix.next().unwrap() } else { tail.next().unwrap() } + } + (Some(_), None) => prefix.next().unwrap(), + (None, Some(_)) => tail.next().unwrap(), + (None, None) => break, + }; + match self.updates.last_mut() { + Some(last) if last.0 == entry.0 => { + last.1 += entry.1; + if last.1 == 0 { self.updates.pop(); } + } + _ => { + if entry.1 != 0 { self.updates.push(entry); } + } + } + } + self.clean = self.updates.len(); + } + /// The sum of all diffs (used for correctness checks). + fn diff_sum(&self) -> i64 { + self.updates.iter().map(|(_, diff)| *diff).sum() + } +} + +impl Chainable for Delta { + fn merge_from(&mut self, other: &Self) { + self.extend_from(&other.updates); + } +} + +/// The delta worker `w` sends at step `s`: its own mint, plus (for `cancel_pct` +/// percent of steps) the retirement of its predecessor's mint from step `s - 1`. +fn make_delta(step: u64, worker: u64, workers: u64, cancel_pct: u64) -> Vec<(K, i64)> { + let mut updates = vec![(K::from_time(step * workers + worker), 1)]; + if step > 0 && (step % 100) < cancel_pct { + let prev = (worker + workers - 1) % workers; + updates.push((K::from_time((step - 1) * workers + prev), -1)); + } + updates +} + +/// The sum of diffs over all deltas sent by all workers (for correctness checks). +fn expected_diff_sum(steps: u64, workers: u64, cancel_pct: u64) -> i64 { + let mut total = 0i64; + for step in 0 .. steps { + let paired = step > 0 && (step % 100) < cancel_pct; + total += workers as i64 * if paired { 0 } else { 1 }; + } + total +} + +/// One worker's communication endpoint: a writer plus its own reader. +trait Endpoint: Send { + /// Broadcasts a delta to all readers. + fn send(&self, updates: &[(K, i64)]); + /// Folds everything unread into `folded`; returns the number of entries folded. + fn recv(&mut self, folded: &mut Delta) -> usize; +} + +/// Per-reader queues; send clones into every queue (the Progcaster model). +struct MpscEndpoint { + queues: Arc>>>>, + index: usize, +} + +impl Endpoint for MpscEndpoint { + fn send(&self, updates: &[(K, i64)]) { + for queue in self.queues.iter() { + queue.lock().unwrap().push_back(Delta::from_updates(updates)); + } + } + fn recv(&mut self, folded: &mut Delta) -> usize { + let drained: Vec> = { + let mut queue = self.queues[self.index].lock().unwrap(); + queue.drain(..).collect() + }; + let mut entries = 0; + for delta in drained { + entries += delta.updates.len(); + folded.extend_owned(delta.updates); + } + entries + } +} + +/// A shared multi-writer chain; one reader per worker. +struct ChainEndpoint { + chain: Chain>, + reader: Reader>, +} + +impl Endpoint for ChainEndpoint { + fn send(&self, updates: &[(K, i64)]) { + self.chain.send(Delta::from_updates(updates)); + } + fn recv(&mut self, folded: &mut Delta) -> usize { + let mut entries = 0; + self.reader.recv_with(|delta| { + entries += delta.updates.len(); + folded.extend_from(&delta.updates); + }); + entries + } +} + +/// Per-writer chains; each worker writes its own and reads all (the pathology). +struct MeshEndpoint { + writer: Chain>, + reader: MeshReader>, +} + +impl Endpoint for MeshEndpoint { + fn send(&self, updates: &[(K, i64)]) { + self.writer.send(Delta::from_updates(updates)); + } + fn recv(&mut self, folded: &mut Delta) -> usize { + let mut entries = 0; + self.reader.recv_with(|delta| { + entries += delta.updates.len(); + folded.extend_from(&delta.updates); + }); + entries + } +} + +/// One mutex-protected consolidated sum of every submitted delta, plus an +/// atomic version bumped per submit so readers can cheaply skip when nothing +/// has changed since their last recv. +struct LedgerShared { + state: Mutex>, + version: AtomicUsize, +} + +/// The shared ledger plus a reader-local copy of the consolidated state (kept +/// outside the lock) and the version latched at the reader's last recv. +struct LedgerEndpoint { + shared: Arc>, + local: Vec<(K, i64)>, + seen: usize, +} + +impl Endpoint for LedgerEndpoint { + fn send(&self, updates: &[(K, i64)]) { + let mut state = self.shared.state.lock().unwrap(); + state.extend_from(updates); + self.shared.version.fetch_add(1, Ordering::Release); + } + fn recv(&mut self, folded: &mut Delta) -> usize { + if self.shared.version.load(Ordering::Acquire) == self.seen { + return 0; + } + let mut diff = Vec::new(); + { + let mut state = self.shared.state.lock().unwrap(); + self.seen = self.shared.version.load(Ordering::Acquire); + state.consolidate_tail(); + // Merge-walk the (sorted, zero-free) shared and local states, + // emitting entries whose counts differ as (key, shared - local). + let shared = &state.updates; + let local = &self.local; + let (mut i, mut j) = (0, 0); + while i < shared.len() || j < local.len() { + if j >= local.len() || (i < shared.len() && shared[i].0 < local[j].0) { + diff.push(shared[i].clone()); + i += 1; + } + else if i >= shared.len() || local[j].0 < shared[i].0 { + diff.push((local[j].0.clone(), -local[j].1)); + j += 1; + } + else { + if shared[i].1 != local[j].1 { + diff.push((shared[i].0.clone(), shared[i].1 - local[j].1)); + } + i += 1; + j += 1; + } + } + self.local.clone_from(&state.updates); + } + let entries = diff.len(); + folded.extend_owned(diff); + entries + } +} + +/// One per-reader accumulator cell: in-place consolidated state plus a version +/// bumped per submit so an idle recv is a single atomic load. +struct Cell { + state: Mutex>, + version: AtomicUsize, +} + +/// Per-reader accumulator cells: submit merges (consolidating, so cross-writer +/// cancellation happens per cell) into every reader's cell; recv takes its own +/// cell wholesale and folds it. +struct CellsEndpoint { + cells: Arc>>, + index: usize, + seen: usize, +} + +impl Endpoint for CellsEndpoint { + fn send(&self, updates: &[(K, i64)]) { + for cell in self.cells.iter() { + cell.state.lock().unwrap().extend_from(updates); + cell.version.fetch_add(1, Ordering::Release); + } + } + fn recv(&mut self, folded: &mut Delta) -> usize { + let cell = &self.cells[self.index]; + if cell.version.load(Ordering::Acquire) == self.seen { + return 0; + } + let taken = { + let mut state = cell.state.lock().unwrap(); + self.seen = cell.version.load(Ordering::Acquire); + std::mem::take(&mut *state) + }; + let entries = taken.updates.len(); + folded.extend_owned(taken.updates); + entries + } +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum Structure { Mpsc, Chain, Mesh, Ledger, Cells } + +impl Structure { + fn name(self) -> &'static str { + match self { + Structure::Mpsc => "mpsc", + Structure::Chain => "chain", + Structure::Mesh => "mesh", + Structure::Ledger => "ledger", + Structure::Cells => "cells", + } + } +} + +const STRUCTURES: [Structure; 5] = [Structure::Mpsc, Structure::Chain, Structure::Mesh, Structure::Ledger, Structure::Cells]; +/// Ledger-free subset for the alloc matrix (the ledger's O(live-state) recv is too slow here). +const ALLOC_STRUCTURES: [Structure; 4] = [Structure::Mpsc, Structure::Chain, Structure::Mesh, Structure::Cells]; + +/// Reports retained state: total entries, and retained "units" (queued deltas +/// for MPSC; live nodes for Chain/Mesh; the single ledger; cells for Cells). +/// For the chain structures this folds a probe reader registered before any +/// sends, which observes (without removing) the chain's full retained content. +type ProbeFn = Box (usize, usize) + Send>; + +/// Builds the endpoints (and, on request, a retained-state probe) for `workers` +/// workers. The probe must only be requested by scenario C: a never-recv-ing +/// probe reader would otherwise pin old chain state and distort scenarios A/B. +fn build(structure: Structure, workers: usize, with_probe: bool) -> (Vec>>, Option) { + match structure { + Structure::Mpsc => { + let queues: Arc>>>> = + Arc::new((0 .. workers).map(|_| Mutex::new(VecDeque::new())).collect()); + let endpoints = (0 .. workers) + .map(|index| Box::new(MpscEndpoint { queues: Arc::clone(&queues), index }) as Box>) + .collect(); + let probe = with_probe.then(|| { + let queues = Arc::clone(&queues); + Box::new(move || { + let mut entries = 0; + let mut units = 0; + for queue in queues.iter() { + let queue = queue.lock().unwrap(); + units += queue.len(); + entries += queue.iter().map(|delta| delta.updates.len()).sum::(); + } + (entries, units) + }) as ProbeFn + }); + (endpoints, probe) + } + Structure::Chain => { + let chain = Chain::>::new(); + let probe = with_probe.then(|| { + let mut reader = chain.reader(); + let diagnostic = chain.clone(); + Box::new(move || { + let mut entries = 0; + reader.recv_with(|delta| entries += delta.updates.len()); + (entries, diagnostic.live_len()) + }) as ProbeFn + }); + let endpoints = (0 .. workers) + .map(|_| Box::new(ChainEndpoint { chain: chain.clone(), reader: chain.reader() }) as Box>) + .collect(); + (endpoints, probe) + } + Structure::Mesh => { + let (writers, mesh) = Mesh::>::new(workers); + let probe_reader = with_probe.then(|| mesh.reader()); + let endpoints: Vec>> = writers + .into_iter() + .map(|writer| Box::new(MeshEndpoint { writer, reader: mesh.reader() }) as Box>) + .collect(); + let probe = probe_reader.map(|mut reader| { + Box::new(move || { + let mut entries = 0; + reader.recv_with(|delta| entries += delta.updates.len()); + (entries, mesh.live_lens().iter().sum()) + }) as ProbeFn + }); + (endpoints, probe) + } + Structure::Ledger => { + let shared = Arc::new(LedgerShared { + state: Mutex::new(Delta::default()), + version: AtomicUsize::new(0), + }); + let endpoints = (0 .. workers) + .map(|_| Box::new(LedgerEndpoint { + shared: Arc::clone(&shared), + local: Vec::new(), + seen: 0, + }) as Box>) + .collect(); + let probe = with_probe.then(|| { + let shared = Arc::clone(&shared); + Box::new(move || { + let mut state = shared.state.lock().unwrap(); + state.consolidate_tail(); + (state.updates.len(), 1usize) + }) as ProbeFn + }); + (endpoints, probe) + } + Structure::Cells => { + let cells: Arc>> = Arc::new( + (0 .. workers) + .map(|_| Cell { state: Mutex::new(Delta::default()), version: AtomicUsize::new(0) }) + .collect(), + ); + let endpoints = (0 .. workers) + .map(|index| Box::new(CellsEndpoint { cells: Arc::clone(&cells), index, seen: 0 }) as Box>) + .collect(); + let probe = with_probe.then(|| { + let cells = Arc::clone(&cells); + Box::new(move || { + let mut entries = 0; + for cell in cells.iter() { + let mut state = cell.state.lock().unwrap(); + state.consolidate_tail(); + entries += state.updates.len(); + } + (entries, cells.len()) + }) as ProbeFn + }); + (endpoints, probe) + } + } +} + +fn median_by_wall(mut runs: Vec<(Duration, M)>) -> (Duration, M) { + runs.sort_by_key(|(wall, _)| *wall); + runs.remove(runs.len() / 2) +} + +/// Scenario A: every worker sends and recvs every step. +fn scenario_a(structure: Structure, workers: usize, steps: u64, cancel_pct: u64) -> Duration { + let mut runs = Vec::new(); + for run in 0 .. 4 { + let (endpoints, _) = build::(structure, workers, false); + let barrier = Arc::new(Barrier::new(workers + 1)); + let expected = expected_diff_sum(steps, workers as u64, cancel_pct); + let threads: Vec<_> = endpoints + .into_iter() + .enumerate() + .map(|(worker, mut endpoint)| { + let barrier = Arc::clone(&barrier); + std::thread::spawn(move || { + let mut folded = Delta::default(); + barrier.wait(); + for step in 0 .. steps { + endpoint.send(&make_delta::(step, worker as u64, workers as u64, cancel_pct)); + endpoint.recv(&mut folded); + } + barrier.wait(); + endpoint.recv(&mut folded); + folded.consolidate(); + assert_eq!(folded.diff_sum(), expected); + }) + }) + .collect(); + barrier.wait(); + let start = Instant::now(); + barrier.wait(); + let wall = start.elapsed(); + for thread in threads { thread.join().unwrap(); } + if run > 0 { runs.push((wall, ())); } + } + median_by_wall(runs).0 +} + +/// Scenario B metrics for the laggard. +#[derive(Clone, Default)] +struct LaggardMetrics { + recvs: usize, + entries_total: usize, + entries_max: usize, + latency_total: Duration, + latency_max: Duration, +} + +/// Scenario B: worker 0 recvs only every `lag` steps; others every step. +fn scenario_b(structure: Structure, workers: usize, steps: u64, cancel_pct: u64, lag: u64) -> (Duration, LaggardMetrics) { + let mut runs = Vec::new(); + for run in 0 .. 4 { + let (endpoints, _) = build::(structure, workers, false); + let barrier = Arc::new(Barrier::new(workers + 1)); + let expected = expected_diff_sum(steps, workers as u64, cancel_pct); + let metrics = Arc::new(Mutex::new(LaggardMetrics::default())); + let threads: Vec<_> = endpoints + .into_iter() + .enumerate() + .map(|(worker, mut endpoint)| { + let barrier = Arc::clone(&barrier); + let metrics = Arc::clone(&metrics); + std::thread::spawn(move || { + let laggard = worker == 0; + let mut local = LaggardMetrics::default(); + let mut folded = Delta::default(); + barrier.wait(); + for step in 0 .. steps { + endpoint.send(&make_delta::(step, worker as u64, workers as u64, cancel_pct)); + if !laggard { + endpoint.recv(&mut folded); + } + else if step % lag == lag - 1 { + let start = Instant::now(); + let entries = endpoint.recv(&mut folded); + let latency = start.elapsed(); + local.recvs += 1; + local.entries_total += entries; + local.entries_max = local.entries_max.max(entries); + local.latency_total += latency; + local.latency_max = local.latency_max.max(latency); + } + } + barrier.wait(); + endpoint.recv(&mut folded); + folded.consolidate(); + assert_eq!(folded.diff_sum(), expected); + if laggard { *metrics.lock().unwrap() = local; } + }) + }) + .collect(); + barrier.wait(); + let start = Instant::now(); + barrier.wait(); + let wall = start.elapsed(); + for thread in threads { thread.join().unwrap(); } + if run > 0 { + let collected = metrics.lock().unwrap().clone(); + runs.push((wall, collected)); + } + } + median_by_wall(runs) +} + +/// Scenario C metrics. +#[derive(Clone, Default)] +struct BacklogMetrics { + retained_entries: usize, + retained_units: usize, + catchup: Duration, + folded_by_reader0: usize, +} + +/// Scenario C: nobody recvs until the end; then everyone catches up at once. +fn scenario_c(structure: Structure, workers: usize, steps: u64, cancel_pct: u64) -> BacklogMetrics { + let mut runs = Vec::new(); + for run in 0 .. 4 { + let (endpoints, probe) = build::(structure, workers, true); + let mut probe = probe.expect("probe requested"); + let barrier = Arc::new(Barrier::new(workers + 1)); + let expected = expected_diff_sum(steps, workers as u64, cancel_pct); + let folded0 = Arc::new(Mutex::new(0usize)); + let threads: Vec<_> = endpoints + .into_iter() + .enumerate() + .map(|(worker, mut endpoint)| { + let barrier = Arc::clone(&barrier); + let folded0 = Arc::clone(&folded0); + std::thread::spawn(move || { + barrier.wait(); // start sending + for step in 0 .. steps { + endpoint.send(&make_delta::(step, worker as u64, workers as u64, cancel_pct)); + } + barrier.wait(); // all sends done + barrier.wait(); // probe measured; catch up + let mut folded = Delta::default(); + let entries = endpoint.recv(&mut folded); + barrier.wait(); // catch-up done + folded.consolidate(); + assert_eq!(folded.diff_sum(), expected); + if worker == 0 { *folded0.lock().unwrap() = entries; } + }) + }) + .collect(); + barrier.wait(); // start sending + barrier.wait(); // all sends done + let (retained_entries, retained_units) = probe(); + let start = Instant::now(); + barrier.wait(); // catch up + barrier.wait(); // catch-up done + let catchup = start.elapsed(); + for thread in threads { thread.join().unwrap(); } + if run > 0 { + let metrics = BacklogMetrics { + retained_entries, + retained_units, + catchup, + folded_by_reader0: *folded0.lock().unwrap(), + }; + runs.push((catchup, metrics)); + } + } + median_by_wall(runs).1 +} + +/// The default mode: the full N × cancel% matrix, `u64` keys. +fn run_full_matrix() { + let worker_counts = [2usize, 4, 8]; + let cancel_pcts = [100u64, 50, 0]; + + let steps_a: u64 = 400_000; + let steps_b: u64 = 400_000; + let steps_c: u64 = 50_000; + let lag: u64 = 1024; + + println!("# chain_bench results"); + println!(); + println!("Delta: miniature ChangeBatch (`Vec<(u64, i64)>`, amortized consolidation)."); + println!("Workload: all-to-all; worker w mints (t,+1) each step; its successor"); + println!("retires it (-1) one step later for `cancel%` of steps. Timing rows are"); + println!("the median of 3 runs after a warmup run."); + println!(); + + // The ledger's recv diffs the full consolidated shared state, which at + // cancel% < 100 grows linearly with elapsed sends, making keep-up + // scenarios quadratic in steps. Those cells run with reduced steps (the + // table reports steps per cell; sends/sec remains the comparable rate, + // though it flatters the ledger, whose rate degrades as state grows). + let cell_steps = |structure: Structure, cancel_pct: u64, steps: u64| { + if structure == Structure::Ledger && cancel_pct < 100 { steps / 16 } else { steps } + }; + + // Scenario A. + println!("## Scenario A: all keep up ({} steps/worker; send + recv every step)", steps_a); + println!(); + println!("| N | cancel% | structure | steps | wall (s) | sends/sec |"); + println!("|---|---------|-----------|-------|----------|-----------|"); + for &workers in &worker_counts { + for &cancel_pct in &cancel_pcts { + for &structure in &STRUCTURES { + let steps = cell_steps(structure, cancel_pct, steps_a); + let wall = scenario_a::(structure, workers, steps, cancel_pct); + let rate = (workers as u64 * steps) as f64 / wall.as_secs_f64(); + println!("| {} | {} | {} | {} | {:.3} | {:.0} |", workers, cancel_pct, structure.name(), steps, wall.as_secs_f64(), rate); + } + } + } + println!(); + + // Scenario B. + println!("## Scenario B: laggard ({} steps/worker; worker 0 recvs every {} steps)", steps_b, lag); + println!(); + println!("| N | cancel% | structure | steps | wall (s) | laggard recvs | mean entries/recv | max entries/recv | mean latency (µs) | max latency (µs) |"); + println!("|---|---------|-----------|-------|----------|---------------|-------------------|------------------|-------------------|------------------|"); + for &workers in &worker_counts { + for &cancel_pct in &cancel_pcts { + for &structure in &STRUCTURES { + let steps = cell_steps(structure, cancel_pct, steps_b); + let (wall, metrics) = scenario_b::(structure, workers, steps, cancel_pct, lag); + print_b_row(&format!("| {} | {} | {} | {}", workers, cancel_pct, structure.name(), steps), wall, &metrics); + } + } + } + println!(); + + // Scenario C. + println!("## Scenario C: unread backlog ({} steps/worker; recv only at the end)", steps_c); + println!(); + println!("Retained units: queued deltas (mpsc), live chain nodes (chain/mesh),"); + println!("the single shared ledger (ledger; its retained entries are the"); + println!("consolidated shared state size = the net in-flight window), or"); + println!("per-reader cells (cells; retained entries are the sum of cell sizes)."); + println!(); + println!("| N | cancel% | structure | retained entries | retained units | folded by reader 0 | catch-up (s) |"); + println!("|---|---------|-----------|------------------|----------------|--------------------|--------------|"); + for &workers in &worker_counts { + for &cancel_pct in &cancel_pcts { + for &structure in &STRUCTURES { + let metrics = scenario_c::(structure, workers, steps_c, cancel_pct); + println!( + "| {} | {} | {} | {} | {} | {} | {:.3} |", + workers, cancel_pct, structure.name(), + metrics.retained_entries, metrics.retained_units, + metrics.folded_by_reader0, metrics.catchup.as_secs_f64(), + ); + } + } + } +} + +fn print_b_row(prefix: &str, wall: Duration, metrics: &LaggardMetrics) { + let recvs = metrics.recvs.max(1); + println!( + "{} | {:.3} | {} | {:.0} | {} | {:.1} | {:.1} |", + prefix, wall.as_secs_f64(), + metrics.recvs, + metrics.entries_total as f64 / recvs as f64, + metrics.entries_max, + metrics.latency_total.as_secs_f64() * 1e6 / recvs as f64, + metrics.latency_max.as_secs_f64() * 1e6, + ); +} + +fn alloc_rows_a(workers: usize, steps: u64, cancel_pct: u64) { + for &structure in &ALLOC_STRUCTURES { + let wall = scenario_a::(structure, workers, steps, cancel_pct); + let rate = (workers as u64 * steps) as f64 / wall.as_secs_f64(); + println!("| {} | {} | {} | {:.3} | {:.0} |", workers, K::NAME, structure.name(), wall.as_secs_f64(), rate); + } +} + +fn alloc_rows_b(workers: usize, steps: u64, cancel_pct: u64, lag: u64) { + for &structure in &ALLOC_STRUCTURES { + let (wall, metrics) = scenario_b::(structure, workers, steps, cancel_pct, lag); + print_b_row(&format!("| {} | {} | {}", workers, K::NAME, structure.name()), wall, &metrics); + } +} + +fn alloc_rows_c(workers: usize, steps: u64, cancel_pct: u64) { + for &structure in &ALLOC_STRUCTURES { + let metrics = scenario_c::(structure, workers, steps, cancel_pct); + println!( + "| {} | {} | {} | {} | {} | {} | {:.3} |", + workers, K::NAME, structure.name(), + metrics.retained_entries, metrics.retained_units, + metrics.folded_by_reader0, metrics.catchup.as_secs_f64(), + ); + } +} + +/// The `alloc` mode: five structures × two key types, cancel 100%. +/// Scenarios A and B at N ∈ {4, 8}; scenario C at N = 8. +fn run_alloc_matrix() { + let steps_a: u64 = 400_000; + let steps_b: u64 = 400_000; + let steps_c: u64 = 50_000; + let lag: u64 = 1024; + let cancel_pct: u64 = 100; + + println!("### Scenario A: all keep up ({} steps/worker; cancel {}%)", steps_a, cancel_pct); + println!(); + println!("| N | key | structure | wall (s) | sends/sec |"); + println!("|---|-----|-----------|----------|-----------|"); + for &workers in &[4usize, 8] { + alloc_rows_a::(workers, steps_a, cancel_pct); + alloc_rows_a::>(workers, steps_a, cancel_pct); + } + println!(); + + println!("### Scenario B: laggard ({} steps/worker; worker 0 recvs every {} steps; cancel {}%)", steps_b, lag, cancel_pct); + println!(); + println!("| N | key | structure | wall (s) | laggard recvs | mean entries/recv | max entries/recv | mean latency (µs) | max latency (µs) |"); + println!("|---|-----|-----------|----------|---------------|-------------------|------------------|-------------------|------------------|"); + for &workers in &[4usize, 8] { + alloc_rows_b::(workers, steps_b, cancel_pct, lag); + alloc_rows_b::>(workers, steps_b, cancel_pct, lag); + } + println!(); + + println!("### Scenario C: unread backlog ({} steps/worker; cancel {}%)", steps_c, cancel_pct); + println!(); + println!("| N | key | structure | retained entries | retained units | folded by reader 0 | catch-up (s) |"); + println!("|---|-----|-----------|------------------|----------------|--------------------|--------------|"); + alloc_rows_c::(8, steps_c, cancel_pct); + alloc_rows_c::>(8, steps_c, cancel_pct); +} + +/// The `smoke` mode: a tiny correctness pass (the scenarios assert the folded +/// diff sum) over every structure, scenario, and key type. +fn run_smoke() { + for &workers in &[2usize, 4] { + for &cancel_pct in &[100u64, 50, 0] { + for &structure in &STRUCTURES { + scenario_a::(structure, workers, 2_000, cancel_pct); + scenario_a::>(structure, workers, 2_000, cancel_pct); + scenario_b::(structure, workers, 2_000, cancel_pct, 64); + scenario_b::>(structure, workers, 2_000, cancel_pct, 64); + scenario_c::(structure, workers, 2_000, cancel_pct); + scenario_c::>(structure, workers, 2_000, cancel_pct); + } + } + } + println!("smoke: all structures × scenarios × key types passed"); +} + +fn main() { + match std::env::args().nth(1).as_deref() { + Some("alloc") => run_alloc_matrix(), + Some("smoke") => run_smoke(), + _ => run_full_matrix(), + } +} diff --git a/communication/src/chain.rs b/communication/src/chain.rs new file mode 100644 index 000000000..97724b6cf --- /dev/null +++ b/communication/src/chain.rs @@ -0,0 +1,929 @@ +//! An intra-process multi-writer, multi-reader all-reduce structure. +//! +//! This module provides a forward-linked, compacting chain of "atoms", intended to +//! eventually replace the intra-process leg of timely's progress broadcasting: all +//! workers write progress updates into one shared [`Chain`], and all workers (and +//! any network threads) read from it. +//! +//! # Why one multi-writer chain (rather than one chain per writer) +//! +//! The primary goal is *cross-writer cancellation*: in progress traffic, one +//! worker's `{(T, +1)}` is typically retired by another worker's `{(T, -1)}`. +//! With per-writer chains (see [`Mesh`], retained for comparison), each chain's +//! content is internally incompressible — two workers emitting `{(T, +1)}` and +//! `{(T, -1)}` at distinct increasing `T` build two chains whose accumulated +//! values never shrink, even though their union cancels to nothing; a laggard then +//! performs `O(elapsed)` fold work to catch up. With a single shared chain, the +//! writers' atoms meet at the shared head, where `merge_from` cancels them: the +//! accumulated nodes hold only the *net* update, so a laggard's catch-up work is +//! bounded by the net content, not by elapsed sends. +//! +//! # Contract +//! +//! - A [`Chain`] is a cloneable handle: any holder may [`Chain::send`] atoms, +//! and any holder may create readers ([`Reader`], via [`Chain::reader`]). +//! Readers created later observe only atoms sent after their creation. +//! - [`Chain::send`] commits an atom. Every reader eventually folds, exactly once, +//! every atom committed after its registration. Atoms may be merged with adjacent +//! atoms (never split) via [`Chainable`], which must be a **commutative** monoid: +//! commutativity is required because multiple writers and multiple readers impose +//! no cross-atom ordering (concurrent sends are merged in whatever order they win +//! the head, and a reader of a [`Mesh`] sees atoms from different chains in no +//! particular relative order). +//! - Live state is bounded by `O(#readers)` *nodes*, independent of the number of +//! sends, provided readers occasionally call [`Reader::recv`] (each `recv`, and +//! each reader drop, runs a compaction sweep over the whole chain). The *content* +//! of those nodes is the merged net of the unread atoms. +//! +//! # Structure +//! +//! The chain is a forward-linked list of nodes, oldest to newest. Each node holds +//! a payload `(value, next)` behind a single `RwLock`, so that walkers snapshot a +//! consistent pair and compactors mutate both atomically, plus an atomic `holders` +//! count of the readers currently *pinned* at the node. A reader pinned at a node +//! has folded every atom up to and including that node, and resumes from its `next`. +//! +//! The chain object holds a `newest` pointer (where writers append or merge) +//! and an `oldest` pointer (where compaction sweeps begin). Nodes strictly before +//! `oldest` are unreachable and reclaimed by `Arc` reference counting: forward links +//! mean old nodes are kept alive only by `oldest`, by reader pins, and by their +//! predecessors' `next` pointers, so abandoning a prefix frees it. +//! +//! # Send path +//! +//! `send` locks the `newest` pointer (serializing writers) and the newest node's +//! payload; if the node's `holders` is zero it merges the value in place (zero +//! allocation — the common case when no reader has just caught up); otherwise it +//! allocates a new node, links it from the old newest, and swaps the pointer. +//! In-place merging at the shared head is where cross-writer cancellation happens. +//! +//! # Compaction rule +//! +//! Node `a` may absorb its successor `b` (merging `b`'s value into `a` and setting +//! `a.next = b.next`) iff **both** `a.holders == 0` and `b.holders == 0`, and `b` +//! is not the newest node (writers merge into the newest node instead). The +//! `holders` checks are made under both payload write locks, which excludes +//! concurrent pinning (pins are taken under a payload read lock, or under the +//! newest-pointer lock for the newest node). +//! +//! Safety argument: +//! - *No lost atoms*: a value only moves backwards, from `b` into its unique live +//! predecessor `a`. Any reader that has not folded `b` is pinned strictly before +//! `b` (pinned exactly at `a` is excluded by `a.holders == 0`; mid-walk readers +//! pin hand-over-hand, so their fold frontier is always a pinned node), and every +//! path from a pin before `a` to `b` passes through `a`, where the value now lives. +//! - *No double folds*: a reader that has folded `b` is pinned at or after `b` and +//! never revisits `a`; readers pinned exactly at `b` are excluded by +//! `b.holders == 0`. +//! - *Pins never dangle*: pinned nodes are never absorbed nor bypassed, so a pin's +//! `next` always leads into the live chain. +//! +//! Note: this rule is deliberately stronger than "absorb iff `a.holders == 0`" +//! (bypassing pinned successors). Bypassing a pinned node `p` freezes `p.next` as a +//! side entrance into the chain; a later absorption of `*p.next` into a node behind +//! `p` would then move an unfolded value behind the pinned reader's fold frontier, +//! losing it. Restricting absorption to unpinned pairs removes the side entrances. +//! +//! Because pinned nodes are never bypassed, a sweep from a reader's own pin cannot +//! compact the region behind it. Instead, each `recv` (and each reader drop) runs a +//! full sweep from the chain's `oldest` pointer: it first advances `oldest` past +//! unpinned head nodes (all readers are pinned at or after `oldest`, so an unpinned +//! head has been folded by everyone and can be abandoned to the reference counter), +//! and then merges every adjacent unpinned pair up to the newest node. This is the +//! "self-healing" of the design: no leaked adjacency survives any reader's `recv`. +//! +//! # Lock ordering +//! +//! 1. the chain's `oldest` mutex (held for the duration of a sweep, serializing +//! sweeps against one another), +//! 2. the chain's `newest` mutex (serializing writers against one another and +//! against reader registration), +//! 3. node payload locks, in chain order (older before newer), at most two at once. +//! +//! Every code path acquires locks consistently with this order, so no cycle exists. +//! Walkers hold at most one payload read lock at a time; `send` holds the `newest` +//! mutex (2) and then the newest node's payload write lock (3) — consistent with +//! the order; sweeps hold the `oldest` mutex, briefly the `newest` mutex (released +//! before node locks are taken), and pairwise payload write locks in chain order. +//! +//! [`Mesh`] operations touch one chain at a time, so the per-chain order suffices. + +use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// A commutative monoid into which atoms can be merged. +/// +/// Adjacent atoms in a chain may be merged (never split) before a reader observes +/// them. Commutativity is required because multiple writers and multiple readers +/// impose no cross-atom ordering: the merged result must not depend on the order +/// in which contributions are folded. +pub trait Chainable { + /// Merges `other` into `self`. + fn merge_from(&mut self, other: &Self); +} + +macro_rules! implement_chainable_int { + ($($index_type:ty,)*) => ( + $( + impl Chainable for $index_type { + #[inline] fn merge_from(&mut self, other: &Self) { *self = self.wrapping_add(*other); } + } + )* + ) +} + +implement_chainable_int!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize,); + +/// The payload of a node: its (possibly merged) value, and the next node. +/// +/// Both live behind one `RwLock` so that walkers snapshot a consistent pair, +/// and so that compaction can mutate both atomically. +struct Payload { + /// The merged atoms recorded at this node, if any (`None` only for sentinels). + value: Option, + /// The next (newer) node; `None` iff this node is the chain's newest. + next: Option>>, +} + +/// A node in the chain. +struct Node { + /// The number of readers currently pinned at this node. + /// + /// Incremented either under the chain's `newest` mutex (for the newest node) + /// or under this node's payload read lock (mid-walk, hand-over-hand); writers + /// and the compactor re-check this count under the payload write lock, + /// which excludes concurrent pinning. + holders: AtomicUsize, + /// The node's value and successor. + payload: RwLock>, +} + +impl Node { + fn new(value: Option) -> Self { + Self { + holders: AtomicUsize::new(0), + payload: RwLock::new(Payload { value, next: None }), + } + } +} + +/// An RAII pin on a node: while held, the node will be neither absorbed nor +/// bypassed, nor merged into by writers, and its `next` leads into the live +/// chain. +/// +/// Construction does *not* increment `holders`: callers increment it inside the +/// appropriate critical section (see [`Node::holders`]) and then wrap the node. +/// Dropping the pin decrements `holders`. +struct Held { + node: Arc>, +} + +impl Held { + /// Wraps an already-incremented pin on `node`. + fn pinned(node: Arc>) -> Self { Self { node } } +} + +impl Drop for Held { + fn drop(&mut self) { + self.node.holders.fetch_sub(1, Ordering::SeqCst); + } +} + +/// State shared by the chain handles and the readers. +struct ChainInner { + /// The newest node, where writers append or merge. + newest: Mutex>>, + /// The oldest retained node, where compaction sweeps begin. + /// + /// Invariant: every reader is pinned at or after `oldest`. + oldest: Mutex>>, + /// A counter incremented by every send, for cheap change detection. + /// + /// Pollers comparing [`Chain::version`] against a remembered value can tell + /// "something may have been sent" without touching any chain lock. The counter + /// is a hint: a poller observing a stale value must have another occasion to + /// poll again (e.g. a bounded park), but a changed value reliably indicates + /// new content. + version: AtomicUsize, +} + +impl ChainInner { + /// Compacts the chain: advances `oldest` past unpinned head nodes, and merges + /// every adjacent pair of unpinned nodes older than the newest node. + /// + /// Holds the `oldest` mutex throughout, serializing sweeps; concurrent sweeps + /// merging overlapping pairs would otherwise drain values into unlinked nodes. + fn sweep(&self) { + let mut oldest = self.oldest.lock().expect("lock poisoned"); + let newest = self.newest.lock().expect("lock poisoned").clone(); + + // Advance `oldest` past unpinned head nodes: all reader pins are at or after + // `*oldest`, so an unpinned head node has been folded by every reader, and no + // reader can pin it anymore (readers pin only their pin's successors, or the + // newest node). The abandoned prefix is reclaimed by `Arc` reference counts. + while !Arc::ptr_eq(&oldest, &newest) && oldest.holders.load(Ordering::SeqCst) == 0 { + let next = oldest.payload.read().expect("lock poisoned").next.clone(); + match next { + Some(node) => *oldest = node, + None => break, + } + } + + // Merge adjacent unpinned pairs in `[oldest, newest)`. The pair's `holders` + // are checked under both payload write locks, excluding concurrent pinning. + // We never absorb the (snapshot) newest node: a writer may be merging into + // the true newest node, and the newest pointer must not be left dangling. + // (If the true newest has moved past our snapshot, we conservatively treat + // the snapshot as un-absorbable this sweep; the true newest is newer still.) + let mut cursor = Arc::clone(&oldest); + while !Arc::ptr_eq(&cursor, &newest) { + let mut advance = None; + { + let mut payload_a = cursor.payload.write().expect("lock poisoned"); + let Some(node_b) = payload_a.next.clone() else { break }; + if !Arc::ptr_eq(&node_b, &newest) && cursor.holders.load(Ordering::SeqCst) == 0 { + let mut payload_b = node_b.payload.write().expect("lock poisoned"); + if node_b.holders.load(Ordering::SeqCst) == 0 { + // Absorb `node_b` into `cursor`. + if let Some(value_b) = payload_b.value.take() { + match payload_a.value.as_mut() { + Some(value_a) => value_a.merge_from(&value_b), + None => payload_a.value = Some(value_b), + } + } + payload_a.next = payload_b.next.clone(); + // Leave `cursor` in place: its new successor may also be absorbable. + } + else { + drop(payload_b); + advance = Some(node_b); + } + } + else { + advance = Some(node_b); + } + } + if let Some(node) = advance { cursor = node; } + } + } + + /// The number of nodes currently retained, from `oldest` through `newest`. + /// + /// A diagnostic; `O(length)`, and approximate under concurrent activity. + fn live_len(&self) -> usize { + // Hold the `oldest` mutex to serialize against sweeps. + let oldest = self.oldest.lock().expect("lock poisoned"); + let mut count = 1; + let mut cursor = Arc::clone(&oldest); + loop { + let next = cursor.payload.read().expect("lock poisoned").next.clone(); + match next { + Some(node) => { count += 1; cursor = node; } + None => break, + } + } + count + } +} + +/// A handle to a multi-writer, multi-reader compacting chain. +/// +/// Cloneable; any holder may [`Chain::send`] atoms and create [`Reader`]s. +/// Concurrent sends are serialized at the chain's head, where they merge — and, +/// for cancelling updates, annihilate. +pub struct Chain { + inner: Arc>, +} + +impl Clone for Chain { + fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner) } } +} + +impl Default for Chain { + fn default() -> Self { Self::new() } +} + +impl Chain { + /// Creates a new, empty chain. + pub fn new() -> Chain { + let sentinel = Arc::new(Node::new(None)); + let inner = Arc::new(ChainInner { + newest: Mutex::new(Arc::clone(&sentinel)), + oldest: Mutex::new(sentinel), + version: AtomicUsize::new(0), + }); + Chain { inner } + } + + /// Commits an atom: every reader registered before this call will fold `value` + /// exactly once, possibly merged with adjacent atoms (including atoms from + /// other writers — this is where cross-writer cancellation happens). + pub fn send(&self, value: T) { + // Lock order: the `newest` mutex (2), then the newest node's payload + // write lock (3) — consistent with the documented ordering. The `newest` + // mutex serializes concurrent writers. + let mut newest = self.inner.newest.lock().expect("lock poisoned"); + let node = Arc::clone(&newest); + let mut payload = node.payload.write().expect("lock poisoned"); + // The `holders` check happens under the payload write lock: mid-walk pins + // are taken under the payload read lock, and reader registration under the + // `newest` mutex (which we hold), so neither can race this check. + if node.holders.load(Ordering::SeqCst) == 0 { + // Fast path: no reader is pinned here, so none has folded this node's + // value yet; merge in place without allocating. + match payload.value.as_mut() { + Some(current) => current.merge_from(&value), + None => payload.value = Some(value), + } + } + else { + // Slow path: some reader has folded this node; append a new node. + let appended = Arc::new(Node::new(Some(value))); + payload.next = Some(Arc::clone(&appended)); + drop(payload); + *newest = appended; + } + // Bump the version only after the atom is committed (locks released or about + // to be): a poller observing the new version must be able to read the atom. + // Bumping before the commit invites a lost wakeup: the poller sees the new + // version, finds nothing on its walk, and latches the version anyway. + self.inner.version.fetch_add(1, Ordering::SeqCst); + } + + /// Creates a new reader, which will observe exactly the atoms sent after this call. + pub fn reader(&self) -> Reader { + // Pin the newest node under the `newest` mutex: this excludes writers, + // so atoms sent after we return go to nodes after our pin. + let newest = self.inner.newest.lock().expect("lock poisoned"); + newest.holders.fetch_add(1, Ordering::SeqCst); + let pin = Held::pinned(Arc::clone(&newest)); + // Latch the version before releasing the lock: sends after our pin bump + // the version past this value, so the reader's fast path cannot miss them. + let seen_version = self.inner.version.load(Ordering::SeqCst); + drop(newest); + Reader { inner: Arc::clone(&self.inner), pin: Some(pin), seen_version } + } + + /// The number of nodes currently retained by the chain. + /// + /// A diagnostic; `O(length)`, and approximate under concurrent activity. + pub fn live_len(&self) -> usize { self.inner.live_len() } + + /// A counter incremented by every send: a lock-free change-detection hint. + /// + /// A changed value (relative to one remembered by the caller) reliably indicates + /// that atoms have been sent since; an unchanged value may be momentarily stale. + pub fn version(&self) -> usize { self.inner.version.load(Ordering::SeqCst) } +} + +/// A reading endpoint of a [`Chain`]. +/// +/// Each reader folds, exactly once, every atom sent after its creation. Dropping a +/// reader releases its pin and compacts the chain, so departed readers leak nothing. +/// +/// The `T: Chainable` bound on the type itself allows `Drop` to compact the chain. +pub struct Reader { + /// The chain version this reader had folded everything up to (fast path). + seen_version: usize, + inner: Arc>, + /// The reader's pin; `Some` except transiently during drop. + pin: Option>, +} + +impl Reader { + /// Folds every atom sent since the last call (or since creation) into `out`. + pub fn recv(&mut self, out: &mut T) { + self.recv_with(|value| out.merge_from(value)); + } + + /// Hands every atom sent since the last call (or since creation) to `logic`. + /// + /// Atoms are presented oldest first, though adjacent atoms may have been merged. + /// `logic` is invoked while a chain lock is held, and must not call back into + /// this chain. + pub fn recv_with(&mut self, mut logic: impl FnMut(&T)) { + // Fast path: if no atom has been committed since we last caught up, there + // is nothing to fold and no pin will move, so no compaction can be due. + // A lock-free version compare keeps idle polling off the chain's locks. + let version = self.inner.version.load(Ordering::SeqCst); + if version == self.seen_version { return; } + let mut folded = false; + // Walk forward from our pin, hand-over-hand: pin and fold each successor + // before unpinning its predecessor, so compaction (which skips pinned nodes + // and successors of pinned nodes) can never outrun our fold frontier. + loop { + let pinned = &self.pin.as_ref().expect("pin present outside of drop").node; + let next = pinned.payload.read().expect("lock poisoned").next.clone(); + // A node has no successor iff it is the chain's newest: we are caught up. + let Some(node) = next else { break }; + { + // Pin and fold under the payload read lock: writers re-check + // `holders` under the payload write lock before merging in place, + // so we either fold a value no writer will extend, or the + // writer sees our pin and appends a fresh node (which we will + // visit next, or on a later call). + let payload = node.payload.read().expect("lock poisoned"); + node.holders.fetch_add(1, Ordering::SeqCst); + if let Some(value) = payload.value.as_ref() { + logic(value); + } + } + // Re-pin at the folded node; dropping the old pin decrements its count. + self.pin = Some(Held::pinned(node)); + folded = true; + } + // Latch the version read at entry: atoms committed during the walk that we + // happened to fold are folded; any we missed left version past our latch. + self.seen_version = version; + // Compact only after a productive walk: our pin moved, which is the event + // that creates merge opportunities, and this is what keeps live state + // bounded even when this reader is the laggard everyone else moved past. + if folded { + self.inner.sweep(); + } + } + + /// Indicates whether the reader has folded every atom sent so far. + /// + /// `O(1)`: compares the reader's pin against the chain's newest node, so + /// polling threads can skip work. A `false` may be stale by the time it is + /// observed, but a `true` is accurate as of the call. + pub fn is_caught_up(&self) -> bool { + let newest = self.inner.newest.lock().expect("lock poisoned"); + Arc::ptr_eq(&self.pin.as_ref().expect("pin present outside of drop").node, &newest) + } +} + +impl Drop for Reader { + fn drop(&mut self) { + // Release the pin first, then heal the adjacency around it, so a departing + // laggard (e.g. `drop_dataflow`) does not leave a pinned position behind. + self.pin = None; + self.inner.sweep(); + } +} + +/// A bundle of `W` chains, one per writer, swept by every reader. +/// +/// **Retained for benchmarking comparison only**: per-writer chains have a known +/// cancellation pathology. Updates from different writers never meet, so a `+1` +/// on one chain and its retiring `-1` on another accumulate indefinitely instead +/// of cancelling; a laggard's catch-up work is then `O(elapsed sends)` rather +/// than `O(net content)`. Use a single shared [`Chain`] instead, whose head +/// merging performs cross-writer cancellation. +pub struct Mesh { + chains: Vec>, +} + +impl Mesh { + /// Creates `writers` chains, returning per-writer send handles and the mesh. + pub fn new(writers: usize) -> (Vec>, Mesh) { + let chains: Vec> = (0 .. writers).map(|_| Chain::new()).collect(); + let handles = chains.clone(); + (handles, Mesh { chains }) + } + + /// Creates a reader that sweeps all chains, observing atoms sent after this call. + pub fn reader(&self) -> MeshReader { + MeshReader { readers: self.chains.iter().map(Chain::reader).collect() } + } + + /// The number of nodes currently retained by each chain (a diagnostic). + pub fn live_lens(&self) -> Vec { + self.chains.iter().map(Chain::live_len).collect() + } +} + +/// A reading endpoint over all chains of a [`Mesh`]. +pub struct MeshReader { + readers: Vec>, +} + +impl MeshReader { + /// Folds every atom sent on any chain since the last call into `out`. + pub fn recv(&mut self, out: &mut T) { + for reader in self.readers.iter_mut() { + reader.recv(out); + } + } + + /// Hands every atom sent on any chain since the last call to `logic`. + /// + /// Atoms from one chain are presented oldest first; atoms from different + /// chains are interleaved arbitrarily (whence the commutativity requirement). + pub fn recv_with(&mut self, mut logic: impl FnMut(&T)) { + for reader in self.readers.iter_mut() { + reader.recv_with(&mut logic); + } + } + + /// Indicates whether the reader has folded every atom sent so far, on all chains. + pub fn is_caught_up(&self) -> bool { + self.readers.iter().all(Reader::is_caught_up) + } +} + +#[cfg(test)] +mod tests { + + use std::sync::{Arc, Barrier}; + use super::{Chain, Mesh}; + + /// A tiny deterministic PRNG (xorshift64*), to avoid a `rand` dependency. + struct Rng(u64); + impl Rng { + fn new(seed: u64) -> Self { Rng(seed.max(1)) } + fn next(&mut self) -> u64 { + self.0 ^= self.0 >> 12; + self.0 ^= self.0 << 25; + self.0 ^= self.0 >> 27; + self.0.wrapping_mul(0x2545F4914F6CDD1D) + } + fn below(&mut self, bound: u64) -> u64 { self.next() % bound } + } + + #[test] + fn single_reader_observes_all() { + let chain = Chain::::new(); + let mut reader = chain.reader(); + for i in 1 ..= 100 { chain.send(i); } + let mut total = 0; + reader.recv(&mut total); + assert_eq!(total, 5050); + assert!(reader.is_caught_up()); + } + + #[test] + fn multiple_readers_each_observe_all() { + let chain = Chain::::new(); + let mut readers = (0 .. 4).map(|_| chain.reader()).collect::>(); + for i in 1 ..= 100 { chain.send(i); } + for reader in readers.iter_mut() { + let mut total = 0; + reader.recv(&mut total); + assert_eq!(total, 5050); + } + } + + #[test] + fn late_reader_sees_only_subsequent() { + let chain = Chain::::new(); + let mut early = chain.reader(); + for i in 1 ..= 10 { chain.send(i); } + let mut late = chain.reader(); + assert!(late.is_caught_up()); + for i in 1 ..= 10 { chain.send(100 * i); } + let (mut early_total, mut late_total) = (0, 0); + early.recv(&mut early_total); + late.recv(&mut late_total); + assert_eq!(early_total, 55 + 5500); + assert_eq!(late_total, 5500); + } + + #[test] + fn repeated_recv_yields_only_new() { + let chain = Chain::::new(); + let mut reader = chain.reader(); + chain.send(3); + let mut total = 0; + reader.recv(&mut total); + assert_eq!(total, 3); + reader.recv(&mut total); + assert_eq!(total, 3); + chain.send(4); + reader.recv(&mut total); + assert_eq!(total, 7); + } + + #[test] + fn empty_recv_is_noop() { + let chain = Chain::::new(); + let mut reader = chain.reader(); + assert!(reader.is_caught_up()); + let mut total = 0; + let mut atoms = 0; + reader.recv(&mut total); + reader.recv_with(|_| atoms += 1); + assert_eq!(total, 0); + assert_eq!(atoms, 0); + } + + /// With one laggard pinned at the start and one active reader keeping pace, + /// the chain length must stay bounded by a small constant (#readers + 2). + #[test] + fn compaction_bounds_with_laggard() { + let chain = Chain::::new(); + let laggard = chain.reader(); + let mut active = chain.reader(); + let mut total = 0; + for _ in 0 .. 10_000 { + chain.send(1); + active.recv(&mut total); + assert!(chain.live_len() <= 4, "live_len {} exceeds bound", chain.live_len()); + } + assert_eq!(total, 10_000); + let mut laggard = laggard; + let mut behind = 0; + laggard.recv(&mut behind); + assert_eq!(behind, 10_000); + } + + /// Sums survive heavy sending with only occasional recvs. + #[test] + fn sums_preserved_under_heavy_send() { + let chain = Chain::::new(); + let mut reader = chain.reader(); + let mut total = 0; + let mut expected = 0; + let mut rng = Rng::new(0xDECAF); + for i in 0 .. 100_000u64 { + let value = rng.below(1000); + expected += value; + chain.send(value); + if i % 1017 == 0 { reader.recv(&mut total); } + } + reader.recv(&mut total); + assert_eq!(total, expected); + } + + /// Dropping a laggard releases its pin, and the chain compacts afterwards. + #[test] + fn reader_drop_releases_pin() { + let chain = Chain::::new(); + let laggard = chain.reader(); + let mut active = chain.reader(); + let mut total = 0; + for _ in 0 .. 1000 { + chain.send(1); + active.recv(&mut total); + } + // The laggard's pin retains the prefix. + assert!(chain.live_len() >= 3); + drop(laggard); + // The drop's sweep advances past the released pin and compacts; everything + // behind the active reader's pin is reclaimed. + assert!(chain.live_len() <= 2, "live_len {} after drop", chain.live_len()); + assert_eq!(total, 1000); + } + + /// Once all readers have caught up (and pinned the newest node), further sends + /// allocate exactly one new node and then merge into it in place: the chain is + /// exactly the pinned caught-up node plus one accumulating newest node. + #[test] + fn in_place_merge_fast_path() { + let chain = Chain::::new(); + let mut readers = (0 .. 3).map(|_| chain.reader()).collect::>(); + for i in 1 ..= 5 { chain.send(i); } + // Two recvs each: the first folds and re-pins at the newest node; the + // second's sweep advances `oldest` past the abandoned prefix. + let mut totals = vec![0; readers.len()]; + for _ in 0 .. 2 { + for (reader, total) in readers.iter_mut().zip(totals.iter_mut()) { + reader.recv(total); + } + } + assert_eq!(chain.live_len(), 1); + // N sends with no recv: the first allocates (the newest node is pinned by + // all readers); the rest merge in place into the new newest node. + for _ in 0 .. 1000 { chain.send(1); } + assert_eq!(chain.live_len(), 2); + for (reader, total) in readers.iter_mut().zip(totals.iter_mut()) { + reader.recv(total); + assert_eq!(*total, 15 + 1000); + } + } + + /// Multiple writers sending sequentially through clones of the handle. + #[test] + fn multiple_writers_sequential() { + let chain = Chain::::new(); + let other = chain.clone(); + let mut reader = chain.reader(); + for i in 1 ..= 10 { chain.send(i); other.send(100 * i); } + let mut total = 0; + reader.recv(&mut total); + assert_eq!(total, 55 + 5500); + } + + /// Cross-writer cancellation: with no reader catching up mid-stream, paired + /// `+v`/`-v` sends from two handles merge in place at the shared head, so the + /// chain stays at one or two nodes regardless of the number of sends. + #[test] + fn cross_writer_cancellation_bounds_state() { + let chain = Chain::::new(); + let other = chain.clone(); + let mut reader = chain.reader(); + for i in 1 ..= 100_000i64 { + chain.send(i); + other.send(-i); + } + // The reader's registration pin is on the sentinel; everything since is + // merged into at most one additional node. + assert!(chain.live_len() <= 2, "live_len {}", chain.live_len()); + let mut total = 0i64; + let mut atoms = 0; + reader.recv_with(|value| { total += *value; atoms += 1; }); + assert_eq!(total, 0); + assert!(atoms <= 1, "laggard folded {} atoms", atoms); + } + + /// Concurrent writers, one reader: the reader's final total must equal the + /// sum of everything sent, and live state must stay bounded at quiescence. + #[test] + fn concurrent_writers_one_reader() { + const WRITERS: usize = 4; + const SENDS: u64 = 50_000; + let chain = Chain::::new(); + let mut reader = chain.reader(); + let barrier = Arc::new(Barrier::new(WRITERS + 1)); + let mut threads = Vec::new(); + for index in 0 .. WRITERS { + let chain = chain.clone(); + let barrier = Arc::clone(&barrier); + threads.push(std::thread::spawn(move || -> u64 { + let mut rng = Rng::new(0xABCD + index as u64); + let mut sent = 0; + barrier.wait(); + for _ in 0 .. SENDS { + let value = rng.below(100); + sent += value; + chain.send(value); + } + sent + })); + } + barrier.wait(); + // Recv concurrently with the writers, then drain after they finish. + let mut total = 0; + for _ in 0 .. 100 { + reader.recv(&mut total); + std::thread::yield_now(); + } + let expected: u64 = threads.into_iter().map(|t| t.join().unwrap()).sum(); + reader.recv(&mut total); + assert_eq!(total, expected); + assert!(reader.is_caught_up()); + reader.recv(&mut total); + assert!(chain.live_len() <= 2, "live_len {}", chain.live_len()); + } + + /// Concurrent writers, many readers (each reader on its own thread, recv-ing + /// while the writers send): every reader's total must equal the sum sent. + #[test] + fn concurrent_writers_many_readers() { + const WRITERS: usize = 3; + const READERS: usize = 3; + const SENDS: u64 = 20_000; + let chain = Chain::::new(); + let readers = (0 .. READERS).map(|_| chain.reader()).collect::>(); + let barrier = Arc::new(Barrier::new(WRITERS + READERS)); + let done = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let mut threads = Vec::new(); + for index in 0 .. WRITERS { + let chain = chain.clone(); + let barrier = Arc::clone(&barrier); + let done = Arc::clone(&done); + threads.push(std::thread::spawn(move || -> u64 { + let mut rng = Rng::new(0xFACE + index as u64); + let mut sent = 0; + barrier.wait(); + for _ in 0 .. SENDS { + let value = rng.below(100); + sent += value; + chain.send(value); + if rng.below(64) == 0 { std::thread::yield_now(); } + } + done.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + sent + })); + } + let mut reader_threads = Vec::new(); + for mut reader in readers { + let barrier = Arc::clone(&barrier); + let done = Arc::clone(&done); + reader_threads.push(std::thread::spawn(move || -> u64 { + let mut total = 0; + barrier.wait(); + while done.load(std::sync::atomic::Ordering::SeqCst) < WRITERS { + reader.recv(&mut total); + std::thread::yield_now(); + } + reader.recv(&mut total); + total + })); + } + let expected: u64 = threads.into_iter().map(|t| t.join().unwrap()).sum(); + for thread in reader_threads { + assert_eq!(thread.join().unwrap(), expected); + } + assert!(chain.live_len() <= READERS + 2, "live_len {}", chain.live_len()); + } + + #[test] + fn mesh_readers_observe_all_writers() { + let (writers, mesh) = Mesh::::new(3); + let mut readers = (0 .. 2).map(|_| mesh.reader()).collect::>(); + for (index, writer) in writers.iter().enumerate() { + for i in 1 ..= 10 { writer.send((index as u64 + 1) * i); } + } + for reader in readers.iter_mut() { + let mut total = 0; + reader.recv(&mut total); + assert_eq!(total, 55 + 110 + 165); + assert!(reader.is_caught_up()); + } + } + + /// Randomized stress: `W` writer threads sharing one chain, `R` reader threads + /// recv-ing at random intervals, one deliberate laggard that recvs rarely. + /// At quiescent checkpoints the chain's length must stay within bounds, + /// and every reader's final total must equal the sum of all atoms sent. + #[test] + fn stress_randomized() { + + const WRITERS: usize = 3; + const ACTIVES: usize = 3; + const READERS: usize = ACTIVES + 1; // plus one laggard + const PHASES: usize = 50; + + let chain = Chain::::new(); + let readers = (0 .. READERS).map(|_| chain.reader()).collect::>(); + + // Four barrier waits per phase: start, quiesce, checkpoint, done. + let barrier = Arc::new(Barrier::new(WRITERS + READERS + 1)); + + let mut threads = Vec::new(); + + for index in 0 .. WRITERS { + let chain = chain.clone(); + let barrier = Arc::clone(&barrier); + threads.push(std::thread::spawn(move || -> u64 { + let mut rng = Rng::new(0xC0FFEE + index as u64); + let mut sent = 0; + for _ in 0 .. PHASES { + barrier.wait(); // start + for _ in 0 .. rng.below(200) { + let value = rng.below(100); + sent += value; + chain.send(value); + if rng.below(16) == 0 { std::thread::yield_now(); } + } + barrier.wait(); // quiesce + barrier.wait(); // checkpoint + barrier.wait(); // done + } + sent + })); + } + + for (index, mut reader) in readers.into_iter().enumerate() { + let barrier = Arc::clone(&barrier); + let laggard = index == 0; + threads.push(std::thread::spawn(move || -> u64 { + let mut rng = Rng::new(0xBEEF + index as u64); + let mut total = 0; + for phase in 0 .. PHASES { + barrier.wait(); // start + if !laggard { + // Recv at random moments while writers are sending. + for _ in 0 .. rng.below(4) { + std::thread::yield_now(); + reader.recv(&mut total); + } + } + barrier.wait(); // quiesce + // Drain and heal: two recvs leave the chain fully compacted. + // The laggard recvs rarely, pinning old positions for a while. + if !laggard { + reader.recv(&mut total); + reader.recv(&mut total); + assert!(reader.is_caught_up()); + } + else if phase % 8 == 7 { + reader.recv(&mut total); + } + barrier.wait(); // checkpoint + barrier.wait(); // done + } + reader.recv(&mut total); // final drain + total + })); + } + + // The main thread asserts the chain length at each quiescent checkpoint. + for _ in 0 .. PHASES { + barrier.wait(); // start + barrier.wait(); // quiesce + barrier.wait(); // checkpoint + let len = chain.live_len(); + assert!(len <= READERS + 2, "live_len {} exceeds bound {}", len, READERS + 2); + barrier.wait(); // done + } + + let outcomes: Vec = threads.into_iter().map(|t| t.join().unwrap()).collect(); + let expected: u64 = outcomes[.. WRITERS].iter().sum(); + for total in outcomes[WRITERS ..].iter() { + assert_eq!(*total, expected); + } + } +} diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 3daac7b43..b41376a07 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -95,6 +95,7 @@ #![forbid(missing_docs)] pub mod allocator; +pub mod chain; pub mod networking; pub mod initialize; pub mod logging;