From c3d0425f9278353bb7aaed01e82ae7e870f249a0 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 27 Mar 2026 17:20:47 +0000 Subject: [PATCH 1/4] services: fix minor issues --- src/Simplex/Messaging/Agent.hs | 2 +- src/Simplex/Messaging/Server.hs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 8483bfebf..7e5c80436 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -828,7 +828,7 @@ setUserService' c userId enable = do let changed = enable /= wasEnabled when changed $ TM.insert userId enable $ useClientServices c pure (True, changed) - unless ok $ throwE $ CMD PROHIBITED "setNetworkConfig" + unless ok $ throwE $ CMD PROHIBITED "setUserService" when (changed && not enable) $ withStore' c (`deleteClientServices` userId) newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e50416af6..2d0656225 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1863,7 +1863,7 @@ client let incSrvStat sel n = liftIO $ atomicModifyIORef'_ (sel $ servicesSel stats) (+ n) diff = fromIntegral $ count' - count if -- `count == -1` only for subscriptions by old NTF servers - | count == -1 && (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1 + | count == -1 || (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1 | diff > 0 -> incSrvStat srvSubMore 1 >> incSrvStat srvSubMoreTotal diff | diff < 0 -> incSrvStat srvSubFewer 1 >> incSrvStat srvSubFewerTotal (- diff) | otherwise -> incSrvStat srvSubDiff 1 From 21a65b229510ba79016e4c8a56a0e33f67e47f39 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Sat, 28 Mar 2026 07:32:52 +0000 Subject: [PATCH 2/4] fix accounting for subscribed service queues, add prometheus stats --- src/Simplex/Messaging/Server.hs | 9 +-- src/Simplex/Messaging/Server/Prometheus.hs | 11 ++- tests/ServerTests.hs | 83 ++++++++++++++++++++++ 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 2d0656225..ab001bbc3 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -292,7 +292,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure $ as ++ as' CSService serviceId changedSubs -> do modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients - modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs -- server count and IDs hash for all services + modifyTVar' totalServiceSubs $ addServiceSubs changedSubs -- server count and IDs hash for all services cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers updateSubDisconnected = case clntSub of -- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service @@ -701,12 +701,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt loadedCounts <- loadedQueueCounts $ fromMsgStore ms pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts} where - getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do + getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, totalServiceSubs, subClients} = do subsCount <- M.size <$> getSubscribedClients queueSubscribers subClientsCount <- IS.size <$> readTVarIO subClients subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers - pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount} - getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0, emptyTimeBuckets) =<< getServerClients srv + subServiceSubsCount <- fst <$> readTVarIO totalServiceSubs + pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount, subServiceSubsCount} + getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0 0, emptyTimeBuckets) =<< getServerClients srv where countClnt acc@(metrics, times) Client {subscriptions} = do (cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 76e288afe..32e8bd9a1 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -52,7 +52,8 @@ data RealTimeMetrics = RealTimeMetrics data RTSubscriberMetrics = RTSubscriberMetrics { subsCount :: Int, subClientsCount :: Int, - subServicesCount :: Int + subServicesCount :: Int, + subServiceSubsCount :: Int64 } {-# FOURMOLU_DISABLE\n#-} @@ -517,6 +518,10 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_subscribtion_services_total gauge\n\ \simplex_smp_subscribtion_services_total " <> mshow (subServicesCount smpSubs) <> "\n# smp.subServicesCount\n\ \\n\ + \# HELP simplex_smp_subscribtion_service_subs_total Total queues subscribed via services\n\ + \# TYPE simplex_smp_subscribtion_service_subs_total gauge\n\ + \simplex_smp_subscribtion_service_subs_total " <> mshow (subServiceSubsCount smpSubs) <> "\n# smp.subServiceSubsCount\n\ + \\n\ \# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\ \# TYPE simplex_smp_subscription_ntf_total gauge\n\ \simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\ @@ -529,6 +534,10 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_subscribtion_nts_services_total gauge\n\ \simplex_smp_subscribtion_nts_services_total " <> mshow (subServicesCount ntfSubs) <> "\n# ntf.subServicesCount\n\ \\n\ + \# HELP simplex_smp_subscription_ntf_service_subs_total Total queues subscribed via NTF services\n\ + \# TYPE simplex_smp_subscription_ntf_service_subs_total gauge\n\ + \simplex_smp_subscription_ntf_service_subs_total " <> mshow (subServiceSubsCount ntfSubs) <> "\n# ntf.subServiceSubsCount\n\ + \\n\ \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\ \simplex_smp_loaded_queues_queue_count " <> mshow (loadedQueueCount loadedCounts) <> "\n# loadedCounts.loadedQueueCount\n\ diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index deace417e..7f342f6ae 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -33,8 +33,10 @@ import Data.Foldable (foldrM) import Data.Hashable (hash) import qualified Data.IntSet as IS import Data.List.NonEmpty (NonEmpty) +import Data.List (isPrefixOf) import Data.Maybe (catMaybes) import Data.String (IsString (..)) +import Text.Read (readMaybe) import Data.Type.Equality import qualified Data.X509.Validation as XV import GHC.Stack (withFrozenCallStack) @@ -90,6 +92,7 @@ serverTests = do describe "Service message subscriptions" $ do testServiceDeliverSubscribe testServiceUpgradeAndDowngrade + testServiceSubsTotalCount describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages @@ -862,6 +865,86 @@ testServiceUpgradeAndDowngrade = Resp "25" _ OK <- signSendRecv sh rKey ("25", rId, ACK mId6) pure () +testServiceSubsTotalCount :: SpecWith (ASrvTransport, AStoreType) +testServiceSubsTotalCount = + it "should track totalServiceSubs correctly via SUBS and SUB" $ \(at@(ATransport t), msType) -> do + g <- C.newRandom + creds <- genCredentials g Nothing (0, 2400) "localhost" + let (_fp, tlsCred) = tlsCredentials [creds] + serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g + let aServicePK = C.APrivateAuthKey C.SEd25519 servicePK + cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {prometheusInterval = Just 1} + withSmpServerConfigOn at cfg' testPort $ \_ -> runSMPClient t $ \h -> do + -- Phase 1: create 2 queues as service, reconnect with SUBS, check metric = 2 + (rPub1, rKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub1, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + (rPub2, rKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub2, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + + (rId1, rId2, serviceId) <- runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + Resp "1" NoEntity (Ids_ rId1 _sId1 _srvDh1 serviceId) <- serviceSignSendRecv sh rKey1 servicePK ("1", NoEntity, New rPub1 dhPub1) + Resp "2" NoEntity (Ids_ rId2 _sId2 _srvDh2 serviceId') <- serviceSignSendRecv sh rKey2 servicePK ("2", NoEntity, New rPub2 dhPub2) + serviceId' `shouldBe` serviceId + pure (rId1, rId2, serviceId) + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + let idsHash = queueIdsHash [rId1, rId2] + signSend_ sh aServicePK Nothing ("3", serviceId, SUBS 2 idsHash) + void $ + receiveInAnyOrder sh + [ \case + Resp "3" serviceId' (SOKS n idsHash') -> do + n `shouldBe` 2 + idsHash' `shouldBe` idsHash + serviceId' `shouldBe` serviceId + pure $ Just () + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just () + _ -> pure Nothing + ] + threadDelay 1500000 + readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 2 + + -- Phase 2: associate 1 more queue via SUB, reconnect with SUBS 3, check metric = 3 + (rPub3, rKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub3, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + (sPub3, sKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + Resp "4" NoEntity (Ids rId3 sId3 _) <- signSendRecv h rKey3 ("4", NoEntity, New rPub3 dhPub3) + Resp "5" _ OK <- signSendRecv h sKey3 ("5", sId3, SKEY sPub3) + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + Resp "6" _ (SOK (Just serviceId')) <- serviceSignSendRecv sh rKey3 servicePK ("6", rId3, SUB) + serviceId' `shouldBe` serviceId + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + let idsHash = queueIdsHash [rId1, rId2, rId3] + signSend_ sh aServicePK Nothing ("7", serviceId, SUBS 3 idsHash) + void $ + receiveInAnyOrder sh + [ \case + Resp "7" serviceId' (SOKS n idsHash') -> do + n `shouldBe` 3 + idsHash' `shouldBe` idsHash + serviceId' `shouldBe` serviceId + pure $ Just () + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just () + _ -> pure Nothing + ] + threadDelay 1500000 + readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 3 + +readServiceSubsMetric :: String -> Maybe Int +readServiceSubsMetric content = + case filter ("simplex_smp_subscribtion_service_subs_total " `isPrefixOf`) (lines content) of + (line : _) -> case words line of + [_, val, _] -> readMaybe val + [_, val] -> readMaybe val + _ -> Nothing + [] -> Nothing + receiveInAnyOrder :: (HasCallStack, Transport c) => THandleSMP c 'TClient -> [(CorrId, EntityId, Either ErrorType BrokerMsg) -> IO (Maybe b)] -> IO [b] receiveInAnyOrder h = fmap reverse . go [] where From c2e7b5bc86f579cb741f31f51e551b75b1e8214a Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Sat, 28 Mar 2026 07:37:03 +0000 Subject: [PATCH 3/4] fix uncorrelated subquery --- src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs index aca573d21..7cf5a438b 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs @@ -270,7 +270,7 @@ getUsedSMPServers st = smp_host, smp_port, smp_keyhash, smp_server_id, ntf_service_id, smp_notifier_count, smp_notifier_ids_hash FROM smp_servers - WHERE EXISTS (SELECT 1 FROM subscriptions WHERE status IN ?) + WHERE EXISTS (SELECT 1 FROM subscriptions WHERE smp_server_id = smp_servers.smp_server_id AND status IN ?) |] (Only (In subscribeNtfStatuses)) where From ccd0b6e5faed1735174d5a837e715899cb91ca60 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Sat, 28 Mar 2026 07:58:22 +0000 Subject: [PATCH 4/4] fix potential race condition when inserting service defensively, as it is also prevented by how client is created --- src/Simplex/Messaging/Agent/Client.hs | 4 +-- .../Messaging/Agent/Store/AgentStore.hs | 32 +++++++++---------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c1f7ab685..92de1dd49 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -625,9 +625,7 @@ getServiceCredentials c userId srv = Just service -> pure service Nothing -> do cred <- genCredentials g Nothing (25, 24 * 999999) "simplex" - let tlsCreds = tlsCredentials [cred] - createClientService db userId srv tlsCreds - pure (tlsCreds, Nothing) + createClientService db userId srv $ tlsCredentials [cred] serviceSignKey <- liftEitherWith INTERNAL $ C.x509ToPrivate' $ snd serviceCreds let creds = ServiceCredentials {serviceRole = SRMessaging, serviceCreds, serviceCertHash = XV.Fingerprint kh, serviceSignKey} pure (creds, serviceId_) diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 32e5e6ace..f6d1daebe 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -410,23 +410,23 @@ deleteUsersWithoutConns db = do forM_ userIds $ DB.execute db "DELETE FROM users WHERE user_id = ?" . Only pure userIds -createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO () -createClientService db userId srv (kh, (cert, pk)) = do +createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ((C.KeyHash, TLS.Credential), Maybe ServiceId) +createClientService db userId srv tlsCreds@(kh, (cert, pk)) = do serverKeyHash_ <- createServer db srv - DB.execute - db - [sql| - INSERT INTO client_services - (user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key) - VALUES (?,?,?,?,?,?,?) - ON CONFLICT (user_id, host, port, server_key_hash) - DO UPDATE SET - service_cert_hash = EXCLUDED.service_cert_hash, - service_cert = EXCLUDED.service_cert, - service_priv_key = EXCLUDED.service_priv_key, - service_id = NULL - |] - (userId, host srv, port srv, serverKeyHash_, kh, cert, pk) + (rs :: [Only Int]) <- + DB.query + db + [sql| + INSERT INTO client_services + (user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key) + VALUES (?,?,?,?,?,?,?) + ON CONFLICT (user_id, host, port, server_key_hash) DO NOTHING + RETURNING 1 + |] + (userId, host srv, port srv, serverKeyHash_, kh, cert, pk) + if null rs + then fromMaybe (tlsCreds, Nothing) <$> getClientServiceCredentials db userId srv + else pure (tlsCreds, Nothing) getClientServiceCredentials :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId)) getClientServiceCredentials db userId srv =