Skip to content

feat(elevenlabs): integrate ElevenLabs STT & TTS with fallback support#82

Merged
aniebiet-afia merged 1 commit into
mainfrom
feat/eleven_labs_tts
May 30, 2026
Merged

feat(elevenlabs): integrate ElevenLabs STT & TTS with fallback support#82
aniebiet-afia merged 1 commit into
mainfrom
feat/eleven_labs_tts

Conversation

@aniebietafia
Copy link
Copy Markdown
Contributor

@aniebietafia aniebietafia commented May 30, 2026

  • Implement ElevenLabs Scribe STT batch service and real-time streaming WebSocket client.
  • Implement ElevenLabs TTS batch and stream services using eleven_flash_v2_5.
  • Update STTWorker and TTSWorker to dispatch ElevenLabs and support primary/fallback providers.
  • Add active and fallback controls (ACTIVE_STT_PROVIDER, STT_FALLBACK_PROVIDER, etc.) in settings.
  • Add unit and pipeline integration tests for the ElevenLabs pipeline path.
  • Resolve mypy, ruff lint, and pytest import paths across modules and tests.

Summary by CodeRabbit

New Features

  • Added ElevenLabs as a supported audio provider for speech-to-text and text-to-speech services
  • Enabled real-time streaming capabilities for both speech-to-text and text-to-speech

Configuration

  • Default audio provider changed from Deepgram to ElevenLabs for speech-to-text and text-to-speech

Review Change Stack

- Implement ElevenLabs Scribe STT batch service and real-time streaming WebSocket client.
- Implement ElevenLabs TTS batch and stream services using eleven_flash_v2_5.
- Update STTWorker and TTSWorker to dispatch ElevenLabs and support primary/fallback providers.
- Add active and fallback controls (ACTIVE_STT_PROVIDER, STT_FALLBACK_PROVIDER, etc.) in settings.
- Add unit and pipeline integration tests for the ElevenLabs pipeline path.
- Resolve mypy, ruff lint, and pytest import paths across modules and tests.

Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 30, 2026

📝 Walkthrough

Walkthrough

This PR adds comprehensive ElevenLabs Speech-to-Text and Text-to-Speech integration to the audio pipeline, replacing Deepgram as the default provider. It includes batch and streaming modes, full service implementations with circuit breakers and connection management, worker dispatch logic with fallback support, and extensive test coverage across unit and integration layers.

Changes

ElevenLabs Audio Pipeline Integration

Layer / File(s) Summary
Configuration and Package Setup
app/core/config.py, app/external_services/elevenlabs_stt/__init__.py, app/external_services/elevenlabs_tts/__init__.py
Settings now include ELEVEN_LABS_API_KEY and full ElevenLabs STT/TTS configuration fields (API URLs, WebSocket URLs, model names, streaming toggles). Default audio providers switched from "deepgram" to "elevenlabs" for both TTS and STT. Package initializers export service factories and streaming clients.
ElevenLabs STT Batch Service
app/external_services/elevenlabs_stt/config.py, app/external_services/elevenlabs_stt/service.py
Batch transcription service with lazy httpx.AsyncClient, AsyncCircuitBreaker protection, and WAV header construction for raw PCM audio. Config helpers generate API headers and normalize language codes from locale strings. Transcription result includes text, per-word confidence averaging, detected language, and latency measurement.
ElevenLabs STT Streaming Client
app/external_services/elevenlabs_stt/streaming.py
WebSocket real-time speech-to-text client with configurable session establishment, base64-encoded audio chunk transmission, transcript callback spawning, exponential-backoff reconnection (max 5 attempts), and graceful shutdown with task cleanup.
ElevenLabs TTS Service
app/external_services/elevenlabs_tts/config.py, app/external_services/elevenlabs_tts/service.py
Batch and streaming synthesis methods with language-code mapping, lazy HTTP client, circuit breaker, and sample-rate derivation from output format. Streaming uses client.stream() and yields audio chunks; batch performs single POST and returns audio bytes plus metadata. Config helpers construct API headers and normalize language inputs.
STT Worker Integration
app/services/stt_worker.py
Worker dispatch selects active STT provider from config; streaming vs batch mode determined by per-provider settings and API-key availability. Batch mode buffers audio chunks until threshold, then calls ElevenLabsSTTService.transcribe() and publishes TranscriptionEvent to Kafka, broadcasts speaker change, and publishes captions to Redis. Streaming mode maintains per-user WebSocket connections and forwards transcript callbacks. Fallback provider retried on exception when enabled.
TTS Worker Integration
app/services/tts_worker.py
Worker dispatch routes ElevenLabs to streaming handler when ELEVENLABS_TTS_USE_STREAMING enabled. Streaming handler iteratively publishes Redis chunks and accumulated bytes to Kafka. Non-streaming uses ElevenLabsTTSService.synthesize() for single Kafka event.
Test Coverage
tests/external_services/test_elevenlabs_stt.py, tests/external_services/test_elevenlabs_tts.py, tests/test_kafka/test_pipeline.py, pyproject.toml
Unit tests validate config helper language mappings, batch/streaming service HTTP interactions with circuit breaker behavior. Integration tests verify STT and TTS worker dispatch, Kafka event emission, and audio data correctness. Pytest pythonpath config added for test imports.

Sequence Diagram(s)

sequenceDiagram
  participant Worker as Audio Worker
  participant Dispatch as Provider Dispatcher
  participant Service as ElevenLabs Service
  participant CircuitBreaker as Circuit Breaker
  participant API as ElevenLabs API
  participant Kafka as Kafka Publisher
  
  Worker->>Dispatch: audio chunk / text to synthesize
  Dispatch->>Dispatch: select provider & streaming mode
  Dispatch->>Service: batch_transcribe() or synthesize()
  Service->>CircuitBreaker: execute HTTP request
  CircuitBreaker->>API: POST with audio/text + params
  API-->>CircuitBreaker: response (audio/transcription)
  CircuitBreaker-->>Service: decoded result
  Service-->>Dispatch: {audio_bytes, latency} / {text, confidence}
  Dispatch->>Kafka: publish AUDIO_SYNTHESIZED or TEXT_ORIGINAL
  Kafka-->>Dispatch: ack
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • Brints/spoken-api#81: Modifies TTS provider dispatch and fallback behavior in tts_worker.py, overlapping with ElevenLabs routing logic changes in this PR.
  • Brints/spoken-api#77: Extends TTS worker streaming handler logic for VoiceAI WebSocket integration, similar pattern to ElevenLabs streaming dispatch added here.

Suggested labels

backend, tests, config, size/XL

Poem

🐰 Whispers into the ether now,
ElevenLabs takes the stage,
Batch and streaming, both in vow,
Audio flows page by page.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 54.17% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: integrating ElevenLabs STT and TTS services with fallback support, which aligns with the PR objectives and the substantial changes across config, services, workers, and tests.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/eleven_labs_tts

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@aniebiet-afia
Copy link
Copy Markdown
Contributor

LGTM

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
tests/test_kafka/test_pipeline.py (1)

325-410: ⚡ Quick win

Add one failure-path test for ElevenLabs fallback dispatch.

These cases only prove the happy path. The new worker contract here is “ElevenLabs first, configured fallback on exception”, so it would be worth adding a test where ElevenLabs raises and the fallback provider publishes the final event exactly once. That’s the highest-risk integration behavior added in this PR.

Also applies to: 413-468

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_kafka/test_pipeline.py` around lines 325 - 410, Add a failing-path
test that verifies ElevenLabs is attempted first and, on exception, the
configured fallback TTS provider is used exactly once: in a new async test
(similar to test_tts_worker_handle_elevenlabs) patch get_elevenlabs_tts_service
to raise from its synthesize / synthesize_stream, patch the fallback provider
factory (the service your worker will call when ElevenLabs errors), and patch
settings to enable the ElevenLabs-first behavior; then assert the ElevenLabs
mock was called and raised, assert the fallback service's synthesize was called
once, assert mock_producer.send was called once with "audio.synthesized", and
that the published audio_data equals the fallback bytes and sample_rate. Ensure
you reuse TTSWorker, mock_producer, base_translation_event, and mock_get_redis
patches from the existing tests to mirror setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/external_services/elevenlabs_stt/service.py`:
- Around line 76-107: The code always sends files = {"file": ("audio.wav",
wav_data, "audio/wav")} even when encoding is non-PCM; detect whether you
wrapped the bytes into a WAV (i.e., when encoding.lower() in
("linear16","raw","pcm") and wav_data == bytes(header)+audio_bytes) and only
then use "audio.wav"/"audio/wav"; otherwise preserve the original media type and
extension based on the encoding variable (e.g., keep "audio.opus"/"audio/ogg" or
a passed-in content_type/filename) so compressed formats like opus are not
mislabeled as WAV; update the file tuple construction to choose filename and
MIME accordingly using encoding (or a provided content_type) instead of always
"audio.wav"/"audio/wav".

In `@app/external_services/elevenlabs_tts/service.py`:
- Around line 45-47: The methods in this module that accept the parameter
encoding (see the function signature with language: str = "en", encoding: str =
"linear16") must map supported encodings to ElevenLabs output_format values
(e.g. "linear16" -> "pcm_24000"/appropriate PCM format, "mp3" -> "mp3", "wav" ->
"wav") and set that mapped value on the request payload instead of always
sending pcm_24000; for any encoding that cannot be mapped, validate early and
raise/return an explicit error (e.g. ValueError) so callers know it is
unsupported; apply this change to every method that takes encoding (the ones
around the indicated ranges) and ensure the event/response metadata records the
same encoding/output_format used in the actual bytes.

In `@app/services/stt_worker.py`:
- Around line 201-218: The code clears self._audio_buffers[buffer_key] before
calling get_elevenlabs_stt_service().transcribe, which loses buffered chunks if
transcribe() raises; change the flow in the method that builds full_audio (use
buffer_key, self._audio_buffers, and the call to stt_service.transcribe) so you
do not mutate/clear self._audio_buffers[buffer_key] until after transcribe
returns successfully (or catch exceptions and only clear in the success path),
and if transcribe fails ensure you preserve or restore the original buffer so
the fallback in handle() can retry the full buffered utterance.
- Around line 173-184: The except block handling ElevenLabs streaming failures
currently logs and cleans up (logger.error, closing conn via
asyncio.create_task, managing self._background_tasks, and popping
self._streaming_connections[buffer_key]) but then returns, preventing handle()'s
fallback logic from running; after performing the cleanup steps in that except
Exception as e block, re-raise the caught exception (use plain raise to preserve
traceback) so the error propagates back into handle() and triggers the fallback
provider logic.

In `@app/services/tts_worker.py`:
- Around line 274-335: The ElevenLabs streaming handler
_handle_elevenlabs_streaming currently returns silently when synthesize_stream
yields no chunks; change it to treat that as a provider failure by raising an
exception when accumulated_bytes is empty after the async for loop so the normal
retry/fallback logic can run. Specifically, after the loop that fills
accumulated_bytes (from get_elevenlabs_tts_service().synthesize_stream) add a
check if not accumulated_bytes and raise a descriptive exception (e.g.,
RuntimeError or the module's ProviderError) including context like room_id,
sequence_number and target_language; keep existing behavior for the non-empty
path (creating final_payload / final_event and calling self._producer.send)
unchanged.

---

Nitpick comments:
In `@tests/test_kafka/test_pipeline.py`:
- Around line 325-410: Add a failing-path test that verifies ElevenLabs is
attempted first and, on exception, the configured fallback TTS provider is used
exactly once: in a new async test (similar to test_tts_worker_handle_elevenlabs)
patch get_elevenlabs_tts_service to raise from its synthesize /
synthesize_stream, patch the fallback provider factory (the service your worker
will call when ElevenLabs errors), and patch settings to enable the
ElevenLabs-first behavior; then assert the ElevenLabs mock was called and
raised, assert the fallback service's synthesize was called once, assert
mock_producer.send was called once with "audio.synthesized", and that the
published audio_data equals the fallback bytes and sample_rate. Ensure you reuse
TTSWorker, mock_producer, base_translation_event, and mock_get_redis patches
from the existing tests to mirror setup.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 38bf608b-cdb1-4823-b1c5-e0d44fa23b02

📥 Commits

Reviewing files that changed from the base of the PR and between 36e95e1 and 0c1a6ce.

📒 Files selected for processing (15)
  • app/core/config.py
  • app/external_services/elevenlabs_stt/__init__.py
  • app/external_services/elevenlabs_stt/config.py
  • app/external_services/elevenlabs_stt/service.py
  • app/external_services/elevenlabs_stt/streaming.py
  • app/external_services/elevenlabs_tts/__init__.py
  • app/external_services/elevenlabs_tts/config.py
  • app/external_services/elevenlabs_tts/service.py
  • app/services/stt_worker.py
  • app/services/tts_worker.py
  • pyproject.toml
  • tests/external_services/__init__.py
  • tests/external_services/test_elevenlabs_stt.py
  • tests/external_services/test_elevenlabs_tts.py
  • tests/test_kafka/test_pipeline.py

Comment on lines +76 to +107
wav_data = audio_bytes
if encoding.lower() in ("linear16", "raw", "pcm"):
# Construct a basic WAV header
# 44 bytes header for PCM
num_channels = 1
bytes_per_sample = 2 # 16-bit
byte_rate = sample_rate * num_channels * bytes_per_sample
block_align = num_channels * bytes_per_sample
data_size = len(audio_bytes)
file_size = 36 + data_size

header = bytearray(44)
header[0:4] = b"RIFF"
header[4:8] = file_size.to_bytes(4, "little")
header[8:12] = b"WAVE"
header[12:16] = b"fmt "
header[16:20] = (16).to_bytes(4, "little") # Subchunk1Size (16 for PCM)
header[20:22] = (1).to_bytes(2, "little") # AudioFormat (1 for PCM)
header[22:24] = num_channels.to_bytes(2, "little")
header[24:28] = sample_rate.to_bytes(4, "little")
header[28:32] = byte_rate.to_bytes(4, "little")
header[32:34] = block_align.to_bytes(2, "little")
header[34:36] = (bytes_per_sample * 8).to_bytes(
2, "little"
) # BitsPerSample
header[36:40] = b"data"
header[40:44] = data_size.to_bytes(4, "little")

wav_data = bytes(header) + audio_bytes

# Form data for ElevenLabs
files = {"file": ("audio.wav", wav_data, "audio/wav")}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve the original media type for non-PCM uploads.

This only wraps PCM-like inputs in a WAV header, but Line 107 still sends every request as audio.wav / audio/wav. If the pipeline passes a non-PCM encoding like opus, ElevenLabs receives raw compressed bytes mislabeled as WAV, so this branch won't transcribe correctly.

Suggested fix
-        wav_data = audio_bytes
+        upload_bytes = audio_bytes
+        filename = "audio.bin"
+        content_type = "application/octet-stream"
         if encoding.lower() in ("linear16", "raw", "pcm"):
             # Construct a basic WAV header
             # 44 bytes header for PCM
             num_channels = 1
             bytes_per_sample = 2  # 16-bit
@@
-            wav_data = bytes(header) + audio_bytes
+            upload_bytes = bytes(header) + audio_bytes
+            filename = "audio.wav"
+            content_type = "audio/wav"

         # Form data for ElevenLabs
-        files = {"file": ("audio.wav", wav_data, "audio/wav")}
+        files = {"file": (filename, upload_bytes, content_type)}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/external_services/elevenlabs_stt/service.py` around lines 76 - 107, The
code always sends files = {"file": ("audio.wav", wav_data, "audio/wav")} even
when encoding is non-PCM; detect whether you wrapped the bytes into a WAV (i.e.,
when encoding.lower() in ("linear16","raw","pcm") and wav_data ==
bytes(header)+audio_bytes) and only then use "audio.wav"/"audio/wav"; otherwise
preserve the original media type and extension based on the encoding variable
(e.g., keep "audio.opus"/"audio/ogg" or a passed-in content_type/filename) so
compressed formats like opus are not mislabeled as WAV; update the file tuple
construction to choose filename and MIME accordingly using encoding (or a
provided content_type) instead of always "audio.wav"/"audio/wav".

Comment on lines +45 to +47
language: str = "en",
encoding: str = "linear16", # default is linear16/pcm
) -> dict:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Honor encoding when selecting the ElevenLabs output_format.

Both methods accept encoding, but they always send pcm_24000 (or the static setting override). If the worker requests a different encoding, the event metadata will say one thing while the bytes are still PCM, which will break downstream consumers. Please map supported encodings to ElevenLabs output_format values here, or reject unsupported encodings before returning.

Also applies to: 67-73, 120-122, 138-142

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/external_services/elevenlabs_tts/service.py` around lines 45 - 47, The
methods in this module that accept the parameter encoding (see the function
signature with language: str = "en", encoding: str = "linear16") must map
supported encodings to ElevenLabs output_format values (e.g. "linear16" ->
"pcm_24000"/appropriate PCM format, "mp3" -> "mp3", "wav" -> "wav") and set that
mapped value on the request payload instead of always sending pcm_24000; for any
encoding that cannot be mapped, validate early and raise/return an explicit
error (e.g. ValueError) so callers know it is unsupported; apply this change to
every method that takes encoding (the ones around the indicated ranges) and
ensure the event/response metadata records the same encoding/output_format used
in the actual bytes.

Comment on lines +173 to +184
except Exception as e:
logger.error(
"Error in ElevenLabs streaming connection for %s: %s",
buffer_key,
e,
)
if conn:
task = asyncio.create_task(conn.close())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._streaming_connections.pop(buffer_key, None)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Re-raise streaming failures after cleanup so fallback can run.

This except block logs and tears down the connection, but then returns normally. The new fallback logic in handle() only engages on raised exceptions, so an ElevenLabs streaming outage currently drops the chunk instead of switching providers.

Suggested fix
         except Exception as e:
             logger.error(
                 "Error in ElevenLabs streaming connection for %s: %s",
                 buffer_key,
                 e,
@@
             if conn:
                 task = asyncio.create_task(conn.close())
                 self._background_tasks.add(task)
                 task.add_done_callback(self._background_tasks.discard)
             self._streaming_connections.pop(buffer_key, None)
+            raise
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/services/stt_worker.py` around lines 173 - 184, The except block handling
ElevenLabs streaming failures currently logs and cleans up (logger.error,
closing conn via asyncio.create_task, managing self._background_tasks, and
popping self._streaming_connections[buffer_key]) but then returns, preventing
handle()'s fallback logic from running; after performing the cleanup steps in
that except Exception as e block, re-raise the caught exception (use plain raise
to preserve traceback) so the error propagates back into handle() and triggers
the fallback provider logic.

Comment on lines +201 to +218
full_audio = b"".join(self._audio_buffers[buffer_key])
self._audio_buffers[buffer_key] = []

if not settings.ELEVEN_LABS_API_KEY:
logger.info("ELEVEN_LABS_API_KEY not set. Mocking ElevenLabs STT response.")
result: dict[str, Any] = {
"text": "Hello, this is a simulated ElevenLabs transcription.",
"detected_language": payload.source_language,
"confidence": 1.0,
}
else:
await self._handle_batch(payload, buffer_key, audio_bytes)
stt_service = get_elevenlabs_stt_service()
result = await stt_service.transcribe(
full_audio,
language=payload.source_language,
sample_rate=payload.sample_rate,
encoding=payload.encoding.value,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Don't discard the buffered batch before the primary transcription succeeds.

This code empties the 5-chunk buffer before calling ElevenLabs. If transcribe() raises, the fallback path in handle() only has the current audio_bytes, so the earlier buffered audio is lost and the utterance can't be retried intact.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/services/stt_worker.py` around lines 201 - 218, The code clears
self._audio_buffers[buffer_key] before calling
get_elevenlabs_stt_service().transcribe, which loses buffered chunks if
transcribe() raises; change the flow in the method that builds full_audio (use
buffer_key, self._audio_buffers, and the call to stt_service.transcribe) so you
do not mutate/clear self._audio_buffers[buffer_key] until after transcribe
returns successfully (or catch exceptions and only clear in the success path),
and if transcribe fails ensure you preserve or restore the original buffer so
the fallback in handle() can retry the full buffered utterance.

Comment on lines +274 to +335
async def _handle_elevenlabs_streaming(
self,
payload: Any,
text: str,
encoding: str,
pipeline_start: float,
) -> None:
"""Handle ElevenLabs HTTP streaming path."""
accumulated_bytes = bytearray()
sample_rate = 24000

async for chunk_data in get_elevenlabs_tts_service().synthesize_stream(
text=text,
language=payload.target_language,
encoding=encoding,
):
chunk_bytes = chunk_data["audio_bytes"]
sample_rate = chunk_data["sample_rate"]
accumulated_bytes.extend(chunk_bytes)

chunk_b64 = base64.b64encode(chunk_bytes).decode("ascii")
synth_payload = SynthesizedAudioPayload(
room_id=payload.room_id,
user_id=payload.user_id,
sequence_number=payload.sequence_number,
audio_data=chunk_b64,
target_language=payload.target_language,
sample_rate=sample_rate,
encoding=AudioEncoding(encoding),
)
synth_event = SynthesizedAudioEvent(payload=synth_payload)
try:
await self._publish_audio_to_redis(synth_event)
except Exception as redis_err:
logger.warning("Redis audio egress publish failed: %s", redis_err)

if accumulated_bytes:
full_audio_b64 = base64.b64encode(accumulated_bytes).decode("ascii")
final_payload = SynthesizedAudioPayload(
room_id=payload.room_id,
user_id=payload.user_id,
sequence_number=payload.sequence_number,
audio_data=full_audio_b64,
target_language=payload.target_language,
sample_rate=sample_rate,
encoding=AudioEncoding(encoding),
)
final_event = SynthesizedAudioEvent(payload=final_payload)
await self._producer.send(
AUDIO_SYNTHESIZED, final_event, key=payload.room_id
)

elapsed_ms = (time.monotonic() - pipeline_start) * 1000
logger.info(
"TTS (ElevenLabs Stream Final): seq=%d room=%s lang=%s "
"provider=elevenlabs audio_size=%d latency=%.1fms",
payload.sequence_number,
payload.room_id,
payload.target_language,
len(accumulated_bytes),
elapsed_ms,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Treat an empty ElevenLabs stream as a provider failure.

If synthesize_stream() yields no chunks, this method exits without publishing anything and without throwing, so the translation is silently lost and fallback never runs. Raise when accumulated_bytes is empty so the normal retry/fallback path can handle it.

Possible fix
         async for chunk_data in get_elevenlabs_tts_service().synthesize_stream(
             text=text,
             language=payload.target_language,
             encoding=encoding,
         ):
             chunk_bytes = chunk_data["audio_bytes"]
             sample_rate = chunk_data["sample_rate"]
             accumulated_bytes.extend(chunk_bytes)
@@
             except Exception as redis_err:
                 logger.warning("Redis audio egress publish failed: %s", redis_err)
 
+        if not accumulated_bytes:
+            raise RuntimeError("ElevenLabs streaming returned no audio")
+
         if accumulated_bytes:
             full_audio_b64 = base64.b64encode(accumulated_bytes).decode("ascii")
             final_payload = SynthesizedAudioPayload(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/services/tts_worker.py` around lines 274 - 335, The ElevenLabs streaming
handler _handle_elevenlabs_streaming currently returns silently when
synthesize_stream yields no chunks; change it to treat that as a provider
failure by raising an exception when accumulated_bytes is empty after the async
for loop so the normal retry/fallback logic can run. Specifically, after the
loop that fills accumulated_bytes (from
get_elevenlabs_tts_service().synthesize_stream) add a check if not
accumulated_bytes and raise a descriptive exception (e.g., RuntimeError or the
module's ProviderError) including context like room_id, sequence_number and
target_language; keep existing behavior for the non-empty path (creating
final_payload / final_event and calling self._producer.send) unchanged.

Copy link
Copy Markdown
Contributor

@aniebiet-afia aniebiet-afia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work

@aniebiet-afia aniebiet-afia merged commit fc27b0f into main May 30, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants