@@ -16,7 +16,9 @@ when defined(broker):
1616 times,
1717 random,
1818 nmqtt/ utils/ passwords,
19- nmqtt/ utils/ version
19+ nmqtt/ utils/ version,
20+ nmqtt/ utils/ acl,
21+ nmqtt/ utils/ trie
2022 from parsecfg import loadConfig, getSectionValue
2123 from os import fileExists
2224
@@ -142,6 +144,7 @@ when defined(broker):
142144 connections: Table [string , MqttCtx ]
143145 retained: Table [string , RetainedMsg ] # Topic, RetaindMsg
144146 subscribers: Table [string , seq [MqttCtx ]]
147+ subTrie: TopicTrie # Trie index for O(depth) subscription matching
145148 version: uint8
146149 clientIdMaxLen: int
147150 clientKickOld: bool
@@ -150,6 +153,7 @@ when defined(broker):
150153 passClientId: bool
151154 maxConnections: int
152155 passwords: Table [string , string ]
156+ acl: AclStore
153157
154158 RetainedMsg = object
155159 msg: string
@@ -159,7 +163,7 @@ when defined(broker):
159163
160164when defined (broker):
161165 var
162- mqttbroker = MqttBroker ()
166+ mqttbroker = MqttBroker (acl: newAclStore (), subTrie: newTopicTrie () )
163167 r = initRand (toInt (epochTime ()))
164168
165169
@@ -289,6 +293,8 @@ when defined(broker):
289293 mqttbroker.subscribers[topic].insert (ctx)
290294 else :
291295 mqttbroker.subscribers[topic] = @ [ctx]
296+ # First subscriber on this filter, add to the trie index
297+ mqttbroker.subTrie.subscribe (topic)
292298 except :
293299 wrn (" Crash when adding a new subcriber" )
294300
@@ -298,6 +304,9 @@ when defined(broker):
298304 try :
299305 if mqttbroker.subscribers.hasKey (topic):
300306 mqttbroker.subscribers[topic] = filter (mqttbroker.subscribers[topic], proc (x: MqttCtx ): bool = x != ctx)
307+ if mqttbroker.subscribers[topic].len () == 0 :
308+ mqttbroker.subscribers.del (topic)
309+ mqttbroker.subTrie.unsubscribe (topic)
301310 except :
302311 wrn (" Crash when removing subscriber with specific topic" )
303312
@@ -314,6 +323,7 @@ when defined(broker):
314323
315324 for t in delTop:
316325 mqttbroker.subscribers.del (t)
326+ mqttbroker.subTrie.unsubscribe (t)
317327
318328when defined (broker):
319329 proc qosAlign (qP, qS: uint8 ): uint8 =
@@ -641,20 +651,22 @@ when defined(broker):
641651 await c.work ()
642652
643653when defined (broker):
644- proc publishToSubscribers (seqctx: seq [MqttCtx ], pkt: Pkt , topic, message: string , qos: uint8 , retain: bool , senderId: string ) {.async .} =
645- # # Publish async to clients
654+ proc publishToSubscribers (seqctx: seq [MqttCtx ], pkt: Pkt , subFilter, pubTopic, message: string , qos: uint8 , retain: bool , senderId: string ) {.async .} =
655+ # # Publish async to clients.
656+ # # `subFilter` is the subscription key (e.g. "topic/subtopic/#") for QoS lookup.
657+ # # `pubTopic` is the actual published topic (e.g. "topic/subtopic/specific") sent to the client.
646658 for c in seqctx:
647659 if c.state != Connected :
648- asyncCheck removeSubscriber (c, topic )
660+ asyncCheck removeSubscriber (c, subFilter )
649661 continue
650662 let
651663 msgId = c.nextMsgId ()
652- qosSub = qosAlign (qos, c.subscribed[topic ])
664+ qosSub = qosAlign (qos, c.subscribed[subFilter ])
653665
654666 if mqttbroker.passClientId:
655- c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: topic , qos: qosSub, retain: retain, message: senderId & " :" & message, typ: Publish )
667+ c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: pubTopic , qos: qosSub, retain: retain, message: senderId & " :" & message, typ: Publish )
656668 else :
657- c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: topic , qos: qosSub, retain: retain, message: message, typ: Publish )
669+ c.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, topic: pubTopic , qos: qosSub, retain: retain, message: message, typ: Publish )
658670 await c.work ()
659671
660672when defined (broker):
@@ -810,12 +822,25 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
810822 (message, offset) = pkt.getstring (offset, false )
811823
812824 when defined (broker):
813- # Send message to all subscribers on "#"
814- if mqttbroker.subscribers.hasKey (" #" ):
815- await publishToSubscribers (mqttbroker.subscribers[" #" ], pkt, " #" , message, qos, retain, ctx.clientid)
816- # Send message to all subscribers on _the topic_
817- if mqttbroker.subscribers.hasKey (topic):
818- await publishToSubscribers (mqttbroker.subscribers[topic], pkt, topic, message, qos, retain, ctx.clientid)
825+ # Check ACL: does the publishing client have write access?
826+ if not mqttbroker.acl.checkPublish (ctx.username, ctx.clientId, topic):
827+ if mqttbroker.verbosity >= 1 :
828+ verbose (" ACL >> " & ctx.clientId & " denied publish to " & topic)
829+ # Per MQTT v3.1.1, the broker silently drops the message but still
830+ # completes the QoS handshake so the client doesn't stall.
831+ if qos == 1 :
832+ ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 1 , typ: PubAck )
833+ await ctx.work ()
834+ elif qos == 2 :
835+ ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 2 , typ: PubRec )
836+ await ctx.work ()
837+ return
838+
839+ # Route the published message to all matching subscribers.
840+ # Uses the trie index for O(topic_depth) matching instead of O(n) scan.
841+ for subFilter in mqttbroker.subTrie.matchingFilters (topic):
842+ if mqttbroker.subscribers.hasKey (subFilter):
843+ await publishToSubscribers (mqttbroker.subscribers[subFilter], pkt, subFilter, topic, message, qos, retain, ctx.clientid)
819844
820845 if mqttbroker.verbosity >= 1 :
821846 verbose (" Client >> " & ctx.clientId & " has published a message" )
@@ -869,36 +894,44 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
869894
870895proc onPubAck (ctx: MqttCtx , pkt: Pkt ) {.async .} =
871896 let (msgId, _) = pkt.getu16 (0 )
872- assert msgId in ctx.workQueue
873- assert ctx.workQueue[msgId].wk == PubWork
874- assert ctx.workQueue[msgId].state == WorkSent
875- assert ctx.workQueue[msgId].qos == 1
897+ if msgId notin ctx.workQueue:
898+ ctx.dmp " PubAck for unknown msgId: " & $ msgId
899+ return
900+ if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 1 :
901+ ctx.dmp " PubAck unexpected state for msgId: " & $ msgId
902+ return
876903 ctx.workQueue.del msgId
877904
878905proc onPubRec (ctx: MqttCtx , pkt: Pkt ) {.async .} =
879906 let (msgId, _) = pkt.getu16 (0 )
880- assert msgId in ctx.workQueue
881- assert ctx.workQueue[msgId].wk == PubWork
882- assert ctx.workQueue[msgId].state == WorkSent
883- assert ctx.workQueue[msgId].qos == 2
907+ if msgId notin ctx.workQueue:
908+ ctx.dmp " PubRec for unknown msgId: " & $ msgId
909+ return
910+ if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2 :
911+ ctx.dmp " PubRec unexpected state for msgId: " & $ msgId
912+ return
884913 ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 2 , typ: PubRel )
885914 await ctx.work ()
886915
887916proc onPubRel (ctx: MqttCtx , pkt: Pkt ) {.async .} =
888917 let (msgId, _) = pkt.getu16 (0 )
889- assert msgId in ctx.workQueue
890- assert ctx.workQueue[msgId].wk == PubWork
891- assert ctx.workQueue[msgId].state == WorkSent
892- assert ctx.workQueue[msgId].qos == 2
918+ if msgId notin ctx.workQueue:
919+ ctx.dmp " PubRel for unknown msgId: " & $ msgId
920+ return
921+ if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2 :
922+ ctx.dmp " PubRel unexpected state for msgId: " & $ msgId
923+ return
893924 ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 2 , typ: PubComp )
894925 await ctx.work ()
895926
896927proc onPubComp (ctx: MqttCtx , pkt: Pkt ) {.async .} =
897928 let (msgId, _) = pkt.getu16 (0 )
898- assert msgId in ctx.workQueue
899- assert ctx.workQueue[msgId].wk == PubWork
900- assert ctx.workQueue[msgId].state == WorkSent
901- assert ctx.workQueue[msgId].qos == 2
929+ if msgId notin ctx.workQueue:
930+ ctx.dmp " PubComp for unknown msgId: " & $ msgId
931+ return
932+ if ctx.workQueue[msgId].wk != PubWork or ctx.workQueue[msgId].qos != 2 :
933+ ctx.dmp " PubComp unexpected state for msgId: " & $ msgId
934+ return
902935 ctx.workQueue.del msgId
903936
904937# when defined(broker):
@@ -921,6 +954,15 @@ proc onSubscribe(ctx: MqttCtx, pkt: Pkt) {.async.} =
921954 (topic, offset) = pkt.getstring (offset, parseInt ($ nextLen))
922955 (qos, offset) = pkt.getu8 (offset)
923956
957+ # Check ACL: does the client have read access for this topic?
958+ if not mqttbroker.acl.checkSubscribe (ctx.username, ctx.clientId, topic):
959+ if mqttbroker.verbosity >= 1 :
960+ verbose (" ACL >> " & ctx.clientId & " denied subscribe to " & topic)
961+ # Send SubAck with failure return code (0x80) per MQTT v3.1.1 spec
962+ ctx.workQueue[msgId] = Work (wk: PubWork , msgId: msgId, state: WorkNew , qos: 0 , typ: SubAck )
963+ await ctx.work ()
964+ return
965+
924966 ctx.subscribed[topic] = qos
925967 await addSubscriber (ctx, topic)
926968
0 commit comments