diff --git a/cabal.project b/cabal.project index 6223d1e9..ca730fe9 100644 --- a/cabal.project +++ b/cabal.project @@ -43,3 +43,12 @@ if impl(ghc >= 9.12.0) -- rejecting: cardano-crypto-class-2.3.1.0 (conflict: cardano-crypto-tests => cardano-crypto-class>=2.2.2 && <2.2.4) -- allow-newer: cardano-crypto-tests:cardano-crypto-class + +source-repository-package + type: git + location: https://github.com/IntersectMBO/ouroboros-network + tag: f2d88695147d9f236669caa214e11dba6b75eadc + --sha256: sha256-mRP9WxZ4xPOus/nFzSOMXCYopBGZcERDk+QqOD1LmQc= + subdir: ouroboros-network + +allow-newer: ouroboros-network:trace-dispatcher diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index b43819d2..e4fbcfe2 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -46,8 +46,8 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions) import DMQ.Configuration.Topology (readTopologyFile) import DMQ.Diffusion.Applications (diffusionApplications) import DMQ.Diffusion.Arguments -import DMQ.Diffusion.NodeKernel -import DMQ.Diffusion.PeerSelection (policy) +import DMQ.Diffusion.NodeKernel as NodeKernel +import DMQ.Diffusion.PeerSelectionPolicy (policy) import DMQ.Handlers.TopLevel (toplevelExceptionHandler) import DMQ.NodeToClient qualified as NtC import DMQ.NodeToClient.LocalStateQueryClient @@ -247,7 +247,7 @@ runDMQ commandLineConfig = do dmqLimitsAndTimeouts dmqNtNApps dmqNtCApps - (policy policyRngVar) + (policy policyRngVar nodeKernel.peerMetric) Diffusion.run dmqDiffusionArguments dmqDiffusionTracers diff --git a/dmq-node/changelog.d/20260512_150257_coot_sig_metric.md b/dmq-node/changelog.d/20260512_150257_coot_sig_metric.md new file mode 100644 index 00000000..06f336fa --- /dev/null +++ b/dmq-node/changelog.d/20260512_150257_coot_sig_metric.md @@ -0,0 +1,26 @@ + + + +### Non-Breaking + +- Added `DMQ.Diffusion.PeerSelection.PeerMetric`: peer-scoring metric that + tracks how promptly each peer announces signatures, used by peer selection + to prefer well-performing peers. +- Wired the metric into the sig-submission inbound client. + + diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index 70e4f224..90a65a7f 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -64,7 +64,8 @@ library DMQ.Diffusion.Arguments DMQ.Diffusion.NodeKernel DMQ.Diffusion.NodeKernel.Types - DMQ.Diffusion.PeerSelection + DMQ.Diffusion.PeerSelection.PeerMetric + DMQ.Diffusion.PeerSelectionPolicy DMQ.Handlers.TopLevel DMQ.NodeToClient DMQ.NodeToClient.LocalMsgNotification @@ -131,6 +132,7 @@ library optparse-applicative >=0.18 && <0.20, ouroboros-consensus:{ouroboros-consensus, cardano, diffusion}, ouroboros-network:{ouroboros-network, api, framework, framework-tracing, orphan-instances, protocols, tracing} ^>=1.1.0.0, + psqueues, quiet, random ^>=1.3, singletons, @@ -192,6 +194,7 @@ test-suite dmq-tests DMQ.Protocol.SigSubmissionV2.Test Test.DMQ.NodeToClient Test.DMQ.NodeToNode + Test.DMQ.PeerSelection.PeerMetric Test.DMQ.SigSubmission.App Test.DMQ.SigSubmission.Types @@ -215,6 +218,7 @@ test-suite dmq-tests io-sim, kes-agent-crypto, ouroboros-network:{ouroboros-network, api, framework, ouroboros-network-tests-lib, protocols, protocols-tests-lib, tests-lib}, + psqueues, quickcheck-instances, random, serialise, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index fe8980d1..6abb1672 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -41,6 +41,7 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool import DMQ.Configuration import DMQ.Diffusion.NodeKernel.Types +import DMQ.Diffusion.PeerSelection.PeerMetric (mkPeerMetric) import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId) import DMQ.Tracer @@ -94,6 +95,8 @@ newNodeKernel rng = do ps_POLICY_PEER_SHARE_STICKY_TIME ps_POLICY_PEER_SHARE_MAX_PEERS + peerMetric <- mkPeerMetric + pure NodeKernel { fetchClientRegistry , peerSharingRegistry , peerSharingAPI @@ -103,6 +106,7 @@ newNodeKernel rng = do , sigSharedTxStateVar , nextEpochVar , stakePools + , peerMetric } diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs index 0a7149d2..ba5cfca7 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs @@ -26,6 +26,7 @@ import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..)) +import DMQ.Diffusion.PeerSelection.PeerMetric (PeerMetric) import DMQ.Protocol.SigSubmission.Type (Sig, SigId) @@ -44,6 +45,7 @@ data NodeKernel crypto ntnAddr m = , sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto)) , stakePools :: !(StakePools m) , nextEpochVar :: !(StrictTVar m (Maybe UTCTime)) + , peerMetric :: !(PeerMetric m SigId ntnAddr) } diff --git a/dmq-node/src/DMQ/Diffusion/PeerSelection/PeerMetric.hs b/dmq-node/src/DMQ/Diffusion/PeerSelection/PeerMetric.hs new file mode 100644 index 00000000..f43a2f2c --- /dev/null +++ b/dmq-node/src/DMQ/Diffusion/PeerSelection/PeerMetric.hs @@ -0,0 +1,341 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module DMQ.Diffusion.PeerSelection.PeerMetric + ( ReportPeerMetric' (..) + , ReportPeerMetricI + , ReportPeerMetric + , hoist + , nullMetrics + , LocalPeerMetricState + , emptyLocalPeerMetricState + , reportMetric + , PeerMetricConfiguration (..) + , PeerMetric + , mkPeerMetric + , erasePeer + , announciness + -- * Re-exports + , TraceLabelPeer (..) + , TxMempoolResult (..) + -- * For testing purposes + , peerMetricVar + -- * Pure API exported for testing purposes + , PeerMetricState (..) + , emptyPeerMetricState + , reportSigIdImpl + , reportSigImpl + , erasePeerImpl + , announcinessImpl + , localState + ) where + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime.SI +import Data.Functor.Identity (Identity) +import Data.List qualified as List +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.OrdPSQ (OrdPSQ) +import Data.OrdPSQ qualified as OrdPSQ + +import Network.Mux.Trace (TraceLabelPeer (..)) + +import Ouroboros.Network.TxSubmission.Inbound.V2 (TxMempoolResult (..)) + + +newtype PeerMetricConfiguration = PeerMetricConfiguration { + timeWindowToKeep :: DiffTime + } + deriving Show + +-- | Each peer keeps its own `LocalPeerMetricState`. A `sigid` enters this +-- state when it is received, leaves when the corresponding signature is +-- downloaded. +-- +newtype LocalPeerMetricState sigid = LocalPeerMetricState { + localState :: OrdPSQ sigid Time () + } + deriving Show + +emptyLocalPeerMetricState + :: LocalPeerMetricState sigid +emptyLocalPeerMetricState = LocalPeerMetricState { localState = OrdPSQ.empty } + +reportSigIdImpl + :: Ord sigid + => PeerMetricConfiguration + -> sigid + -> Time + -> LocalPeerMetricState sigid + -> LocalPeerMetricState sigid +reportSigIdImpl + PeerMetricConfiguration { timeWindowToKeep } + sigid + time + LocalPeerMetricState { localState } + = + LocalPeerMetricState { localState = localState' } + where + localState' = + snd + . OrdPSQ.atMostView ((-timeWindowToKeep) `addTime` time) + -- Unlike in `reportSigImpl` we overwrite the entry if it's already + -- there, keeping the most recent announcement time. This prevents a + -- peer from announcing sigids early to bank a good timestamp and then + -- sending the sigs only once they are forged. + . OrdPSQ.insert sigid time () + $ localState + + +-- | Internal, used by `reportSig` to remove an entry from +-- `LocalPeerMetricState` and get the time when it entered it. +-- +unreportSigId + :: Ord sigid + => sigid + -> LocalPeerMetricState sigid + -> (Maybe Time, LocalPeerMetricState sigid) +unreportSigId + sigid + LocalPeerMetricState { localState } + = + (mbTime, LocalPeerMetricState { localState = localState' }) + where + (mbTime, localState') = OrdPSQ.alter fn sigid localState + fn Nothing = (Nothing, Nothing) + fn (Just (t, _)) = (Just t, Nothing) + + +-- | Internal state of the metric. +-- +newtype PeerMetricState sigid peeraddr = PeerMetricState { + metricState :: OrdPSQ sigid Time peeraddr + } + +instance (Show sigid, Show peeraddr) + => Show (PeerMetricState sigid peeraddr) where + show PeerMetricState { metricState } = + unwords [ "PeerMetricState", show $ OrdPSQ.toAscList metricState ] + +emptyPeerMetricState :: PeerMetricState sigid peeraddr +emptyPeerMetricState = PeerMetricState OrdPSQ.empty + +-- | Mutable peer metrics state accessible via 'STM'. +-- +newtype PeerMetric m sigid peeraddr = PeerMetric { + peerMetricVar :: StrictTVar m (PeerMetricState sigid peeraddr) + } + +mkPeerMetric :: MonadSTM m + => m (PeerMetric m sigid peeraddr) +mkPeerMetric = + PeerMetric <$> + newTVarIO emptyPeerMetricState + +data ReportPeerMetric' m sigid f = ReportPeerMetric { + -- | Report a received `sigid` + reportSigId :: sigid + -> Time + -> LocalPeerMetricState sigid + -> LocalPeerMetricState sigid, + + -- | Report a received `sig` + reportSig :: LocalPeerMetricState sigid + -> f (sigid, TxMempoolResult) + -> STM m (LocalPeerMetricState sigid) + } + +type ReportPeerMetricI m sigid = ReportPeerMetric' m sigid Identity +type ReportPeerMetric m sigid peeraddr = ReportPeerMetric' m sigid (TraceLabelPeer peeraddr) + +hoist + :: (forall a. f a -> g a) + -> ReportPeerMetric' m sigid g + -> ReportPeerMetric' m sigid f +hoist + nat + ReportPeerMetric { + reportSigId, + reportSig + } + = + ReportPeerMetric { + reportSigId, + reportSig = \localState -> reportSig localState . nat + } + + +nullMetrics :: Applicative (STM m) => ReportPeerMetric' m sigid f +nullMetrics = ReportPeerMetric { + reportSigId = \_ _ localState -> localState, + reportSig = \localState _ -> pure localState + } + + +reportMetric + :: forall m sigid peeraddr. + ( MonadSTM m + , Ord sigid + ) + => PeerMetricConfiguration + -> PeerMetric m sigid peeraddr + -> ReportPeerMetric m sigid peeraddr +reportMetric config PeerMetric { peerMetricVar } = + ReportPeerMetric { + reportSigId = reportSigIdImpl config, + reportSig = \localState -> + stateTVar peerMetricVar + . reportSigImpl config localState + } + + +-- | An internal function which prunes the `PeerMetricState` keeping only +-- `timeWindowToKeep` of entries. +-- +-- Note: we can trust the time, since it's coming from the host +-- `getMonotonicTime`. +prune :: Ord sigid + => PeerMetricConfiguration + -> Maybe Time + -- ^ time when sigid was inserted to `LocalPeerMetricState` + -> PeerMetricState sigid peeraddr + -> PeerMetricState sigid peeraddr +prune PeerMetricConfiguration { timeWindowToKeep } + mbTime + PeerMetricState { + metricState + } + = PeerMetricState { + metricState = + case mbTime of + Nothing -> metricState + Just t -> snd $ OrdPSQ.atMostView ((-timeWindowToKeep) `addTime` t) metricState + } + + +reportSigImpl + :: forall sigid peeraddr. + Ord sigid + => PeerMetricConfiguration + -> LocalPeerMetricState sigid + -> TraceLabelPeer peeraddr (sigid, TxMempoolResult) + -> PeerMetricState sigid peeraddr + -> (LocalPeerMetricState sigid, PeerMetricState sigid peeraddr) +reportSigImpl + config + localState + (TraceLabelPeer _peeraddr (sigid, TxRejected)) + metricState + = + ( localState' + , prune config mbTime metricState + ) + where + localState' :: LocalPeerMetricState sigid + (mbTime, localState') = unreportSigId sigid localState +reportSigImpl + config + localState + (TraceLabelPeer peeraddr (sigid, TxAccepted)) + st + = + ( localState' + , PeerMetricState { metricState = metricState' } + ) + where + mbTime :: Maybe Time + localState' :: LocalPeerMetricState sigid + (mbTime, localState') = unreportSigId sigid localState + + -- First prune entries, then add a new one. This is important in the edge + -- case when there was an entry that is before the cut-off time, which we + -- prune first then add the new result. If we first added the new result then + -- pruned: the time would be preserved and then pruned - at the end the + -- current entry would be lost, while it shouldn't. + PeerMetricState { metricState } = prune config mbTime st + + metricState' :: OrdPSQ sigid Time peeraddr + metricState' = case mbTime of + Nothing -> metricState + Just time -> snd $ OrdPSQ.alter + (\case + Nothing -> ((), Just (time, peeraddr)) + Just a@(time', _) -> + -- keep the earliest entry + if time' < time + then ((), Just a) + else ((), Just (time, peeraddr)) + ) + sigid + metricState + + +erasePeerImpl + :: ( Ord peeraddr + , Ord sigid + ) + => peeraddr + -> PeerMetricState sigid peeraddr + -> PeerMetricState sigid peeraddr +erasePeerImpl + peeraddr + PeerMetricState { + metricState + } + = + PeerMetricState { + metricState = + -- O(n log n) filtering + OrdPSQ.fromList + . List.filter (\(_, _, peeraddr') -> peeraddr' /= peeraddr) + . OrdPSQ.toAscList + $ metricState + } + +-- | Erase a peer from `PeerMetric`. +-- +erasePeer + :: ( MonadSTM m + , Ord peeraddr + , Ord sigid + ) + => peeraddr + -> PeerMetric m sigid peeraddr + -> m () +erasePeer peeraddr PeerMetric { peerMetricVar } = + atomically $ modifyTVar peerMetricVar (erasePeerImpl peeraddr) + + +announcinessImpl + :: forall sigid peeraddr. Ord peeraddr + => PeerMetricState sigid peeraddr + -> Map peeraddr Int +announcinessImpl PeerMetricState { metricState } + = OrdPSQ.fold' count Map.empty metricState + where + count :: sigid + -> Time + -> peeraddr + -> Map peeraddr Int + -> Map peeraddr Int + count _ _ peeraddr m = + Map.alter fn peeraddr m + + fn :: Maybe Int -> Maybe Int + fn Nothing = Just 1 + fn (Just n) = Just (n + 1) + + +-- | Metric counters. +-- +announciness + :: forall m sigid peeraddr. + ( Ord peeraddr + , MonadSTM m + ) + => PeerMetric m sigid peeraddr + -> STM m (Map peeraddr Int) +announciness PeerMetric { peerMetricVar } = + announcinessImpl <$> readTVar peerMetricVar diff --git a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs b/dmq-node/src/DMQ/Diffusion/PeerSelectionPolicy.hs similarity index 89% rename from dmq-node/src/DMQ/Diffusion/PeerSelection.hs rename to dmq-node/src/DMQ/Diffusion/PeerSelectionPolicy.hs index 39996fb0..efa2ea54 100644 --- a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs +++ b/dmq-node/src/DMQ/Diffusion/PeerSelectionPolicy.hs @@ -1,22 +1,27 @@ -module DMQ.Diffusion.PeerSelection where +module DMQ.Diffusion.PeerSelectionPolicy where import Control.Concurrent.Class.MonadSTM.Strict import Data.List (sortOn, unfoldr) import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Word (Word32) -import Ouroboros.Network.PeerSelection import System.Random (Random (..), StdGen, splitGen) +import DMQ.Diffusion.PeerSelection.PeerMetric (PeerMetric) +import DMQ.Diffusion.PeerSelection.PeerMetric qualified as PeerMetric + +import Ouroboros.Network.PeerSelection hiding (PeerMetrics) + -- | Trivial peer selection policy used as dummy value -- -policy :: forall peerAddr m. +policy :: forall sigId peerAddr m. ( MonadSTM m , Ord peerAddr ) => StrictTVar m StdGen + -> PeerMetric m sigId peerAddr -> PeerSelectionPolicy peerAddr m -policy rngVar = +policy rngVar peerMetrics = PeerSelectionPolicy { policyPickKnownPeersForPeerShare = simplePromotionPolicy, policyPickColdPeersToPromote = simplePromotionPolicy, @@ -40,10 +45,11 @@ policy rngVar = hotDemotionPolicy :: PickPolicy peerAddr (STM m) hotDemotionPolicy _ _ _ available pickNum = do available' <- addRand rngVar available (,) + scores <- PeerMetric.announciness peerMetrics return $ Set.fromList . map fst . take pickNum - . sortOn snd + . sortOn (\(peer, rn) -> (Map.findWithDefault 0 peer scores , rn)) . Map.assocs $ available' diff --git a/dmq-node/src/DMQ/NodeToNode.hs b/dmq-node/src/DMQ/NodeToNode.hs index bd26f1f2..9d2111c4 100644 --- a/dmq-node/src/DMQ/NodeToNode.hs +++ b/dmq-node/src/DMQ/NodeToNode.hs @@ -40,6 +40,7 @@ import Codec.CBOR.Encoding qualified as CBOR import Codec.CBOR.Read qualified as CBOR import Data.ByteString.Lazy qualified as BL import Data.Functor.Contravariant ((>$<)) +import Data.Functor.Identity (Identity (..)) import Data.Hashable (Hashable) import Data.Typeable import Data.Void (Void) @@ -54,6 +55,7 @@ import Cardano.KESAgent.KES.Crypto (Crypto (..)) import DMQ.Configuration (Configuration) import DMQ.Diffusion.NodeKernel.Types (NodeKernel (..)) +import DMQ.Diffusion.PeerSelection.PeerMetric qualified as PeerMetric import DMQ.NodeToNode.Version import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Codec (byteLimitsSigSubmission, @@ -207,6 +209,7 @@ ntnApps , sigChannelVar , sigMempoolSem , sigSharedTxStateVar + , peerMetric } Codecs { sigSubmissionCodecV1 @@ -247,6 +250,12 @@ ntnApps eicConnectionId = connId, eicControlMessage = controlMessage } channel = + let reportPeerMetrics = + PeerMetric.hoist (Mx.TraceLabelPeer (remoteAddress connId) . runIdentity) + . PeerMetric.reportMetric + Policy.peerMetricConfiguration + $ peerMetric + in withPeer (Mx.WithBearer connId >$< sigSubmissionLogicPeerTracer) sigChannelVar @@ -257,7 +266,7 @@ ntnApps mempoolWriter sigSize (remoteAddress connId) - $ \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) -> + ( \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) -> runPipelinedAnnotatedPeerWithLimits (Mx.WithBearer connId >$< sigSubmissionV2ProtocolTracer) sigSubmissionCodecV2 @@ -269,7 +278,12 @@ ntnApps (Mx.WithBearer connId >$< sigSubmissionInboundTracer) mempoolWriter peerSigAPI + reportPeerMetrics controlMessage + ) + `finally` + -- Remove the peer from `PeerMetric`. + PeerMetric.erasePeer (remoteAddress connId) peerMetric aSigSubmissionV1Client :: NodeToNodeVersion diff --git a/dmq-node/src/DMQ/Policy.hs b/dmq-node/src/DMQ/Policy.hs index 9a0a4df7..c3286ffe 100644 --- a/dmq-node/src/DMQ/Policy.hs +++ b/dmq-node/src/DMQ/Policy.hs @@ -1,12 +1,14 @@ -{-# LANGUAGE NumericUnderscores #-} - module DMQ.Policy ( sigDecisionPolicy , sigSubmissionIngressLimit + , peerMetricConfiguration ) where +import DMQ.Diffusion.PeerSelection.PeerMetric (PeerMetricConfiguration (..)) import DMQ.Protocol.SigSubmission.Type (NumTxIdsToReq) + import Network.Mux.Types (MiniProtocolLimits (..)) + import Ouroboros.Network.SizeInBytes (SizeInBytes) import Ouroboros.Network.TxSubmission.Inbound.V2 @@ -56,3 +58,6 @@ sigSubmissionIngressLimit = MiniProtocolLimits { addMargin :: Int -> Int addMargin = \x -> x + x `div` 10 + +peerMetricConfiguration :: PeerMetricConfiguration +peerMetricConfiguration = PeerMetricConfiguration { timeWindowToKeep = 3600 } diff --git a/dmq-node/src/DMQ/Protocol/SigSubmission/Type.hs b/dmq-node/src/DMQ/Protocol/SigSubmission/Type.hs index 9612595d..ad19908d 100644 --- a/dmq-node/src/DMQ/Protocol/SigSubmission/Type.hs +++ b/dmq-node/src/DMQ/Protocol/SigSubmission/Type.hs @@ -123,7 +123,7 @@ data SigRaw crypto = SigRaw { sigRawKESSignature :: SigKESSignature crypto -- ^ KES signature of all previous fields. -- - -- NOTE: this field must be lazy, otetherwise tests will fail. + -- NOTE: this field must be lazy, otherwise tests will fail. } deriving instance ( DSIGNAlgorithm (KES.DSIGN crypto) diff --git a/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs b/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs index aab45329..8940c8fb 100644 --- a/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs +++ b/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs @@ -12,15 +12,18 @@ module DMQ.SigSubmissionV2.Inbound sigSubmissionInbound ) where +import Data.Foldable qualified as Foldable +import Data.Functor.Identity (Identity (..)) import Data.Map.Strict qualified as Map import Data.Sequence.Strict qualified as StrictSeq import Data.Set qualified as Set import Control.Concurrent.Class.MonadSTM.Strict import Control.Exception (assert) -import Control.Monad (unless, when) +import Control.Monad (foldM, unless) import Control.Monad.Class.MonadAsync (MonadAsync (..)) import Control.Monad.Class.MonadThrow +import Control.Monad.Class.MonadTime.SI import "contra-tracer" Control.Tracer (Tracer, traceWith) import Network.TypedProtocol @@ -33,6 +36,9 @@ import Ouroboros.Network.TxSubmission.Inbound.V2 (PeerTxAPI (..), import Ouroboros.Network.TxSubmission.Inbound.V2.Types (TxSubmissionMempoolWriter (..)) +import DMQ.Diffusion.PeerSelection.PeerMetric (LocalPeerMetricState, + ReportPeerMetric' (..), ReportPeerMetricI, TxMempoolResult (..)) +import DMQ.Diffusion.PeerSelection.PeerMetric qualified as PeerMetric import DMQ.Protocol.SigSubmissionV2.Inbound import DMQ.Protocol.SigSubmissionV2.Type (NumIdsAck (NumIdsAck), NumIdsReq (..)) import DMQ.SigSubmissionV2.Types @@ -49,11 +55,12 @@ sigSubmissionInbound :: forall sigid sig idx m failure. ( MonadThrow m , MonadAsync m - , Ord sigid + , Ord sigid, MonadMonotonicTime m ) => Tracer m (TraceTxSubmissionInbound sigid sig) -> TxSubmissionMempoolWriter sigid sig idx m failure -> PeerTxAPI m sigid sig + -> ReportPeerMetricI m sigid -> ControlMessageSTM m -> SigSubmissionInboundPipelined sigid sig m () sigSubmissionInbound @@ -65,13 +72,18 @@ sigSubmissionInbound handleReceivedTxs, submitTxToMempool } + ReportPeerMetric { + reportSigId, + reportSig + } controlMessageSTM = - SigSubmissionInboundPipelined inboundIdle + SigSubmissionInboundPipelined (inboundIdle PeerMetric.emptyLocalPeerMetricState) where inboundIdle - :: m (InboundStIdle Z sigid sig m ()) - inboundIdle = do + :: LocalPeerMetricState sigid + -> m (InboundStIdle Z sigid sig m ()) + inboundIdle localState = do -- NOTE: The `tx-logic` is using `MVar` API, while `controlMessage` an -- `STM`, we need to compose both. We do that in two steps to avoid @@ -104,78 +116,99 @@ sigSubmissionInbound let !collected = length listOfTxsToMempool -- Only attempt to add sigs if we have some work to do - when (collected > 0) $ do - -- submitTxToMempool traces: - -- * `TraceTxSubmissionProcessed`, - -- * `TraceTxInboundAddedToMempool`, and - -- * `TraceTxInboundRejectedFromMempool` - -- events. - mapM_ (uncurry $ submitTxToMempool tracer) listOfTxsToMempool + localState' <- + if collected > 0 + then do + -- submitTxToMempool traces: + -- * `TraceTxSubmissionProcessed`, + -- * `TraceTxInboundAddedToMempool`, and + -- * `TraceTxInboundRejectedFromMempool` + -- events. + foldM (\st (sigid, sig) -> do + r <- submitTxToMempool tracer sigid sig + atomically $ reportSig st (Identity (sigid, r)) + ) + localState + listOfTxsToMempool + else return localState -- TODO: -- We can update the state so that other `sig-submission` servers will -- not try to add these sigs to the mempool. if Map.null sigsToRequest - then clientReqSigIds Zero sigd - else clientReqSigs sigd + then clientReqSigIds Zero localState' sigd + else clientReqSigs localState' sigd -- Pipelined request of sigs - clientReqSigs :: TxDecision sigid sig + clientReqSigs :: LocalPeerMetricState sigid + -> TxDecision sigid sig -> m (InboundStIdle Z sigid sig m ()) - clientReqSigs sigd@TxDecision { txdTxsToRequest = sigdSigsToRequest } = + clientReqSigs localState sigd@TxDecision { txdTxsToRequest = sigdSigsToRequest } = pure $ SendMsgRequestSigsPipelined sigdSigsToRequest - (clientReqSigIds (Succ Zero) sigd) + (clientReqSigIds (Succ Zero) localState sigd) clientReqSigIds :: forall (n :: N). Nat n + -> LocalPeerMetricState sigid -> TxDecision sigid sig -> m (InboundStIdle n sigid sig m ()) clientReqSigIds - n TxDecision { txdTxIdsToRequest = 0 } + n localState TxDecision { txdTxIdsToRequest = 0 } = case n of - Zero -> inboundIdle - Succ _ -> handleReplies n + Zero -> inboundIdle localState + Succ _ -> handleReplies n localState clientReqSigIds -- if there are no unacknowledged sigids, the protocol requires sending -- a blocking `MsgRequestSigIds` request. This is important, as otherwise -- the client side wouldn't have a chance to terminate the -- mini-protocol. - Zero TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, - txdPipelineTxIds = False, - txdTxIdsToRequest = sigIdsToReq - } + Zero + localState + TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, + txdPipelineTxIds = False, + txdTxIdsToRequest = sigIdsToReq + } = pure $ SendMsgRequestSigIdsBlocking (NumIdsAck . getNumTxIdsToAck $ sigIdsToAck) (NumIdsReq . getNumTxIdsToReq $ sigIdsToReq) (\sigids -> do + time <- getMonotonicTime let sigidsSeq = StrictSeq.fromList $ fst <$> sigids sigidsMap = Map.fromList sigids unless (StrictSeq.length sigidsSeq <= fromIntegral sigIdsToReq) $ throwIO ProtocolErrorSigIdsNotRequested + let localState' = + Foldable.foldl' (\st (sigid, _) -> reportSigId sigid time st) + localState + sigids handleReceivedTxIds sigIdsToReq sigidsSeq sigidsMap - inboundIdle + inboundIdle localState' ) clientReqSigIds - n@Zero TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, - txdPipelineTxIds = True, - txdTxIdsToRequest = sigIdsToReq - } + n@Zero + localState + TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, + txdPipelineTxIds = True, + txdTxIdsToRequest = sigIdsToReq + } = pure $ SendMsgRequestSigIdsPipelined (NumIdsAck . getNumTxIdsToAck $ sigIdsToAck) (NumIdsReq . getNumTxIdsToReq $ sigIdsToReq) - (handleReplies (Succ n)) + (handleReplies (Succ n) localState) clientReqSigIds - n@Succ{} TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, - txdPipelineTxIds, - txdTxIdsToRequest = sigIdsToReq - } + n@Succ{} + localState + TxDecision { txdTxIdsToAcknowledge = sigIdsToAck, + txdPipelineTxIds, + txdTxIdsToRequest = sigIdsToReq + } = -- it is impossible that we have had `sig`'s to request (Succ{} - is an -- evidence for that), but no unacknowledged `sigid`s. @@ -183,46 +216,60 @@ sigSubmissionInbound pure $ SendMsgRequestSigIdsPipelined (NumIdsAck . getNumTxIdsToAck $ sigIdsToAck) (NumIdsReq . getNumTxIdsToReq $ sigIdsToReq) - (handleReplies (Succ n)) + (handleReplies (Succ n) localState) handleReplies :: forall (n :: N). Nat (S n) + -> LocalPeerMetricState sigid -> m (InboundStIdle (S n) sigid sig m ()) - handleReplies (Succ n'@Succ{}) = + handleReplies (Succ n'@Succ{}) localState = pure $ CollectPipelined Nothing - (handleReply (handleReplies n')) + (handleReply localState (handleReplies n')) - handleReplies (Succ Zero) = + handleReplies (Succ Zero) localState = pure $ CollectPipelined Nothing - (handleReply inboundIdle) + (handleReply localState inboundIdle) handleReply :: forall (n :: N). - m (InboundStIdle n sigid sig m ()) + LocalPeerMetricState sigid + -> (LocalPeerMetricState sigid -> m (InboundStIdle n sigid sig m ())) -- continuation -> Collect sigid sig -> m (InboundStIdle n sigid sig m ()) - handleReply k = \case + handleReply localState k = \case CollectSigIds sigIdsToReq sigids -> do + time <- getMonotonicTime let sigidsSeq = StrictSeq.fromList $ fst <$> sigids sigidsMap = Map.fromList sigids unless (StrictSeq.length sigidsSeq <= fromIntegral sigIdsToReq) $ throwIO ProtocolErrorSigIdsNotRequested + let localState' = + Foldable.foldl' (\st (sigid, _) -> reportSigId sigid time st) + localState + sigids handleReceivedTxIds (NumTxIdsToReq . getNumIdsReq $ sigIdsToReq) sigidsSeq sigidsMap - k + k localState' + CollectSigs sigids sigs -> do - let requested = Map.keysSet sigids - received = Map.fromList [ (txId sig, sig) | sig <- sigs ] + let requested = Map.keysSet sigids + received = Map.fromList [ (txId sig, sig) | sig <- sigs ] + notReceived = requested Set.\\ Map.keysSet received unless (Map.keysSet received `Set.isSubsetOf` requested) $ throwIO ProtocolErrorSigNotRequested + localState' <- atomically do + foldM (\st sigid -> reportSig st (Identity (sigid, TxRejected))) + localState + (Set.toList notReceived) + mbe <- handleReceivedTxs sigids received traceWith tracer $ TraceTxSubmissionCollected (txId `map` sigs) case mbe of -- one of `sig`s had a wrong size Just e -> traceWith tracer (TraceTxInboundError e) >> throwIO e - Nothing -> k + Nothing -> k localState' diff --git a/dmq-node/test/Main.hs b/dmq-node/test/Main.hs index effacb19..b6c26521 100644 --- a/dmq-node/test/Main.hs +++ b/dmq-node/test/Main.hs @@ -6,6 +6,7 @@ import Cardano.Crypto.Libsodium import Test.DMQ.NodeToClient qualified import Test.DMQ.NodeToNode qualified +import Test.DMQ.PeerSelection.PeerMetric qualified import Test.DMQ.SigSubmission.App qualified import DMQ.Protocol.LocalMsgNotification.Test qualified @@ -27,6 +28,7 @@ tests = testGroup "decentralised-message-queue:tests" [ Test.DMQ.NodeToClient.tests , Test.DMQ.NodeToNode.tests + , Test.DMQ.PeerSelection.PeerMetric.tests , Test.DMQ.SigSubmission.App.tests -- protocols diff --git a/dmq-node/test/Test/DMQ/PeerSelection/PeerMetric.hs b/dmq-node/test/Test/DMQ/PeerSelection/PeerMetric.hs new file mode 100644 index 00000000..53d5c72b --- /dev/null +++ b/dmq-node/test/Test/DMQ/PeerSelection/PeerMetric.hs @@ -0,0 +1,403 @@ +{-# LANGUAGE MultiWayIf #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Test.DMQ.PeerSelection.PeerMetric (tests) where + +import Control.Monad.Class.MonadTime.SI +import Data.Foldable qualified as Foldable +import Data.Function (on) +import Data.List qualified as List +import Data.List.NonEmpty (NonEmpty (..)) +import Data.List.NonEmpty qualified as NonEmpty +import Data.Map (Map) +import Data.Map.Strict qualified as Map +import Data.Ord (Down (..)) +import Data.OrdPSQ qualified as OrdPSQ + +import DMQ.Diffusion.PeerSelection.PeerMetric as PeerMetric + +import Test.Ouroboros.Network.Utils (Delay (..), renderRanges) +import Test.QuickCheck +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.QuickCheck (testProperty) + + +tests :: TestTree +tests = testGroup "Test.DMQ.PeerSelection.PeerMetric" + [ testProperty "reportSigId" prop_reportSigId + , testProperty "erasePeer" prop_erasePeer + , testProperty "expired" prop_expired + , testProperty "announciness" prop_announciness + , testProperty "competingPeers" prop_competingPeers + ] + + +type SigId = Int +newtype PeerAddr = PeerAddr Int + deriving (Eq, Ord, Show) + +instance Arbitrary PeerAddr where + -- We generate a small number of peers to increase the chance of multiple + -- signatures from the same peer + arbitrary = PeerAddr . getNonNegative <$> resize 5 arbitrary + shrink (PeerAddr addr) = PeerAddr <$> shrink addr + +-- | Use the same time range as the sig generators so that the window +-- sometimes cuts into the generated sig history and sometimes spans all of it. +instance Arbitrary PeerMetricConfiguration where + arbitrary = PeerMetricConfiguration . getDelay <$> resize 5200 arbitrary `suchThat` (> 0) + shrink (PeerMetricConfiguration t) = + [ PeerMetricConfiguration t' + | Delay t' <- shrink (Delay t) + , t' > 0 + ] + +data SigType = SigValid { sigId :: SigId, peerAddr :: PeerAddr, sigTime :: Time } + | SigAnnounced { sigId :: SigId, peerAddr :: PeerAddr, sigTime :: Time } + | SigInvalid { sigId :: SigId, peerAddr :: PeerAddr, sigTime :: Time } + deriving Show + +instance Arbitrary SigType where + arbitrary = frequency + [ ( 6 + , SigValid <$> arbitrary + <*> arbitrary + <*> (Time . getDelay <$> resize 5200 arbitrary) + ) + , ( 2 + , SigAnnounced <$> arbitrary + <*> arbitrary + <*> (Time . getDelay <$> resize 5200 arbitrary) + ) + , ( 1 + , SigInvalid <$> arbitrary + <*> arbitrary + <*> (Time . getDelay <$> resize 5200 arbitrary) + ) + ] + shrink SigValid { sigId, peerAddr, sigTime = sigTime@(Time t) } = + [ SigValid { sigId, peerAddr, sigTime = Time t' } + | t' <- getDelay <$> shrink (Delay t) + ] + ++ + [ SigAnnounced { sigId, peerAddr, sigTime } + , SigInvalid { sigId, peerAddr, sigTime } + ] + shrink SigAnnounced { sigId, peerAddr, sigTime = sigTime@(Time t) } = + [ SigAnnounced { sigId, peerAddr, sigTime = Time t' } + | t' <- getDelay <$> shrink (Delay t) + ] + ++ + [ SigInvalid { sigId, peerAddr, sigTime } ] + shrink SigInvalid { sigId, peerAddr, sigTime = Time t } = + [ SigInvalid { sigId, peerAddr, sigTime = Time t' } + | t' <- getDelay <$> shrink (Delay t) + ] + +newtype Sigs = Sigs { getSigs :: NonEmpty SigType } + deriving Show + +fixupSigs :: NonEmpty SigType -> NonEmpty SigType +fixupSigs = NonEmpty.nubBy ((==) `on` sigId) + +instance Arbitrary Sigs where + arbitrary = + Sigs + . fixupSigs + . NonEmpty.fromList + . getNonEmpty + <$> arbitrary + shrink Sigs { getSigs = sigs } = + Sigs + . fixupSigs + . NonEmpty.fromList + . getNonEmpty + <$> shrink (NonEmpty . NonEmpty.toList $ sigs) + + +reportSigType + :: PeerMetricConfiguration + -> (Map PeerAddr (LocalPeerMetricState SigId), PeerMetricState SigId PeerAddr) + -> SigType + -> (Map PeerAddr (LocalPeerMetricState SigId), PeerMetricState SigId PeerAddr) +reportSigType config (localStMap, st) = \case + SigAnnounced { sigId, peerAddr, sigTime } -> + let alterFn :: Maybe (LocalPeerMetricState SigId) + -> Maybe (LocalPeerMetricState SigId) + alterFn Nothing = Just $ reportSigIdImpl config sigId sigTime emptyLocalPeerMetricState + alterFn (Just s) = Just $ reportSigIdImpl config sigId sigTime s + in (Map.alter alterFn peerAddr localStMap, st) + SigValid { sigId, peerAddr, sigTime } -> + let localSt = reportSigIdImpl config sigId sigTime + $ Map.findWithDefault emptyLocalPeerMetricState peerAddr localStMap + (localSt', st') = reportSigImpl config localSt (TraceLabelPeer peerAddr (sigId, TxAccepted)) st + in (Map.insert peerAddr localSt' localStMap, st') + SigInvalid { sigId, peerAddr, sigTime } -> + let localSt = reportSigIdImpl config sigId sigTime + $ Map.findWithDefault emptyLocalPeerMetricState peerAddr localStMap + (localSt', st') = reportSigImpl config localSt (TraceLabelPeer peerAddr (sigId, TxRejected)) st + in (Map.insert peerAddr localSt' localStMap, st') + + +prepareState :: PeerMetricConfiguration + -> NonEmpty SigType + -> (Map PeerAddr (LocalPeerMetricState SigId) -> PeerMetricState SigId PeerAddr -> SigType -> Property) + -> Property +prepareState config (sig :| sigs) k = + let (localMap, st) = Foldable.foldl' (reportSigType config) (Map.empty, emptyPeerMetricState) (reverse sigs) + in k localMap st sig + + +-- | Properties of `reportSigId` +-- +-- * inserted entry has the right time +-- * old entries are pruned +-- * not pruned entries are preserved +-- +prop_reportSigId :: PeerMetricConfiguration -> Sigs -> Property +prop_reportSigId config@PeerMetricConfiguration { timeWindowToKeep } (Sigs sigs) = + prepareState config sigs $ \localStMap _st sig -> + let peer :: PeerAddr + peer = peerAddr sig + + lst, lst' :: LocalPeerMetricState SigId + lst = Map.findWithDefault emptyLocalPeerMetricState peer localStMap + lst' = reportSigIdImpl config (sigId sig) (sigTime sig) lst + + lstM, lstM' :: Map SigId Time + lstM = Map.fromList + . map (\(k,p,_) -> (k,p)) + . OrdPSQ.toList + $ localState lst + lstM' = Map.fromList $ map (\(k,p,_) -> (k,p)) $ OrdPSQ.toList $ localState lst' + + cutoff = (-timeWindowToKeep) `addTime` sigTime sig + numPruned = Map.size $ Map.filter (<= cutoff) lstM + + in classify (cutoff > Time 0) "window < sigTime" + . classify (numPruned > 0) "entries pruned" + . counterexample (show sig) + . counterexample (show lstM) + . counterexample (show lstM') + $ + counterexample "wrong time inserted" + (Map.lookup (sigId sig) lstM' === Just (sigTime sig)) + .&&. + counterexample "old entries were pruned" + ( case Map.elems lstM' of + [] -> property True + ks -> property $ minimum ks > cutoff + ) + .&&. + counterexample "not pruned entries were preserved" + (Map.filter (> cutoff) lstM `Map.isSubmapOf` lstM') + + +-- | Check that `erasePeerImpl` deletes the peer and doesn't modify state of +-- other peers. +-- +prop_erasePeer :: PeerMetricConfiguration -> Sigs -> Property +prop_erasePeer config (Sigs sigs) = + prepareState config sigs $ + \_localStateMap st sig -> + let peer = peerAddr sig + a = announcinessImpl st + st' = erasePeerImpl peer st + a' = announcinessImpl st' + in label (if Map.member peer a then "peer present" else "peer absent") + $ Map.delete peer a === a' + +-- The first signature is valid, sigs are sorted by time, which hardens +-- the `prop_expired` test. +-- +newtype SigsFirstValid = SigsFirstValid { getSigsFirstValid :: NonEmpty SigType } + deriving Show + +fromSigs :: Sigs -> SigsFirstValid +fromSigs (Sigs sigs) = + SigsFirstValid + -- TODO: with GHC-9.10 we can use `NonEmpty.sortOn` + case List.sortOn (Down . sigTime) . NonEmpty.toList $ sigs of + a : as -> + SigValid + { sigId = sigId a, + peerAddr = peerAddr a, + sigTime = sigTime a + } + :| as + [] -> error "impossible happened" + +instance Arbitrary SigsFirstValid where + arbitrary = fromSigs <$> arbitrary + shrink (SigsFirstValid as) = fromSigs `map` shrink (Sigs as) + + +-- | Verify that `reportSigId` and `reportSig` are pruning old entries. +-- +prop_expired :: PeerMetricConfiguration -> SigsFirstValid -> Property +prop_expired config@PeerMetricConfiguration { timeWindowToKeep } (SigsFirstValid sigs) = + prepareState config sigs $ \localStateMap st sig -> + let time = (-timeWindowToKeep) `addTime` sigTime sig + (localStateMap', st') = reportSigType config (localStateMap, st) sig + lst = Map.findWithDefault emptyLocalPeerMetricState (peerAddr sig) localStateMap + lst' = localStateMap' Map.! peerAddr sig + metricPruned = case OrdPSQ.minView (metricState st) of + Just (_, oldest, _, _) -> oldest <= time + Nothing -> False + localPruned = case OrdPSQ.minView (localState lst) of + Just (_, oldest, _, _) -> oldest <= time + Nothing -> False + in classify (time > Time 0) "window < sigTime" + . classify metricPruned "metric entry pruned" + . classify localPruned "local entry pruned" + . counterexample (show st) $ + counterexample (show st') $ + counterexample (show lst) $ + counterexample (show lst') $ + ( -- verify that old entries in `metricState` are pruned + case OrdPSQ.minView (metricState st') of + Nothing -> + property True + Just (_, oldest, _, _) -> + case sig of + SigValid {} -> + counterexample (unwords [show oldest, "not later than", show time]) + (oldest > time) + SigInvalid {} -> + property True + SigAnnounced {} -> + property True + ) + .&&. + ( -- verify that old entries in `LocalPeerMetricState` are pruned + case OrdPSQ.minView (localState lst') of + Nothing -> property True + Just (_, oldest, _, _) -> + case sig of + SigValid {} -> + counterexample (unwords [show oldest, "not later than", show time]) + (oldest > time) + SigInvalid {} -> + property True + SigAnnounced {} -> + property True + ) + + +-- | Verify properties of `announcinessImpl` +-- +-- * if signature was announced or invalid the metric map is not changed +-- * if signature is valid, the score increments relative to the pruned state +-- of old entries; untouched peers are preserved. +-- +prop_announciness :: PeerMetricConfiguration -> Sigs -> Property +prop_announciness config@PeerMetricConfiguration { timeWindowToKeep } (Sigs sigs) = + prepareState config sigs $ \localStMap st sig -> + let a = announcinessImpl st + (_localStMap', st') = reportSigType config (localStMap, st) sig + a' = announcinessImpl st' + cutoff = (-timeWindowToKeep) `addTime` sigTime sig + metricPruned = case OrdPSQ.minView (metricState st) of + Just (_, oldest, _, _) -> oldest <= cutoff + Nothing -> False + in classify (cutoff > Time 0) "window < sigTime" + . classify metricPruned "metric entry pruned" + . label ("max score: " ++ renderRanges 4 (if null a' then 0 else maximum a')) + $ label ("number of peers: " ++ renderRanges 5 (Map.size a')) + $ counterexample (show (st, a)) + $ counterexample (show (st', a')) + $ case sig of + SigValid { peerAddr, sigTime } -> + -- Processing a `SigValid` at time `sigTime` prunes all entries with + -- time `<= sigTime - timeWindowToKeep` (regardless of peer). Compare + -- against `st` pruned at the new sig's time so peers that lost + -- entries to pruning are accounted for. + let aPruned = announcinessImpl (pruneState sigTime st) + in + -- untouched entries are preserved + Map.delete peerAddr aPruned === Map.delete peerAddr a' + .&&. + -- new entry is bumped & it is in the results + Just (maybe 1 succ (Map.lookup peerAddr aPruned)) + === + Map.lookup peerAddr a' + SigAnnounced {} -> + a === a' + SigInvalid { sigTime } -> + let aPruned = announcinessImpl (pruneState sigTime st) + in aPruned === a' + where + pruneState :: Time + -> PeerMetricState SigId PeerAddr + -> PeerMetricState SigId PeerAddr + pruneState t PeerMetricState { metricState } = + PeerMetricState { + metricState = + snd + $ OrdPSQ.atMostView ((-timeWindowToKeep) `addTime` t) metricState + } + + +-- | A pair of distinct peer addresses. +data TwoPeerAddrs = TwoPeerAddrs PeerAddr PeerAddr + deriving Show + +instance Arbitrary TwoPeerAddrs where + arbitrary = do + p <- arbitrary + q <- arbitrary `suchThat` (/= p) + return (TwoPeerAddrs p q) + shrink (TwoPeerAddrs p q) = + [ TwoPeerAddrs p' q' + | (p', q') <- shrink (p, q) + , p' /= q' + ] + + +-- | Bundles a 'PeerMetricConfiguration' with two announcement times generated +-- at the same scale, so the time gap and the window are in comparable ranges. +data ConfigWithTimes = ConfigWithTimes + { cwtConfig :: PeerMetricConfiguration + , cwtTime1 :: Time + , cwtTime2 :: Time + } deriving Show + +instance Arbitrary ConfigWithTimes where + arbitrary = ConfigWithTimes + <$> arbitrary + <*> (Time . getDelay <$> resize 5200 arbitrary) + <*> (Time . getDelay <$> resize 5200 arbitrary) + shrink ConfigWithTimes { cwtConfig = config + , cwtTime1 = Time t1 + , cwtTime2 = Time t2 + } = + [ ConfigWithTimes config' (Time t1') (Time t2') + | (config', Delay t1', Delay t2') <- shrink (config, Delay t1, Delay t2) + ] + + +-- | When two peers both get 'TxAccepted' for the same sigid (e.g. after a +-- mempool eviction and re-submission), the peer with the *earlier* announcement +-- time keeps the entry. 'peer1' is accepted first, 'peer2' second; the times +-- are independent so the test covers both displacement and no-displacement. +-- +prop_competingPeers :: ConfigWithTimes -> SigId -> TwoPeerAddrs -> Property +prop_competingPeers ConfigWithTimes { cwtConfig = config + , cwtTime1 = time1 + , cwtTime2 = time2 + } + sigId (TwoPeerAddrs peer1 peer2) = + let lst1 = reportSigIdImpl config sigId time1 emptyLocalPeerMetricState + lst2 = reportSigIdImpl config sigId time2 emptyLocalPeerMetricState + -- peer1 is accepted first, claiming the entry + (_, st1) = reportSigImpl config lst1 (TraceLabelPeer peer1 (sigId, TxAccepted)) emptyPeerMetricState + -- peer2 is accepted second + (_, st2) = reportSigImpl config lst2 (TraceLabelPeer peer2 (sigId, TxAccepted)) st1 + a = announcinessImpl st2 + PeerMetricConfiguration { timeWindowToKeep } = config + cutoff = (-timeWindowToKeep) `addTime` time2 + in if | time2 <= time1 -> label "displacement (peer2 announced earlier)" (a === Map.fromList [(peer2, 1)]) + | time1 > cutoff -> label "no displacement, peer1 keeps credit" (a === Map.fromList [(peer1, 1)]) + | -- peer1 announced earlier, but its entry falls outside the window and is + -- pruned when peer2's result arrives; peer2 is inserted fresh + otherwise -> label "no displacement, peer1 entry pruned" (a === Map.fromList [(peer2, 1)]) diff --git a/dmq-node/test/Test/DMQ/SigSubmission/App.hs b/dmq-node/test/Test/DMQ/SigSubmission/App.hs index 1d9a2cb3..5d70a897 100644 --- a/dmq-node/test/Test/DMQ/SigSubmission/App.hs +++ b/dmq-node/test/Test/DMQ/SigSubmission/App.hs @@ -7,6 +7,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} module Test.DMQ.SigSubmission.App (tests) where @@ -28,8 +29,10 @@ import Control.Tracer (Tracer (..), contramap) import System.Random (mkStdGen) import Data.ByteString.Lazy qualified as BSL -import Data.Foldable (traverse_) +import Data.Foldable (toList, traverse_) +import Data.Foldable qualified as Foldable import Data.Function (on) +import Data.Functor.Identity (runIdentity) import Data.Hashable import Data.List (nubBy) import Data.List qualified as List @@ -39,6 +42,8 @@ import Data.Maybe (fromMaybe) import Data.Set qualified as Set import Data.Typeable (Typeable) +import Network.TypedProtocol.Codec (AnyMessage (..)) + import Ouroboros.Network.Channel import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.Driver @@ -46,15 +51,20 @@ import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToReq (..)) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.Util.ShowProxy +import DMQ.Diffusion.PeerSelection.PeerMetric (PeerMetricState (..), + TraceLabelPeer (..), announcinessImpl, peerMetricVar) +import DMQ.Diffusion.PeerSelection.PeerMetric qualified as PeerMetric import DMQ.Protocol.SigSubmissionV2.Codec (byteLimitsSigSubmissionV2, timeLimitsSigSubmissionV2) import DMQ.Protocol.SigSubmissionV2.Inbound (sigSubmissionV2InboundPeerPipelined) import DMQ.Protocol.SigSubmissionV2.Outbound (sigSubmissionV2OutboundPeer) -import DMQ.Protocol.SigSubmissionV2.Type (NumIdsAck (..), SigSubmissionV2) +import DMQ.Protocol.SigSubmissionV2.Type (Message (..), NumIdsAck (..), + SigSubmissionV2) import DMQ.SigSubmissionV2.Inbound (sigSubmissionInbound) import DMQ.SigSubmissionV2.Outbound (sigSubmissionOutbound) +import Test.DMQ.PeerSelection.PeerMetric () import Test.DMQ.SigSubmission.Types (SigStateTrace (..), SigSubmissionState (..), TestVersion, sigSubmissionCodec2) @@ -69,7 +79,8 @@ import Test.Tasty.QuickCheck (testProperty) tests :: TestTree tests = testGroup "Test.DMQ.SigSubmissionV2.App" - [ testProperty "sigSubmissionV2" prop_sigSubmissionV2 + [ testProperty "sigSubmissionV2" prop_sigSubmissionV2 + , testProperty "sigSubmissionV2_metric" prop_sigSubmissionV2_metric ] @@ -314,6 +325,7 @@ runSigSubmissionV2 tracer tracerSigLogic st0 sigDecisionPolicy = do verboseTracer (getMempoolWriter duplicateSigsVar inboundMempool) api + PeerMetric.nullMetrics ctrlMsgSTM runPipelinedPeerWithLimits (("INBOUND " ++ show addr,) `contramap` verboseTracer) @@ -352,3 +364,373 @@ runSigSubmissionV2 tracer tracerSigLogic st0 sigDecisionPolicy = do where go as [] = action (reverse as) go as ((x,y):xs) = withAsync x (\a -> withAsync y (\b -> go ((a, b):as) xs)) + + +-- | The peer address type used in the simulation. All trace-event type +-- aliases below are parameterised by this alias so that a change to the peer +-- address type causes a compile error rather than silently emptying the traces. +type SimPeerAddr = Int + +-- | Dynamic-trace type for inbound protocol events (emitted by the per-peer +-- protocol tracer in 'runSigSubmissionV2WithMetric'). +type SimProtocolEvent = TraceLabelPeer SimPeerAddr + (TraceSendRecv (SigSubmissionV2 TxId (Tx TxId))) + +-- | Dynamic-trace type for inbound application-layer events (emitted by the +-- per-peer application tracer in 'runSigSubmissionV2WithMetric'). +type SimAppEvent = TraceLabelPeer SimPeerAddr + (TraceTxSubmissionInbound TxId (Tx TxId)) + +-- | Dynamic-trace type for metric state snapshots (emitted by 'traceTVarIO' +-- on the peer-metric TVar in 'runSigSubmissionV2WithMetric'). +type SimMetricSnapshot = PeerMetricState TxId SimPeerAddr + +-- | Unified event type emitted into the dynamic trace so that peer events and +-- metric snapshots are extracted in a single ordered pass. Using a single +-- 'selectTraceEventsDynamicWithTime' call preserves the in-thread interleaving +-- (sig_add → snapshot → sig_add → snapshot …) that separate calls would lose. +data SimTraceEvent + = SimProtocolEvent SimProtocolEvent + | SimAppEvent SimAppEvent + | SimMetricSnapshot SimMetricSnapshot + deriving Show + + +sigSubmissionSimulationWithMetric + :: forall s. + PeerMetric.PeerMetricConfiguration + -> SigSubmissionState + -> IOSim s ([Tx TxId], [[Tx TxId]]) +sigSubmissionSimulationWithMetric config (SigSubmissionState state sigDecisionPolicy) = do + state' <- traverse (\(sigs, mbOutDelay, mbInDelay) -> do + let mbOutDelayTime = getSmallDelay . getPositive <$> mbOutDelay + mbInDelayTime = getSmallDelay . getPositive <$> mbInDelay + controlMessageVar <- newTVarIO Continue + return ( sigs + , controlMessageVar + , mbOutDelayTime + , mbInDelayTime + ) + ) + state + + state'' <- traverse (\(sigs, var, mbOutDelay, mbInDelay) -> + return ( sigs + , readTVar var + , mbOutDelay + , mbInDelay + ) + ) + state' + + let simDelayTime = Map.foldl' (\m (sigs, _, mbInDelay, mbOutDelay) -> + max m ( fromMaybe 1 (max <$> mbInDelay <*> mbOutDelay) + * realToFrac (length sigs `div` 4) + ) + ) + 0 + state'' + controlMessageVars = (\(_, x, _, _) -> x) + <$> Map.elems state' + + withAsync + (do threadDelay (simDelayTime + 1100) + atomically (traverse_ (`writeTVar` Terminate) controlMessageVars) + ) \_ -> do + let tracer :: forall a. (Show a, Typeable a) => Tracer (IOSim s) a + tracer = dynamicTracer <> sayTracer + runSigSubmissionV2WithMetric tracer tracer config state'' sigDecisionPolicy + + +-- | Variant of 'runSigSubmissionV2' that wires a real 'PeerMetric' into each +-- inbound peer and registers a 'traceTVarIO' callback so that every +-- meaningful change to the metric state is emitted into the IOSim dynamic +-- trace as a 'SimMetricSnapshot'. +-- +-- The inbound protocol tracer emits 'SimProtocolEvent' and the application +-- tracer emits 'SimAppEvent'; all three event kinds are consumed by +-- 'prop_sigSubmissionV2_metric'. +runSigSubmissionV2WithMetric + :: forall s. + Tracer (IOSim s) (String, TraceSendRecv (SigSubmissionV2 TxId (Tx TxId))) + -> Tracer (IOSim s) (TraceTxLogic SimPeerAddr TxId (Tx TxId)) + -> PeerMetric.PeerMetricConfiguration + -> Map SimPeerAddr ( [Tx TxId] + , ControlMessageSTM (IOSim s) + , Maybe DiffTime + , Maybe DiffTime + ) + -> TxDecisionPolicy + -> IOSim s ([Tx TxId], [[Tx TxId]]) +runSigSubmissionV2WithMetric tracer tracerSigLogic config st0 sigDecisionPolicy = do + st <- traverse (\(b, c, d, e) -> do + mempool <- newMempool b + (outChannel, inChannel) <- createConnectedChannels + return (mempool, c, d, e, outChannel, inChannel) + ) st0 + inboundMempool <- emptyMempool + let sigRng = mkStdGen 42 + + sigChannelsVar <- newMVar (TxChannels Map.empty) + sigMempoolSem <- newTxMempoolSem + sharedSigStateVar <- newSharedTxStateVar sigRng + traceTVarIO sharedSigStateVar \_ -> return . TraceDynamic . SigStateTrace + labelTVarIO sharedSigStateVar "shared-sig-state" + duplicateSigsVar <- LazySTM.newTVarIO [] + + peerMetric <- PeerMetric.mkPeerMetric + traceTVarIO (peerMetricVar peerMetric) $ \prev new -> + pure $ case prev of + Just p | PeerMetric.announcinessImpl p == PeerMetric.announcinessImpl new + -> DontTrace + _ -> TraceDynamic (SimMetricSnapshot new) + + withAsync (decisionLogicThreads tracerSigLogic sayTracer + sigDecisionPolicy sigChannelsVar sharedSigStateVar) $ \a -> do + let outbounds = (\(addr, (mempool, _, outDelay, _, outChannel, _)) -> do + labelThisThread ("outbound-" ++ show addr) + let outbound = sigSubmissionOutbound + (Tracer $ say . show) + (NumIdsAck $ getNumTxIdsToReq $ maxUnacknowledgedTxIds sigDecisionPolicy) + (getMempoolReader mempool) + (maxBound :: TestVersion) + runPeerWithLimits + (("OUTBOUND " ++ show addr,) `contramap` tracer) + sigSubmissionCodec2 + (byteLimitsSigSubmissionV2 (fromIntegral . BSL.length)) + timeLimitsSigSubmissionV2 + (maybe id delayChannel outDelay outChannel) + (sigSubmissionV2OutboundPeer outbound) + ) + <$> Map.assocs st + + let inbounds = (\(addr, (_, ctrlMsgSTM, _, inDelay, _, inChannel)) -> do + labelThisThread ("inbound-" ++ show addr) + withPeer tracerSigLogic + sigChannelsVar + sigMempoolSem + sigDecisionPolicy + sharedSigStateVar + (getMempoolReader inboundMempool) + (getMempoolWriter duplicateSigsVar inboundMempool) + getTxSize + addr $ \(api :: PeerTxAPI (IOSim s) TxId (Tx TxId)) -> do + let inbound = sigSubmissionInbound + (contramap (SimAppEvent . TraceLabelPeer addr) + (dynamicTracer :: Tracer (IOSim s) SimTraceEvent)) + (getMempoolWriter duplicateSigsVar inboundMempool) + api + (PeerMetric.hoist + (TraceLabelPeer addr . runIdentity) + (PeerMetric.reportMetric + config + peerMetric)) + ctrlMsgSTM + runPipelinedPeerWithLimits + (contramap (SimProtocolEvent . TraceLabelPeer addr) + (dynamicTracer :: Tracer (IOSim s) SimTraceEvent)) + sigSubmissionCodec2 + (byteLimitsSigSubmissionV2 (fromIntegral . BSL.length)) + timeLimitsSigSubmissionV2 + (maybe id delayChannel inDelay inChannel) + (sigSubmissionV2InboundPeerPipelined inbound) + ) <$> Map.assocs st + + withAsyncAll (inbounds `zip` outbounds) $ \as -> do + _ <- waitAllInbounds as + cancel a + + inmp <- readMempool inboundMempool + let outmp = map (\(sigs, _, _, _) -> sigs) + $ Map.elems st0 + + return (inmp, outmp) + where + waitAllInbounds :: [(Async (IOSim s) x, Async (IOSim s) x)] -> IOSim s [Either SomeException x] + waitAllInbounds [] = return [] + waitAllInbounds ((inbound, outbound):as) = do + r <- waitCatch inbound + cancel outbound + rs <- waitAllInbounds as + return (r : rs) + + withAsyncAll :: [(IOSim s a, IOSim s a)] + -> ([(Async (IOSim s) a, Async (IOSim s) a)] -> IOSim s b) + -> IOSim s b + withAsyncAll xs0 action = go [] xs0 + where + go as [] = action (reverse as) + go as ((x,y):xs) = withAsync x (\a -> withAsync y (\b -> go ((a, b):as) xs)) + + +-- | Checks that on every 'SimMetricSnapshot' in the merged trace the actual +-- 'announcinessImpl' agrees with the pure model rebuilt from 'SimProtocolEvent' +-- and 'SimAppEvent' events seen so far. +prop_sigSubmissionV2_metric :: PeerMetric.PeerMetricConfiguration + -> SigSubmissionState + -> Property +prop_sigSubmissionV2_metric config st@(SigSubmissionState peers _) = + let tr = runSimTrace (sigSubmissionSimulationWithMetric config st) + in label ("number of peers: " ++ renderRanges 3 (Map.size peers)) + $ case traceResult True tr of + Left e -> + counterexample (show e) + . counterexample (ppTrace tr) + $ False + Right _ -> + let events = selectTraceEventsDynamicWithTime @_ @SimTraceEvent tr + numSnapshots = length + . filter + (\case + (_, SimMetricSnapshot{}) -> True + _ -> False + ) + $ events + numAppEvents = length + . filter + (\case + (_, SimAppEvent{}) -> True + _ -> False + ) + $ events + anyNonEmpty = any + (\case + (_, SimMetricSnapshot sn) -> + not (Map.null (announcinessImpl sn)) + _ -> False + ) + events + in classify (numSnapshots > 0) "metric fired" + . classify anyNonEmpty "non-empty announciness observed" + . label ("snapshots: " ++ renderRanges 10 numSnapshots) + . label ("mempool adds: " ++ renderRanges 100 numAppEvents) + $ checkTrace config events + + +-- --------------------------------------------------------------------------- +-- Pure model +-- --------------------------------------------------------------------------- + +-- | State of the pure model, mirroring the two-stage bookkeeping inside +-- 'reportSigImpl': first we record when each peer announced each sigid, then +-- on acceptance we move the entry into the accepted window. +data PureModelState = PureModelState + { announced :: Map (TxId, SimPeerAddr) Time + -- ^ (sigid, peer) → time the peer first announced the sigid + , accepted :: Map TxId (Time, SimPeerAddr) + -- ^ accepted sigs still within the time window: sigid → (time, peer) + } + +emptyPureModelState :: PureModelState +emptyPureModelState = PureModelState Map.empty Map.empty + +-- | Advance the pure model when a sigid is announced using a protocol message. +-- Only 'TraceRecvMsg' of 'MsgReplySigIds' matters: it records the IOSim time at +-- which the inbound peer received the sigid announcement. +-- +updatePureModelOnSigAnnounced + :: Time + -> SimProtocolEvent + -> PureModelState + -> PureModelState +updatePureModelOnSigAnnounced t (TraceLabelPeer addr (TraceRecvMsg (AnyMessage msg))) st = + case msg of + MsgReplySigIds sigids -> + let txids = fst <$> toList sigids + in st { announced = Foldable.foldl' (\m txid -> Map.insert (txid, addr) t m) + (announced st) txids } + _ -> st +updatePureModelOnSigAnnounced _ _ st = st + + +-- | Advance the pure model when on mempool submission result. Only +-- 'TraceTxInboundAddedToMempool' matters: it moves the sigid from the +-- announced set into the accepted window. +-- +-- Note: the model is simple but good enough to provide the same announciness +-- metrics as the production implementation. For example we never evacuate +-- entries from `announced`, but we make sure we insert and prune `accepted` +-- map in a way to make the mode faithful. +-- +updatePureModelOnMempoolResult + :: PeerMetric.PeerMetricConfiguration + -> SimAppEvent + -> PureModelState + -> PureModelState + +-- valid signatures: +-- * prune old entries +-- * insert new entry +updatePureModelOnMempoolResult + (PeerMetric.PeerMetricConfiguration window) + (TraceLabelPeer addr (TraceTxInboundAddedToMempool sigids _)) + st0 + = + Foldable.foldl' fn st0 sigids + where + fn st sigid = + case Map.lookup (sigid, addr) (announced st) of + Nothing -> st + Just t -> + let accepted' = Map.filter (\(time, _) -> time > (-window) `addTime` t) (accepted st) + accepted'' = Map.insert sigid (t, addr) accepted' + in st { accepted = accepted'' } + +-- invalid signatures: +-- * prune old entries +updatePureModelOnMempoolResult + (PeerMetric.PeerMetricConfiguration window) + (TraceLabelPeer addr (TraceTxInboundRejectedFromMempool sigids _)) + st0 + = + Foldable.foldl' fn st0 sigids + where + fn st sigid = + case Map.lookup (sigid, addr) (announced st) of + Nothing -> st + Just t -> + let accepted' = Map.filter (\(time, _) -> time > (-window) `addTime` t) (accepted st) + in st { accepted = accepted' } + +-- any other `TraceTxSubmissionInbound` event can be ignored +updatePureModelOnMempoolResult _ _ st = st + + +-- | Derive announciness counts from the pure model state. +-- +expectedAnnounciness :: PureModelState -> Map SimPeerAddr Int +expectedAnnounciness PureModelState { accepted } = + Map.fromListWith (+) [ (peer, 1) | (_, peer) <- Map.elems accepted ] + + +-- --------------------------------------------------------------------------- +-- Trace checking +-- --------------------------------------------------------------------------- + +-- | Fold over the ordered event stream, updating the pure model on peer events +-- and asserting agreement on metric snapshots. +checkTrace + :: PeerMetric.PeerMetricConfiguration + -> [(Time, SimTraceEvent)] + -> Property +checkTrace config evs = + case Foldable.foldl' step (emptyPureModelState, property True, 0) evs of + (_, prop, maxScore) -> + label ("max score: " ++ renderRanges 2 maxScore) prop + where + step :: (PureModelState, Property, Int) + -> (Time, SimTraceEvent) + -> (PureModelState, Property, Int) + step (st, prop, maxScore) (t, SimProtocolEvent event) = (updatePureModelOnSigAnnounced t event st, prop, maxScore) + step (st, prop, maxScore) (_, SimAppEvent event) = (updatePureModelOnMempoolResult config event st, prop, maxScore) + step (st, prop, maxScore) (_, SimMetricSnapshot snapshot) = + let expected = expectedAnnounciness st + actual = announcinessImpl snapshot + maxScore' = if Map.null actual then maxScore else maximum actual + in ( st + , prop .&&. ( counterexample ("actual: " ++ show actual) + . counterexample ("expected: " ++ show expected) + $ actual === expected + ) + , maxScore' + ) diff --git a/scripts/ci/run-nixpkgs-fmt.sh b/scripts/ci/run-nixpkgs-fmt.sh index a4cf8f3e..2dffac31 100755 --- a/scripts/ci/run-nixpkgs-fmt.sh +++ b/scripts/ci/run-nixpkgs-fmt.sh @@ -2,4 +2,7 @@ set -euo pipefail -fd -e nix -X nixpkgs-fmt +# First, try to find the 'fd' command +FD="$(which fdfind 2>/dev/null || which fd 2>/dev/null)" + +$FD -e nix -X nixpkgs-fmt diff --git a/scripts/ci/run-stylish-haskell.sh b/scripts/ci/run-stylish-haskell.sh index bbc2ef11..563f774f 100755 --- a/scripts/ci/run-stylish-haskell.sh +++ b/scripts/ci/run-stylish-haskell.sh @@ -2,6 +2,9 @@ set -euo pipefail +# First, try to find the 'fd' command +FD="$(which fdfind 2>/dev/null || which fd 2>/dev/null)" + function usage { echo "Usage $(basename "$0") [-ch]" echo "Check files with 'stylish-haskell'; by default check all files." @@ -32,7 +35,7 @@ while getopts ${optstring} arg; do PATHS=$(git show --pretty='' --name-only HEAD) for path in $PATHS; do echo $path - fd -e hs --ignore-file ./scripts/ci/check-stylish-ignore --full-path $path -X stylish-haskell $STYLISH_HASKELL_ARGS + $FD -e hs --ignore-file ./scripts/ci/check-stylish-ignore --full-path $path -X stylish-haskell $STYLISH_HASKELL_ARGS done if [ $USE_GIT == 1 ]; then git --no-pager diff --exit-code @@ -44,7 +47,7 @@ while getopts ${optstring} arg; do for path in $PATHS; do if [ "${path##*.}" == "hs" ]; then echo $path - fd -e hs --ignore-file ./scripts/ci/check-stylish-ignore --full-path $path -X stylish-haskell $STYLISH_HASKELL_ARGS + $FD -e hs --ignore-file ./scripts/ci/check-stylish-ignore --full-path $path -X stylish-haskell $STYLISH_HASKELL_ARGS fi done if [ $USE_GIT == 1 ]; then @@ -62,7 +65,7 @@ done # TODO CPP pragmas in export lists are not supported by stylish-haskell FD_OPTS="-e hs --ignore-file ./scripts/ci/check-stylish-ignore -X stylish-haskell $STYLISH_HASKELL_ARGS" -fd . './dmq-node' $FD_OPTS +$FD . './dmq-node' $FD_OPTS if [ $USE_GIT == 1 ]; then git --no-pager diff --exit-code