Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions plans/20260328_01_server_batched_sub_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Server: batched SUB command processing

Implementation plan for Part 1 of [RFC 2026-03-28-subscription-performance](../rfcs/2026-03-28-subscription-performance.md).

## Current state

When a batch of ~135 SUB commands arrives, the server already batches:
- Queue record lookups (`getQueueRecs` in `receive`, Server.hs:1151)
- Command verification (`verifyLoadedQueue`, Server.hs:1152)

But command processing is per-command (`foldrM process` in `client`, Server.hs:1372-1375). Each SUB calls `subscribeQueueAndDeliver` which calls `tryPeekMsg` - one DB query per queue. For Postgres, that's ~135 individual `SELECT ... FROM messages WHERE recipient_id = ? ORDER BY message_id ASC LIMIT 1` queries per batch.

## Goal

Replace ~135 individual message peek queries with 1 batched query per batch. No protocol changes.

## Implementation

### Step 1: Add `tryPeekMsgs` to MsgStoreClass

File: `src/Simplex/Messaging/Server/MsgStore/Types.hs`

Add to `MsgStoreClass`:

```haskell
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
```

Returns a map from recipient ID to earliest pending message for each queue that has one. Queues with no messages are absent from the map.

### Step 2: Parameterize `deliver` to accept pre-fetched message

File: `src/Simplex/Messaging/Server.hs`

Currently `deliver` (inside `subscribeQueueAndDeliver`, line 1641) calls `tryPeekMsg ms q`. Add a parameter for an optional pre-fetched message:

```haskell
deliver :: Maybe Message -> (Bool, Maybe Sub) -> M s ResponseAndMessage
deliver prefetchedMsg (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- maybe (tryPeekMsg ms q) (pure . Just) prefetchedMsg
...
```

When `Nothing` is passed, falls back to individual `tryPeekMsg` (existing behavior). When `Just msg` is passed, uses it directly (batched path).

### Step 3: Pre-fetch messages before the processing loop

File: `src/Simplex/Messaging/Server.hs`

Currently (lines 1372-1375):

```haskell
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
>>= \(rs_, msgs) -> ...
```

Add a pre-fetch step before the existing loop:

```haskell
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
>>= \(rs_, msgs) -> ...
```

`prefetchMsgs` scans the batch, collects queues from SUB commands that have a verified queue (`q_ = Just (q, _)`), calls `tryPeekMsgs` once, returns the map. For batches with no SUBs it returns an empty map (no DB call).

`process` passes the looked-up message (or Nothing) through to `processCommand` and down to `deliver`.

The `foldrM process` loop, `processCommand`, `subscribeQueueAndDeliver`, and all other command handlers stay structurally the same. Only `deliver` gains one parameter, and the `client` loop gains one pre-fetch call.

### Step 4: Review

Review the typeclass signature and server usage. Confirm the interface has the right shape before implementing store backends.

### Step 5: Implement for each store backend

#### Postgres

File: `src/Simplex/Messaging/Server/MsgStore/Postgres.hs`

Single query using `DISTINCT ON`:

```sql
SELECT DISTINCT ON (recipient_id)
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id IN ?
ORDER BY recipient_id, message_id ASC
```

Build `Map RecipientId Message` from results.

#### STM

File: `src/Simplex/Messaging/Server/MsgStore/STM.hs`

Loop over queues, call `tryPeekMsg` for each, collect into map.

#### Journal

File: `src/Simplex/Messaging/Server/MsgStore/Journal.hs`

Loop over queues, call `tryPeekMsg` for each, collect into map.

### Step 6: Handle edge cases

1. **Mixed batches**: `prefetchMsgs` collects only SUB queues. Non-SUB commands get Nothing for the pre-fetched message and process unchanged.

2. **Already-subscribed queues**: Include in pre-fetch - `deliver` is called for re-SUBs too (delivers pending message).

3. **Service subscriptions**: The pre-fetch doesn't care about service state. `sharedSubscribeQueue` handles service association in STM; message peek is the same.

4. **Error queues**: Verification errors from `receive` are Left values in the batch. `prefetchMsgs` only looks at Right values with SUB commands.

5. **Empty pre-fetch**: If batch has no SUBs (e.g., all ACKs), `prefetchMsgs` returns empty map, no DB call made.

### Step 7: Batch other commands (future, not in scope)

The same pattern (pre-fetch before loop, parameterize handler) can extend to:
- `ACK` with `tryDelPeekMsg` - batch delete+peek
- `GET` with `tryPeekMsg` - same map lookup

Lower priority since these don't have the N-at-once pattern of subscriptions.

## File changes summary

| File | Change |
|---|---|
| `src/Simplex/Messaging/Server/MsgStore/Types.hs` | Add `tryPeekMsgs` to typeclass |
| `src/Simplex/Messaging/Server/MsgStore/Postgres.hs` | Implement `tryPeekMsgs` with batch SQL |
| `src/Simplex/Messaging/Server/MsgStore/STM.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Implement `tryPeekMsgs` as loop |
| `src/Simplex/Messaging/Server.hs` | Add `prefetchMsgs`, parameterize `deliver` |

## Testing

1. Existing server tests must pass unchanged (correctness preserved).
2. Add a test that subscribes a batch of queues (some with pending messages, some without) and verifies all get correct SOK + MSG responses.
3. Prometheus metrics: existing `qSub` stat should still increment correctly.

## Performance expectation

For 300K queues across ~2200 batches:
- Before: ~300K individual DB queries
- After: ~2200 batched DB queries (one per batch of ~135)
- ~136x reduction in DB round-trips
90 changes: 90 additions & 0 deletions rfcs/2026-03-28-subscription-performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Subscription performance

No protocol changes. This is an implementation RFC addressing subscription performance bottlenecks in both the SMP router and the agent.

## Problem

Subscribing large numbers of queues is slow. A messaging client with ~300K queues per router across 3 routers takes over 1 hour to subscribe. For comparison, the NTF server with ~1M queues per router across 12 routers took 20-30 minutes (prior to NTF client services, now in master).

Even on fast networks (cloud VMs), a client with 1.1M active subscriptions needed ~1.5M attempts (commands sent) to fully subscribe - ~36% retry rate caused by the timeout cascade described below.

### Root causes

#### 1. Router: per-command processing in batches

Batch verification and queue lookups are already done efficiently for the whole batch in `Server.hs`. But `processCommand` is called per-command in a loop - each SUB does its own individual DB query for message peek/delivery. With ~135 SUBs per batch (current SMP version), that's 135 individual DB queries per batch instead of 1 batched query.

For 300K queues, that's ~2200 batches x 135 queries = ~300K individual DB queries on the router, which is the dominant bottleneck when using PostgreSQL storage.

NSUB is cheaper because it just registers for notifications without message delivery - no per-queue DB query.

#### 2. Agent: all queues read and sent at once

`getUserServerRcvQueueSubs` reads all queues for a `(userId, server)` pair in one query with no LIMIT. For 300K queues, the entire result set is loaded into memory, then all ~2200 batches are queued to send without waiting for responses.

The NTF server agent uses cursor-style reading with configurable batch sizes (900 subs per chunk, 90K per DB fetch) and waits for each chunk to be processed before fetching the next.

#### 3. No backpressure on sends

`nonBlockingWriteTBQueue` bypasses the `sndQ` bound by forking a thread when the queue is full. All batches are queued immediately, and all their response timers start simultaneously. A 30-second per-response timeout means later batches time out not because the router is slow to respond to them specifically, but because they're waiting in the router's receive queue behind thousands of earlier commands.

This causes cascading timeouts: timed-out responses trigger `resubscribeSMPSession`, which retries all pending subs. Three consecutive timeouts can trigger connection drop via the monitor thread, causing a full reconnection and retry of everything.

## Solution

### Part 1: Router - batched command processing

Move the per-command processing loop inside command handlers so that commands of the same type within a batch can be processed together.

Current flow:
```
receive batch -> verify all -> lookup queues all -> for each command: processCommand (individual DB query)
```

Proposed flow:
```
receive batch -> verify all -> lookup queues all -> group by command type -> process group:
SUB group: one batched message peek query for all queues
NSUB group: batch registration (already cheap, but can batch DB writes)
other commands: process individually as before
```

For SUB, the batched processing would:
1. Collect all queue IDs from the SUB group
2. Perform a single DB query to peek messages for all queues
3. Distribute results back to individual responses

This reduces ~135 DB queries per batch to 1, cutting router-side DB load by ~100x for subscriptions.

Commands where batching doesn't matter (SEND, ACK, KEY, etc.) continue to be processed individually.

### Part 2: Agent - cursor-based subscription with backpressure

Replace the all-at-once fetch-and-send pattern with cursor-style batching, similar to what the NTF server agent does.

Changes to `subscribeUserServer`:
1. Fetch queues in fixed-size batches (e.g., configurable, default ~1000) using LIMIT/OFFSET or cursor-based pagination.
2. Send each batch and wait for responses before sending the next.
3. Remove the use of `nonBlockingWriteTBQueue` for subscription batches - use blocking writes or structured backpressure so response timers don't start until the batch is actually sent.

This ensures:
- Memory usage is bounded (not 300K queue records in memory at once)
- Response timeouts are meaningful (timer starts when the router receives the batch, not when it's queued locally)
- Retries are scoped to the failed batch, not all pending subs
- Works on slow/lossy networks by naturally pacing sends

### Part 3: Response timeout for batches

The current per-response 30-second timeout doesn't account for batch processing time. Options:

1. **Stagger deadlines**: later responses in a batch get proportionally more time. The `rcvConcurrency` field was designed for this but is never used.
2. **Per-batch timeout**: instead of timing individual responses, timeout the entire batch with a budget proportional to batch size.
3. **No timeout for subscription responses**: since subscriptions are sent as batches with backpressure (Part 2), and the connection is monitored by pings, individual response timeouts may not be needed. A subscription that doesn't get a response will be retried on reconnect.

## Priority and ordering

Part 1 (router batching) gives the biggest improvement and is independent of Parts 2/3.

Part 2 (agent cursor + backpressure) eliminates the retry cascade and is critical for slow networks.

Part 3 (timeout handling) is a refinement that can be addressed after Parts 1 and 2.
31 changes: 19 additions & 12 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,14 +1366,20 @@ client
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process t acc@(rs, msgs) =
process msgMap t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion t
forever $
atomically (readTBQueue rcvQ)
>>= foldrM process ([], [])
<$> processCommand clntServiceId thVersion msgMap t
forever $ do
batch <- atomically (readTBQueue rcvQ)
msgMap <- prefetchMsgs batch
foldrM (process msgMap) ([], []) batch
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
where
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
prefetchMsgs batch =
let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs

processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH)
Expand Down Expand Up @@ -1454,8 +1460,8 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
Expand All @@ -1479,7 +1485,9 @@ client
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
Cmd SRecipient command ->
case command of
SUB -> withQueue' subscribeQueueAndDeliver
SUB -> case msgMap of
Left e -> pure $ Just (err e, Nothing)
Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
GET -> withQueue getMessage
ACK msgId -> withQueue $ acknowledgeMsg msgId
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
Expand Down Expand Up @@ -1620,8 +1628,8 @@ client
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q

subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} =
subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing ->
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
Expand All @@ -1642,7 +1650,6 @@ client
deliver (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- tryPeekMsg ms q
msg' <- forM msg_ $ \msg -> liftIO $ do
ts <- getSystemSeconds
sub <- maybe (atomically getSub) pure sub_
Expand Down Expand Up @@ -2087,7 +2094,7 @@ client
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
Expand Down
21 changes: 20 additions & 1 deletion src/Simplex/Messaging/Server/MsgStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import Data.List (intersperse)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (..))
import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..))
import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), (:.) (..))
import qualified Database.PostgreSQL.Simple as DB
import qualified Database.PostgreSQL.Simple.Copy as DB
import Database.PostgreSQL.Simple.SqlQQ (sql)
Expand Down Expand Up @@ -246,6 +246,25 @@ instance MsgStoreClass PostgresMsgStore where
tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q ()
{-# INLINE tryPeekMsg #-}

tryPeekMsgs :: PostgresMsgStore -> [PostgresQueue] -> ExceptT ErrorType IO (M.Map RecipientId Message)
tryPeekMsgs _ms [] = pure M.empty
tryPeekMsgs ms qs =
uninterruptibleMask_ $
withDB' "tryPeekMsgs" (queueStore_ ms) $ \db ->
M.fromList . map toRcvMsg <$>
DB.query
db
[sql|
SELECT DISTINCT ON (recipient_id)
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
WHERE recipient_id IN ?
ORDER BY recipient_id, message_id ASC
|]
(Only (In (map recipientId' qs)))
where
toRcvMsg (Only rId :. msg) = (rId, toMessage msg)

tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg ms q msgId =
uninterruptibleMask_ $
Expand Down
7 changes: 6 additions & 1 deletion src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import Data.Maybe (fromMaybe)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe)
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
Expand Down Expand Up @@ -91,6 +93,9 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}

tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
tryPeekMsgs st qs = M.fromList . catMaybes <$> mapM (\q -> (recipientId q,) <$$> tryPeekMsg st q) qs

tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st q msgId' =
Expand Down
Loading