From 9a05344213bb3243791d71a987387d5a9eb546a8 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 7 Apr 2026 16:22:59 +0200 Subject: [PATCH 1/3] Exit early if trying to close the peer connections twice Previously, the repeated close() calls were hanging indefinitely for PublisherPeerConnection and SubscriberPeerConnection. Now, we set `_closed=True` guard after the connection is closed for the first time and exit early on the repeated call. --- getstream/video/rtc/pc.py | 35 ++++++++++++++++++ getstream/video/rtc/reconnection.py | 4 +++ tests/{ => rtc}/test_connection_manager.py | 41 +++++++++++++++++++--- tests/rtc/test_subscriber_drain.py | 1 + 4 files changed, 77 insertions(+), 4 deletions(-) rename tests/{ => rtc}/test_connection_manager.py (83%) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index feab89a7..a9d10888 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -50,6 +50,7 @@ def __init__( ) super().__init__(configuration) self.manager = manager + self._closed = False self._connected_event = asyncio.Event() for transceiver in self.getTransceivers(): @@ -113,6 +114,14 @@ async def wait_for_connected(self, timeout: float = 15.0): logger.error(f"Publisher connection timed out after {timeout}s") raise TimeoutError(f"Connection timed out after {timeout} seconds") + async def close(self): + # Using self._closed guard here + # to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time) + if self._closed: + return + await super().close() + self._closed = True + async def restartIce(self): """Restart ICE connection for reconnection scenarios.""" logger.info("Restarting ICE connection for publisher") @@ -138,6 +147,7 @@ def __init__( ) super().__init__(configuration) self.connection = connection + self._closed = False self._drain_video_frames = drain_video_frames self.track_map = {} # track_id -> (MediaRelay, original_track) @@ -245,6 +255,31 @@ def get_video_frame_tracker(self) -> Optional[Any]: return next(iter(self.video_frame_trackers.values())) return None + async def close(self): + # Using self._closed guard here + # to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time) + if self._closed: + return + + # Clean up video drains + for blackhole, drain_task, drain_proxy in list(self._video_drains.values()): + drain_task.cancel() + drain_proxy.stop() + await blackhole.stop() + self._video_drains.clear() + + # Cancel background tasks + for task in list(self._background_tasks): + task.cancel() + self._background_tasks.clear() + + # Clear track maps + self.track_map.clear() + self.video_frame_trackers.clear() + + await super().close() + self._closed = True + async def restartIce(self): """Restart ICE connection for reconnection scenarios.""" logger.info("Restarting ICE connection for subscriber") diff --git a/getstream/video/rtc/reconnection.py b/getstream/video/rtc/reconnection.py index 003f117b..1d23feb4 100644 --- a/getstream/video/rtc/reconnection.py +++ b/getstream/video/rtc/reconnection.py @@ -284,6 +284,10 @@ async def _reconnect_migrate(self): current_publisher = self.connection_manager.publisher_pc current_subscriber = self.connection_manager.subscriber_pc + # Clear old references so _connect_internal creates fresh PCs + self.connection_manager.publisher_pc = None + self.connection_manager.subscriber_pc = None + self.connection_manager.connection_state = ConnectionState.MIGRATING if current_publisher and hasattr(current_publisher, "removeListener"): diff --git a/tests/test_connection_manager.py b/tests/rtc/test_connection_manager.py similarity index 83% rename from tests/test_connection_manager.py rename to tests/rtc/test_connection_manager.py index 91e8bbfe..fa9962a2 100644 --- a/tests/test_connection_manager.py +++ b/tests/rtc/test_connection_manager.py @@ -1,12 +1,23 @@ +import asyncio import contextlib +import uuid +from unittest.mock import AsyncMock, MagicMock, patch import pytest -from unittest.mock import AsyncMock, patch, MagicMock +from dotenv import load_dotenv +from getstream import AsyncStream +from getstream.video import rtc from getstream.video.rtc.connection_manager import ConnectionManager -from getstream.video.rtc.connection_utils import SfuJoinError, SfuConnectionError +from getstream.video.rtc.connection_utils import ( + ConnectionState, + SfuConnectionError, + SfuJoinError, +) from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2 +load_dotenv() + @contextlib.contextmanager def patched_dependencies(): @@ -45,8 +56,30 @@ def connection_manager(request): yield cm -class TestConnectRetry: - """Tests for connect() retry logic when SFU is full.""" +@pytest.fixture +def client(): + return AsyncStream(timeout=10.0) + + +class TestConnectionManager: + @pytest.mark.asyncio + @pytest.mark.integration + async def test_leave_twice_does_not_hang(self, client: AsyncStream): + """Integration test: join a real call and leave twice without hanging.""" + call_id = str(uuid.uuid4()) + call = client.video.call("default", call_id) + + async with await rtc.join(call, "test-user") as connection: + assert connection.connection_state == ConnectionState.JOINED + + await asyncio.sleep(2) + + await asyncio.wait_for(connection.leave(), timeout=10.0) + assert connection.connection_state == ConnectionState.LEFT + + # Second leave must not hang + await asyncio.wait_for(connection.leave(), timeout=10.0) + assert connection.connection_state == ConnectionState.LEFT @pytest.mark.asyncio @pytest.mark.parametrize("connection_manager", [2], indirect=True) diff --git a/tests/rtc/test_subscriber_drain.py b/tests/rtc/test_subscriber_drain.py index 1d00b688..44db03b5 100644 --- a/tests/rtc/test_subscriber_drain.py +++ b/tests/rtc/test_subscriber_drain.py @@ -13,6 +13,7 @@ def subscriber_pc(): """Create a SubscriberPeerConnection bypassing heavy parent inits.""" pc = SubscriberPeerConnection.__new__(SubscriberPeerConnection) pc.connection = Mock() + pc._closed = False pc._drain_video_frames = True pc.track_map = {} pc.video_frame_trackers = {} From bab3958bc057c71087ac2c5e0904c6138892870c Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 7 Apr 2026 16:40:07 +0200 Subject: [PATCH 2/3] Fix makefile --- Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/Makefile b/Makefile index f3208dcc..a0005561 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,6 @@ VIDEO_PATHS := \ getstream/video \ tests/rtc \ tests/test_audio_stream_track.py \ - tests/test_connection_manager.py \ tests/test_connection_utils.py \ tests/test_signaling.py \ tests/test_video_examples.py \ From ac4d370eee3e43bfaa8ba3167fcbb041bcbb0311 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 7 Apr 2026 17:16:39 +0200 Subject: [PATCH 3/3] Set _closed=True in peer connections before super is called --- getstream/video/rtc/pc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index a9d10888..0df0d777 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -119,8 +119,8 @@ async def close(self): # to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time) if self._closed: return - await super().close() self._closed = True + await super().close() async def restartIce(self): """Restart ICE connection for reconnection scenarios.""" @@ -277,8 +277,8 @@ async def close(self): self.track_map.clear() self.video_frame_trackers.clear() - await super().close() self._closed = True + await super().close() async def restartIce(self): """Restart ICE connection for reconnection scenarios."""