Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ VIDEO_PATHS := \
getstream/video \
tests/rtc \
tests/test_audio_stream_track.py \
tests/test_connection_manager.py \
tests/test_connection_utils.py \
tests/test_signaling.py \
tests/test_video_examples.py \
Expand Down
35 changes: 35 additions & 0 deletions getstream/video/rtc/pc.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
)
super().__init__(configuration)
self.manager = manager
self._closed = False
self._connected_event = asyncio.Event()

for transceiver in self.getTransceivers():
Expand Down Expand Up @@ -113,6 +114,14 @@ async def wait_for_connected(self, timeout: float = 15.0):
logger.error(f"Publisher connection timed out after {timeout}s")
raise TimeoutError(f"Connection timed out after {timeout} seconds")

async def close(self):
# Using self._closed guard here
# to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time)
if self._closed:
return
self._closed = True
await super().close()

async def restartIce(self):
"""Restart ICE connection for reconnection scenarios."""
logger.info("Restarting ICE connection for publisher")
Expand All @@ -138,6 +147,7 @@ def __init__(
)
super().__init__(configuration)
self.connection = connection
self._closed = False
self._drain_video_frames = drain_video_frames

self.track_map = {} # track_id -> (MediaRelay, original_track)
Expand Down Expand Up @@ -245,6 +255,31 @@ def get_video_frame_tracker(self) -> Optional[Any]:
return next(iter(self.video_frame_trackers.values()))
return None

async def close(self):
# Using self._closed guard here
# to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time)
if self._closed:
return

# Clean up video drains
for blackhole, drain_task, drain_proxy in list(self._video_drains.values()):
drain_task.cancel()
drain_proxy.stop()
await blackhole.stop()
self._video_drains.clear()

# Cancel background tasks
for task in list(self._background_tasks):
task.cancel()
self._background_tasks.clear()

# Clear track maps
self.track_map.clear()
self.video_frame_trackers.clear()

self._closed = True
await super().close()

async def restartIce(self):
"""Restart ICE connection for reconnection scenarios."""
logger.info("Restarting ICE connection for subscriber")
Expand Down
4 changes: 4 additions & 0 deletions getstream/video/rtc/reconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ async def _reconnect_migrate(self):
current_publisher = self.connection_manager.publisher_pc
current_subscriber = self.connection_manager.subscriber_pc

# Clear old references so _connect_internal creates fresh PCs
self.connection_manager.publisher_pc = None
self.connection_manager.subscriber_pc = None

self.connection_manager.connection_state = ConnectionState.MIGRATING

if current_publisher and hasattr(current_publisher, "removeListener"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import asyncio
import contextlib
import uuid
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from dotenv import load_dotenv

from getstream import AsyncStream
from getstream.video import rtc
from getstream.video.rtc.connection_manager import ConnectionManager
from getstream.video.rtc.connection_utils import SfuJoinError, SfuConnectionError
from getstream.video.rtc.connection_utils import (
ConnectionState,
SfuConnectionError,
SfuJoinError,
)
from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2

load_dotenv()


@contextlib.contextmanager
def patched_dependencies():
Expand Down Expand Up @@ -45,8 +56,30 @@ def connection_manager(request):
yield cm


class TestConnectRetry:
"""Tests for connect() retry logic when SFU is full."""
@pytest.fixture
def client():
return AsyncStream(timeout=10.0)


class TestConnectionManager:
@pytest.mark.asyncio
@pytest.mark.integration
async def test_leave_twice_does_not_hang(self, client: AsyncStream):
"""Integration test: join a real call and leave twice without hanging."""
call_id = str(uuid.uuid4())
call = client.video.call("default", call_id)

async with await rtc.join(call, "test-user") as connection:
assert connection.connection_state == ConnectionState.JOINED

await asyncio.sleep(2)

await asyncio.wait_for(connection.leave(), timeout=10.0)
assert connection.connection_state == ConnectionState.LEFT

# Second leave must not hang
await asyncio.wait_for(connection.leave(), timeout=10.0)
assert connection.connection_state == ConnectionState.LEFT

@pytest.mark.asyncio
@pytest.mark.parametrize("connection_manager", [2], indirect=True)
Expand Down
1 change: 1 addition & 0 deletions tests/rtc/test_subscriber_drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def subscriber_pc():
"""Create a SubscriberPeerConnection bypassing heavy parent inits."""
pc = SubscriberPeerConnection.__new__(SubscriberPeerConnection)
pc.connection = Mock()
pc._closed = False
pc._drain_video_frames = True
pc.track_map = {}
pc.video_frame_trackers = {}
Expand Down