From 931933d167e221cadee22eed63195deeed5b027c Mon Sep 17 00:00:00 2001 From: Jordan Cullen Date: Wed, 1 Apr 2026 16:14:26 -0700 Subject: [PATCH 1/2] fix(live): resolve protocol 1007 collisions during audio streaming --- .../adk/models/gemini_llm_connection.py | 42 ++++++++++------ src/google/adk/models/google_llm.py | 1 + .../models/test_gemini_llm_connection.py | 49 +++++++++++++++++-- tests/unittests/models/test_google_llm.py | 1 + 4 files changed, 74 insertions(+), 19 deletions(-) diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index 4cf28acdfc..87adaf1080 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -44,6 +44,7 @@ def __init__( gemini_session: live.AsyncSession, api_backend: GoogleLLMVariant = GoogleLLMVariant.VERTEX_AI, model_version: str | None = None, + live_config: types.LiveConnectConfig | None = None, ): self._gemini_session = gemini_session self._input_transcription_text: str = '' @@ -51,6 +52,10 @@ def __init__( self._api_backend = api_backend self._model_version = model_version + self._audio_active = False + if live_config and getattr(live_config, 'response_modalities', None): + self._audio_active = 'AUDIO' in live_config.response_modalities + async def send_history(self, history: list[types.Content]): """Sends the conversation history to the gemini model. @@ -111,25 +116,27 @@ async def send_content(self, content: types.Content): ), ) else: - logger.debug('Sending LLM new content %s', content) is_gemini_31 = model_name_utils.is_gemini_3_1_flash_live( self._model_version ) is_gemini_api = self._api_backend == GoogleLLMVariant.GEMINI_API - # As of now, Gemini 3.1 Flash Live is only available in Gemini API, not - # Vertex AI. - if ( - is_gemini_31 - and is_gemini_api - and len(content.parts) == 1 - and content.parts[0].text - ): - logger.debug('Using send_realtime_input for Gemini 3.1 text input') - await self._gemini_session.send_realtime_input( - text=content.parts[0].text + # Route via send_realtime_input if audio is active OR if targeting 3.1 API + if self._audio_active or (is_gemini_31 and is_gemini_api): + logger.debug( + 'Routing text via send_realtime_input %s', + content, ) + has_text = False + for part in content.parts: + if isinstance(part.text, str): + await self._gemini_session.send_realtime_input(text=part.text) + has_text = True + + if not has_text: + logger.warning('Encountered unsupported content in send_content') else: + logger.debug('Sending LLM new content %s', content) await self._gemini_session.send( input=types.LiveClientContent( turns=[content], @@ -154,9 +161,9 @@ async def send_realtime(self, input: RealtimeInput): # As of now, Gemini 3.1 Flash Live is only available in Gemini API, not # Vertex AI. if is_gemini_31 and is_gemini_api: - if input.mime_type and input.mime_type.startswith('audio/'): + if isinstance(input.mime_type, str) and input.mime_type.startswith('audio/'): await self._gemini_session.send_realtime_input(audio=input) - elif input.mime_type and input.mime_type.startswith('image/'): + elif isinstance(input.mime_type, str) and input.mime_type.startswith('image/'): await self._gemini_session.send_realtime_input(video=input) else: logger.warning( @@ -165,7 +172,12 @@ async def send_realtime(self, input: RealtimeInput): input.mime_type, ) else: - await self._gemini_session.send_realtime_input(media=input) + if isinstance(input.mime_type, str) and input.mime_type.startswith('video/'): + await self._gemini_session.send_realtime_input(video=input) + elif isinstance(input.mime_type, str) and input.mime_type.startswith('audio/'): + await self._gemini_session.send_realtime_input(audio=input) + else: + await self._gemini_session.send_realtime_input(media=input) elif isinstance(input, types.ActivityStart): logger.debug('Sending LLM activity start signal.') diff --git a/src/google/adk/models/google_llm.py b/src/google/adk/models/google_llm.py index 609de3d3d3..3eaec180b2 100644 --- a/src/google/adk/models/google_llm.py +++ b/src/google/adk/models/google_llm.py @@ -410,6 +410,7 @@ async def connect(self, llm_request: LlmRequest) -> BaseLlmConnection: live_session, api_backend=self._api_backend, model_version=llm_request.model, + live_config=llm_request.live_connect_config, ) async def _adapt_computer_use_tool(self, llm_request: LlmRequest) -> None: diff --git a/tests/unittests/models/test_gemini_llm_connection.py b/tests/unittests/models/test_gemini_llm_connection.py index 7b580c6fc0..dfded0e4bd 100644 --- a/tests/unittests/models/test_gemini_llm_connection.py +++ b/tests/unittests/models/test_gemini_llm_connection.py @@ -54,16 +54,37 @@ def test_blob(): return types.Blob(data=b'\x00\xFF\x00\xFF', mime_type='audio/pcm') +@pytest.fixture +def test_fallback_blob(): + """Test blob for unknown media data.""" + return types.Blob(data=b'\x01\x02', mime_type='application/pdf') + + @pytest.mark.asyncio -async def test_send_realtime_default_behavior( +async def test_send_realtime_audio_routing( gemini_connection, mock_gemini_session, test_blob ): - """Test send_realtime with default automatic_activity_detection value (True).""" + """Test send_realtime explicitly routing audio mimetypes to the audio parameter.""" await gemini_connection.send_realtime(test_blob) # Should call send once mock_gemini_session.send_realtime_input.assert_called_once_with( - media=test_blob + audio=test_blob + ) + # Should not call .send function + mock_gemini_session.send.assert_not_called() + + +@pytest.mark.asyncio +async def test_send_realtime_media_fallback_routing( + gemini_connection, mock_gemini_session, test_fallback_blob +): + """Test send_realtime falling back to media for non-audio/video mimetypes.""" + await gemini_connection.send_realtime(test_fallback_blob) + + # Should call send once + mock_gemini_session.send_realtime_input.assert_called_once_with( + media=test_fallback_blob ) # Should not call .send function mock_gemini_session.send.assert_not_called() @@ -90,7 +111,12 @@ async def test_send_history(gemini_connection, mock_gemini_session): @pytest.mark.asyncio async def test_send_content_text(gemini_connection, mock_gemini_session): - """Test send_content with text content.""" + """Test send_content with text content when audio is inactive. + + Note: gemini_connection._audio_active is False by default. + """ + assert gemini_connection._audio_active is False + content = types.Content( role='user', parts=[types.Part.from_text(text='Hello')] ) @@ -104,6 +130,21 @@ async def test_send_content_text(gemini_connection, mock_gemini_session): assert call_args['input'].turn_complete is True +@pytest.mark.asyncio +async def test_send_content_text_audio_active(gemini_connection, mock_gemini_session): + """Test send_content routes to send_realtime_input when audio is active.""" + gemini_connection._audio_active = True + + content = types.Content( + role='user', parts=[types.Part.from_text(text='Hello')] + ) + + await gemini_connection.send_content(content) + + mock_gemini_session.send_realtime_input.assert_called_once_with(text='Hello') + mock_gemini_session.send.assert_not_called() + + @pytest.mark.asyncio async def test_send_content_function_response( gemini_connection, mock_gemini_session diff --git a/tests/unittests/models/test_google_llm.py b/tests/unittests/models/test_google_llm.py index 70aa01b69d..b877b9a6fa 100644 --- a/tests/unittests/models/test_google_llm.py +++ b/tests/unittests/models/test_google_llm.py @@ -726,6 +726,7 @@ async def __aexit__(self, *args): mock_live_session, api_backend=gemini_llm._api_backend, model_version=llm_request.model, + live_config=llm_request.live_connect_config, ) From ecc31c03aa6e775ad1e3fa1f4e4af8656fc6b29c Mon Sep 17 00:00:00 2001 From: Jordan Cullen Date: Tue, 14 Apr 2026 03:41:17 -0700 Subject: [PATCH 2/2] run autoformatter per pr comment --- .../adk/models/gemini_llm_connection.py | 28 ++++++++------- .../models/test_gemini_llm_connection.py | 35 ++++++++++++++++--- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index 3fae36b1cd..e5af62dee4 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -122,19 +122,15 @@ async def send_content(self, content: types.Content): is_gemini_api = self._api_backend == GoogleLLMVariant.GEMINI_API # Route via send_realtime_input if audio is active OR if targeting 3.1 API - if self._audio_active or (is_gemini_31 and is_gemini_api): + if (self._audio_active or (is_gemini_31 and is_gemini_api)) and all( + isinstance(part.text, str) for part in content.parts + ): logger.debug( 'Routing text via send_realtime_input %s', content, ) - has_text = False for part in content.parts: - if isinstance(part.text, str): - await self._gemini_session.send_realtime_input(text=part.text) - has_text = True - - if not has_text: - logger.warning('Encountered unsupported content in send_content') + await self._gemini_session.send_realtime_input(text=part.text) else: logger.debug('Sending LLM new content %s', content) await self._gemini_session.send( @@ -161,9 +157,13 @@ async def send_realtime(self, input: RealtimeInput): # As of now, Gemini 3.1 Flash Live is only available in Gemini API, not # Vertex AI. if is_gemini_31 and is_gemini_api: - if isinstance(input.mime_type, str) and input.mime_type.startswith('audio/'): + if isinstance(input.mime_type, str) and input.mime_type.startswith( + 'audio/' + ): await self._gemini_session.send_realtime_input(audio=input) - elif isinstance(input.mime_type, str) and input.mime_type.startswith('image/'): + elif isinstance(input.mime_type, str) and input.mime_type.startswith( + 'image/' + ): await self._gemini_session.send_realtime_input(video=input) else: logger.warning( @@ -172,9 +172,13 @@ async def send_realtime(self, input: RealtimeInput): input.mime_type, ) else: - if isinstance(input.mime_type, str) and input.mime_type.startswith('video/'): + if isinstance(input.mime_type, str) and input.mime_type.startswith( + 'video/' + ): await self._gemini_session.send_realtime_input(video=input) - elif isinstance(input.mime_type, str) and input.mime_type.startswith('audio/'): + elif isinstance(input.mime_type, str) and input.mime_type.startswith( + 'audio/' + ): await self._gemini_session.send_realtime_input(audio=input) else: await self._gemini_session.send_realtime_input(media=input) diff --git a/tests/unittests/models/test_gemini_llm_connection.py b/tests/unittests/models/test_gemini_llm_connection.py index 8bf99a1d8c..b66b1ee2fa 100644 --- a/tests/unittests/models/test_gemini_llm_connection.py +++ b/tests/unittests/models/test_gemini_llm_connection.py @@ -112,11 +112,11 @@ async def test_send_history(gemini_connection, mock_gemini_session): @pytest.mark.asyncio async def test_send_content_text(gemini_connection, mock_gemini_session): """Test send_content with text content when audio is inactive. - + Note: gemini_connection._audio_active is False by default. """ assert gemini_connection._audio_active is False - + content = types.Content( role='user', parts=[types.Part.from_text(text='Hello')] ) @@ -131,10 +131,12 @@ async def test_send_content_text(gemini_connection, mock_gemini_session): @pytest.mark.asyncio -async def test_send_content_text_audio_active(gemini_connection, mock_gemini_session): +async def test_send_content_text_audio_active( + gemini_connection, mock_gemini_session +): """Test send_content routes to send_realtime_input when audio is active.""" gemini_connection._audio_active = True - + content = types.Content( role='user', parts=[types.Part.from_text(text='Hello')] ) @@ -145,6 +147,31 @@ async def test_send_content_text_audio_active(gemini_connection, mock_gemini_ses mock_gemini_session.send.assert_not_called() +@pytest.mark.asyncio +async def test_send_content_mixed_audio_active( + gemini_connection, mock_gemini_session, test_blob +): + """Test send_content falls back to LiveClientContent for mixed modalities.""" + gemini_connection._audio_active = True + + content = types.Content( + role='user', + parts=[ + types.Part.from_text(text='Hello'), + types.Part(inline_data=test_blob) + ] + ) + + await gemini_connection.send_content(content) + + mock_gemini_session.send.assert_called_once() + call_args = mock_gemini_session.send.call_args[1] + assert 'input' in call_args + assert call_args['input'].turns == [content] + assert call_args['input'].turn_complete is True + mock_gemini_session.send_realtime_input.assert_not_called() + + @pytest.mark.asyncio async def test_send_content_function_response( gemini_connection, mock_gemini_session