feat(tts): integrate Deepgram TTS provider, implement fallback logic,…#81
Conversation
|
Warning Review limit reached
More reviews will be available in 53 minutes and 25 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughThis PR integrates Deepgram Aura-2 as a new TTS provider with automatic fallback support, enhances the WebSocket audio handler with keepalive and heartbeat tasks, and updates tests accordingly. Configuration adds Deepgram API endpoints and fallback settings; a new service layer implements non-streaming and streaming synthesis with circuit-breaker protection; TTSWorker routes provider selection through batch and synthesis methods with try/catch fallback; WebSocket handler maintains monotonic sequence counters and adds periodic background tasks; tests mock Redis pubsub correctly and assert audio chunk publication. ChangesDeepgram TTS Integration with Fallback Routing and WebSocket Improvements
🎯 4 (Complex) | ⏱️ ~50 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
… and update tests - Add Deepgram TTS (Aura-2) service and language/voice mapping - Implement automatic fallback from primary to secondary TTS provider on failure - Add WebSocket keepalive task (20s ping) and heartbeat task (15s) - Prevent sequence counter reset on reconnect to avoid audio packet corruption - Update `test_audio_websocket_ingest` to assert chunk publication Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
a602867 to
4e92c48
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/services/tts_worker.py (1)
135-201:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStreaming path logs incorrect provider during fallback.
When fallback triggers Voice.ai streaming (WS or HTTP), the log statements at lines 198 and 262 use
settings.ACTIVE_TTS_PROVIDERinstead of the actual provider that was dispatched. During fallback from Deepgram to Voice.ai, logs would incorrectly show "deepgram" instead of "voiceai".Consider passing the
providerparameter to_handle_ws_streamingand_handle_http_streamingfor accurate logging.Proposed fix
async def _handle_ws_streaming( self, payload: Any, text: str, encoding: str, pipeline_start: float, + provider: str, ) -> None: """Handle Voice.ai WebSocket multi-context streaming path.""" ... logger.info( "TTS (WS Final): seq=%d room=%s lang=%s " "provider=%s audio_size=%d latency=%.1fms", payload.sequence_number, payload.room_id, payload.target_language, - settings.ACTIVE_TTS_PROVIDER, + provider, len(accumulated_bytes), elapsed_ms, )Apply the same change to
_handle_http_streamingand update the call sites in_dispatch_providerto passprovider.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/services/tts_worker.py` around lines 135 - 201, The WS/HTTP streaming handlers (_handle_ws_streaming and _handle_http_streaming) currently log settings.ACTIVE_TTS_PROVIDER which is wrong during fallback; add a provider parameter to both function signatures, use that provider variable in the logger.info calls (replace settings.ACTIVE_TTS_PROVIDER), and update all call sites (notably in _dispatch_provider) to pass the actual provider string when invoking these handlers so logs reflect the dispatched provider.
🧹 Nitpick comments (3)
app/external_services/deepgram_tts/service.py (1)
105-152: 💤 Low valueStreaming path bypasses circuit breaker protection.
synthesize()usesself._breaker.call()to wrap HTTP requests, butsynthesize_stream()makes requests directly without circuit breaker protection. If the Deepgram streaming endpoint experiences repeated failures, the circuit won't trip, potentially overwhelming the failing service.Consider wrapping the stream initiation in the circuit breaker for consistency with the batch path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/external_services/deepgram_tts/service.py` around lines 105 - 152, synthesize_stream currently calls self.client.stream directly and bypasses the circuit breaker used by synthesize; update synthesize_stream to wrap the stream initiation with the same breaker (self._breaker.call) so the HTTP request and initial response handling are protected—i.e., invoke self._breaker.call to perform the async POST to settings.DEEPGRAM_TTS_API_URL (including headers, params, and json payload), then enter the returned response context and proceed with response.raise_for_status() and async for chunk in response.aiter_bytes(...) as before; ensure you reference the existing self._breaker.call API and keep sample_rate/params logic intact.app/modules/meeting/ws_router.py (2)
308-308: ⚡ Quick winRemove redundant import.
jsonis already imported at module level (line 8). Use the existing import instead of re-importing locally.♻️ Proposed fix
async def heartbeat_task() -> None: """Sends pipeline heartbeat JSON messages every 15s. This allows the frontend to distinguish between 'no one is speaking' and 'pipeline is broken' — the watchdog can be cleared on heartbeat receipt. """ - import json as _json - try: while True: await asyncio.sleep(15) try: await websocket.send_text( - _json.dumps( + json.dumps( { "type": "pipeline_heartbeat", "ts": int(time.time() * 1000), } ) )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/modules/meeting/ws_router.py` at line 308, The module-level import already provides json, so remove the redundant local import statement "import json as _json" in ws_router.py and update any nearby uses of the alias _json to use the existing module-level name "json" (ensure references inside functions or methods in this file such as the websocket routing handlers use json instead of _json).
289-299: 💤 Low valueDocstring is slightly misleading.
send_bytes(b"")sends an empty binary data frame, not a WebSocket ping control frame (opcode 0x9). This still achieves the keepalive goal, but the docstring could be more accurate.📝 Suggested docstring clarification
async def keepalive_task() -> None: - """Sends WebSocket ping frames every 20s to keep the connection alive.""" + """Sends empty binary frames every 20s to keep the connection alive."""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/modules/meeting/ws_router.py` around lines 289 - 299, The docstring for keepalive_task is misleading: it currently says it "Sends WebSocket ping frames every 20s" but the implementation uses websocket.send_bytes(b""), which sends an empty binary data frame, not a ping control frame; update the docstring of keepalive_task to accurately state it sends an empty binary frame as a keepalive (or alternately change the implementation to use websocket.ping() if a true ping control frame is desired), and mention the 20s interval and cancellation behavior so future readers match behavior to code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/meeting/test_ws_router.py`:
- Around line 131-137: The test file containing the assertion
mock_audio_ingest.publish_audio_chunk.assert_called_once_with(...) needs code
formatting to satisfy CI; run your formatter (e.g., `uv run ruff format` and/or
Black) on that test file, re-stage the formatted file, and commit the changes so
the pipeline passes.
---
Outside diff comments:
In `@app/services/tts_worker.py`:
- Around line 135-201: The WS/HTTP streaming handlers (_handle_ws_streaming and
_handle_http_streaming) currently log settings.ACTIVE_TTS_PROVIDER which is
wrong during fallback; add a provider parameter to both function signatures, use
that provider variable in the logger.info calls (replace
settings.ACTIVE_TTS_PROVIDER), and update all call sites (notably in
_dispatch_provider) to pass the actual provider string when invoking these
handlers so logs reflect the dispatched provider.
---
Nitpick comments:
In `@app/external_services/deepgram_tts/service.py`:
- Around line 105-152: synthesize_stream currently calls self.client.stream
directly and bypasses the circuit breaker used by synthesize; update
synthesize_stream to wrap the stream initiation with the same breaker
(self._breaker.call) so the HTTP request and initial response handling are
protected—i.e., invoke self._breaker.call to perform the async POST to
settings.DEEPGRAM_TTS_API_URL (including headers, params, and json payload),
then enter the returned response context and proceed with
response.raise_for_status() and async for chunk in response.aiter_bytes(...) as
before; ensure you reference the existing self._breaker.call API and keep
sample_rate/params logic intact.
In `@app/modules/meeting/ws_router.py`:
- Line 308: The module-level import already provides json, so remove the
redundant local import statement "import json as _json" in ws_router.py and
update any nearby uses of the alias _json to use the existing module-level name
"json" (ensure references inside functions or methods in this file such as the
websocket routing handlers use json instead of _json).
- Around line 289-299: The docstring for keepalive_task is misleading: it
currently says it "Sends WebSocket ping frames every 20s" but the implementation
uses websocket.send_bytes(b""), which sends an empty binary data frame, not a
ping control frame; update the docstring of keepalive_task to accurately state
it sends an empty binary frame as a keepalive (or alternately change the
implementation to use websocket.ping() if a true ping control frame is desired),
and mention the 20s interval and cancellation behavior so future readers match
behavior to code.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 94e02fc8-ed7a-4ae1-8f1d-2800325056b6
📒 Files selected for processing (7)
app/core/config.pyapp/external_services/deepgram_tts/__init__.pyapp/external_services/deepgram_tts/config.pyapp/external_services/deepgram_tts/service.pyapp/modules/meeting/ws_router.pyapp/services/tts_worker.pytests/meeting/test_ws_router.py
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
… and update tests
test_audio_websocket_ingestto assert chunk publicationSummary by CodeRabbit
New Features
Chores