From b8b75a942fa616afad63d1c532ba3c5862aabc5d Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 18 Feb 2026 17:42:37 +0000 Subject: [PATCH 1/7] s2s docs --- .../voice-agents/speechtospeech.mdx | 781 ++++++++++++++++++ 1 file changed, 781 insertions(+) create mode 100644 fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx new file mode 100644 index 000000000..595d9fd7b --- /dev/null +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -0,0 +1,781 @@ +--- +title: "Speech-to-Speech" +description: "Build real-time voice agents with a single WebSocket connection. Stream audio in, get intelligent spoken responses back." +--- + +import { AgentGenerator } from "../../../../assets/components/AgentGenerator"; + +Build voice agents with a single WebSocket connection. Stream audio in, get intelligent spoken responses back — with built-in transcription, turn detection, and function calling. The API is compatible with the [OpenAI Realtime API](https://platform.openai.com/docs/guides/realtime), so you can use the OpenAI SDK or any OpenAI-compatible framework like LiveKit. + +## Quickstart + +Install dependencies and talk to your agent in under a minute. + +```bash +pip install websockets sounddevice +``` + +```python +import asyncio, base64, json, threading +import sounddevice as sd +import websockets + +API_KEY = "YOUR_ASSEMBLYAI_API_KEY" +WS_URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" +SAMPLE_RATE = 24000 + + +class AudioPlayer: + """Buffers and plays PCM16 audio in real time.""" + def __init__(self): + self._buf = bytearray() + self._lock = threading.Lock() + self._out = sd.RawOutputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, latency="low", + ) + self._out.start() + + def play(self, pcm: bytes): + with self._lock: + self._buf.extend(pcm) + while len(self._buf) >= 960: + self._out.write(bytes(self._buf[:960])) + del self._buf[:960] + + def close(self): + self._out.stop() + self._out.close() + + +async def main(): + player = AudioPlayer() + q = asyncio.Queue() + + def mic_cb(data, frames, ti, status): + q.put_nowait(bytes(data)) + + mic = sd.RawInputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, + callback=mic_cb, latency="low", + ) + mic.start() + + ws = await websockets.connect(WS_URL, additional_headers={"Authorization": f"Bearer {API_KEY}"}) + + await ws.send(json.dumps({"type": "session.update", "session": { + "input_audio_format": "pcm16", "input_audio_sample_rate": SAMPLE_RATE, + "output_audio_format": "pcm16", "output_audio_sample_rate": SAMPLE_RATE, + "input_audio_transcription": {"model": "universal-streaming"}, + "turn_detection": {"type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 200}, + "output_modalities": ["audio", "text"], + "instructions": "You are a helpful voice assistant. Keep responses brief.", + "voice": "sage", + }})) + + async def stream_mic(): + while True: + try: + pcm = await asyncio.wait_for(q.get(), timeout=0.1) + await ws.send(json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(pcm).decode()})) + except asyncio.TimeoutError: + pass + + async def handle_events(): + async for raw in ws: + e = json.loads(raw) + et = e.get("type", "") + if et == "response.output_audio.delta": + player.play(base64.b64decode(e["delta"])) + elif et == "response.output_audio_transcript.done": + print(f"Agent: {e.get('transcript', '')}") + elif et == "conversation.item.input_audio_transcription.completed": + print(f"You: {e.get('transcript', '')}") + + print("Listening — start talking.\n") + try: + await asyncio.gather(stream_mic(), handle_events()) + except KeyboardInterrupt: + pass + finally: + mic.stop(); mic.close(); player.close(); await ws.close() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Replace `YOUR_ASSEMBLYAI_API_KEY` with your key from the [AssemblyAI dashboard](https://www.assemblyai.com/dashboard/signup), run the script, and start talking. + +--- + +## How it works + +``` +Client Server + | | + |--- WebSocket connect -------------------->| + |--- session.update (config) -------------->| + |--- input_audio_buffer.append ------------>| stream mic audio + | | + |<------------ session.created -------------| + |<------------ speech_started --------------| user is talking + |<------------ speech_stopped --------------| user finished + |<------------ transcription.completed -----| what the user said + |<------------ response.audio.delta --------| agent speaks back + |<------------ response.done ---------------| + | | +``` + +1. **Connect** — Open a WebSocket to `wss://speech-to-speech.assemblyai.com/v1/realtime` with your API key in the `Authorization: Bearer` header. +2. **Configure** — Send a `session.update` with your voice, instructions, turn detection settings, and any tools. +3. **Stream audio** — Send base64-encoded PCM16 audio chunks. The server detects when the user starts and stops speaking. +4. **Receive responses** — The server transcribes the user's speech, generates a response, and streams back audio and text in real time. + +The API is fully compatible with the OpenAI Realtime protocol, so the [OpenAI Python SDK](https://github.com/openai/openai-python), [LiveKit Agents](https://docs.livekit.io/agents/), and any OpenAI-compatible client work out of the box — just point them at `wss://speech-to-speech.assemblyai.com/v1`. + +--- + +## Agent generator + +Describe your agent and we'll generate the complete code — system prompt, tool definitions, and a runnable script. + + + +--- + +## Configuration + +Configure your session by sending a `session.update` event after connecting. The API accepts two session formats depending on your integration approach. + +### Flat format (Raw WebSocket) + +```json +{ + "type": "session.update", + "session": { + "instructions": "You are a helpful voice assistant.", + "voice": "sage", + "input_audio_format": "pcm16", + "input_audio_sample_rate": 24000, + "output_audio_format": "pcm16", + "output_audio_sample_rate": 24000, + "input_audio_transcription": {"model": "universal-streaming"}, + "output_modalities": ["audio", "text"], + "turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 200, + "create_response": true + }, + "tools": [], + "tool_choice": "auto" + } +} +``` + +### Nested format (OpenAI SDK / LiveKit) + +The OpenAI GA SDK and LiveKit plugin use a nested session format. + +```json +{ + "type": "session.update", + "session": { + "instructions": "You are a helpful voice assistant.", + "output_modalities": ["audio", "text"], + "audio": { + "input": { + "format": {"type": "audio/pcm", "rate": 24000}, + "transcription": {"model": "universal-streaming"}, + "turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 200, + "create_response": true + } + }, + "output": { + "format": {"type": "audio/pcm", "rate": 24000}, + "voice": "sage" + } + }, + "tools": [], + "tool_choice": "auto" + } +} +``` + +### Session parameters + + + System prompt for the AI agent. Defines personality, behavior, and constraints. + + + + Voice for agent audio responses. One of: `sage`, `ember`, `breeze`, `cascade`. + + + + Input audio encoding. Use `pcm16` (signed 16-bit little-endian). + + + + Input audio sample rate in Hz. + + + + Output audio encoding. Use `pcm16` (signed 16-bit little-endian). + + + + Output audio sample rate in Hz. + + + + What the agent returns. Include `"audio"` for spoken responses and `"text"` for transcripts. + + + + Enables real-time transcription of user speech. Set `model` to `"universal-streaming"`. + + + + Server-side voice activity detection. See [Turn detection](#turn-detection). + + + + Functions the agent can call. See [Tool calling](#tool-calling). + + + + When to use tools. `"auto"` lets the model decide. + + +### Audio format + +All audio is **PCM16** (signed 16-bit integer, little-endian), **mono**, **24,000 Hz**. Audio is base64-encoded inside JSON messages. Each chunk should be approximately 20 ms (480 samples, 960 bytes). + +### Voices + +| Voice | ID | +|-------|----| +| Sage | `sage` | +| Ember | `ember` | +| Breeze | `breeze` | +| Cascade | `cascade` | + +### Turn detection + +The server automatically detects when the user starts and stops speaking using voice activity detection (VAD). When the user finishes a turn, the agent responds automatically. + +```json +"turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 200, + "create_response": true +} +``` + + + Set to `"server_vad"` for server-side voice activity detection. + + + + Speech detection sensitivity (0.0 to 1.0). Lower values detect quieter speech. + + + + Audio to preserve before speech onset, in milliseconds. Prevents clipping the start of a sentence. + + + + How long the user must pause before the server considers them done speaking, in milliseconds. + + + + Automatically generate an agent response when the user finishes speaking. + + +--- + +## Tool calling + +Give your agent the ability to call functions in your application — look up data, take actions, or call external APIs — then continue the conversation with the result. + +### Define tools in your session config + +```json +"tools": [{ + "type": "function", + "name": "get_weather", + "description": "Get the current weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City name"} + }, + "required": ["location"] + } +}], +"tool_choice": "auto" +``` + +### Handle tool calls + +When the agent decides to call a function, the server sends `response.function_call_arguments.done` while the response is still in progress. Start executing the function immediately — you don't need to wait. When `response.done` arrives, send the result back. + +```python +pending_tasks = {} + +async for raw in ws: + e = json.loads(raw) + et = e.get("type", "") + + if et == "response.function_call_arguments.done": + # Start executing immediately — don't wait for response.done + args = json.loads(e["arguments"]) + pending_tasks[e["call_id"]] = asyncio.create_task(run_tool(e["name"], args)) + + elif et == "response.done" and pending_tasks: + # Response is complete — send back the results + for call_id, task in pending_tasks.items(): + result = await task + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result), + }, + })) + pending_tasks.clear() + + elif et == "response.output_audio.delta": + player.play(base64.b64decode(e["delta"])) +``` + +The pattern is: **receive the call** → **start executing immediately** → **send the result when `response.done` arrives**. Your function runs concurrently while the response completes, so there's no wasted time. + +--- + +## Events reference + +### Client → Server + +| Event | Description | Key fields | +|-------|-------------|------------| +| `session.update` | Configure the session | `session`: configuration object | +| `input_audio_buffer.append` | Stream an audio chunk | `audio`: base64-encoded PCM16 | +| `input_audio_buffer.commit` | Commit buffered audio as a user turn | — | +| `input_audio_buffer.clear` | Discard buffered audio | — | +| `conversation.item.create` | Add a message or tool result | `item`: conversation item | +| `conversation.item.delete` | Remove a conversation item | `item_id`: ID to remove | +| `response.create` | Trigger the agent to respond | — | + +### Server → Client + +| Event | Description | Key fields | +|-------|-------------|------------| +| `session.created` | Session initialized | `session.id` | +| `input_audio_buffer.speech_started` | User started speaking | `audio_start_ms` | +| `input_audio_buffer.speech_stopped` | User stopped speaking | `audio_end_ms` | +| `input_audio_buffer.committed` | Audio committed as a turn | — | +| `conversation.item.created` | New conversation item added | `item` | +| `conversation.item.input_audio_transcription.completed` | User speech transcribed | `transcript` | +| `response.created` | Agent started generating a response | — | +| `response.output_audio.delta` | Agent audio chunk | `delta`: base64 PCM16 | +| `response.output_audio.done` | Agent audio complete | — | +| `response.output_audio_transcript.delta` | Agent text (streaming) | `delta` | +| `response.output_audio_transcript.done` | Agent text (final) | `transcript` | +| `response.function_call_arguments.done` | Agent requesting a tool call | `call_id`, `name`, `arguments` | +| `response.done` | Response complete | `response.status`: `completed` or `cancelled` | +| `error` | Error occurred | `error.message` | + +--- + +## Complete examples + +Production-ready examples for three integration approaches. Each handles microphone input, speaker output, turn detection, transcription, and tool calling. + +### Raw WebSocket + +Direct WebSocket control using the `websockets` library. + +```bash +pip install websockets sounddevice +``` + +```python +import asyncio, base64, json, threading, time +import sounddevice as sd +import websockets + +API_KEY = "YOUR_ASSEMBLYAI_API_KEY" +WS_URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" +SAMPLE_RATE = 24000 + +TOOLS = [{ + "type": "function", + "name": "get_weather", + "description": "Get the current weather for a location", + "parameters": { + "type": "object", + "properties": {"location": {"type": "string", "description": "City name"}}, + "required": ["location"], + }, +}] + + +async def run_tool(name, args): + """Replace with your own tool implementations.""" + if name == "get_weather": + return {"temperature": 72, "condition": "sunny", "location": args["location"]} + return {"error": f"Unknown tool: {name}"} + + +class AudioPlayer: + def __init__(self): + self._buf = bytearray() + self._lock = threading.Lock() + self._out = sd.RawOutputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, latency="low", + ) + self._out.start() + + def play(self, pcm: bytes): + with self._lock: + self._buf.extend(pcm) + while len(self._buf) >= 960: + self._out.write(bytes(self._buf[:960])) + del self._buf[:960] + + def close(self): + self._out.stop() + self._out.close() + + +async def main(): + player = AudioPlayer() + q = asyncio.Queue() + + def mic_cb(data, frames, ti, status): + q.put_nowait(bytes(data)) + + mic = sd.RawInputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, + callback=mic_cb, latency="low", + ) + mic.start() + + try: + ws = await websockets.connect(WS_URL, extra_headers={"Authorization": f"Bearer {API_KEY}"}) + except TypeError: + ws = await websockets.connect(WS_URL, additional_headers={"Authorization": f"Bearer {API_KEY}"}) + + await ws.send(json.dumps({"type": "session.update", "session": { + "input_audio_format": "pcm16", "input_audio_sample_rate": SAMPLE_RATE, + "output_audio_format": "pcm16", "output_audio_sample_rate": SAMPLE_RATE, + "input_audio_transcription": {"model": "universal-streaming"}, + "turn_detection": {"type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 200}, + "output_modalities": ["audio", "text"], + "instructions": "You are a helpful voice assistant. Keep responses brief.", + "voice": "sage", + "tools": TOOLS, + "tool_choice": "auto", + }})) + + pending_tasks = {} + + async def stream_mic(): + while True: + try: + pcm = await asyncio.wait_for(q.get(), timeout=0.1) + await ws.send(json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(pcm).decode()})) + except asyncio.TimeoutError: + pass + + async def handle_events(): + async for raw in ws: + e = json.loads(raw) + et = e.get("type", "") + t = time.strftime("%H:%M:%S") + + if et == "session.created": + print(f"[{t}] Connected — session {e['session']['id']}") + elif et == "input_audio_buffer.speech_started": + print(f"[{t}] You started speaking") + elif et == "input_audio_buffer.speech_stopped": + print(f"[{t}] You stopped speaking") + elif et == "conversation.item.input_audio_transcription.completed": + print(f"[{t}] You: {e.get('transcript', '')}") + elif et == "response.output_audio.delta": + player.play(base64.b64decode(e["delta"])) + elif et == "response.output_audio_transcript.done": + print(f"[{t}] Agent: {e.get('transcript', '')}") + elif et == "response.function_call_arguments.done": + args = json.loads(e["arguments"]) + pending_tasks[e["call_id"]] = asyncio.create_task(run_tool(e["name"], args)) + print(f"[{t}] Tool: {e['name']}({e['arguments']})") + elif et == "response.done": + s = e.get("response", {}).get("status", "?") + print(f"[{t}] Done ({s})") + if pending_tasks and s == "completed": + for cid, task in pending_tasks.items(): + result = await task + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": {"type": "function_call_output", "call_id": cid, "output": json.dumps(result)}, + })) + pending_tasks.clear() + + print("Listening — start talking.\n") + try: + await asyncio.gather(stream_mic(), handle_events()) + except KeyboardInterrupt: + pass + finally: + mic.stop(); mic.close(); player.close(); await ws.close() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### OpenAI Python SDK + +Uses the OpenAI GA Realtime API. Note the differences from the beta API: `websocket_base_url` instead of `base_url`, `client.realtime.connect()` instead of `client.beta.realtime.connect()`, and the nested session format. + +```bash +pip install openai sounddevice +``` + +```python +import asyncio, base64, json, threading, time +import sounddevice as sd +from openai import AsyncOpenAI + +API_KEY = "YOUR_ASSEMBLYAI_API_KEY" +SAMPLE_RATE = 24000 + +client = AsyncOpenAI( + api_key=API_KEY, + websocket_base_url="wss://speech-to-speech.assemblyai.com/v1", +) + +TOOLS = [{ + "type": "function", + "name": "get_weather", + "description": "Get the current weather for a location", + "parameters": { + "type": "object", + "properties": {"location": {"type": "string", "description": "City name"}}, + "required": ["location"], + }, +}] + + +async def run_tool(name, args): + """Replace with your own tool implementations.""" + if name == "get_weather": + return {"temperature": 72, "condition": "sunny", "location": args["location"]} + return {"error": f"Unknown tool: {name}"} + + +class AudioPlayer: + def __init__(self): + self._buf = bytearray() + self._lock = threading.Lock() + self._out = sd.RawOutputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, latency="low", + ) + self._out.start() + + def play(self, pcm: bytes): + with self._lock: + self._buf.extend(pcm) + while len(self._buf) >= 960: + self._out.write(bytes(self._buf[:960])) + del self._buf[:960] + + def close(self): + self._out.stop() + self._out.close() + + +async def main(): + player = AudioPlayer() + q = asyncio.Queue() + + def mic_cb(data, frames, ti, status): + q.put_nowait(bytes(data)) + + mic = sd.RawInputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, + callback=mic_cb, latency="low", + ) + mic.start() + + connection = await client.realtime.connect( + model="universal-streaming", + websocket_connection_options={"compression": None}, + ).enter() + + print("Listening — start talking.\n") + + async def send_config(): + await connection.session.update(session={ + "instructions": "You are a helpful voice assistant. Keep responses brief.", + "output_modalities": ["audio", "text"], + "audio": { + "input": { + "format": {"type": "audio/pcm", "rate": 24000}, + "transcription": {"model": "universal-streaming"}, + "turn_detection": { + "type": "server_vad", "threshold": 0.5, + "prefix_padding_ms": 300, "silence_duration_ms": 200, + }, + }, + "output": { + "format": {"type": "audio/pcm", "rate": 24000}, + "voice": "sage", + }, + }, + "tools": TOOLS, + "tool_choice": "auto", + }) + + async def stream_mic(): + while True: + pcm = await q.get() + await connection.input_audio_buffer.append(audio=base64.b64encode(pcm).decode()) + + async def handle_events(): + pending_tasks = {} + while True: + data = await connection.recv_bytes() + e = json.loads(data.decode("utf-8")) + et = e.get("type", "") + t = time.strftime("%H:%M:%S") + + if et == "session.created": + print(f"[{t}] Connected — session {e['session']['id']}") + elif et == "input_audio_buffer.speech_started": + print(f"[{t}] You started speaking") + elif et == "input_audio_buffer.speech_stopped": + print(f"[{t}] You stopped speaking") + elif et == "conversation.item.input_audio_transcription.completed": + txt = e.get("transcript", "") + if txt: + print(f"[{t}] You: {txt}") + elif et == "response.output_audio.delta": + player.play(base64.b64decode(e["delta"])) + elif et == "response.output_audio_transcript.done": + print(f"[{t}] Agent: {e.get('transcript', '')}") + elif et == "response.function_call_arguments.done": + args = json.loads(e["arguments"]) + pending_tasks[e["call_id"]] = asyncio.create_task(run_tool(e["name"], args)) + print(f"[{t}] Tool: {e['name']}({e['arguments']})") + elif et == "response.done": + s = e.get("response", {}).get("status", "?") + print(f"[{t}] Done ({s})") + if pending_tasks and s == "completed": + for cid, task in pending_tasks.items(): + result = await task + await connection.conversation.item.create(item={ + "type": "function_call_output", "call_id": cid, + "output": json.dumps(result)}) + pending_tasks.clear() + elif et == "error": + print(f"[{t}] Error: {e.get('error', {})}") + + try: + await asyncio.gather(send_config(), stream_mic(), handle_events()) + except KeyboardInterrupt: + pass + finally: + mic.stop(); mic.close(); player.close() + await connection.close() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### LiveKit Agents + +Uses the [LiveKit Agents framework](https://docs.livekit.io/agents/) with the OpenAI Realtime plugin. LiveKit handles audio transport, room management, and client connections — you define the agent behavior. + +```bash +pip install "livekit-agents[openai,silero]" python-dotenv +``` + +```python +import asyncio, os +from dotenv import load_dotenv +from livekit.agents import Agent, AgentServer, AgentSession, JobContext, JobProcess, RunContext, cli, function_tool +from livekit.plugins import openai, silero +from openai.types.beta.realtime.session import TurnDetection +from openai.types.realtime import AudioTranscription + +load_dotenv() + + +class VoiceAgent(Agent): + def __init__(self): + super().__init__(instructions="You are a helpful voice assistant. Keep responses brief.") + + @function_tool + async def get_weather(self, context: RunContext, location: str): + """Get the current weather for a location. + + Args: + location: City name + """ + return f"72 degrees and sunny in {location}." + + +server = AgentServer() + + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + + +server.setup_fnc = prewarm + + +@server.rtc_session() +async def entrypoint(ctx: JobContext): + session = AgentSession( + llm=openai.realtime.RealtimeModel( + base_url="wss://speech-to-speech.assemblyai.com/v1", + api_key=os.environ["ASSEMBLYAI_API_KEY"], + model="universal-streaming", + voice="sage", + input_audio_transcription=AudioTranscription(model="universal-streaming"), + turn_detection=TurnDetection( + type="server_vad", + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=200, + create_response=True, + ), + ) + ) + await session.start(agent=VoiceAgent(), room=ctx.room) + await ctx.connect() + + +if __name__ == "__main__": + cli.run_app(server) +``` + +Run with: + +```bash +python agent.py console +``` From c1b0b17aafb40fd45c3098f94472ef44105b9ab9 Mon Sep 17 00:00:00 2001 From: dan-ince-aai Date: Wed, 18 Feb 2026 19:10:42 +0000 Subject: [PATCH 2/7] Add interactive agent generator component to S2S docs Adds an AgentGenerator React component that lets users describe their voice agent in natural language, pick an output format (Python/JS/config), and opens Claude/ChatGPT/Gemini with the full S2S API reference pre-loaded to generate a complete agent implementation. Co-Authored-By: Claude Opus 4.6 (1M context) --- fern/assets/components/AgentGenerator.tsx | 453 ++++++++++++++++++++++ 1 file changed, 453 insertions(+) create mode 100644 fern/assets/components/AgentGenerator.tsx diff --git a/fern/assets/components/AgentGenerator.tsx b/fern/assets/components/AgentGenerator.tsx new file mode 100644 index 000000000..1c09db552 --- /dev/null +++ b/fern/assets/components/AgentGenerator.tsx @@ -0,0 +1,453 @@ +"use client"; +import * as React from "react"; + +const DOCS_URL = + "https://www.assemblyai.com/docs/speech-to-text/voice-agents/speechtospeech"; + +type OutputFormat = "python" | "javascript" | "config"; + +const LLM_CONTEXT = `You are an expert at building real-time voice agents using the AssemblyAI Speech-to-Speech API. Based on the user's description, generate a complete voice agent implementation. + +## AssemblyAI Speech-to-Speech API Reference + +Endpoint: wss://speech-to-speech.assemblyai.com/v1/realtime +Auth: Authorization: Bearer YOUR_ASSEMBLYAI_API_KEY header on WebSocket connect +Audio: PCM16 (signed 16-bit little-endian), mono, 24000 Hz, base64-encoded in JSON +Voices: sage (default), ember, breeze, cascade + +### Session config (flat format, for raw WebSocket): +{ + "type": "session.update", + "session": { + "instructions": "System prompt here", + "voice": "sage", + "input_audio_format": "pcm16", + "input_audio_sample_rate": 24000, + "output_audio_format": "pcm16", + "output_audio_sample_rate": 24000, + "input_audio_transcription": {"model": "universal-streaming"}, + "output_modalities": ["audio", "text"], + "turn_detection": {"type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 200}, + "tools": [], + "tool_choice": "auto" + } +} + +### Tool definition schema: +{ + "type": "function", + "name": "tool_name", + "description": "What this tool does", + "parameters": { + "type": "object", + "properties": { + "param_name": {"type": "string", "description": "Param description"} + }, + "required": ["param_name"] + } +} + +### Tool calling pattern: +- On "response.function_call_arguments.done": Start executing the function immediately (use asyncio.create_task) +- On "response.done": Send results back via "conversation.item.create" with type "function_call_output" +- Do NOT send "response.create" after tool results — the server continues automatically +- Interruptions are handled automatically by server-side VAD — no client logic needed + +### Key events: +Client sends: session.update, input_audio_buffer.append (base64 audio) +Server sends: session.created, input_audio_buffer.speech_started, input_audio_buffer.speech_stopped, conversation.item.input_audio_transcription.completed, response.output_audio.delta (base64 audio), response.output_audio_transcript.done, response.function_call_arguments.done, response.done + +### Python quickstart template (raw WebSocket with websockets + sounddevice): +\`\`\`python +import asyncio, base64, json, threading, time +import sounddevice as sd +import websockets + +API_KEY = "YOUR_ASSEMBLYAI_API_KEY" +WS_URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" +SAMPLE_RATE = 24000 + +TOOLS = [] # Add tool definitions here + +async def run_tool(name, args): + # Implement tool logic here + return {"error": f"Unknown tool: {name}"} + +class AudioPlayer: + def __init__(self): + self._buf = bytearray() + self._lock = threading.Lock() + self._out = sd.RawOutputStream(samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, latency="low") + self._out.start() + def play(self, pcm): + with self._lock: + self._buf.extend(pcm) + while len(self._buf) >= 960: + self._out.write(bytes(self._buf[:960])) + del self._buf[:960] + def close(self): + self._out.stop(); self._out.close() + +async def main(): + player = AudioPlayer() + q = asyncio.Queue() + def mic_cb(data, frames, ti, status): + q.put_nowait(bytes(data)) + mic = sd.RawInputStream(samplerate=SAMPLE_RATE, channels=1, dtype="int16", blocksize=480, callback=mic_cb, latency="low") + mic.start() + + ws = await websockets.connect(WS_URL, additional_headers={"Authorization": f"Bearer {API_KEY}"}) + await ws.send(json.dumps({"type": "session.update", "session": { + "input_audio_format": "pcm16", "input_audio_sample_rate": SAMPLE_RATE, + "output_audio_format": "pcm16", "output_audio_sample_rate": SAMPLE_RATE, + "input_audio_transcription": {"model": "universal-streaming"}, + "turn_detection": {"type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 200}, + "output_modalities": ["audio", "text"], + "instructions": "SYSTEM_PROMPT_HERE", + "voice": "sage", + "tools": TOOLS, + "tool_choice": "auto", + }})) + pending_tasks = {} + async def stream_mic(): + while True: + try: + pcm = await asyncio.wait_for(q.get(), timeout=0.1) + await ws.send(json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(pcm).decode()})) + except asyncio.TimeoutError: + pass + async def handle_events(): + async for raw in ws: + e = json.loads(raw) + et = e.get("type", "") + t = time.strftime("%H:%M:%S") + if et == "session.created": + print(f"[{t}] Connected — session {e['session']['id']}") + elif et == "conversation.item.input_audio_transcription.completed": + print(f"[{t}] You: {e.get('transcript', '')}") + elif et == "response.output_audio.delta": + player.play(base64.b64decode(e["delta"])) + elif et == "response.output_audio_transcript.done": + print(f"[{t}] Agent: {e.get('transcript', '')}") + elif et == "response.function_call_arguments.done": + args = json.loads(e["arguments"]) + pending_tasks[e["call_id"]] = asyncio.create_task(run_tool(e["name"], args)) + print(f"[{t}] Tool: {e['name']}({e['arguments']})") + elif et == "response.done": + if pending_tasks and e.get("response", {}).get("status") == "completed": + for cid, task in pending_tasks.items(): + result = await task + await ws.send(json.dumps({"type": "conversation.item.create", "item": {"type": "function_call_output", "call_id": cid, "output": json.dumps(result)}})) + pending_tasks.clear() + print("Listening — start talking.\\n") + try: + await asyncio.gather(stream_mic(), handle_events()) + except KeyboardInterrupt: + pass + finally: + mic.stop(); mic.close(); player.close(); await ws.close() + +if __name__ == "__main__": + asyncio.run(main()) +\`\`\` + +### JavaScript quickstart template (raw WebSocket in Node.js): +\`\`\`javascript +// Requires: npm install ws +const WebSocket = require("ws"); +const API_KEY = "YOUR_ASSEMBLYAI_API_KEY"; +const WS_URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"; + +const TOOLS = []; // Add tool definitions here + +function runTool(name, args) { + // Implement tool logic here + return { error: "Unknown tool: " + name }; +} + +const ws = new WebSocket(WS_URL, { headers: { Authorization: "Bearer " + API_KEY } }); +const pendingTasks = new Map(); + +ws.on("open", () => { + ws.send(JSON.stringify({ type: "session.update", session: { + input_audio_format: "pcm16", input_audio_sample_rate: 24000, + output_audio_format: "pcm16", output_audio_sample_rate: 24000, + input_audio_transcription: { model: "universal-streaming" }, + turn_detection: { type: "server_vad", threshold: 0.5, prefix_padding_ms: 300, silence_duration_ms: 200 }, + output_modalities: ["audio", "text"], + instructions: "SYSTEM_PROMPT_HERE", + voice: "sage", + tools: TOOLS, + tool_choice: "auto", + }})); + // Start streaming mic audio as base64 PCM16 via input_audio_buffer.append +}); + +ws.on("message", async (raw) => { + const e = JSON.parse(raw); + if (e.type === "response.output_audio.delta") { + // Play base64-decoded PCM16 audio: Buffer.from(e.delta, "base64") + } else if (e.type === "response.output_audio_transcript.done") { + console.log("Agent:", e.transcript); + } else if (e.type === "response.function_call_arguments.done") { + pendingTasks.set(e.call_id, runTool(e.name, JSON.parse(e.arguments))); + } else if (e.type === "response.done" && pendingTasks.size > 0) { + for (const [callId, resultPromise] of pendingTasks) { + const result = await resultPromise; + ws.send(JSON.stringify({ type: "conversation.item.create", item: { type: "function_call_output", call_id: callId, output: JSON.stringify(result) } })); + } + pendingTasks.clear(); + } +}); +\`\`\` + +Full documentation: ${DOCS_URL}`; + +const FORMAT_INSTRUCTIONS: Record = { + python: `Generate a COMPLETE, RUNNABLE Python script using the raw WebSocket template above. Include: +1. A detailed system prompt in the instructions field tailored to the agent's purpose +2. All tool definitions in the TOOLS array with proper JSON schemas +3. Full run_tool() implementation with realistic mock data for each tool +4. All imports, the AudioPlayer class, mic handling, and event loop — everything needed to pip install and run +Choose an appropriate voice from: sage, ember, breeze, cascade.`, + + javascript: `Generate a COMPLETE, RUNNABLE JavaScript/Node.js script using the JS WebSocket template above. Include: +1. A detailed system prompt in the instructions field tailored to the agent's purpose +2. All tool definitions in the TOOLS array with proper JSON schemas +3. Full runTool() implementation with realistic mock data for each tool +4. All requires, WebSocket setup, and event handling — everything needed to npm install and run +Choose an appropriate voice from: sage, ember, breeze, cascade. +For audio I/O, use a comment placeholder since Node.js audio libraries vary.`, + + config: `Generate ONLY the session configuration JSON (the session.update payload) with: +1. A detailed system prompt in the instructions field tailored to the agent's purpose +2. All tool definitions in the tools array with proper JSON schemas +3. An appropriate voice chosen from: sage, ember, breeze, cascade +4. All audio format and turn detection settings filled in +Output ONLY the JSON — no script wrapper.`, +}; + +const MAX_DESCRIPTION_CHARS = 2000; +const MAX_URL_LENGTH = 8000; + +const truncateAtWordBoundary = (text: string, maxLength: number): string => { + if (text.length <= maxLength) return text; + const truncated = text.substring(0, maxLength); + const lastSpace = truncated.lastIndexOf(" "); + if (lastSpace > 50) return text.substring(0, lastSpace); + return truncated; +}; + +export function AgentGenerator() { + const [description, setDescription] = React.useState(""); + const [format, setFormat] = React.useState("python"); + + const buildPrompt = (maxContentLength?: number) => { + let descText = description || "(No description provided — generate a general-purpose helpful voice assistant)"; + + if (descText.length > MAX_DESCRIPTION_CHARS) { + descText = truncateAtWordBoundary(descText, MAX_DESCRIPTION_CHARS) + "\n\n[Description truncated]"; + } + + if (maxContentLength && descText.length > maxContentLength) { + descText = truncateAtWordBoundary(descText, maxContentLength) + "\n\n[Description truncated]"; + } + + return `${LLM_CONTEXT} + +## Output format +${FORMAT_INSTRUCTIONS[format]} + +## User's agent description +${descText}`; + }; + + const getMaxContentLength = (baseUrl: string) => { + const promptWithoutDesc = buildPrompt(0).replace( + description || "(No description provided — generate a general-purpose helpful voice assistant)", + "" + ); + const encodedBaseLength = baseUrl.length + encodeURIComponent(promptWithoutDesc).length; + const available = MAX_URL_LENGTH - encodedBaseLength; + return Math.floor(available / 3); + }; + + const openInClaude = () => { + const baseUrl = "https://claude.ai/new?q="; + const maxLen = getMaxContentLength(baseUrl); + const prompt = encodeURIComponent(buildPrompt(maxLen)); + window.open(`${baseUrl}${prompt}`, "_blank"); + }; + + const openInChatGPT = () => { + const baseUrl = "https://chat.openai.com/?q="; + const maxLen = getMaxContentLength(baseUrl); + const prompt = encodeURIComponent(buildPrompt(maxLen)); + window.open(`${baseUrl}${prompt}`, "_blank"); + }; + + const openInGemini = () => { + const baseUrl = "https://aistudio.google.com/prompts/new_chat?prompt="; + const maxLen = getMaxContentLength(baseUrl); + const prompt = encodeURIComponent(buildPrompt(maxLen)); + window.open(`${baseUrl}${prompt}`, "_blank"); + }; + + const containerStyle: React.CSSProperties = { + border: "1px solid var(--grayscale-a4, #e5e7eb)", + borderRadius: "8px", + padding: "24px", + backgroundColor: "var(--grayscale-2, #f9fafb)", + }; + + const labelStyle: React.CSSProperties = { + display: "block", + fontSize: "14px", + fontWeight: 500, + marginBottom: "8px", + color: "var(--grayscale-12, #111827)", + }; + + const textareaStyle: React.CSSProperties = { + width: "100%", + height: "120px", + padding: "12px", + border: "1px solid var(--grayscale-a4, #d1d5db)", + borderRadius: "6px", + fontSize: "14px", + fontFamily: "inherit", + resize: "vertical", + backgroundColor: "var(--grayscale-1, #ffffff)", + color: "var(--grayscale-12, #111827)", + }; + + const charCountStyle: React.CSSProperties = { + fontSize: "12px", + color: "var(--grayscale-11, #6b7280)", + marginTop: "4px", + }; + + const toggleContainerStyle: React.CSSProperties = { + display: "flex", + gap: "4px", + padding: "4px", + backgroundColor: "var(--grayscale-a3, #e5e7eb)", + borderRadius: "6px", + width: "fit-content", + }; + + const toggleButtonStyle = (active: boolean): React.CSSProperties => ({ + padding: "6px 16px", + border: "none", + borderRadius: "4px", + fontSize: "13px", + fontWeight: 500, + cursor: "pointer", + backgroundColor: active ? "var(--grayscale-1, #ffffff)" : "transparent", + color: active ? "var(--grayscale-12, #111827)" : "var(--grayscale-11, #6b7280)", + boxShadow: active ? "0 1px 2px rgba(0,0,0,0.08)" : "none", + transition: "all 0.15s ease", + }); + + const buttonBaseStyle: React.CSSProperties = { + display: "inline-flex", + alignItems: "center", + gap: "8px", + padding: "10px 20px", + border: "none", + borderRadius: "6px", + fontSize: "14px", + fontWeight: 500, + cursor: "pointer", + color: "#ffffff", + }; + + const helpTextStyle: React.CSSProperties = { + marginTop: "12px", + fontSize: "13px", + color: "var(--grayscale-11, #6b7280)", + }; + + return ( +
+
+
+ +