Skip to content

Exit early if trying to close the peer connections twice#234

Merged
dangusev merged 3 commits intomainfrom
fix/connectionmanager-leave-called-twice-hangs
Apr 7, 2026
Merged

Exit early if trying to close the peer connections twice#234
dangusev merged 3 commits intomainfrom
fix/connectionmanager-leave-called-twice-hangs

Conversation

@dangusev
Copy link
Copy Markdown
Contributor

@dangusev dangusev commented Apr 7, 2026

Previously, the repeated close() calls were hanging indefinitely for PublisherPeerConnection and SubscriberPeerConnection.

Now, we set _closed=True guard when* the connection is closed for the first time and exit early on the repeated call.

Summary by CodeRabbit

  • Bug Fixes

    • Made peer-connection close() idempotent and improved shutdown to reliably cancel background tasks, drains, and related resources to prevent hangs.
    • Ensured reconnection clears previous peer connections before establishing new ones.
  • Tests

    • Added integration test verifying calling leave() twice does not hang.
    • Adjusted subscriber test setup to initialize closed-state flag.
  • Chores

    • Updated test selection configuration in Makefile.

Previously, the repeated close() calls were hanging indefinitely for PublisherPeerConnection and SubscriberPeerConnection.

Now, we set `_closed=True` guard after the connection is closed for the first time and exit early on the repeated call.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 7, 2026

📝 Walkthrough

Walkthrough

Added idempotent close() implementations to Publisher/Subscriber peer connections with explicit subscriber resource cleanup; reconnection flow clears stale peer-connection refs before creating new ones; tests updated to ensure double-leave doesn't hang and to initialize subscriber _closed flag.

Changes

Cohort / File(s) Summary
Peer Connection Close Idempotence
getstream/video/rtc/pc.py
Added self._closed in constructors and overridden async close() in PublisherPeerConnection and SubscriberPeerConnection to be idempotent. SubscriberPeerConnection.close() additionally cancels/awaits video drain tasks and MediaBlackhole stops, cancels background tasks, and clears track maps before calling super().close().
Reconnection Flow Cleanup
getstream/video/rtc/reconnection.py
_reconnect_migrate now clears connection_manager.publisher_pc and connection_manager.subscriber_pc before setting MIGRATING and invoking _connect_internal, ensuring new peer connections are created rather than reused.
RTC Connection Tests
tests/rtc/test_connection_manager.py
Replaced previous test class with TestConnectionManager; added client fixture and integration test test_leave_twice_does_not_hang which joins a session and calls leave() twice (each with timeout), asserting state becomes LEFT.
Subscriber Drain Test Setup
tests/rtc/test_subscriber_drain.py
Test fixture now explicitly sets subscriber_pc._closed = False during setup to ensure known closed-state before exercising drain behavior.
Makefile Test Paths
Makefile
Removed tests/test_connection_manager.py from VIDEO_PATHS, altering which tests are included in the test-video target.

Sequence Diagram(s)

sequenceDiagram
  participant CM as ConnectionManager
  participant Reconnect as _reconnect_migrate
  participant Connector as _connect_internal
  participant OldCleanup as _cleanup_connections

  CM->>Reconnect: start migrate
  Reconnect->>CM: clear CM.publisher_pc / CM.subscriber_pc
  Reconnect->>CM: set state = MIGRATING
  Reconnect->>Connector: invoke _connect_internal (creates new PCs)
  Connector-->>CM: new publisher_pc / subscriber_pc set
  Reconnect->>OldCleanup: leave old instances for cleanup
  OldCleanup->>OldCleanup: cleanup old peer connections asynchronously
Loading
sequenceDiagram
  participant SubPC as SubscriberPeerConnection
  participant Drain as VideoDrainTask
  participant Proxy as DrainProxy
  participant Blackhole as MediaBlackhole

  SubPC->>SubPC: if _closed True -> return
  SubPC->>Drain: cancel each drain task
  SubPC->>Proxy: stop each drain proxy
  SubPC->>Blackhole: await blackhole.stop()
  SubPC->>SubPC: cancel background tasks, clear maps
  SubPC->>SubPC: set _closed = True
  SubPC->>Super: await super().close()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐇 I tidy wires with nimble paws,
I cancel drains and patch the flaws,
Two leaves called — I don't misbehave,
Old peers hop off; new ones I pave,
I close once, then nap in the cave.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 57.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: adding idempotent close() methods with early exit logic to prevent hanging on repeated close() calls to peer connections.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/connectionmanager-leave-called-twice-hangs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
getstream/video/rtc/pc.py (1)

258-281: Consider wrapping cleanup in try/except to ensure super().close() is always called.

If await blackhole.stop() raises an exception, super().close() and self._closed = True are never executed. This could leave the underlying RTCPeerConnection unclosed, and subsequent close() calls will retry the same failing cleanup.

🛡️ Proposed defensive cleanup
     async def close(self):
         # Using self._closed guard here
         # to avoid closing RTCPeerConnectionTwice by accident (it freezes on second time)
         if self._closed:
             return
 
         # Clean up video drains
         for blackhole, drain_task, drain_proxy in list(self._video_drains.values()):
-            drain_task.cancel()
-            drain_proxy.stop()
-            await blackhole.stop()
+            try:
+                drain_task.cancel()
+                drain_proxy.stop()
+                await blackhole.stop()
+            except Exception as e:
+                logger.warning(f"Error cleaning up video drain: {e}")
         self._video_drains.clear()
 
         # Cancel background tasks
         for task in list(self._background_tasks):
             task.cancel()
         self._background_tasks.clear()
 
         # Clear track maps
         self.track_map.clear()
         self.video_frame_trackers.clear()
 
         await super().close()
         self._closed = True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@getstream/video/rtc/pc.py` around lines 258 - 281, The close() method must
ensure super().close() and self._closed = True run even if cleanup steps raise;
wrap the cleanup of self._video_drains and cancellation of
self._background_tasks in a try/except/finally (or try/finally) so exceptions
from await blackhole.stop() (or drain_task.cancel()/drain_proxy.stop()) are
caught/logged and do not prevent calling await super().close() and setting
self._closed; locate the close method and apply the change around the blocks
using self._video_drains, self._background_tasks, and the call to
super().close() so cleanup errors are handled but the underlying
RTCPeerConnection is always closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@getstream/video/rtc/pc.py`:
- Around line 258-281: The close() method must ensure super().close() and
self._closed = True run even if cleanup steps raise; wrap the cleanup of
self._video_drains and cancellation of self._background_tasks in a
try/except/finally (or try/finally) so exceptions from await blackhole.stop()
(or drain_task.cancel()/drain_proxy.stop()) are caught/logged and do not prevent
calling await super().close() and setting self._closed; locate the close method
and apply the change around the blocks using self._video_drains,
self._background_tasks, and the call to super().close() so cleanup errors are
handled but the underlying RTCPeerConnection is always closed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 975e6331-f4ae-4672-9bde-699c7aa2e190

📥 Commits

Reviewing files that changed from the base of the PR and between f3972a6 and 9a05344.

📒 Files selected for processing (4)
  • getstream/video/rtc/pc.py
  • getstream/video/rtc/reconnection.py
  • tests/rtc/test_connection_manager.py
  • tests/rtc/test_subscriber_drain.py

@dangusev dangusev merged commit bc76d35 into main Apr 7, 2026
39 of 41 checks passed
@dangusev dangusev deleted the fix/connectionmanager-leave-called-twice-hangs branch April 7, 2026 16:04
@aliev aliev restored the fix/connectionmanager-leave-called-twice-hangs branch April 7, 2026 21:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants