Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ setUserService' c userId enable = do
let changed = enable /= wasEnabled
when changed $ TM.insert userId enable $ useClientServices c
pure (True, changed)
unless ok $ throwE $ CMD PROHIBITED "setNetworkConfig"
unless ok $ throwE $ CMD PROHIBITED "setUserService"
when (changed && not enable) $ withStore' c (`deleteClientServices` userId)

newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId
Expand Down
4 changes: 1 addition & 3 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,7 @@ getServiceCredentials c userId srv =
Just service -> pure service
Nothing -> do
cred <- genCredentials g Nothing (25, 24 * 999999) "simplex"
let tlsCreds = tlsCredentials [cred]
createClientService db userId srv tlsCreds
pure (tlsCreds, Nothing)
createClientService db userId srv $ tlsCredentials [cred]
serviceSignKey <- liftEitherWith INTERNAL $ C.x509ToPrivate' $ snd serviceCreds
let creds = ServiceCredentials {serviceRole = SRMessaging, serviceCreds, serviceCertHash = XV.Fingerprint kh, serviceSignKey}
pure (creds, serviceId_)
Expand Down
32 changes: 16 additions & 16 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -410,23 +410,23 @@ deleteUsersWithoutConns db = do
forM_ userIds $ DB.execute db "DELETE FROM users WHERE user_id = ?" . Only
pure userIds

createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ()
createClientService db userId srv (kh, (cert, pk)) = do
createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ((C.KeyHash, TLS.Credential), Maybe ServiceId)
createClientService db userId srv tlsCreds@(kh, (cert, pk)) = do
serverKeyHash_ <- createServer db srv
DB.execute
db
[sql|
INSERT INTO client_services
(user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key)
VALUES (?,?,?,?,?,?,?)
ON CONFLICT (user_id, host, port, server_key_hash)
DO UPDATE SET
service_cert_hash = EXCLUDED.service_cert_hash,
service_cert = EXCLUDED.service_cert,
service_priv_key = EXCLUDED.service_priv_key,
service_id = NULL
|]
(userId, host srv, port srv, serverKeyHash_, kh, cert, pk)
(rs :: [Only Int]) <-
DB.query
db
[sql|
INSERT INTO client_services
(user_id, host, port, server_key_hash, service_cert_hash, service_cert, service_priv_key)
VALUES (?,?,?,?,?,?,?)
ON CONFLICT (user_id, host, port, server_key_hash) DO NOTHING
RETURNING 1
|]
(userId, host srv, port srv, serverKeyHash_, kh, cert, pk)
if null rs
then fromMaybe (tlsCreds, Nothing) <$> getClientServiceCredentials db userId srv
else pure (tlsCreds, Nothing)

getClientServiceCredentials :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId))
getClientServiceCredentials db userId srv =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ getUsedSMPServers st =
smp_host, smp_port, smp_keyhash, smp_server_id,
ntf_service_id, smp_notifier_count, smp_notifier_ids_hash
FROM smp_servers
WHERE EXISTS (SELECT 1 FROM subscriptions WHERE status IN ?)
WHERE EXISTS (SELECT 1 FROM subscriptions WHERE smp_server_id = smp_servers.smp_server_id AND status IN ?)
|]
(Only (In subscribeNtfStatuses))
where
Expand Down
11 changes: 6 additions & 5 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
pure $ as ++ as'
CSService serviceId changedSubs -> do
modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients
modifyTVar' totalServiceSubs $ subtractServiceSubs changedSubs -- server count and IDs hash for all services
modifyTVar' totalServiceSubs $ addServiceSubs changedSubs -- server count and IDs hash for all services
cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
updateSubDisconnected = case clntSub of
-- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service
Expand Down Expand Up @@ -701,12 +701,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
where
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, totalServiceSubs, subClients} = do
subsCount <- M.size <$> getSubscribedClients queueSubscribers
subClientsCount <- IS.size <$> readTVarIO subClients
subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0, emptyTimeBuckets) =<< getServerClients srv
subServiceSubsCount <- fst <$> readTVarIO totalServiceSubs
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount, subServiceSubsCount}
getDeliveredMetrics ts' = foldM countClnt (RTSubscriberMetrics 0 0 0 0, emptyTimeBuckets) =<< getServerClients srv
where
countClnt acc@(metrics, times) Client {subscriptions} = do
(cnt, times') <- foldM countSubs (0, times) =<< readTVarIO subscriptions
Expand Down Expand Up @@ -1863,7 +1864,7 @@ client
let incSrvStat sel n = liftIO $ atomicModifyIORef'_ (sel $ servicesSel stats) (+ n)
diff = fromIntegral $ count' - count
if -- `count == -1` only for subscriptions by old NTF servers
| count == -1 && (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1
| count == -1 || (diff == 0 && idsHash == idsHash') -> incSrvStat srvSubOk 1
| diff > 0 -> incSrvStat srvSubMore 1 >> incSrvStat srvSubMoreTotal diff
| diff < 0 -> incSrvStat srvSubFewer 1 >> incSrvStat srvSubFewerTotal (- diff)
| otherwise -> incSrvStat srvSubDiff 1
Expand Down
11 changes: 10 additions & 1 deletion src/Simplex/Messaging/Server/Prometheus.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ data RealTimeMetrics = RealTimeMetrics
data RTSubscriberMetrics = RTSubscriberMetrics
{ subsCount :: Int,
subClientsCount :: Int,
subServicesCount :: Int
subServicesCount :: Int,
subServiceSubsCount :: Int64
}

{-# FOURMOLU_DISABLE\n#-}
Expand Down Expand Up @@ -517,6 +518,10 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_subscribtion_services_total gauge\n\
\simplex_smp_subscribtion_services_total " <> mshow (subServicesCount smpSubs) <> "\n# smp.subServicesCount\n\
\\n\
\# HELP simplex_smp_subscribtion_service_subs_total Total queues subscribed via services\n\
\# TYPE simplex_smp_subscribtion_service_subs_total gauge\n\
\simplex_smp_subscribtion_service_subs_total " <> mshow (subServiceSubsCount smpSubs) <> "\n# smp.subServiceSubsCount\n\
\\n\
\# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\
\# TYPE simplex_smp_subscription_ntf_total gauge\n\
\simplex_smp_subscription_ntf_total " <> mshow (subsCount ntfSubs) <> "\n# ntf.subsCount\n\
Expand All @@ -529,6 +534,10 @@ prometheusMetrics sm rtm ts =
\# TYPE simplex_smp_subscribtion_nts_services_total gauge\n\
\simplex_smp_subscribtion_nts_services_total " <> mshow (subServicesCount ntfSubs) <> "\n# ntf.subServicesCount\n\
\\n\
\# HELP simplex_smp_subscription_ntf_service_subs_total Total queues subscribed via NTF services\n\
\# TYPE simplex_smp_subscription_ntf_service_subs_total gauge\n\
\simplex_smp_subscription_ntf_service_subs_total " <> mshow (subServiceSubsCount ntfSubs) <> "\n# ntf.subServiceSubsCount\n\
\\n\
\# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\
\# TYPE simplex_smp_loaded_queues_queue_count gauge\n\
\simplex_smp_loaded_queues_queue_count " <> mshow (loadedQueueCount loadedCounts) <> "\n# loadedCounts.loadedQueueCount\n\
Expand Down
83 changes: 83 additions & 0 deletions tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import Data.Foldable (foldrM)
import Data.Hashable (hash)
import qualified Data.IntSet as IS
import Data.List.NonEmpty (NonEmpty)
import Data.List (isPrefixOf)
import Data.Maybe (catMaybes)
import Data.String (IsString (..))
import Text.Read (readMaybe)
import Data.Type.Equality
import qualified Data.X509.Validation as XV
import GHC.Stack (withFrozenCallStack)
Expand Down Expand Up @@ -90,6 +92,7 @@ serverTests = do
describe "Service message subscriptions" $ do
testServiceDeliverSubscribe
testServiceUpgradeAndDowngrade
testServiceSubsTotalCount
describe "Store log" testWithStoreLog
describe "Restore messages" testRestoreMessages
describe "Restore messages (old / v2)" testRestoreExpireMessages
Expand Down Expand Up @@ -862,6 +865,86 @@ testServiceUpgradeAndDowngrade =
Resp "25" _ OK <- signSendRecv sh rKey ("25", rId, ACK mId6)
pure ()

testServiceSubsTotalCount :: SpecWith (ASrvTransport, AStoreType)
testServiceSubsTotalCount =
it "should track totalServiceSubs correctly via SUBS and SUB" $ \(at@(ATransport t), msType) -> do
g <- C.newRandom
creds <- genCredentials g Nothing (0, 2400) "localhost"
let (_fp, tlsCred) = tlsCredentials [creds]
serviceKeys@(_, servicePK) <- atomically $ C.generateKeyPair g
let aServicePK = C.APrivateAuthKey C.SEd25519 servicePK
cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {prometheusInterval = Just 1}
withSmpServerConfigOn at cfg' testPort $ \_ -> runSMPClient t $ \h -> do
-- Phase 1: create 2 queues as service, reconnect with SUBS, check metric = 2
(rPub1, rKey1) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
(dhPub1, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g
(rPub2, rKey2) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
(dhPub2, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g

(rId1, rId2, serviceId) <- runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do
Resp "1" NoEntity (Ids_ rId1 _sId1 _srvDh1 serviceId) <- serviceSignSendRecv sh rKey1 servicePK ("1", NoEntity, New rPub1 dhPub1)
Resp "2" NoEntity (Ids_ rId2 _sId2 _srvDh2 serviceId') <- serviceSignSendRecv sh rKey2 servicePK ("2", NoEntity, New rPub2 dhPub2)
serviceId' `shouldBe` serviceId
pure (rId1, rId2, serviceId)

runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do
let idsHash = queueIdsHash [rId1, rId2]
signSend_ sh aServicePK Nothing ("3", serviceId, SUBS 2 idsHash)
void $
receiveInAnyOrder sh
[ \case
Resp "3" serviceId' (SOKS n idsHash') -> do
n `shouldBe` 2
idsHash' `shouldBe` idsHash
serviceId' `shouldBe` serviceId
pure $ Just ()
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just ()
_ -> pure Nothing
]
threadDelay 1500000
readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 2

-- Phase 2: associate 1 more queue via SUB, reconnect with SUBS 3, check metric = 3
(rPub3, rKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
(dhPub3, _ :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g
(sPub3, sKey3) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
Resp "4" NoEntity (Ids rId3 sId3 _) <- signSendRecv h rKey3 ("4", NoEntity, New rPub3 dhPub3)
Resp "5" _ OK <- signSendRecv h sKey3 ("5", sId3, SKEY sPub3)

runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do
Resp "6" _ (SOK (Just serviceId')) <- serviceSignSendRecv sh rKey3 servicePK ("6", rId3, SUB)
serviceId' `shouldBe` serviceId

runSMPServiceClient t (tlsCred, serviceKeys) $ \sh -> do
let idsHash = queueIdsHash [rId1, rId2, rId3]
signSend_ sh aServicePK Nothing ("7", serviceId, SUBS 3 idsHash)
void $
receiveInAnyOrder sh
[ \case
Resp "7" serviceId' (SOKS n idsHash') -> do
n `shouldBe` 3
idsHash' `shouldBe` idsHash
serviceId' `shouldBe` serviceId
pure $ Just ()
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just ()
_ -> pure Nothing
]
threadDelay 1500000
readFile testPrometheusMetricsFile >>= \m -> readServiceSubsMetric m `shouldBe` Just 3

readServiceSubsMetric :: String -> Maybe Int
readServiceSubsMetric content =
case filter ("simplex_smp_subscribtion_service_subs_total " `isPrefixOf`) (lines content) of
(line : _) -> case words line of
[_, val, _] -> readMaybe val
[_, val] -> readMaybe val
_ -> Nothing
[] -> Nothing

receiveInAnyOrder :: (HasCallStack, Transport c) => THandleSMP c 'TClient -> [(CorrId, EntityId, Either ErrorType BrokerMsg) -> IO (Maybe b)] -> IO [b]
receiveInAnyOrder h = fmap reverse . go []
where
Expand Down
Loading