Skip to content

Commit a892e76

Browse files
author
Sebastian Molenda
committed
Aligned status emmiting
1 parent 0656e6a commit a892e76

7 files changed

Lines changed: 133 additions & 28 deletions

File tree

pubnub/enums.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class PNStatusCategory(Enum):
3737
PNTLSConnectionFailedCategory = 15
3838
PNTLSUntrustedCertificateCategory = 16
3939
PNInternalExceptionCategory = 17
40+
PNSubscriptionChangedCategory = 18
41+
PNConnectionErrorCategory = 19
4042

4143

4244
class PNOperationType(object):

pubnub/event_engine/effects.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
128128
recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken)
129129
self.event_engine.trigger(recieve_failure)
130130
elif response.status.error:
131+
if self.stop_event.is_set():
132+
self.logger.debug(f'Recieve messages cancelled: {response.status.error_data.__dict__}')
133+
return
131134
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
132135
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
133136
self.event_engine.trigger(recieve_failure)
@@ -437,6 +440,9 @@ def set_pn(self, pubnub: PubNub):
437440
self.message_worker = BaseMessageWorker(pubnub)
438441

439442
def emit(self, invocation: invocations.PNEmittableInvocation):
443+
if isinstance(invocation, list):
444+
for inv in invocation:
445+
self.emit(inv)
440446
if isinstance(invocation, invocations.EmitMessagesInvocation):
441447
self.emit_message(invocation)
442448
if isinstance(invocation, invocations.EmitStatusInvocation):
@@ -452,5 +458,9 @@ def emit_status(self, invocation: invocations.EmitStatusInvocation):
452458
pn_status = PNStatus()
453459
pn_status.category = invocation.status
454460
pn_status.operation = invocation.operation
461+
if invocation.context and invocation.context.channels:
462+
pn_status.affected_channels = invocation.context.channels
463+
if invocation.context and invocation.context.groups:
464+
pn_status.affected_groups = invocation.context.groups
455465
pn_status.error = False
456466
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)

pubnub/event_engine/models/invocations.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Union
1+
from typing import List, Optional, Union
22
from pubnub.exceptions import PubNubException
33
from pubnub.enums import PNOperationType, PNStatusCategory
44

@@ -90,10 +90,16 @@ def __init__(self, messages: Union[None, List[str]]) -> None:
9090

9191

9292
class EmitStatusInvocation(PNEmittableInvocation):
93-
def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None:
93+
def __init__(
94+
self,
95+
status: Optional[PNStatusCategory],
96+
operation: Optional[PNOperationType] = None,
97+
context=None,
98+
) -> None:
9499
super().__init__()
95100
self.status = status
96101
self.operation = operation
102+
self.context = context
97103

98104

99105
"""

pubnub/event_engine/models/states.py

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
7373

7474
return PNTransition(
7575
state=HandshakingState,
76-
context=self._context
76+
context=self._context,
77+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
78+
operation=PNOperationType.PNSubscribeOperation,
79+
context=self._context)
7780
)
7881

7982
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -122,7 +125,15 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
122125

123126
return PNTransition(
124127
state=HandshakingState,
125-
context=self._context
128+
context=self._context,
129+
invocation=[
130+
invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
131+
operation=PNOperationType.PNSubscribeOperation,
132+
context=self._context),
133+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
134+
operation=PNOperationType.PNSubscribeOperation,
135+
context=self._context),
136+
]
126137
)
127138

128139
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -148,7 +159,10 @@ def reconnecting(self, event: events.HandshakeFailureEvent, context: PNContext)
148159

149160
return PNTransition(
150161
state=HandshakeReconnectingState,
151-
context=self._context
162+
context=self._context,
163+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory,
164+
operation=PNOperationType.PNSubscribeOperation,
165+
context=self._context)
152166
)
153167

154168
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -183,8 +197,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
183197
return PNTransition(
184198
state=UnsubscribedState,
185199
context=self._context,
186-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
187-
operation=PNOperationType.PNUnsubscribeOperation)
200+
invocation=[
201+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
202+
operation=PNOperationType.PNSubscribeOperation,
203+
context=self._context),
204+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
205+
operation=PNOperationType.PNSubscribeOperation,
206+
context=self._context),
207+
]
188208
)
189209

190210

@@ -218,7 +238,10 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra
218238

219239
return PNTransition(
220240
state=HandshakeStoppedState,
221-
context=self._context
241+
context=self._context,
242+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
243+
operation=PNOperationType.PNSubscribeOperation,
244+
context=self._context)
222245
)
223246

224247
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -230,7 +253,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
230253

231254
return PNTransition(
232255
state=HandshakeReconnectingState,
233-
context=self._context
256+
context=self._context,
257+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
258+
operation=PNOperationType.PNSubscribeOperation,
259+
context=self._context)
234260
)
235261

236262
def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, context: PNContext) -> PNTransition:
@@ -240,7 +266,10 @@ def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, cont
240266

241267
return PNTransition(
242268
state=HandshakeReconnectingState,
243-
context=self._context
269+
context=self._context,
270+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory,
271+
operation=PNOperationType.PNSubscribeOperation,
272+
context=self._context)
244273
)
245274

246275
def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContext) -> PNTransition:
@@ -305,7 +334,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
305334

306335
return PNTransition(
307336
state=HandshakingState,
308-
context=self._context
337+
context=self._context,
338+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
339+
operation=PNOperationType.PNSubscribeOperation,
340+
context=self._context)
309341
)
310342

311343
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -340,8 +372,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
340372
return PNTransition(
341373
state=UnsubscribedState,
342374
context=self._context,
343-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
344-
operation=PNOperationType.PNUnsubscribeOperation)
375+
invocation=[
376+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
377+
operation=PNOperationType.PNSubscribeOperation,
378+
context=self._context),
379+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
380+
operation=PNOperationType.PNSubscribeOperation,
381+
context=self._context),
382+
]
345383
)
346384

347385

@@ -374,8 +412,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
374412
return PNTransition(
375413
state=UnsubscribedState,
376414
context=self._context,
377-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
378-
operation=PNOperationType.PNUnsubscribeOperation)
415+
invocation=[
416+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
417+
operation=PNOperationType.PNSubscribeOperation,
418+
context=self._context),
419+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
420+
operation=PNOperationType.PNSubscribeOperation,
421+
context=self._context),
422+
]
379423
)
380424

381425

@@ -412,7 +456,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
412456

413457
return PNTransition(
414458
state=self.__class__,
415-
context=self._context
459+
context=self._context,
460+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
461+
operation=PNOperationType.PNSubscribeOperation,
462+
context=self._context)
416463
)
417464

418465
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -446,7 +493,10 @@ def receiving_failure(self, event: events.ReceiveFailureEvent, context: PNContex
446493
self._context.timetoken = event.timetoken
447494
return PNTransition(
448495
state=ReceiveReconnectingState,
449-
context=self._context
496+
context=self._context,
497+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory,
498+
operation=PNOperationType.PNSubscribeOperation,
499+
context=self._context)
450500
)
451501

452502
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -477,8 +527,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
477527
return PNTransition(
478528
state=UnsubscribedState,
479529
context=self._context,
480-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
481-
operation=PNOperationType.PNUnsubscribeOperation)
530+
invocation=[
531+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
532+
operation=PNOperationType.PNSubscribeOperation,
533+
context=self._context),
534+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
535+
operation=PNOperationType.PNSubscribeOperation,
536+
context=self._context),
537+
]
482538
)
483539

484540

@@ -515,7 +571,10 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context:
515571

516572
return PNTransition(
517573
state=ReceiveReconnectingState,
518-
context=self._context
574+
context=self._context,
575+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory,
576+
operation=PNOperationType.PNSubscribeOperation,
577+
context=self._context)
519578
)
520579

521580
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -527,7 +586,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
527586

528587
return PNTransition(
529588
state=ReceiveReconnectingState,
530-
context=self._context
589+
context=self._context,
590+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
591+
operation=PNOperationType.PNSubscribeOperation,
592+
context=self._context)
531593
)
532594

533595
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -602,7 +664,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
602664

603665
return PNTransition(
604666
state=ReceivingState,
605-
context=self._context
667+
context=self._context,
668+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
669+
operation=PNOperationType.PNSubscribeOperation,
670+
context=self._context)
606671
)
607672

608673
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -637,8 +702,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
637702
return PNTransition(
638703
state=UnsubscribedState,
639704
context=self._context,
640-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
641-
operation=PNOperationType.PNUnsubscribeOperation)
705+
invocation=[
706+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
707+
operation=PNOperationType.PNSubscribeOperation,
708+
context=self._context),
709+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
710+
operation=PNOperationType.PNSubscribeOperation,
711+
context=self._context),
712+
]
642713
)
643714

644715

@@ -671,8 +742,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
671742
return PNTransition(
672743
state=UnsubscribedState,
673744
context=self._context,
674-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
675-
operation=PNOperationType.PNUnsubscribeOperation)
745+
invocation=[
746+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
747+
operation=PNOperationType.PNSubscribeOperation,
748+
context=self._context),
749+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
750+
operation=PNOperationType.PNSubscribeOperation,
751+
context=self._context),
752+
]
676753
)
677754

678755

pubnub/pubnub_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def __init__(self):
559559
self.error_queue = Queue()
560560

561561
def status(self, pubnub, status):
562+
super().status(pubnub, status)
562563
if utils.is_subscribed_event(status) and not self.connected_event.is_set():
563564
self.connected_event.set()
564565
elif utils.is_unsubscribed_event(status) and not self.disconnected_event.is_set():

tests/functional/event_engine/test_state_machine.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,23 @@
22
from pubnub.event_engine.statemachine import StateMachine
33

44

5+
class FakePN:
6+
def __init__(self) -> None:
7+
self._subscription_manager = self
8+
self._listener_manager = self
9+
10+
def announce_status(self, pn_status):
11+
...
12+
13+
514
def test_initialize_with_state():
615
machine = StateMachine(states.UnsubscribedState)
716
assert states.UnsubscribedState.__name__ == machine.get_state_name()
817

918

1019
def test_unsubscribe_state_trigger_sub_changed():
1120
machine = StateMachine(states.UnsubscribedState)
21+
machine.get_dispatcher().set_pn(FakePN())
1222
machine.trigger(events.SubscriptionChangedEvent(
1323
channels=['test'], groups=[]
1424
))
@@ -17,6 +27,7 @@ def test_unsubscribe_state_trigger_sub_changed():
1727

1828
def test_unsubscribe_state_trigger_sub_restored():
1929
machine = StateMachine(states.UnsubscribedState)
30+
machine.get_dispatcher().set_pn(FakePN())
2031
machine.trigger(events.SubscriptionChangedEvent(
2132
channels=['test'], groups=[]
2233
))

tests/integrational/asyncio/test_subscribe.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ async def test_subscribe_publish_unsubscribe():
129129
# )
130130
@pytest.mark.asyncio
131131
async def test_encrypted_subscribe_publish_unsubscribe():
132-
133132
pubnub = PubNubAsyncio(pnconf_enc_env_copy(enable_subscribe=True))
134133
pubnub.config.uuid = 'test-subscribe-asyncio-uuid'
135134

@@ -341,7 +340,6 @@ async def test_cg_join_leave():
341340

342341
pubnub.add_listener(callback_messages)
343342
pubnub.subscribe().channel_groups(gr).execute()
344-
345343
callback_messages_future = asyncio.ensure_future(callback_messages.wait_for_connect())
346344
presence_messages_future = asyncio.ensure_future(callback_presence.wait_for_presence_on(ch))
347345
await asyncio.wait([callback_messages_future, presence_messages_future])

0 commit comments

Comments
 (0)