diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 8483bfebfb..7e5c804364 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -828,7 +828,7 @@ setUserService' c userId enable = do let changed = enable /= wasEnabled when changed $ TM.insert userId enable $ useClientServices c pure (True, changed) - unless ok $ throwE $ CMD PROHIBITED "setNetworkConfig" + unless ok $ throwE $ CMD PROHIBITED "setUserService" when (changed && not enable) $ withStore' c (`deleteClientServices` userId) newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c1f7ab6859..92de1dd49c 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -625,9 +625,7 @@ getServiceCredentials c userId srv = Just service -> pure service Nothing -> do cred <- genCredentials g Nothing (25, 24 * 999999) "simplex" - let tlsCreds = tlsCredentials [cred] - createClientService db userId srv tlsCreds - pure (tlsCreds, Nothing) + createClientService db userId srv $ tlsCredentials [cred] serviceSignKey <- liftEitherWith INTERNAL $ C.x509ToPrivate' $ snd serviceCreds let creds = ServiceCredentials {serviceRole = SRMessaging, serviceCreds, serviceCertHash = XV.Fingerprint kh, serviceSignKey} pure (creds, serviceId_) diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 32e5e6aceb..f6d1daebe7 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -410,23 +410,23 @@ deleteUsersWithoutConns db = do forM_ userIds $ DB.execute db "DELETE FROM users WHERE user_id = ?" . Only pure userIds -createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO () -createClientService db userId srv (kh, (cert, pk)) = do +createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ((C.KeyHash, TLS.Credential), Maybe ServiceId) +createClientService db userId srv tlsCreds@(kh, (cert, pk)) = do serverKeyHash_ <- createServer db srv - DB.execute - db - [sql| - INSERT INTO client_services - (user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key) - VALUES (?,?,?,?,?,?,?) - ON CONFLICT (user_id, host, port, server_key_hash) - DO UPDATE SET - service_cert_hash = EXCLUDED.service_cert_hash, - service_cert = EXCLUDED.service_cert, - service_priv_key = EXCLUDED.service_priv_key, - service_id = NULL - |] - (userId, host srv, port srv, serverKeyHash_, kh, cert, pk) + (rs :: [Only Int]) <- + DB.query + db + [sql| + INSERT INTO client_services + (user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key) + VALUES (?,?,?,?,?,?,?) + ON CONFLICT (user_id, host, port, server_key_hash) DO NOTHING + RETURNING 1 + |] + (userId, host srv, port srv, serverKeyHash_, kh, cert, pk) + if null rs + then fromMaybe (tlsCreds, Nothing) <$> getClientServiceCredentials db userId srv + else pure (tlsCreds, Nothing) getClientServiceCredentials :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId)) getClientServiceCredentials db userId srv = diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs index aca573d21f..7cf5a438b2 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs @@ -270,7 +270,7 @@ getUsedSMPServers st = smp_host, smp_port, smp_keyhash, smp_server_id, ntf_service_id, smp_notifier_count, smp_notifier_ids_hash FROM smp_servers - WHERE EXISTS (SELECT 1 FROM subscriptions WHERE status IN ?) + WHERE EXISTS (SELECT 1 FROM subscriptions WHERE smp_server_id = smp_servers.smp_server_id AND status IN ?) |] (Only (In subscribeNtfStatuses)) where diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e50416af67..ab001bbc3b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -292,7 +292,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure $ as ++ as' CSService serviceId changedSubs -> do modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients - modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs -- server count and IDs hash for all services + modifyTVar' totalServiceSubs $ addServiceSubs changedSubs -- server count and IDs hash for all services cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers updateSubDisconnected = case clntSub of -- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service @@ -701,12 +701,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt loadedCounts <- loadedQueueCounts $ fromMsgStore ms pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts} where - getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do + getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, totalServiceSubs, subClients} = do subsCount <- M.size <$> getSubscribedClients queueSubscribers subClientsCount <- IS.size <$> readTVarIO subClients subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers - pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount} - getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0, emptyTimeBuckets) =<< getServerClients srv + subServiceSubsCount <- fst <$> readTVarIO totalServiceSubs + pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount, subServiceSubsCount} + getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0 0, emptyTimeBuckets) =<< getServerClients srv where countClnt acc@(metrics, times) Client {subscriptions} = do (cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions @@ -1863,7 +1864,7 @@ client let incSrvStat sel n = liftIO $ atomicModifyIORef'_ (sel $ servicesSel stats) (+ n) diff = fromIntegral $ count' - count if -- `count == -1` only for subscriptions by old NTF servers - | count == -1 && (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1 + | count == -1 || (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1 | diff > 0 -> incSrvStat srvSubMore 1 >> incSrvStat srvSubMoreTotal diff | diff < 0 -> incSrvStat srvSubFewer 1 >> incSrvStat srvSubFewerTotal (- diff) | otherwise -> incSrvStat srvSubDiff 1 diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 76e288afe2..32e8bd9a10 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -52,7 +52,8 @@ data RealTimeMetrics = RealTimeMetrics data RTSubscriberMetrics = RTSubscriberMetrics { subsCount :: Int, subClientsCount :: Int, - subServicesCount :: Int + subServicesCount :: Int, + subServiceSubsCount :: Int64 } {-# FOURMOLU_DISABLE\n#-} @@ -517,6 +518,10 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_subscribtion_services_total gauge\n\ \simplex_smp_subscribtion_services_total " <> mshow (subServicesCount smpSubs) <> "\n# smp.subServicesCount\n\ \\n\ + \# HELP simplex_smp_subscribtion_service_subs_total Total queues subscribed via services\n\ + \# TYPE simplex_smp_subscribtion_service_subs_total gauge\n\ + \simplex_smp_subscribtion_service_subs_total " <> mshow (subServiceSubsCount smpSubs) <> "\n# smp.subServiceSubsCount\n\ + \\n\ \# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\ \# TYPE simplex_smp_subscription_ntf_total gauge\n\ \simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\ @@ -529,6 +534,10 @@ prometheusMetrics sm rtm ts = \# TYPE simplex_smp_subscribtion_nts_services_total gauge\n\ \simplex_smp_subscribtion_nts_services_total " <> mshow (subServicesCount ntfSubs) <> "\n# ntf.subServicesCount\n\ \\n\ + \# HELP simplex_smp_subscription_ntf_service_subs_total Total queues subscribed via NTF services\n\ + \# TYPE simplex_smp_subscription_ntf_service_subs_total gauge\n\ + \simplex_smp_subscription_ntf_service_subs_total " <> mshow (subServiceSubsCount ntfSubs) <> "\n# ntf.subServiceSubsCount\n\ + \\n\ \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\ \simplex_smp_loaded_queues_queue_count " <> mshow (loadedQueueCount loadedCounts) <> "\n# loadedCounts.loadedQueueCount\n\ diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index deace417ee..7f342f6ae3 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -33,8 +33,10 @@ import Data.Foldable (foldrM) import Data.Hashable (hash) import qualified Data.IntSet as IS import Data.List.NonEmpty (NonEmpty) +import Data.List (isPrefixOf) import Data.Maybe (catMaybes) import Data.String (IsString (..)) +import Text.Read (readMaybe) import Data.Type.Equality import qualified Data.X509.Validation as XV import GHC.Stack (withFrozenCallStack) @@ -90,6 +92,7 @@ serverTests = do describe "Service message subscriptions" $ do testServiceDeliverSubscribe testServiceUpgradeAndDowngrade + testServiceSubsTotalCount describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages @@ -862,6 +865,86 @@ testServiceUpgradeAndDowngrade = Resp "25" _ OK <- signSendRecv sh rKey ("25", rId, ACK mId6) pure () +testServiceSubsTotalCount :: SpecWith (ASrvTransport, AStoreType) +testServiceSubsTotalCount = + it "should track totalServiceSubs correctly via SUBS and SUB" $ \(at@(ATransport t), msType) -> do + g <- C.newRandom + creds <- genCredentials g Nothing (0, 2400) "localhost" + let (_fp, tlsCred) = tlsCredentials [creds] + serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g + let aServicePK = C.APrivateAuthKey C.SEd25519 servicePK + cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {prometheusInterval = Just 1} + withSmpServerConfigOn at cfg' testPort $ \_ -> runSMPClient t $ \h -> do + -- Phase 1: create 2 queues as service, reconnect with SUBS, check metric = 2 + (rPub1, rKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub1, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + (rPub2, rKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub2, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + + (rId1, rId2, serviceId) <- runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + Resp "1" NoEntity (Ids_ rId1 _sId1 _srvDh1 serviceId) <- serviceSignSendRecv sh rKey1 servicePK ("1", NoEntity, New rPub1 dhPub1) + Resp "2" NoEntity (Ids_ rId2 _sId2 _srvDh2 serviceId') <- serviceSignSendRecv sh rKey2 servicePK ("2", NoEntity, New rPub2 dhPub2) + serviceId' `shouldBe` serviceId + pure (rId1, rId2, serviceId) + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + let idsHash = queueIdsHash [rId1, rId2] + signSend_ sh aServicePK Nothing ("3", serviceId, SUBS 2 idsHash) + void $ + receiveInAnyOrder sh + [ \case + Resp "3" serviceId' (SOKS n idsHash') -> do + n `shouldBe` 2 + idsHash' `shouldBe` idsHash + serviceId' `shouldBe` serviceId + pure $ Just () + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just () + _ -> pure Nothing + ] + threadDelay 1500000 + readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 2 + + -- Phase 2: associate 1 more queue via SUB, reconnect with SUBS 3, check metric = 3 + (rPub3, rKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + (dhPub3, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g + (sPub3, sKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g + Resp "4" NoEntity (Ids rId3 sId3 _) <- signSendRecv h rKey3 ("4", NoEntity, New rPub3 dhPub3) + Resp "5" _ OK <- signSendRecv h sKey3 ("5", sId3, SKEY sPub3) + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + Resp "6" _ (SOK (Just serviceId')) <- serviceSignSendRecv sh rKey3 servicePK ("6", rId3, SUB) + serviceId' `shouldBe` serviceId + + runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do + let idsHash = queueIdsHash [rId1, rId2, rId3] + signSend_ sh aServicePK Nothing ("7", serviceId, SUBS 3 idsHash) + void $ + receiveInAnyOrder sh + [ \case + Resp "7" serviceId' (SOKS n idsHash') -> do + n `shouldBe` 3 + idsHash' `shouldBe` idsHash + serviceId' `shouldBe` serviceId + pure $ Just () + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just () + _ -> pure Nothing + ] + threadDelay 1500000 + readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 3 + +readServiceSubsMetric :: String -> Maybe Int +readServiceSubsMetric content = + case filter ("simplex_smp_subscribtion_service_subs_total " `isPrefixOf`) (lines content) of + (line : _) -> case words line of + [_, val, _] -> readMaybe val + [_, val] -> readMaybe val + _ -> Nothing + [] -> Nothing + receiveInAnyOrder :: (HasCallStack, Transport c) => THandleSMP c 'TClient -> [(CorrId, EntityId, Either ErrorType BrokerMsg) -> IO (Maybe b)] -> IO [b] receiveInAnyOrder h = fmap reverse . go [] where