diff --git a/python/pyproject.toml b/python/pyproject.toml index ad05a6f2c14c..a6dc7b645101 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -71,6 +71,9 @@ autogen = [ aws = [ "boto3>=1.36.4,<1.41.0", ] +camb = [ + "camb-sdk >= 0.0.2" +] azure = [ "azure-ai-inference >= 1.0.0b6", "azure-core-tracing-opentelemetry >= 1.0.0b11", diff --git a/python/samples/concepts/audio/camb_all_features.py b/python/samples/concepts/audio/camb_all_features.py new file mode 100644 index 000000000000..bb9515673a58 --- /dev/null +++ b/python/samples/concepts/audio/camb_all_features.py @@ -0,0 +1,269 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""All-encompassing example of camb.ai integration with Semantic Kernel. + +Demonstrates all 8 camb.ai features: + 1. Text-to-Speech (TTS) via CambTextToAudio service + 2. List Voices via CambPlugin + 3. Translation via CambPlugin + 4. Transcription (STT) via CambAudioToText service + 5. Translated TTS via CambPlugin + 6. Voice Cloning via CambPlugin + 7. Text-to-Sound via CambPlugin + 8. Audio Separation via CambPlugin + +Requires: + - CAMB_API_KEY in a .env file at the repo root or in environment + - An audio sample clip (defaults to sabrina-original-clip.mp3) + - pip install semantic-kernel[camb] + +Usage: + cd python + source .venv/bin/activate + python samples/concepts/audio/camb_all_features.py +""" + +import asyncio +import base64 +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +from dotenv import load_dotenv + +# Load .env from repo root +_repo_root = Path(__file__).resolve().parents[4] +load_dotenv(_repo_root / ".env") + +from semantic_kernel.connectors.ai.camb import ( + CambAudioToText, + CambAudioToTextExecutionSettings, + CambPlugin, + CambTextToAudio, + CambTextToAudioExecutionSettings, +) +from semantic_kernel.contents.audio_content import AudioContent + +API_KEY = os.environ.get("CAMB_API_KEY") +if not API_KEY: + raise RuntimeError("Set CAMB_API_KEY in .env or environment") + +_DEFAULT_SAMPLE = _repo_root.parent / "yt-dlp" / "voices" / "original" / "sabrina-original-clip.mp3" +AUDIO_SAMPLE = os.environ.get("CAMB_AUDIO_SAMPLE", str(_DEFAULT_SAMPLE)) +if not Path(AUDIO_SAMPLE).exists(): + raise RuntimeError(f"Audio sample not found at {AUDIO_SAMPLE}") + + +def play(path: str) -> None: + """Play audio with afplay (macOS).""" + if sys.platform == "darwin": + print(f" Playing: {path}") + subprocess.run(["afplay", path], check=False) + else: + print(f" Audio saved at: {path}") + + +def save_and_play(data: bytes, suffix: str = ".wav") -> str: + """Save audio bytes to a temp file and play.""" + path = tempfile.mktemp(suffix=suffix) + with open(path, "wb") as f: + f.write(data) + play(path) + return path + + +def save_and_play_b64(audio_b64: str, suffix: str = ".wav") -> str: + """Decode base64 audio, save to temp file, and play.""" + data = base64.b64decode(audio_b64) + return save_and_play(data, suffix) + + +# --------------------------------------------------------------------------- +# 1. Text-to-Speech via CambTextToAudio service +# --------------------------------------------------------------------------- +async def example_tts() -> None: + """1. Text-to-Speech: convert text to audio using the SK service interface.""" + tts = CambTextToAudio(api_key=API_KEY) + settings = CambTextToAudioExecutionSettings( + voice_id=147320, + language="en-us", + output_format="wav", + ) + + results = await tts.get_audio_contents( + "Hello from Semantic Kernel! This is a test of the camb dot A I text to speech integration.", + settings=settings, + ) + + audio = results[0] + print(f" Audio: {len(audio.data)} bytes, mime_type={audio.mime_type}, model={audio.ai_model_id}") + save_and_play(audio.data) + + +# --------------------------------------------------------------------------- +# 2. List Voices via CambPlugin +# --------------------------------------------------------------------------- +async def example_list_voices() -> None: + """2. List Voices: show available voices from camb.ai.""" + import json + + plugin = CambPlugin(api_key=API_KEY) + result = await plugin.list_voices() + voices = json.loads(result) + print(f" Found {len(voices)} voices") + for v in voices[:5]: + print(f" - {v['name']} (id={v['id']}, gender={v['gender']})") + + +# --------------------------------------------------------------------------- +# 3. Translation via CambPlugin +# --------------------------------------------------------------------------- +async def example_translation() -> None: + """3. Translation: translate text between languages.""" + plugin = CambPlugin(api_key=API_KEY) + + # English (1) -> Spanish (2) + result = await plugin.translate( + text="Hello! How are you today? Semantic Kernel is great.", + source_language=1, + target_language=2, + ) + print(f" English -> Spanish: {result}") + + +# --------------------------------------------------------------------------- +# 4. Transcription (STT) via CambAudioToText service +# --------------------------------------------------------------------------- +async def example_transcription() -> None: + """4. Transcription: transcribe audio to text using the SK service interface.""" + stt = CambAudioToText(api_key=API_KEY) + settings = CambAudioToTextExecutionSettings(language=1) # 1 = English + audio_content = AudioContent(uri=AUDIO_SAMPLE) + + print(f" Transcribing {Path(AUDIO_SAMPLE).name}... (this may take a minute)") + results = await stt.get_text_contents(audio_content, settings=settings) + + text = results[0].text + print(f" Transcription: {text[:200]}{'...' if len(text) > 200 else ''}") + + +# --------------------------------------------------------------------------- +# 5. Translated TTS via CambPlugin +# --------------------------------------------------------------------------- +async def example_translated_tts() -> None: + """5. Translated TTS: translate and speak in one step.""" + import json + + plugin = CambPlugin(api_key=API_KEY) + + result = await plugin.translated_tts( + text="Hello, how are you doing today?", + source_language=1, # English + target_language=76, # French + voice_id=147320, + ) + + parsed = json.loads(result) + print(f" Run ID: {parsed['run_id']}") + print(f" Content type: {parsed['content_type']}") + save_and_play_b64(parsed["audio_base64"]) + + +# --------------------------------------------------------------------------- +# 6. Voice Cloning via CambPlugin +# --------------------------------------------------------------------------- +async def example_clone_voice() -> None: + """6. Voice Clone: clone a voice from audio sample and speak with it.""" + import json + + plugin = CambPlugin(api_key=API_KEY) + + # Clone the voice + clone_result = await plugin.clone_voice( + voice_name="sk_test_sabrina", + audio_file_path=AUDIO_SAMPLE, + gender=2, # female + ) + parsed = json.loads(clone_result) + voice_id = parsed["voice_id"] + print(f" Cloned voice ID: {voice_id}, name: {parsed['voice_name']}") + + # Speak with the cloned voice using the TTS service + tts = CambTextToAudio(api_key=API_KEY) + settings = CambTextToAudioExecutionSettings( + voice_id=voice_id, + language="en-us", + output_format="wav", + ) + + results = await tts.get_audio_contents( + "Hello! This is a cloned voice speaking through Semantic Kernel and camb dot A I.", + settings=settings, + ) + print(" Speaking with cloned voice...") + save_and_play(results[0].data) + + +# --------------------------------------------------------------------------- +# 7. Text-to-Sound via CambPlugin +# --------------------------------------------------------------------------- +async def example_text_to_sound() -> None: + """7. Text-to-Sound: generate sound effects from a text description.""" + import json + + plugin = CambPlugin(api_key=API_KEY) + + result = await plugin.text_to_sound(prompt="gentle rain on a rooftop with distant thunder") + + parsed = json.loads(result) + print(f" Run ID: {parsed['run_id']}") + print(f" Audio length: {len(parsed['audio_base64'])} chars (base64)") + save_and_play_b64(parsed["audio_base64"]) + + +# --------------------------------------------------------------------------- +# 8. Audio Separation via CambPlugin +# --------------------------------------------------------------------------- +async def example_audio_separation() -> None: + """8. Audio Separation: separate vocals from background in audio.""" + import json + + plugin = CambPlugin(api_key=API_KEY) + + print(f" Separating {Path(AUDIO_SAMPLE).name}... (this may take a minute)") + result = await plugin.separate_audio(audio_file_path=AUDIO_SAMPLE) + + parsed = json.loads(result) + print(f" Run ID: {parsed['run_id']}") + print(f" Vocals URL: {parsed['vocals_url']}") + print(f" Background URL: {parsed['background_url']}") + + +# --------------------------------------------------------------------------- +# Main runner +# --------------------------------------------------------------------------- +async def main() -> None: + examples = [ + example_tts, + example_list_voices, + example_translation, + example_transcription, + example_translated_tts, + example_clone_voice, + example_text_to_sound, + example_audio_separation, + ] + + for fn in examples: + print(f"\n--- {fn.__doc__} ---") + try: + await fn() + print(" PASSED") + except Exception as e: + print(f" FAILED: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/semantic_kernel/connectors/ai/camb/__init__.py b/python/semantic_kernel/connectors/ai/camb/__init__.py new file mode 100644 index 000000000000..fb543a79506a --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.connectors.ai.camb.camb_plugin import CambPlugin +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import ( + CambAudioToTextExecutionSettings, + CambTextToAudioExecutionSettings, +) +from semantic_kernel.connectors.ai.camb.services.camb_audio_to_text import CambAudioToText +from semantic_kernel.connectors.ai.camb.services.camb_text_to_audio import CambTextToAudio + +__all__ = [ + "CambAudioToText", + "CambAudioToTextExecutionSettings", + "CambPlugin", + "CambTextToAudio", + "CambTextToAudioExecutionSettings", +] diff --git a/python/semantic_kernel/connectors/ai/camb/camb_plugin.py b/python/semantic_kernel/connectors/ai/camb/camb_plugin.py new file mode 100644 index 000000000000..f5d9d7290944 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/camb_plugin.py @@ -0,0 +1,312 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import base64 +import json +import logging +import os +from typing import Annotated, Any + +from semantic_kernel.exceptions import FunctionExecutionException +from semantic_kernel.functions.kernel_function_decorator import kernel_function +from semantic_kernel.kernel_pydantic import KernelBaseModel + +logger: logging.Logger = logging.getLogger(__name__) + +_POLL_INTERVAL_SECONDS = 2.0 +_MAX_POLL_ATTEMPTS = 120 + + +class CambPlugin(KernelBaseModel): + """A plugin that provides camb.ai audio and translation functionality. + + Exposes six kernel functions for features that don't have existing + Semantic Kernel base classes: translate, translated_tts, clone_voice, + list_voices, text_to_sound, and separate_audio. + + Usage: + kernel.add_plugin(CambPlugin(api_key="your-key"), "camb") + """ + + async_client: Any = None + + def __init__( + self, + api_key: str | None = None, + async_client: Any | None = None, + ) -> None: + """Initialize the CambPlugin. + + Args: + api_key: The camb.ai API key. If not provided, reads from CAMB_API_KEY env var. + async_client: An optional pre-configured AsyncCambAI client. + """ + if async_client is None: + resolved_key = api_key or os.environ.get("CAMB_API_KEY") + if not resolved_key: + raise FunctionExecutionException( + "A camb.ai API key is required. Provide api_key or set CAMB_API_KEY env var." + ) + try: + from camb.client import AsyncCambAI + except ImportError: + raise FunctionExecutionException( + "The camb package is required. Install it with: pip install semantic-kernel[camb]" + ) + async_client = AsyncCambAI(api_key=resolved_key) + + super().__init__(async_client=async_client) + + async def _poll_task_status(self, get_status_fn: Any, task_id: str) -> str: + """Poll an async task until completion. + + Args: + get_status_fn: An async callable that takes task_id and returns a status object. + task_id: The task ID to poll. + + Returns: + The run_id of the completed task. + + Raises: + FunctionExecutionException: If the task fails or times out. + """ + for _ in range(_MAX_POLL_ATTEMPTS): + status = await get_status_fn(task_id) + if hasattr(status, "status") and status.status in {"SUCCESS", "completed"}: + return status.run_id + if hasattr(status, "status") and status.status in {"FAILED", "failed", "ERROR", "error"}: + raise FunctionExecutionException(f"Camb.ai task failed: {status}") + await asyncio.sleep(_POLL_INTERVAL_SECONDS) + + raise FunctionExecutionException("Camb.ai task timed out.") + + @kernel_function(description="Translate text between 140+ languages using camb.ai", name="translate") + async def translate( + self, + text: Annotated[str, "The text to translate."], + source_language: Annotated[int, "Source language ID (e.g. 1=English, 2=Spanish)."], + target_language: Annotated[int, "Target language ID (e.g. 1=English, 2=Spanish)."], + formality: Annotated[int | None, "Formality level (optional)."] = None, + ) -> str: + """Translate text between languages using camb.ai. + + Args: + text: The text to translate. + source_language: Source language ID (e.g. 1=English, 2=Spanish). + target_language: Target language ID. + formality: Optional formality level. + + Returns: + The translated text. + """ + kwargs: dict[str, Any] = { + "text": text, + "source_language": source_language, + "target_language": target_language, + } + if formality is not None: + kwargs["formality"] = formality + + try: + result = await self.async_client.translation.translation_stream(**kwargs) + return str(result) + except Exception as e: + # The SDK has a known bug where successful 200 responses may raise ApiError + if hasattr(e, "status_code") and e.status_code == 200 and hasattr(e, "body") and e.body: + return str(e.body) + raise FunctionExecutionException(f"Camb.ai translation failed: {e}") from e + + @kernel_function(description="Translate text and generate speech in one operation using camb.ai", name="translated_tts") + async def translated_tts( + self, + text: Annotated[str, "The text to translate and speak."], + source_language: Annotated[int, "Source language ID."], + target_language: Annotated[int, "Target language ID."], + voice_id: Annotated[int, "The voice ID to use for speech synthesis."], + ) -> str: + """Translate text and generate speech in one operation. + + Args: + text: The text to translate and speak. + source_language: Source language ID. + target_language: Target language ID. + voice_id: The voice ID to use. + + Returns: + JSON string with base64-encoded audio and format info. + """ + import httpx + + task_result = await self.async_client.translated_tts.create_translated_tts( + text=text, + source_language=source_language, + target_language=target_language, + voice_id=voice_id, + ) + + run_id = await self._poll_task_status( + self.async_client.translated_tts.get_translated_tts_task_status, + task_result.task_id, + ) + + # Fetch audio via HTTP endpoint + api_key = "" + if hasattr(self.async_client, "_client_wrapper"): + api_key = getattr(self.async_client._client_wrapper, "api_key", "") + elif hasattr(self.async_client, "_api_key"): + api_key = self.async_client._api_key + async with httpx.AsyncClient() as http_client: + response = await http_client.get( + f"https://client.camb.ai/apis/tts-result/{run_id}", + headers={"x-api-key": api_key}, + ) + response.raise_for_status() + audio_data = response.content + + content_type = response.headers.get("content-type", "audio/wav") + audio_b64 = base64.b64encode(audio_data).decode("utf-8") + + return json.dumps({"audio_base64": audio_b64, "content_type": content_type, "run_id": run_id}) + + @kernel_function(description="Create a custom voice clone from an audio sample using camb.ai", name="clone_voice") + async def clone_voice( + self, + voice_name: Annotated[str, "Name for the cloned voice."], + audio_file_path: Annotated[str, "Path to the audio file to clone from."], + gender: Annotated[int, "Gender of the voice (1=male, 2=female)."], + description: Annotated[str | None, "Optional description of the voice."] = None, + age: Annotated[int | None, "Optional age of the voice."] = None, + ) -> str: + """Create a custom voice clone from an audio sample. + + Args: + voice_name: Name for the cloned voice. + audio_file_path: Path to the audio file. + gender: Gender (1=male, 2=female). + description: Optional description. + age: Optional age. + + Returns: + JSON string with voice_id, voice_name, and status. + """ + kwargs: dict[str, Any] = { + "voice_name": voice_name, + "gender": gender, + } + if description is not None: + kwargs["description"] = description + if age is not None: + kwargs["age"] = age + + with open(audio_file_path, "rb") as f: + kwargs["file"] = f + result = await self.async_client.voice_cloning.create_custom_voice(**kwargs) + + return json.dumps({ + "voice_id": getattr(result, "voice_id", None) or getattr(result, "id", None), + "voice_name": getattr(result, "voice_name", None) or getattr(result, "name", None), + "status": getattr(result, "status", "created"), + }) + + @kernel_function(description="List available voices from camb.ai", name="list_voices") + async def list_voices(self) -> str: + """List all available voices. + + Returns: + JSON array of voice objects with id, name, gender, age, and language. + """ + result = await self.async_client.voice_cloning.list_voices() + + voices = [] + for voice in result: + if isinstance(voice, dict): + voices.append({ + "id": voice.get("id"), + "name": voice.get("voice_name"), + "gender": voice.get("gender"), + "age": voice.get("age"), + "language": voice.get("language"), + }) + else: + voices.append({ + "id": getattr(voice, "id", None), + "name": getattr(voice, "voice_name", None) or getattr(voice, "name", None), + "gender": getattr(voice, "gender", None), + "age": getattr(voice, "age", None), + "language": getattr(voice, "language", None), + }) + + return json.dumps(voices) + + @kernel_function( + description="Generate sounds or music from a text description using camb.ai", name="text_to_sound" + ) + async def text_to_sound( + self, + prompt: Annotated[str, "Text description of the sound or music to generate."], + duration: Annotated[float | None, "Duration in seconds (optional)."] = None, + audio_type: Annotated[str | None, "Type of audio: 'music' or 'sound' (optional)."] = None, + ) -> str: + """Generate sounds or music from a text description. + + Args: + prompt: Description of the sound/music to generate. + duration: Optional duration in seconds. + audio_type: Optional type ("music" or "sound"). + + Returns: + JSON string with base64-encoded audio. + """ + kwargs: dict[str, Any] = {"prompt": prompt} + if duration is not None: + kwargs["duration"] = duration + if audio_type is not None: + kwargs["audio_type"] = audio_type + + task_result = await self.async_client.text_to_audio.create_text_to_audio(**kwargs) + + run_id = await self._poll_task_status( + self.async_client.text_to_audio.get_text_to_audio_status, + task_result.task_id, + ) + + # Collect streaming audio result + audio_chunks: list[bytes] = [] + async for chunk in self.async_client.text_to_audio.get_text_to_audio_result(run_id): + audio_chunks.append(chunk) + + audio_data = b"".join(audio_chunks) + audio_b64 = base64.b64encode(audio_data).decode("utf-8") + + return json.dumps({"audio_base64": audio_b64, "run_id": run_id}) + + @kernel_function( + description="Separate audio into vocals and background tracks using camb.ai", name="separate_audio" + ) + async def separate_audio( + self, + audio_file_path: Annotated[str, "Path to the audio file to separate."], + ) -> str: + """Separate audio into vocals and background tracks. + + Args: + audio_file_path: Path to the audio file. + + Returns: + JSON string with vocals_url and background_url. + """ + with open(audio_file_path, "rb") as f: + task_result = await self.async_client.audio_separation.create_audio_separation(media_file=f) + + run_id = await self._poll_task_status( + self.async_client.audio_separation.get_audio_separation_status, + task_result.task_id, + ) + + result = await self.async_client.audio_separation.get_audio_separation_run_info(run_id) + + return json.dumps({ + "vocals_url": getattr(result, "vocals_url", None), + "background_url": getattr(result, "background_url", None), + "run_id": run_id, + }) diff --git a/python/semantic_kernel/connectors/ai/camb/camb_prompt_execution_settings.py b/python/semantic_kernel/connectors/ai/camb/camb_prompt_execution_settings.py new file mode 100644 index 000000000000..82bdf0ea4dd2 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/camb_prompt_execution_settings.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import Literal + +from pydantic import Field + +from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings + + +class CambTextToAudioExecutionSettings(PromptExecutionSettings): + """Request settings for camb.ai text-to-speech services.""" + + voice_id: int | None = None + language: str | None = None + output_format: Literal["pcm_s16le", "wav", "flac", "mp3", "ogg", "mulaw"] | None = None + user_instructions: str | None = Field(default=None, max_length=1000) + + +class CambAudioToTextExecutionSettings(PromptExecutionSettings): + """Request settings for camb.ai audio-to-text (transcription) services.""" + + language: int | None = None diff --git a/python/semantic_kernel/connectors/ai/camb/services/__init__.py b/python/semantic_kernel/connectors/ai/camb/services/__init__.py new file mode 100644 index 000000000000..2a50eae89411 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/services/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/python/semantic_kernel/connectors/ai/camb/services/camb_audio_to_text.py b/python/semantic_kernel/connectors/ai/camb/services/camb_audio_to_text.py new file mode 100644 index 000000000000..3d733b9d76a4 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/services/camb_audio_to_text.py @@ -0,0 +1,178 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +import sys +from typing import Any + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from pydantic import ValidationError + +from semantic_kernel.connectors.ai.audio_to_text_client_base import AudioToTextClientBase +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import CambAudioToTextExecutionSettings +from semantic_kernel.connectors.ai.camb.settings.camb_settings import CambSettings +from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings +from semantic_kernel.contents.audio_content import AudioContent +from semantic_kernel.contents.text_content import TextContent +from semantic_kernel.exceptions.service_exceptions import ( + ServiceInitializationError, + ServiceInvalidRequestError, + ServiceInvalidResponseError, +) + +logger: logging.Logger = logging.getLogger(__name__) + +_POLL_INTERVAL_SECONDS = 2.0 +_MAX_POLL_ATTEMPTS = 120 + + +class CambAudioToText(AudioToTextClientBase): + """Camb.ai audio-to-text (transcription) service.""" + + async_client: Any = None + + def __init__( + self, + api_key: str | None = None, + ai_model_id: str | None = None, + service_id: str | None = None, + async_client: Any | None = None, + env_file_path: str | None = None, + env_file_encoding: str | None = None, + ) -> None: + """Initialize the CambAudioToText service. + + Args: + api_key: The camb.ai API key. If not provided, reads from CAMB_API_KEY env var. + ai_model_id: The model identifier. Defaults to "camb-transcription". + service_id: The service ID. Defaults to ai_model_id. + async_client: An optional pre-configured AsyncCambAI client. + env_file_path: Path to a .env file for settings. + env_file_encoding: Encoding of the .env file. + """ + try: + camb_settings = CambSettings( + api_key=api_key, + env_file_path=env_file_path, + env_file_encoding=env_file_encoding, + ) + except ValidationError as ex: + raise ServiceInitializationError("Failed to create camb.ai settings.", ex) from ex + + resolved_model_id = ai_model_id or "camb-transcription" + + if async_client is None: + try: + from camb.client import AsyncCambAI + except ImportError: + raise ServiceInitializationError( + "The camb package is required. Install it with: pip install semantic-kernel[camb]" + ) + async_client = AsyncCambAI(api_key=camb_settings.api_key.get_secret_value()) + + super().__init__( + service_id=service_id or resolved_model_id, + ai_model_id=resolved_model_id, + async_client=async_client, + ) + + @override + async def get_text_contents( + self, + audio_content: AudioContent, + settings: PromptExecutionSettings | None = None, + **kwargs: Any, + ) -> list[TextContent]: + """Transcribe audio to text using camb.ai. + + Args: + audio_content: The audio content to transcribe. Must have a uri (file path) or data (bytes). + settings: Optional execution settings (language, etc.). + **kwargs: Additional keyword arguments. + + Returns: + A list containing a single TextContent with the transcription. + """ + if not settings: + settings = CambAudioToTextExecutionSettings() + elif not isinstance(settings, CambAudioToTextExecutionSettings): + settings = self.get_prompt_execution_settings_from_settings(settings) + + assert isinstance(settings, CambAudioToTextExecutionSettings) # nosec + + # Build kwargs for create_transcription + create_kwargs: dict[str, Any] = {} + + # language is required by the API + create_kwargs["language"] = settings.language if settings.language is not None else 1 + + # Get audio file — pass as tuple with filename for proper content-type detection + if audio_content.uri and isinstance(audio_content.uri, str): + import os + + filename = os.path.basename(audio_content.uri) + with open(audio_content.uri, "rb") as f: + audio_bytes = f.read() + create_kwargs["media_file"] = (filename, audio_bytes) + elif audio_content.data: + audio_bytes = audio_content.data if isinstance(audio_content.data, bytes) else bytes(audio_content.data) + create_kwargs["media_file"] = ("audio.wav", audio_bytes) + else: + raise ServiceInvalidRequestError("Audio content must have a uri (file path) or data (bytes).") + + # Create transcription task + task_result = await self.async_client.transcription.create_transcription(**create_kwargs) + task_id = task_result.task_id + + # Poll for completion + run_id = await self._poll_task_status(task_id) + + # Fetch transcription result + result = await self.async_client.transcription.get_transcription_result(run_id) + + # TranscriptionResult has a .transcript list of Transcript objects, each with .text + if hasattr(result, "transcript") and isinstance(result.transcript, list): + full_text = " ".join(seg.text for seg in result.transcript if hasattr(seg, "text")) + elif hasattr(result, "text"): + full_text = result.text + else: + full_text = str(result) + + return [ + TextContent( + ai_model_id=self.ai_model_id, + text=full_text, + inner_content=result, + ) + ] + + async def _poll_task_status(self, task_id: str) -> str: + """Poll the transcription task until completion. + + Args: + task_id: The task ID to poll. + + Returns: + The run_id of the completed task. + + Raises: + ServiceInvalidResponseError: If polling times out or the task fails. + """ + for _ in range(_MAX_POLL_ATTEMPTS): + status = await self.async_client.transcription.get_transcription_task_status(task_id) + if hasattr(status, "status") and status.status in {"SUCCESS", "completed"}: + return status.run_id + if hasattr(status, "status") and status.status in {"FAILED", "failed", "ERROR", "error"}: + raise ServiceInvalidResponseError(f"Camb.ai transcription task failed: {status}") + await asyncio.sleep(_POLL_INTERVAL_SECONDS) + + raise ServiceInvalidResponseError("Camb.ai transcription task timed out.") + + @override + def get_prompt_execution_settings_class(self) -> type[PromptExecutionSettings]: + """Get the request settings class.""" + return CambAudioToTextExecutionSettings diff --git a/python/semantic_kernel/connectors/ai/camb/services/camb_text_to_audio.py b/python/semantic_kernel/connectors/ai/camb/services/camb_text_to_audio.py new file mode 100644 index 000000000000..aa27685bc577 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/services/camb_text_to_audio.py @@ -0,0 +1,150 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +import sys +from typing import Any + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from pydantic import ValidationError + +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import CambTextToAudioExecutionSettings +from semantic_kernel.connectors.ai.camb.settings.camb_settings import CambSettings +from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings +from semantic_kernel.connectors.ai.text_to_audio_client_base import TextToAudioClientBase +from semantic_kernel.contents.audio_content import AudioContent +from semantic_kernel.exceptions.service_exceptions import ServiceInitializationError + +logger: logging.Logger = logging.getLogger(__name__) + +# Mime type mapping for audio formats +_FORMAT_MIME_TYPES: dict[str, str] = { + "wav": "audio/wav", + "mp3": "audio/mpeg", + "flac": "audio/flac", + "ogg": "audio/ogg", + "pcm_s16le": "audio/pcm", + "mulaw": "audio/basic", +} + + +class CambTextToAudio(TextToAudioClientBase): + """Camb.ai text-to-speech service using the MARS TTS models.""" + + async_client: Any = None + + def __init__( + self, + api_key: str | None = None, + ai_model_id: str | None = None, + service_id: str | None = None, + async_client: Any | None = None, + env_file_path: str | None = None, + env_file_encoding: str | None = None, + ) -> None: + """Initialize the CambTextToAudio service. + + Args: + api_key: The camb.ai API key. If not provided, reads from CAMB_API_KEY env var. + ai_model_id: The TTS model ID (e.g. "mars-flash", "mars-pro"). Defaults to "mars-flash". + service_id: The service ID. Defaults to ai_model_id. + async_client: An optional pre-configured AsyncCambAI client. + env_file_path: Path to a .env file for settings. + env_file_encoding: Encoding of the .env file. + """ + try: + camb_settings = CambSettings( + api_key=api_key, + text_to_audio_model_id=ai_model_id, + env_file_path=env_file_path, + env_file_encoding=env_file_encoding, + ) + except ValidationError as ex: + raise ServiceInitializationError("Failed to create camb.ai settings.", ex) from ex + + resolved_model_id = ai_model_id or camb_settings.text_to_audio_model_id or "mars-flash" + + if async_client is None: + try: + from camb.client import AsyncCambAI + except ImportError: + raise ServiceInitializationError( + "The camb package is required. Install it with: pip install semantic-kernel[camb]" + ) + async_client = AsyncCambAI(api_key=camb_settings.api_key.get_secret_value()) + + super().__init__( + service_id=service_id or resolved_model_id, + ai_model_id=resolved_model_id, + async_client=async_client, + ) + + @override + async def get_audio_contents( + self, + text: str, + settings: PromptExecutionSettings | None = None, + **kwargs: Any, + ) -> list[AudioContent]: + """Generate audio from text using camb.ai TTS. + + Args: + text: The text to synthesize into speech. + settings: Optional execution settings (voice_id, language, etc.). + **kwargs: Additional keyword arguments. + + Returns: + A list containing a single AudioContent with the generated audio. + """ + if not settings: + settings = CambTextToAudioExecutionSettings() + elif not isinstance(settings, CambTextToAudioExecutionSettings): + settings = self.get_prompt_execution_settings_from_settings(settings) + + assert isinstance(settings, CambTextToAudioExecutionSettings) # nosec + + tts_kwargs: dict[str, Any] = { + "text": text, + "speech_model": self.ai_model_id, + } + + if settings.voice_id is not None: + tts_kwargs["voice_id"] = settings.voice_id + if settings.language is not None: + tts_kwargs["language"] = settings.language + + output_format = settings.output_format or "wav" + + try: + from camb import StreamTtsOutputConfiguration + + tts_kwargs["output_configuration"] = StreamTtsOutputConfiguration(format=output_format) + except ImportError: + tts_kwargs["output_configuration"] = {"format": output_format} + + if settings.user_instructions is not None: + tts_kwargs["user_instructions"] = settings.user_instructions + + audio_chunks: list[bytes] = [] + async for chunk in self.async_client.text_to_speech.tts(**tts_kwargs): + audio_chunks.append(chunk) + + audio_data = b"".join(audio_chunks) + mime_type = _FORMAT_MIME_TYPES.get(output_format, "audio/wav") + + return [ + AudioContent( + ai_model_id=self.ai_model_id, + data=audio_data, + data_format="base64", + mime_type=mime_type, + ) + ] + + @override + def get_prompt_execution_settings_class(self) -> type[PromptExecutionSettings]: + """Get the request settings class.""" + return CambTextToAudioExecutionSettings diff --git a/python/semantic_kernel/connectors/ai/camb/settings/__init__.py b/python/semantic_kernel/connectors/ai/camb/settings/__init__.py new file mode 100644 index 000000000000..2a50eae89411 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/settings/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/python/semantic_kernel/connectors/ai/camb/settings/camb_settings.py b/python/semantic_kernel/connectors/ai/camb/settings/camb_settings.py new file mode 100644 index 000000000000..5511a05c43f9 --- /dev/null +++ b/python/semantic_kernel/connectors/ai/camb/settings/camb_settings.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import ClassVar + +from pydantic import SecretStr + +from semantic_kernel.kernel_pydantic import KernelBaseSettings + + +class CambSettings(KernelBaseSettings): + """Camb.ai settings. + + The settings are first loaded from environment variables with + the prefix 'CAMB_'. + If the environment variables are not found, the settings can + be loaded from a .env file with the encoding 'utf-8'. + If the settings are not found in the .env file, the settings + are ignored; however, validation will fail alerting that the + settings are missing. + + Required settings for prefix 'CAMB_' are: + - api_key: SecretStr - The camb.ai API key. (Env var CAMB_API_KEY) + + Optional settings for prefix 'CAMB_' are: + - text_to_audio_model_id: str - The TTS model ID. (Env var CAMB_TEXT_TO_AUDIO_MODEL_ID) + """ + + env_prefix: ClassVar[str] = "CAMB_" + + api_key: SecretStr + text_to_audio_model_id: str | None = None diff --git a/python/tests/unit/connectors/ai/camb/__init__.py b/python/tests/unit/connectors/ai/camb/__init__.py new file mode 100644 index 000000000000..2a50eae89411 --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/python/tests/unit/connectors/ai/camb/conftest.py b/python/tests/unit/connectors/ai/camb/conftest.py new file mode 100644 index 000000000000..3f89846a4f2a --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/conftest.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft. All rights reserved. + +import pytest + + +@pytest.fixture() +def camb_unit_test_env(monkeypatch, exclude_list): + """Fixture to set environment variables for CambSettings.""" + if exclude_list is None: + exclude_list = [] + + env_vars = { + "CAMB_API_KEY": "test_api_key", + "CAMB_TEXT_TO_AUDIO_MODEL_ID": "mars-flash", + } + + for key, value in env_vars.items(): + if key not in exclude_list: + monkeypatch.setenv(key, value) + else: + monkeypatch.delenv(key, raising=False) + + return env_vars diff --git a/python/tests/unit/connectors/ai/camb/services/__init__.py b/python/tests/unit/connectors/ai/camb/services/__init__.py new file mode 100644 index 000000000000..2a50eae89411 --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/services/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/python/tests/unit/connectors/ai/camb/services/test_camb_audio_to_text.py b/python/tests/unit/connectors/ai/camb/services/test_camb_audio_to_text.py new file mode 100644 index 000000000000..ad6b2f2902fb --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/services/test_camb_audio_to_text.py @@ -0,0 +1,135 @@ +# Copyright (c) Microsoft. All rights reserved. + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import CambAudioToTextExecutionSettings +from semantic_kernel.connectors.ai.camb.services.camb_audio_to_text import CambAudioToText +from semantic_kernel.contents.audio_content import AudioContent +from semantic_kernel.exceptions.service_exceptions import ( + ServiceInitializationError, + ServiceInvalidRequestError, + ServiceInvalidResponseError, +) + + +def test_init(camb_unit_test_env): + mock_client = MagicMock() + stt = CambAudioToText(async_client=mock_client) + assert stt.ai_model_id == "camb-transcription" + assert stt.async_client is mock_client + + +def test_init_custom_model(camb_unit_test_env): + mock_client = MagicMock() + stt = CambAudioToText(ai_model_id="custom-model", async_client=mock_client) + assert stt.ai_model_id == "custom-model" + + +def test_init_custom_service_id(camb_unit_test_env): + mock_client = MagicMock() + stt = CambAudioToText(service_id="my-stt-service", async_client=mock_client) + assert stt.service_id == "my-stt-service" + + +@pytest.mark.parametrize("exclude_list", [["CAMB_API_KEY"]], indirect=True) +def test_init_missing_api_key(camb_unit_test_env): + with pytest.raises(ServiceInitializationError, match="Failed to create camb.ai settings."): + CambAudioToText() + + +def test_prompt_execution_settings_class(camb_unit_test_env): + mock_client = MagicMock() + stt = CambAudioToText(async_client=mock_client) + assert stt.get_prompt_execution_settings_class() == CambAudioToTextExecutionSettings + + +async def test_get_text_contents(camb_unit_test_env, tmp_path): + mock_client = MagicMock() + + # Mock create_transcription + mock_task = MagicMock() + mock_task.task_id = "task-123" + mock_client.transcription.create_transcription = AsyncMock(return_value=mock_task) + + # Mock get_transcription_task_status (returns completed immediately) + mock_status = MagicMock() + mock_status.status = "SUCCESS" + mock_status.run_id = "run-456" + mock_client.transcription.get_transcription_task_status = AsyncMock(return_value=mock_status) + + # Mock get_transcription_result + mock_result = MagicMock() + mock_result.text = "Hello from camb.ai transcription!" + mock_client.transcription.get_transcription_result = AsyncMock(return_value=mock_result) + + stt = CambAudioToText(async_client=mock_client) + + # Create a temporary audio file + audio_file = tmp_path / "test.wav" + audio_file.write_bytes(b"fake audio data") + + audio_content = AudioContent(uri=str(audio_file)) + settings = CambAudioToTextExecutionSettings(language=1) + + results = await stt.get_text_contents(audio_content, settings=settings) + + assert len(results) == 1 + assert results[0].text == "Hello from camb.ai transcription!" + assert results[0].ai_model_id == "camb-transcription" + + mock_client.transcription.create_transcription.assert_awaited_once() + mock_client.transcription.get_transcription_task_status.assert_awaited_once_with("task-123") + mock_client.transcription.get_transcription_result.assert_awaited_once_with("run-456") + + +async def test_get_text_contents_with_bytes(camb_unit_test_env): + mock_client = MagicMock() + + mock_task = MagicMock() + mock_task.task_id = "task-789" + mock_client.transcription.create_transcription = AsyncMock(return_value=mock_task) + + mock_status = MagicMock() + mock_status.status = "SUCCESS" + mock_status.run_id = "run-012" + mock_client.transcription.get_transcription_task_status = AsyncMock(return_value=mock_status) + + mock_result = MagicMock() + mock_result.text = "Transcribed from bytes" + mock_client.transcription.get_transcription_result = AsyncMock(return_value=mock_result) + + stt = CambAudioToText(async_client=mock_client) + audio_content = AudioContent(data=b"raw audio bytes", data_format="base64") + + results = await stt.get_text_contents(audio_content) + + assert len(results) == 1 + assert results[0].text == "Transcribed from bytes" + + +async def test_get_text_contents_no_audio_data(camb_unit_test_env): + mock_client = MagicMock() + stt = CambAudioToText(async_client=mock_client) + audio_content = AudioContent() + + with pytest.raises(ServiceInvalidRequestError, match="Audio content must have a uri"): + await stt.get_text_contents(audio_content) + + +async def test_poll_task_status_failure(camb_unit_test_env): + mock_client = MagicMock() + + mock_task = MagicMock() + mock_task.task_id = "task-fail" + mock_client.transcription.create_transcription = AsyncMock(return_value=mock_task) + + mock_status = MagicMock() + mock_status.status = "FAILED" + mock_client.transcription.get_transcription_task_status = AsyncMock(return_value=mock_status) + + stt = CambAudioToText(async_client=mock_client) + + with pytest.raises(ServiceInvalidResponseError, match="task failed"): + await stt._poll_task_status("task-fail") diff --git a/python/tests/unit/connectors/ai/camb/services/test_camb_text_to_audio.py b/python/tests/unit/connectors/ai/camb/services/test_camb_text_to_audio.py new file mode 100644 index 000000000000..a6f4a3dccd36 --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/services/test_camb_text_to_audio.py @@ -0,0 +1,91 @@ +# Copyright (c) Microsoft. All rights reserved. + +from unittest.mock import MagicMock, patch + +import pytest + +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import CambTextToAudioExecutionSettings +from semantic_kernel.connectors.ai.camb.services.camb_text_to_audio import CambTextToAudio +from semantic_kernel.exceptions.service_exceptions import ServiceInitializationError + + +def test_init(camb_unit_test_env): + mock_client = MagicMock() + tts = CambTextToAudio(async_client=mock_client) + assert tts.ai_model_id == "mars-flash" + assert tts.async_client is mock_client + + +def test_init_custom_model(camb_unit_test_env): + mock_client = MagicMock() + tts = CambTextToAudio(ai_model_id="mars-pro", async_client=mock_client) + assert tts.ai_model_id == "mars-pro" + + +def test_init_custom_service_id(camb_unit_test_env): + mock_client = MagicMock() + tts = CambTextToAudio(service_id="my-tts-service", async_client=mock_client) + assert tts.service_id == "my-tts-service" + + +@pytest.mark.parametrize("exclude_list", [["CAMB_API_KEY"]], indirect=True) +def test_init_missing_api_key(camb_unit_test_env): + with pytest.raises(ServiceInitializationError, match="Failed to create camb.ai settings."): + CambTextToAudio() + + +def test_prompt_execution_settings_class(camb_unit_test_env): + mock_client = MagicMock() + tts = CambTextToAudio(async_client=mock_client) + assert tts.get_prompt_execution_settings_class() == CambTextToAudioExecutionSettings + + +async def test_get_audio_contents(camb_unit_test_env): + mock_client = MagicMock() + + async def mock_tts(**kwargs): + yield b"audio_chunk_1" + yield b"audio_chunk_2" + + mock_client.text_to_speech.tts = mock_tts + + tts = CambTextToAudio(async_client=mock_client) + settings = CambTextToAudioExecutionSettings(voice_id=147320, language="en-us") + + results = await tts.get_audio_contents("Hello World!", settings=settings) + + assert len(results) == 1 + assert results[0].ai_model_id == "mars-flash" + assert results[0].mime_type == "audio/wav" + + +async def test_get_audio_contents_default_settings(camb_unit_test_env): + mock_client = MagicMock() + + async def mock_tts(**kwargs): + yield b"audio_data" + + mock_client.text_to_speech.tts = mock_tts + + tts = CambTextToAudio(async_client=mock_client) + + results = await tts.get_audio_contents("Hello World!") + + assert len(results) == 1 + assert results[0].ai_model_id == "mars-flash" + + +async def test_get_audio_contents_mp3_format(camb_unit_test_env): + mock_client = MagicMock() + + async def mock_tts(**kwargs): + yield b"mp3_data" + + mock_client.text_to_speech.tts = mock_tts + + tts = CambTextToAudio(async_client=mock_client) + settings = CambTextToAudioExecutionSettings(output_format="mp3") + + results = await tts.get_audio_contents("Hello!", settings=settings) + + assert results[0].mime_type == "audio/mpeg" diff --git a/python/tests/unit/connectors/ai/camb/test_camb_plugin.py b/python/tests/unit/connectors/ai/camb/test_camb_plugin.py new file mode 100644 index 000000000000..9275cb6f75c0 --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/test_camb_plugin.py @@ -0,0 +1,215 @@ +# Copyright (c) Microsoft. All rights reserved. + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from semantic_kernel.connectors.ai.camb.camb_plugin import CambPlugin +from semantic_kernel.exceptions import FunctionExecutionException + + +@pytest.fixture() +def mock_async_client(): + return MagicMock() + + +@pytest.fixture() +def plugin(mock_async_client): + return CambPlugin(async_client=mock_async_client) + + +def test_init_with_client(mock_async_client): + plugin = CambPlugin(async_client=mock_async_client) + assert plugin.async_client is mock_async_client + + +def test_init_with_api_key(monkeypatch): + monkeypatch.setenv("CAMB_API_KEY", "test-key") + mock_client_cls = MagicMock() + mock_client_cls.return_value = MagicMock() + with patch.dict("sys.modules", {"camb": MagicMock(), "camb.client": MagicMock(AsyncCambAI=mock_client_cls)}): + plugin = CambPlugin() + mock_client_cls.assert_called_once_with(api_key="test-key") + + +def test_init_missing_api_key(monkeypatch): + monkeypatch.delenv("CAMB_API_KEY", raising=False) + with pytest.raises(FunctionExecutionException, match="API key is required"): + CambPlugin() + + +async def test_translate(plugin, mock_async_client): + mock_async_client.translation.translation_stream = AsyncMock(return_value="Hola mundo") + + result = await plugin.translate(text="Hello world", source_language=1, target_language=2) + + assert result == "Hola mundo" + mock_async_client.translation.translation_stream.assert_awaited_once() + + +async def test_translate_with_formality(plugin, mock_async_client): + mock_async_client.translation.translation_stream = AsyncMock(return_value="Buenos dias") + + result = await plugin.translate(text="Good morning", source_language=1, target_language=2, formality=1) + + assert result == "Buenos dias" + call_kwargs = mock_async_client.translation.translation_stream.call_args[1] + assert call_kwargs["formality"] == 1 + + +async def test_translate_api_error_200(plugin, mock_async_client): + """Test handling of SDK bug where 200 responses raise ApiError.""" + + class MockApiError(Exception): + def __init__(self, status_code, body): + self.status_code = status_code + self.body = body + super().__init__(f"ApiError {status_code}") + + error = MockApiError(status_code=200, body="Translated text from error body") + mock_async_client.translation.translation_stream = AsyncMock(side_effect=error) + + result = await plugin.translate(text="Hello", source_language=1, target_language=2) + + assert result == "Translated text from error body" + + +async def test_translated_tts(plugin, mock_async_client): + import httpx + + mock_task = MagicMock() + mock_task.task_id = "task-tts-1" + mock_async_client.translated_tts.create_translated_tts = AsyncMock(return_value=mock_task) + + mock_status = MagicMock() + mock_status.status = "SUCCESS" + mock_status.run_id = "run-tts-1" + mock_async_client.translated_tts.get_translated_tts_task_status = AsyncMock(return_value=mock_status) + + mock_async_client._api_key = "test-key" + + mock_response = MagicMock() + mock_response.content = b"fake-audio-data" + mock_response.headers = {"content-type": "audio/wav"} + mock_response.raise_for_status = MagicMock() + + mock_http_client = AsyncMock() + mock_http_client.get = AsyncMock(return_value=mock_response) + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_http_client): + result = await plugin.translated_tts( + text="Hello", source_language=1, target_language=2, voice_id=147320 + ) + + parsed = json.loads(result) + assert "audio_base64" in parsed + assert parsed["content_type"] == "audio/wav" + assert parsed["run_id"] == "run-tts-1" + + +async def test_clone_voice(plugin, mock_async_client, tmp_path): + mock_result = MagicMock() + mock_result.voice_id = 12345 + mock_result.voice_name = "My Voice" + mock_result.status = "created" + mock_async_client.voice_cloning.create_custom_voice = AsyncMock(return_value=mock_result) + + audio_file = tmp_path / "sample.wav" + audio_file.write_bytes(b"fake audio for cloning") + + result = await plugin.clone_voice( + voice_name="My Voice", audio_file_path=str(audio_file), gender=1 + ) + + parsed = json.loads(result) + assert parsed["voice_id"] == 12345 + assert parsed["voice_name"] == "My Voice" + assert parsed["status"] == "created" + + +async def test_list_voices(plugin, mock_async_client): + mock_voice1 = {"id": 1, "voice_name": "Voice A", "gender": "male", "age": 30, "language": "en-us"} + mock_voice2 = {"id": 2, "voice_name": "Voice B", "gender": "female", "age": 25, "language": "es-es"} + + mock_async_client.voice_cloning.list_voices = AsyncMock(return_value=[mock_voice1, mock_voice2]) + + result = await plugin.list_voices() + + parsed = json.loads(result) + assert len(parsed) == 2 + assert parsed[0]["name"] == "Voice A" + assert parsed[1]["name"] == "Voice B" + + +async def test_text_to_sound(plugin, mock_async_client): + mock_task = MagicMock() + mock_task.task_id = "task-sound-1" + mock_async_client.text_to_audio.create_text_to_audio = AsyncMock(return_value=mock_task) + + mock_status = MagicMock() + mock_status.status = "SUCCESS" + mock_status.run_id = "run-sound-1" + mock_async_client.text_to_audio.get_text_to_audio_status = AsyncMock(return_value=mock_status) + + async def mock_stream(run_id): + yield b"sound-chunk-1" + yield b"sound-chunk-2" + + mock_async_client.text_to_audio.get_text_to_audio_result = mock_stream + + result = await plugin.text_to_sound(prompt="A thunderstorm in the distance") + + parsed = json.loads(result) + assert "audio_base64" in parsed + assert parsed["run_id"] == "run-sound-1" + + +async def test_separate_audio(plugin, mock_async_client, tmp_path): + mock_task = MagicMock() + mock_task.task_id = "task-sep-1" + mock_async_client.audio_separation.create_audio_separation = AsyncMock(return_value=mock_task) + + mock_status = MagicMock() + mock_status.status = "SUCCESS" + mock_status.run_id = "run-sep-1" + mock_async_client.audio_separation.get_audio_separation_status = AsyncMock(return_value=mock_status) + + mock_result = MagicMock() + mock_result.vocals_url = "https://example.com/vocals.wav" + mock_result.background_url = "https://example.com/background.wav" + mock_async_client.audio_separation.get_audio_separation_run_info = AsyncMock(return_value=mock_result) + + audio_file = tmp_path / "mixed.wav" + audio_file.write_bytes(b"mixed audio data") + + result = await plugin.separate_audio(audio_file_path=str(audio_file)) + + parsed = json.loads(result) + assert parsed["vocals_url"] == "https://example.com/vocals.wav" + assert parsed["background_url"] == "https://example.com/background.wav" + assert parsed["run_id"] == "run-sep-1" + + +async def test_poll_task_status_timeout(plugin): + mock_status = MagicMock() + mock_status.status = "PENDING" + + mock_get_status = AsyncMock(return_value=mock_status) + + with patch("semantic_kernel.connectors.ai.camb.camb_plugin._MAX_POLL_ATTEMPTS", 2): + with patch("semantic_kernel.connectors.ai.camb.camb_plugin._POLL_INTERVAL_SECONDS", 0.01): + with pytest.raises(FunctionExecutionException, match="timed out"): + await plugin._poll_task_status(mock_get_status, "task-timeout") + + +async def test_poll_task_status_failure(plugin): + mock_status = MagicMock() + mock_status.status = "FAILED" + + mock_get_status = AsyncMock(return_value=mock_status) + + with pytest.raises(FunctionExecutionException, match="task failed"): + await plugin._poll_task_status(mock_get_status, "task-fail") diff --git a/python/tests/unit/connectors/ai/camb/test_camb_prompt_execution_settings.py b/python/tests/unit/connectors/ai/camb/test_camb_prompt_execution_settings.py new file mode 100644 index 000000000000..ab92c3be2f6b --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/test_camb_prompt_execution_settings.py @@ -0,0 +1,50 @@ +# Copyright (c) Microsoft. All rights reserved. + +import pytest +from pydantic import ValidationError + +from semantic_kernel.connectors.ai.camb.camb_prompt_execution_settings import ( + CambAudioToTextExecutionSettings, + CambTextToAudioExecutionSettings, +) + + +def test_tts_settings_defaults(): + settings = CambTextToAudioExecutionSettings() + assert settings.voice_id is None + assert settings.language is None + assert settings.output_format is None + assert settings.user_instructions is None + + +def test_tts_settings_custom_values(): + settings = CambTextToAudioExecutionSettings( + voice_id=147320, + language="en-us", + output_format="wav", + user_instructions="Speak slowly and clearly.", + ) + assert settings.voice_id == 147320 + assert settings.language == "en-us" + assert settings.output_format == "wav" + assert settings.user_instructions == "Speak slowly and clearly." + + +def test_tts_settings_invalid_output_format(): + with pytest.raises(ValidationError): + CambTextToAudioExecutionSettings(output_format="invalid_format") + + +def test_tts_settings_user_instructions_too_long(): + with pytest.raises(ValidationError): + CambTextToAudioExecutionSettings(user_instructions="x" * 1001) + + +def test_stt_settings_defaults(): + settings = CambAudioToTextExecutionSettings() + assert settings.language is None + + +def test_stt_settings_custom_values(): + settings = CambAudioToTextExecutionSettings(language=1) + assert settings.language == 1 diff --git a/python/tests/unit/connectors/ai/camb/test_camb_settings.py b/python/tests/unit/connectors/ai/camb/test_camb_settings.py new file mode 100644 index 000000000000..128d6721800b --- /dev/null +++ b/python/tests/unit/connectors/ai/camb/test_camb_settings.py @@ -0,0 +1,29 @@ +# Copyright (c) Microsoft. All rights reserved. + +import pytest +from pydantic import ValidationError + +from semantic_kernel.connectors.ai.camb.settings.camb_settings import CambSettings + + +def test_settings_from_env(camb_unit_test_env): + settings = CambSettings() + assert settings.api_key.get_secret_value() == camb_unit_test_env["CAMB_API_KEY"] + assert settings.text_to_audio_model_id == camb_unit_test_env["CAMB_TEXT_TO_AUDIO_MODEL_ID"] + + +def test_settings_with_explicit_values(): + settings = CambSettings(api_key="my-key", text_to_audio_model_id="mars-pro") + assert settings.api_key.get_secret_value() == "my-key" + assert settings.text_to_audio_model_id == "mars-pro" + + +def test_settings_optional_model_id(): + settings = CambSettings(api_key="my-key") + assert settings.text_to_audio_model_id is None + + +@pytest.mark.parametrize("exclude_list", [["CAMB_API_KEY"]], indirect=True) +def test_settings_missing_api_key(camb_unit_test_env): + with pytest.raises(ValidationError): + CambSettings() diff --git a/python/uv.lock b/python/uv.lock index 9d1192f50356..4aa3fcc52596 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -716,6 +716,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/28/df/2dd32cce20cbcf6f2ec456b58d44368161ad28320729f64e5e1d5d7bd0ae/cachetools-7.0.0-py3-none-any.whl", hash = "sha256:d52fef60e6e964a1969cfb61ccf6242a801b432790fe520d78720d757c81cbd2", size = 13487, upload-time = "2026-02-01T18:59:45.981Z" }, ] +[[package]] +name = "camb-sdk" +version = "1.5.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "websocket-client", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "websockets", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6c/f9/4d3f62909f62f98556e09958f40934abf226289f55a43e149dfc426dc1cf/camb_sdk-1.5.8.tar.gz", hash = "sha256:4ace563accb6aab35d2a4dce53789c98d8809a8c48806a69d0873fc8b0361300", size = 83508, upload-time = "2026-01-27T14:55:49.16Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1d/2d/e7aeef5d5f48205020d153f4a6ffb39d8971fca78b2cc64fdf0a36ceeb12/camb_sdk-1.5.8-py3-none-any.whl", hash = "sha256:7e1a4764376791ab7cccc27014cdfb691b8c73eecdcaeb01457f506ffd3425be", size = 152371, upload-time = "2026-01-27T14:55:45.637Z" }, +] + [[package]] name = "certifi" version = "2026.1.4" @@ -6322,6 +6338,9 @@ azure = [ { name = "azure-cosmos", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "azure-search-documents", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] +camb = [ + { name = "camb-sdk", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] chroma = [ { name = "chromadb", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] @@ -6436,6 +6455,7 @@ requires-dist = [ { name = "azure-identity", specifier = ">=1.13" }, { name = "azure-search-documents", marker = "extra == 'azure'", specifier = ">=11.6.0b4" }, { name = "boto3", marker = "extra == 'aws'", specifier = ">=1.36.4,<1.41.0" }, + { name = "camb-sdk", marker = "extra == 'camb'", specifier = ">=0.0.2" }, { name = "chromadb", marker = "extra == 'chroma'", specifier = ">=0.5,<1.1" }, { name = "cloudevents", specifier = "~=1.0" }, { name = "dapr", marker = "extra == 'dapr'", specifier = ">=1.14.0" }, @@ -6491,7 +6511,7 @@ requires-dist = [ { name = "websockets", specifier = ">=13,<16" }, { name = "websockets", marker = "extra == 'realtime'", specifier = ">=13,<16" }, ] -provides-extras = ["anthropic", "autogen", "aws", "azure", "chroma", "copilotstudio", "dapr", "faiss", "google", "hugging-face", "mcp", "milvus", "mistralai", "mongo", "notebooks", "ollama", "onnx", "oracledb", "pandas", "pinecone", "postgres", "qdrant", "realtime", "redis", "sql", "usearch", "weaviate"] +provides-extras = ["anthropic", "autogen", "aws", "azure", "camb", "chroma", "copilotstudio", "dapr", "faiss", "google", "hugging-face", "mcp", "milvus", "mistralai", "mongo", "notebooks", "ollama", "onnx", "oracledb", "pandas", "pinecone", "postgres", "qdrant", "realtime", "redis", "sql", "usearch", "weaviate"] [package.metadata.requires-dev] dev = [