feat(elevenlabs): integrate ElevenLabs STT & TTS with fallback support#82
Conversation
- Implement ElevenLabs Scribe STT batch service and real-time streaming WebSocket client. - Implement ElevenLabs TTS batch and stream services using eleven_flash_v2_5. - Update STTWorker and TTSWorker to dispatch ElevenLabs and support primary/fallback providers. - Add active and fallback controls (ACTIVE_STT_PROVIDER, STT_FALLBACK_PROVIDER, etc.) in settings. - Add unit and pipeline integration tests for the ElevenLabs pipeline path. - Resolve mypy, ruff lint, and pytest import paths across modules and tests. Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
📝 WalkthroughWalkthroughThis PR adds comprehensive ElevenLabs Speech-to-Text and Text-to-Speech integration to the audio pipeline, replacing Deepgram as the default provider. It includes batch and streaming modes, full service implementations with circuit breakers and connection management, worker dispatch logic with fallback support, and extensive test coverage across unit and integration layers. ChangesElevenLabs Audio Pipeline Integration
Sequence Diagram(s)sequenceDiagram
participant Worker as Audio Worker
participant Dispatch as Provider Dispatcher
participant Service as ElevenLabs Service
participant CircuitBreaker as Circuit Breaker
participant API as ElevenLabs API
participant Kafka as Kafka Publisher
Worker->>Dispatch: audio chunk / text to synthesize
Dispatch->>Dispatch: select provider & streaming mode
Dispatch->>Service: batch_transcribe() or synthesize()
Service->>CircuitBreaker: execute HTTP request
CircuitBreaker->>API: POST with audio/text + params
API-->>CircuitBreaker: response (audio/transcription)
CircuitBreaker-->>Service: decoded result
Service-->>Dispatch: {audio_bytes, latency} / {text, confidence}
Dispatch->>Kafka: publish AUDIO_SYNTHESIZED or TEXT_ORIGINAL
Kafka-->>Dispatch: ack
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
LGTM |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
tests/test_kafka/test_pipeline.py (1)
325-410: ⚡ Quick winAdd one failure-path test for ElevenLabs fallback dispatch.
These cases only prove the happy path. The new worker contract here is “ElevenLabs first, configured fallback on exception”, so it would be worth adding a test where ElevenLabs raises and the fallback provider publishes the final event exactly once. That’s the highest-risk integration behavior added in this PR.
Also applies to: 413-468
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_kafka/test_pipeline.py` around lines 325 - 410, Add a failing-path test that verifies ElevenLabs is attempted first and, on exception, the configured fallback TTS provider is used exactly once: in a new async test (similar to test_tts_worker_handle_elevenlabs) patch get_elevenlabs_tts_service to raise from its synthesize / synthesize_stream, patch the fallback provider factory (the service your worker will call when ElevenLabs errors), and patch settings to enable the ElevenLabs-first behavior; then assert the ElevenLabs mock was called and raised, assert the fallback service's synthesize was called once, assert mock_producer.send was called once with "audio.synthesized", and that the published audio_data equals the fallback bytes and sample_rate. Ensure you reuse TTSWorker, mock_producer, base_translation_event, and mock_get_redis patches from the existing tests to mirror setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/external_services/elevenlabs_stt/service.py`:
- Around line 76-107: The code always sends files = {"file": ("audio.wav",
wav_data, "audio/wav")} even when encoding is non-PCM; detect whether you
wrapped the bytes into a WAV (i.e., when encoding.lower() in
("linear16","raw","pcm") and wav_data == bytes(header)+audio_bytes) and only
then use "audio.wav"/"audio/wav"; otherwise preserve the original media type and
extension based on the encoding variable (e.g., keep "audio.opus"/"audio/ogg" or
a passed-in content_type/filename) so compressed formats like opus are not
mislabeled as WAV; update the file tuple construction to choose filename and
MIME accordingly using encoding (or a provided content_type) instead of always
"audio.wav"/"audio/wav".
In `@app/external_services/elevenlabs_tts/service.py`:
- Around line 45-47: The methods in this module that accept the parameter
encoding (see the function signature with language: str = "en", encoding: str =
"linear16") must map supported encodings to ElevenLabs output_format values
(e.g. "linear16" -> "pcm_24000"/appropriate PCM format, "mp3" -> "mp3", "wav" ->
"wav") and set that mapped value on the request payload instead of always
sending pcm_24000; for any encoding that cannot be mapped, validate early and
raise/return an explicit error (e.g. ValueError) so callers know it is
unsupported; apply this change to every method that takes encoding (the ones
around the indicated ranges) and ensure the event/response metadata records the
same encoding/output_format used in the actual bytes.
In `@app/services/stt_worker.py`:
- Around line 201-218: The code clears self._audio_buffers[buffer_key] before
calling get_elevenlabs_stt_service().transcribe, which loses buffered chunks if
transcribe() raises; change the flow in the method that builds full_audio (use
buffer_key, self._audio_buffers, and the call to stt_service.transcribe) so you
do not mutate/clear self._audio_buffers[buffer_key] until after transcribe
returns successfully (or catch exceptions and only clear in the success path),
and if transcribe fails ensure you preserve or restore the original buffer so
the fallback in handle() can retry the full buffered utterance.
- Around line 173-184: The except block handling ElevenLabs streaming failures
currently logs and cleans up (logger.error, closing conn via
asyncio.create_task, managing self._background_tasks, and popping
self._streaming_connections[buffer_key]) but then returns, preventing handle()'s
fallback logic from running; after performing the cleanup steps in that except
Exception as e block, re-raise the caught exception (use plain raise to preserve
traceback) so the error propagates back into handle() and triggers the fallback
provider logic.
In `@app/services/tts_worker.py`:
- Around line 274-335: The ElevenLabs streaming handler
_handle_elevenlabs_streaming currently returns silently when synthesize_stream
yields no chunks; change it to treat that as a provider failure by raising an
exception when accumulated_bytes is empty after the async for loop so the normal
retry/fallback logic can run. Specifically, after the loop that fills
accumulated_bytes (from get_elevenlabs_tts_service().synthesize_stream) add a
check if not accumulated_bytes and raise a descriptive exception (e.g.,
RuntimeError or the module's ProviderError) including context like room_id,
sequence_number and target_language; keep existing behavior for the non-empty
path (creating final_payload / final_event and calling self._producer.send)
unchanged.
---
Nitpick comments:
In `@tests/test_kafka/test_pipeline.py`:
- Around line 325-410: Add a failing-path test that verifies ElevenLabs is
attempted first and, on exception, the configured fallback TTS provider is used
exactly once: in a new async test (similar to test_tts_worker_handle_elevenlabs)
patch get_elevenlabs_tts_service to raise from its synthesize /
synthesize_stream, patch the fallback provider factory (the service your worker
will call when ElevenLabs errors), and patch settings to enable the
ElevenLabs-first behavior; then assert the ElevenLabs mock was called and
raised, assert the fallback service's synthesize was called once, assert
mock_producer.send was called once with "audio.synthesized", and that the
published audio_data equals the fallback bytes and sample_rate. Ensure you reuse
TTSWorker, mock_producer, base_translation_event, and mock_get_redis patches
from the existing tests to mirror setup.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 38bf608b-cdb1-4823-b1c5-e0d44fa23b02
📒 Files selected for processing (15)
app/core/config.pyapp/external_services/elevenlabs_stt/__init__.pyapp/external_services/elevenlabs_stt/config.pyapp/external_services/elevenlabs_stt/service.pyapp/external_services/elevenlabs_stt/streaming.pyapp/external_services/elevenlabs_tts/__init__.pyapp/external_services/elevenlabs_tts/config.pyapp/external_services/elevenlabs_tts/service.pyapp/services/stt_worker.pyapp/services/tts_worker.pypyproject.tomltests/external_services/__init__.pytests/external_services/test_elevenlabs_stt.pytests/external_services/test_elevenlabs_tts.pytests/test_kafka/test_pipeline.py
| wav_data = audio_bytes | ||
| if encoding.lower() in ("linear16", "raw", "pcm"): | ||
| # Construct a basic WAV header | ||
| # 44 bytes header for PCM | ||
| num_channels = 1 | ||
| bytes_per_sample = 2 # 16-bit | ||
| byte_rate = sample_rate * num_channels * bytes_per_sample | ||
| block_align = num_channels * bytes_per_sample | ||
| data_size = len(audio_bytes) | ||
| file_size = 36 + data_size | ||
|
|
||
| header = bytearray(44) | ||
| header[0:4] = b"RIFF" | ||
| header[4:8] = file_size.to_bytes(4, "little") | ||
| header[8:12] = b"WAVE" | ||
| header[12:16] = b"fmt " | ||
| header[16:20] = (16).to_bytes(4, "little") # Subchunk1Size (16 for PCM) | ||
| header[20:22] = (1).to_bytes(2, "little") # AudioFormat (1 for PCM) | ||
| header[22:24] = num_channels.to_bytes(2, "little") | ||
| header[24:28] = sample_rate.to_bytes(4, "little") | ||
| header[28:32] = byte_rate.to_bytes(4, "little") | ||
| header[32:34] = block_align.to_bytes(2, "little") | ||
| header[34:36] = (bytes_per_sample * 8).to_bytes( | ||
| 2, "little" | ||
| ) # BitsPerSample | ||
| header[36:40] = b"data" | ||
| header[40:44] = data_size.to_bytes(4, "little") | ||
|
|
||
| wav_data = bytes(header) + audio_bytes | ||
|
|
||
| # Form data for ElevenLabs | ||
| files = {"file": ("audio.wav", wav_data, "audio/wav")} |
There was a problem hiding this comment.
Preserve the original media type for non-PCM uploads.
This only wraps PCM-like inputs in a WAV header, but Line 107 still sends every request as audio.wav / audio/wav. If the pipeline passes a non-PCM encoding like opus, ElevenLabs receives raw compressed bytes mislabeled as WAV, so this branch won't transcribe correctly.
Suggested fix
- wav_data = audio_bytes
+ upload_bytes = audio_bytes
+ filename = "audio.bin"
+ content_type = "application/octet-stream"
if encoding.lower() in ("linear16", "raw", "pcm"):
# Construct a basic WAV header
# 44 bytes header for PCM
num_channels = 1
bytes_per_sample = 2 # 16-bit
@@
- wav_data = bytes(header) + audio_bytes
+ upload_bytes = bytes(header) + audio_bytes
+ filename = "audio.wav"
+ content_type = "audio/wav"
# Form data for ElevenLabs
- files = {"file": ("audio.wav", wav_data, "audio/wav")}
+ files = {"file": (filename, upload_bytes, content_type)}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/elevenlabs_stt/service.py` around lines 76 - 107, The
code always sends files = {"file": ("audio.wav", wav_data, "audio/wav")} even
when encoding is non-PCM; detect whether you wrapped the bytes into a WAV (i.e.,
when encoding.lower() in ("linear16","raw","pcm") and wav_data ==
bytes(header)+audio_bytes) and only then use "audio.wav"/"audio/wav"; otherwise
preserve the original media type and extension based on the encoding variable
(e.g., keep "audio.opus"/"audio/ogg" or a passed-in content_type/filename) so
compressed formats like opus are not mislabeled as WAV; update the file tuple
construction to choose filename and MIME accordingly using encoding (or a
provided content_type) instead of always "audio.wav"/"audio/wav".
| language: str = "en", | ||
| encoding: str = "linear16", # default is linear16/pcm | ||
| ) -> dict: |
There was a problem hiding this comment.
Honor encoding when selecting the ElevenLabs output_format.
Both methods accept encoding, but they always send pcm_24000 (or the static setting override). If the worker requests a different encoding, the event metadata will say one thing while the bytes are still PCM, which will break downstream consumers. Please map supported encodings to ElevenLabs output_format values here, or reject unsupported encodings before returning.
Also applies to: 67-73, 120-122, 138-142
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/external_services/elevenlabs_tts/service.py` around lines 45 - 47, The
methods in this module that accept the parameter encoding (see the function
signature with language: str = "en", encoding: str = "linear16") must map
supported encodings to ElevenLabs output_format values (e.g. "linear16" ->
"pcm_24000"/appropriate PCM format, "mp3" -> "mp3", "wav" -> "wav") and set that
mapped value on the request payload instead of always sending pcm_24000; for any
encoding that cannot be mapped, validate early and raise/return an explicit
error (e.g. ValueError) so callers know it is unsupported; apply this change to
every method that takes encoding (the ones around the indicated ranges) and
ensure the event/response metadata records the same encoding/output_format used
in the actual bytes.
| except Exception as e: | ||
| logger.error( | ||
| "Error in ElevenLabs streaming connection for %s: %s", | ||
| buffer_key, | ||
| e, | ||
| ) | ||
| if conn: | ||
| task = asyncio.create_task(conn.close()) | ||
| self._background_tasks.add(task) | ||
| task.add_done_callback(self._background_tasks.discard) | ||
| self._streaming_connections.pop(buffer_key, None) | ||
|
|
There was a problem hiding this comment.
Re-raise streaming failures after cleanup so fallback can run.
This except block logs and tears down the connection, but then returns normally. The new fallback logic in handle() only engages on raised exceptions, so an ElevenLabs streaming outage currently drops the chunk instead of switching providers.
Suggested fix
except Exception as e:
logger.error(
"Error in ElevenLabs streaming connection for %s: %s",
buffer_key,
e,
@@
if conn:
task = asyncio.create_task(conn.close())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._streaming_connections.pop(buffer_key, None)
+ raise🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/stt_worker.py` around lines 173 - 184, The except block handling
ElevenLabs streaming failures currently logs and cleans up (logger.error,
closing conn via asyncio.create_task, managing self._background_tasks, and
popping self._streaming_connections[buffer_key]) but then returns, preventing
handle()'s fallback logic from running; after performing the cleanup steps in
that except Exception as e block, re-raise the caught exception (use plain raise
to preserve traceback) so the error propagates back into handle() and triggers
the fallback provider logic.
| full_audio = b"".join(self._audio_buffers[buffer_key]) | ||
| self._audio_buffers[buffer_key] = [] | ||
|
|
||
| if not settings.ELEVEN_LABS_API_KEY: | ||
| logger.info("ELEVEN_LABS_API_KEY not set. Mocking ElevenLabs STT response.") | ||
| result: dict[str, Any] = { | ||
| "text": "Hello, this is a simulated ElevenLabs transcription.", | ||
| "detected_language": payload.source_language, | ||
| "confidence": 1.0, | ||
| } | ||
| else: | ||
| await self._handle_batch(payload, buffer_key, audio_bytes) | ||
| stt_service = get_elevenlabs_stt_service() | ||
| result = await stt_service.transcribe( | ||
| full_audio, | ||
| language=payload.source_language, | ||
| sample_rate=payload.sample_rate, | ||
| encoding=payload.encoding.value, | ||
| ) |
There was a problem hiding this comment.
Don't discard the buffered batch before the primary transcription succeeds.
This code empties the 5-chunk buffer before calling ElevenLabs. If transcribe() raises, the fallback path in handle() only has the current audio_bytes, so the earlier buffered audio is lost and the utterance can't be retried intact.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/stt_worker.py` around lines 201 - 218, The code clears
self._audio_buffers[buffer_key] before calling
get_elevenlabs_stt_service().transcribe, which loses buffered chunks if
transcribe() raises; change the flow in the method that builds full_audio (use
buffer_key, self._audio_buffers, and the call to stt_service.transcribe) so you
do not mutate/clear self._audio_buffers[buffer_key] until after transcribe
returns successfully (or catch exceptions and only clear in the success path),
and if transcribe fails ensure you preserve or restore the original buffer so
the fallback in handle() can retry the full buffered utterance.
| async def _handle_elevenlabs_streaming( | ||
| self, | ||
| payload: Any, | ||
| text: str, | ||
| encoding: str, | ||
| pipeline_start: float, | ||
| ) -> None: | ||
| """Handle ElevenLabs HTTP streaming path.""" | ||
| accumulated_bytes = bytearray() | ||
| sample_rate = 24000 | ||
|
|
||
| async for chunk_data in get_elevenlabs_tts_service().synthesize_stream( | ||
| text=text, | ||
| language=payload.target_language, | ||
| encoding=encoding, | ||
| ): | ||
| chunk_bytes = chunk_data["audio_bytes"] | ||
| sample_rate = chunk_data["sample_rate"] | ||
| accumulated_bytes.extend(chunk_bytes) | ||
|
|
||
| chunk_b64 = base64.b64encode(chunk_bytes).decode("ascii") | ||
| synth_payload = SynthesizedAudioPayload( | ||
| room_id=payload.room_id, | ||
| user_id=payload.user_id, | ||
| sequence_number=payload.sequence_number, | ||
| audio_data=chunk_b64, | ||
| target_language=payload.target_language, | ||
| sample_rate=sample_rate, | ||
| encoding=AudioEncoding(encoding), | ||
| ) | ||
| synth_event = SynthesizedAudioEvent(payload=synth_payload) | ||
| try: | ||
| await self._publish_audio_to_redis(synth_event) | ||
| except Exception as redis_err: | ||
| logger.warning("Redis audio egress publish failed: %s", redis_err) | ||
|
|
||
| if accumulated_bytes: | ||
| full_audio_b64 = base64.b64encode(accumulated_bytes).decode("ascii") | ||
| final_payload = SynthesizedAudioPayload( | ||
| room_id=payload.room_id, | ||
| user_id=payload.user_id, | ||
| sequence_number=payload.sequence_number, | ||
| audio_data=full_audio_b64, | ||
| target_language=payload.target_language, | ||
| sample_rate=sample_rate, | ||
| encoding=AudioEncoding(encoding), | ||
| ) | ||
| final_event = SynthesizedAudioEvent(payload=final_payload) | ||
| await self._producer.send( | ||
| AUDIO_SYNTHESIZED, final_event, key=payload.room_id | ||
| ) | ||
|
|
||
| elapsed_ms = (time.monotonic() - pipeline_start) * 1000 | ||
| logger.info( | ||
| "TTS (ElevenLabs Stream Final): seq=%d room=%s lang=%s " | ||
| "provider=elevenlabs audio_size=%d latency=%.1fms", | ||
| payload.sequence_number, | ||
| payload.room_id, | ||
| payload.target_language, | ||
| len(accumulated_bytes), | ||
| elapsed_ms, | ||
| ) |
There was a problem hiding this comment.
Treat an empty ElevenLabs stream as a provider failure.
If synthesize_stream() yields no chunks, this method exits without publishing anything and without throwing, so the translation is silently lost and fallback never runs. Raise when accumulated_bytes is empty so the normal retry/fallback path can handle it.
Possible fix
async for chunk_data in get_elevenlabs_tts_service().synthesize_stream(
text=text,
language=payload.target_language,
encoding=encoding,
):
chunk_bytes = chunk_data["audio_bytes"]
sample_rate = chunk_data["sample_rate"]
accumulated_bytes.extend(chunk_bytes)
@@
except Exception as redis_err:
logger.warning("Redis audio egress publish failed: %s", redis_err)
+ if not accumulated_bytes:
+ raise RuntimeError("ElevenLabs streaming returned no audio")
+
if accumulated_bytes:
full_audio_b64 = base64.b64encode(accumulated_bytes).decode("ascii")
final_payload = SynthesizedAudioPayload(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/tts_worker.py` around lines 274 - 335, The ElevenLabs streaming
handler _handle_elevenlabs_streaming currently returns silently when
synthesize_stream yields no chunks; change it to treat that as a provider
failure by raising an exception when accumulated_bytes is empty after the async
for loop so the normal retry/fallback logic can run. Specifically, after the
loop that fills accumulated_bytes (from
get_elevenlabs_tts_service().synthesize_stream) add a check if not
accumulated_bytes and raise a descriptive exception (e.g., RuntimeError or the
module's ProviderError) including context like room_id, sequence_number and
target_language; keep existing behavior for the non-empty path (creating
final_payload / final_event and calling self._producer.send) unchanged.
Summary by CodeRabbit
New Features
Configuration