diff --git a/plans/20260328_01_server_batched_sub_processing.md b/plans/20260328_01_server_batched_sub_processing.md new file mode 100644 index 0000000000..411968de42 --- /dev/null +++ b/plans/20260328_01_server_batched_sub_processing.md @@ -0,0 +1,152 @@ +# Server: batched SUB command processing + +Implementation plan for Part 1 of [RFC 2026-03-28-subscription-performance](../rfcs/2026-03-28-subscription-performance.md). + +## Current state + +When a batch of ~135 SUB commands arrives, the server already batches: +- Queue record lookups (`getQueueRecs` in `receive`, Server.hs:1151) +- Command verification (`verifyLoadedQueue`, Server.hs:1152) + +But command processing is per-command (`foldrM process` in `client`, Server.hs:1372-1375). Each SUB calls `subscribeQueueAndDeliver` which calls `tryPeekMsg` - one DB query per queue. For Postgres, that's ~135 individual `SELECT ... FROM messages WHERE recipient_id = ? ORDER BY message_id ASC LIMIT 1` queries per batch. + +## Goal + +Replace ~135 individual message peek queries with 1 batched query per batch. No protocol changes. + +## Implementation + +### Step 1: Add `tryPeekMsgs` to MsgStoreClass + +File: `src/Simplex/Messaging/Server/MsgStore/Types.hs` + +Add to `MsgStoreClass`: + +```haskell +tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message) +``` + +Returns a map from recipient ID to earliest pending message for each queue that has one. Queues with no messages are absent from the map. + +### Step 2: Parameterize `deliver` to accept pre-fetched message + +File: `src/Simplex/Messaging/Server.hs` + +Currently `deliver` (inside `subscribeQueueAndDeliver`, line 1641) calls `tryPeekMsg ms q`. Add a parameter for an optional pre-fetched message: + +```haskell +deliver :: Maybe Message -> (Bool, Maybe Sub) -> M s ResponseAndMessage +deliver prefetchedMsg (hasSub, sub_) = do + stats <- asks serverStats + fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do + msg_ <- maybe (tryPeekMsg ms q) (pure . Just) prefetchedMsg + ... +``` + +When `Nothing` is passed, falls back to individual `tryPeekMsg` (existing behavior). When `Just msg` is passed, uses it directly (batched path). + +### Step 3: Pre-fetch messages before the processing loop + +File: `src/Simplex/Messaging/Server.hs` + +Currently (lines 1372-1375): + +```haskell +forever $ + atomically (readTBQueue rcvQ) + >>= foldrM process ([], []) + >>= \(rs_, msgs) -> ... +``` + +Add a pre-fetch step before the existing loop: + +```haskell +forever $ do + batch <- atomically (readTBQueue rcvQ) + msgMap <- prefetchMsgs batch + foldrM (process msgMap) ([], []) batch + >>= \(rs_, msgs) -> ... +``` + +`prefetchMsgs` scans the batch, collects queues from SUB commands that have a verified queue (`q_ = Just (q, _)`), calls `tryPeekMsgs` once, returns the map. For batches with no SUBs it returns an empty map (no DB call). + +`process` passes the looked-up message (or Nothing) through to `processCommand` and down to `deliver`. + +The `foldrM process` loop, `processCommand`, `subscribeQueueAndDeliver`, and all other command handlers stay structurally the same. Only `deliver` gains one parameter, and the `client` loop gains one pre-fetch call. + +### Step 4: Review + +Review the typeclass signature and server usage. Confirm the interface has the right shape before implementing store backends. + +### Step 5: Implement for each store backend + +#### Postgres + +File: `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` + +Single query using `DISTINCT ON`: + +```sql +SELECT DISTINCT ON (recipient_id) + recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body +FROM messages +WHERE recipient_id IN ? +ORDER BY recipient_id, message_id ASC +``` + +Build `Map RecipientId Message` from results. + +#### STM + +File: `src/Simplex/Messaging/Server/MsgStore/STM.hs` + +Loop over queues, call `tryPeekMsg` for each, collect into map. + +#### Journal + +File: `src/Simplex/Messaging/Server/MsgStore/Journal.hs` + +Loop over queues, call `tryPeekMsg` for each, collect into map. + +### Step 6: Handle edge cases + +1. **Mixed batches**: `prefetchMsgs` collects only SUB queues. Non-SUB commands get Nothing for the pre-fetched message and process unchanged. + +2. **Already-subscribed queues**: Include in pre-fetch - `deliver` is called for re-SUBs too (delivers pending message). + +3. **Service subscriptions**: The pre-fetch doesn't care about service state. `sharedSubscribeQueue` handles service association in STM; message peek is the same. + +4. **Error queues**: Verification errors from `receive` are Left values in the batch. `prefetchMsgs` only looks at Right values with SUB commands. + +5. **Empty pre-fetch**: If batch has no SUBs (e.g., all ACKs), `prefetchMsgs` returns empty map, no DB call made. + +### Step 7: Batch other commands (future, not in scope) + +The same pattern (pre-fetch before loop, parameterize handler) can extend to: +- `ACK` with `tryDelPeekMsg` - batch delete+peek +- `GET` with `tryPeekMsg` - same map lookup + +Lower priority since these don't have the N-at-once pattern of subscriptions. + +## File changes summary + +| File | Change | +|---|---| +| `src/Simplex/Messaging/Server/MsgStore/Types.hs` | Add `tryPeekMsgs` to typeclass | +| `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` | Implement `tryPeekMsgs` with batch SQL | +| `src/Simplex/Messaging/Server/MsgStore/STM.hs` | Implement `tryPeekMsgs` as loop | +| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Implement `tryPeekMsgs` as loop | +| `src/Simplex/Messaging/Server.hs` | Add `prefetchMsgs`, parameterize `deliver` | + +## Testing + +1. Existing server tests must pass unchanged (correctness preserved). +2. Add a test that subscribes a batch of queues (some with pending messages, some without) and verifies all get correct SOK + MSG responses. +3. Prometheus metrics: existing `qSub` stat should still increment correctly. + +## Performance expectation + +For 300K queues across ~2200 batches: +- Before: ~300K individual DB queries +- After: ~2200 batched DB queries (one per batch of ~135) +- ~136x reduction in DB round-trips diff --git a/rfcs/2026-03-28-subscription-performance.md b/rfcs/2026-03-28-subscription-performance.md new file mode 100644 index 0000000000..e0af42bf89 --- /dev/null +++ b/rfcs/2026-03-28-subscription-performance.md @@ -0,0 +1,90 @@ +# Subscription performance + +No protocol changes. This is an implementation RFC addressing subscription performance bottlenecks in both the SMP router and the agent. + +## Problem + +Subscribing large numbers of queues is slow. A messaging client with ~300K queues per router across 3 routers takes over 1 hour to subscribe. For comparison, the NTF server with ~1M queues per router across 12 routers took 20-30 minutes (prior to NTF client services, now in master). + +Even on fast networks (cloud VMs), a client with 1.1M active subscriptions needed ~1.5M attempts (commands sent) to fully subscribe - ~36% retry rate caused by the timeout cascade described below. + +### Root causes + +#### 1. Router: per-command processing in batches + +Batch verification and queue lookups are already done efficiently for the whole batch in `Server.hs`. But `processCommand` is called per-command in a loop - each SUB does its own individual DB query for message peek/delivery. With ~135 SUBs per batch (current SMP version), that's 135 individual DB queries per batch instead of 1 batched query. + +For 300K queues, that's ~2200 batches x 135 queries = ~300K individual DB queries on the router, which is the dominant bottleneck when using PostgreSQL storage. + +NSUB is cheaper because it just registers for notifications without message delivery - no per-queue DB query. + +#### 2. Agent: all queues read and sent at once + +`getUserServerRcvQueueSubs` reads all queues for a `(userId, server)` pair in one query with no LIMIT. For 300K queues, the entire result set is loaded into memory, then all ~2200 batches are queued to send without waiting for responses. + +The NTF server agent uses cursor-style reading with configurable batch sizes (900 subs per chunk, 90K per DB fetch) and waits for each chunk to be processed before fetching the next. + +#### 3. No backpressure on sends + +`nonBlockingWriteTBQueue` bypasses the `sndQ` bound by forking a thread when the queue is full. All batches are queued immediately, and all their response timers start simultaneously. A 30-second per-response timeout means later batches time out not because the router is slow to respond to them specifically, but because they're waiting in the router's receive queue behind thousands of earlier commands. + +This causes cascading timeouts: timed-out responses trigger `resubscribeSMPSession`, which retries all pending subs. Three consecutive timeouts can trigger connection drop via the monitor thread, causing a full reconnection and retry of everything. + +## Solution + +### Part 1: Router - batched command processing + +Move the per-command processing loop inside command handlers so that commands of the same type within a batch can be processed together. + +Current flow: +``` +receive batch -> verify all -> lookup queues all -> for each command: processCommand (individual DB query) +``` + +Proposed flow: +``` +receive batch -> verify all -> lookup queues all -> group by command type -> process group: + SUB group: one batched message peek query for all queues + NSUB group: batch registration (already cheap, but can batch DB writes) + other commands: process individually as before +``` + +For SUB, the batched processing would: +1. Collect all queue IDs from the SUB group +2. Perform a single DB query to peek messages for all queues +3. Distribute results back to individual responses + +This reduces ~135 DB queries per batch to 1, cutting router-side DB load by ~100x for subscriptions. + +Commands where batching doesn't matter (SEND, ACK, KEY, etc.) continue to be processed individually. + +### Part 2: Agent - cursor-based subscription with backpressure + +Replace the all-at-once fetch-and-send pattern with cursor-style batching, similar to what the NTF server agent does. + +Changes to `subscribeUserServer`: +1. Fetch queues in fixed-size batches (e.g., configurable, default ~1000) using LIMIT/OFFSET or cursor-based pagination. +2. Send each batch and wait for responses before sending the next. +3. Remove the use of `nonBlockingWriteTBQueue` for subscription batches - use blocking writes or structured backpressure so response timers don't start until the batch is actually sent. + +This ensures: +- Memory usage is bounded (not 300K queue records in memory at once) +- Response timeouts are meaningful (timer starts when the router receives the batch, not when it's queued locally) +- Retries are scoped to the failed batch, not all pending subs +- Works on slow/lossy networks by naturally pacing sends + +### Part 3: Response timeout for batches + +The current per-response 30-second timeout doesn't account for batch processing time. Options: + +1. **Stagger deadlines**: later responses in a batch get proportionally more time. The `rcvConcurrency` field was designed for this but is never used. +2. **Per-batch timeout**: instead of timing individual responses, timeout the entire batch with a budget proportional to batch size. +3. **No timeout for subscription responses**: since subscriptions are sent as batches with backpressure (Part 2), and the connection is monitored by pings, individual response timeouts may not be needed. A subscription that doesn't get a response will be retried on reconnect. + +## Priority and ordering + +Part 1 (router batching) gives the biggest improvement and is independent of Parts 2/3. + +Part 2 (agent cursor + backpressure) eliminates the retry cascade and is critical for slow networks. + +Part 3 (timeout handling) is a refinement that can be addressed after Parts 1 and 2. diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index ab001bbc3b..66100e97d8 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1366,14 +1366,20 @@ client labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" let THandleParams {thVersion} = thParams' clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams') - process t acc@(rs, msgs) = + process msgMap t acc@(rs, msgs) = (maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_))) - <$> processCommand clntServiceId thVersion t - forever $ - atomically (readTBQueue rcvQ) - >>= foldrM process ([], []) + <$> processCommand clntServiceId thVersion msgMap t + forever $ do + batch <- atomically (readTBQueue rcvQ) + msgMap <- prefetchMsgs batch + foldrM (process msgMap) ([], []) batch >>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_) where + prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message)) + prefetchMsgs batch = + let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch] + in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs + processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage) processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH) @@ -1454,8 +1460,8 @@ client mkIncProxyStats ps psOwn own sel = do incStat $ sel ps when own $ incStat $ sel psOwn - processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) - processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of + processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage) + processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command) Cmd SSender command -> case command of SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k @@ -1479,7 +1485,9 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> withQueue' subscribeQueueAndDeliver + SUB -> case msgMap of + Left e -> pure $ Just (err e, Nothing) + Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1620,8 +1628,8 @@ client suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q - subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage - subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} = + subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage + subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case @@ -1642,7 +1650,6 @@ client deliver (hasSub, sub_) = do stats <- asks serverStats fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do - msg_ <- tryPeekMsg ms q msg' <- forM msg_ $ \msg -> liftIO $ do ts <- getSystemSeconds sub <- maybe (atomically getSub) pure sub_ @@ -2087,7 +2094,7 @@ client -- rejectOrVerify filters allowed commands, no need to repeat it here. -- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types). -- `fst` removes empty message that is only returned for `SUB` command - Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'') + Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'') -- encode response r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of [] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index 77d9973e6b..b855346d4a 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -41,7 +41,7 @@ import Data.List (intersperse) import qualified Data.Map.Strict as M import Data.Text (Text) import Data.Time.Clock.System (SystemTime (..)) -import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..)) +import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), (:.) (..)) import qualified Database.PostgreSQL.Simple as DB import qualified Database.PostgreSQL.Simple.Copy as DB import Database.PostgreSQL.Simple.SqlQQ (sql) @@ -246,6 +246,25 @@ instance MsgStoreClass PostgresMsgStore where tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q () {-# INLINE tryPeekMsg #-} + tryPeekMsgs :: PostgresMsgStore -> [PostgresQueue] -> ExceptT ErrorType IO (M.Map RecipientId Message) + tryPeekMsgs _ms [] = pure M.empty + tryPeekMsgs ms qs = + uninterruptibleMask_ $ + withDB' "tryPeekMsgs" (queueStore_ ms) $ \db -> + M.fromList . map toRcvMsg <$> + DB.query + db + [sql| + SELECT DISTINCT ON (recipient_id) + recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + FROM messages + WHERE recipient_id IN ? + ORDER BY recipient_id, message_id ASC + |] + (Only (In (map recipientId' qs))) + where + toRcvMsg (Only rId :. msg) = (rId, toMessage msg) + tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message) tryDelMsg ms q msgId = uninterruptibleMask_ $ diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index acb661a408..a14dfd4242 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -41,7 +41,9 @@ import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) import Data.Kind -import Data.Maybe (fromMaybe) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes, fromMaybe) import Data.Text (Text) import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol @@ -91,6 +93,9 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure {-# INLINE tryPeekMsg #-} + + tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message) + tryPeekMsgs st qs = M.fromList . catMaybes <$> mapM (\q -> (recipientId q,) <$$> tryPeekMsg st q) qs tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) tryDelMsg st q msgId' =