From 7eaa24ecc5df1776f0ddc4c94c95b0c7a1099729 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 5 May 2026 15:34:56 +0000 Subject: [PATCH] fix(replication): omni sequential send and read --- .../issues/omni-table-subscriber-deadlock.md | 15 ++++++++--- .../replication/logical/subscriber/stream.rs | 25 ++++++------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pgdog/docs/issues/omni-table-subscriber-deadlock.md b/pgdog/docs/issues/omni-table-subscriber-deadlock.md index bbfd8b50c..006d83e21 100644 --- a/pgdog/docs/issues/omni-table-subscriber-deadlock.md +++ b/pgdog/docs/issues/omni-table-subscriber-deadlock.md @@ -75,7 +75,10 @@ successive `[0.000 MB/sec]` log lines, confirms this deadlock. ## Solutions -### Solution 1: sequential per-destination apply in `send()` +### Solution 1: sequential per-destination apply in `send()` ✅ implemented + +**What:** collapse the three loops in `send()` into a single loop that completes +the full write→read cycle for each destination before moving to the next. Collapse the three loops in `send()` (`stream.rs:238-277`) into one that completes write→read per destination before moving on: @@ -83,10 +86,9 @@ destination before moving on: ```rust for conn in &mut conns { conn.send(&vec![bind.clone().into(), Execute::new().into(), Flush.into()].into()).await?; - conn.flush().await?; for _ in 0..2 { let msg = conn.read().await?; - // ... existing response handling ... + // ... response handling ... } } ``` @@ -99,6 +101,13 @@ until `Sync` in `commit()`), so two omni rows still produce a cross-row, cross-d This is the case in `repro_deadlock.sh`. **Not sufficient alone** for any workload with multi-row omni transactions. +**Parallelism cost:** sequential apply trades per-row latency for safety — destination +round-trips are now additive rather than overlapping. Parallel sends cannot be restored +without additional mechanisms: they break consistent lock ordering, allowing sub-0 to +acquire dest-0's lock while sub-1 acquires dest-1's lock before either reads back, +re-introducing the cycle. The structural fix is destination-partitioned apply (see +below), which assigns disjoint destination shards to each subscriber so parallel sends +within a subscriber are safe. --- ### Solution 2: `lock_timeout` on destination connections diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 4d4052701..a492df483 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -223,28 +223,19 @@ impl StreamSubscriber { // Dispatch a pre-built bind to the matching shard(s). async fn send(&mut self, val: &Shard, bind: &Bind) -> Result<(), Error> { - let mut conns: Vec<_> = self - .connections - .iter_mut() - .enumerate() - .filter(|(shard, _)| match val { - Shard::Direct(direct) => *shard == *direct, - Shard::Multi(multi) => multi.contains(shard), + for (shard, conn) in self.connections.iter_mut().enumerate() { + let routes = match val { + Shard::Direct(direct) => shard == *direct, + Shard::Multi(multi) => multi.contains(&shard), _ => true, - }) - .map(|(_, server)| server) - .collect(); + }; + if !routes { + continue; + } - for conn in &mut conns { conn.send(&vec![bind.clone().into(), Execute::new().into(), Flush.into()].into()) .await?; - } - - for conn in &mut conns { - conn.flush().await?; - } - for conn in &mut conns { // Keep server connections always synchronized. for _ in 0..2 { let msg = conn.read().await?;