From ccb251162330136bc2cee5115981a032f7310d7a Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 11 Feb 2026 15:56:04 -0800 Subject: [PATCH] Enhance loop state validation in BigQuery plugin Added robustness checks to ensure loop identity before accessing batch processor and flushing. --- .../bigquery_agent_analytics_plugin.py | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 7cbf931ca9..8d438a926e 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -1649,7 +1649,11 @@ def _batch_processor_prop(self) -> Optional["BatchProcessor"]: try: loop = asyncio.get_running_loop() if loop in self._loop_state_by_loop: - return self._loop_state_by_loop[loop].batch_processor + state = self._loop_state_by_loop[loop] + # Validate loop identity to prevent reuse of closed loops + if state.batch_processor._queue._loop is not loop: + return None + return state.batch_processor except RuntimeError: pass return None @@ -1701,7 +1705,23 @@ async def _get_loop_state(self) -> _LoopState: """ loop = asyncio.get_running_loop() if loop in self._loop_state_by_loop: - return self._loop_state_by_loop[loop] + state = self._loop_state_by_loop[loop] + # Robustness Check: Ensure the stored state actually belongs to THIS loop instance. + # Python's asyncio loops in different threads/contexts might reuse memory addresses + # if one is closed and another created, confusing the dict lookup. + # asyncio.Queue bound to a closed loop will raise RuntimeError if used here. + if state.batch_processor._queue._loop is not loop: + logger.warning( + "Detected stale loop state for loop %s (id=%s). Clearing and" + " recreating.", + loop, + id(loop), + ) + # Clean up old clients if possible (though loop likely closed) + # We don't await here to avoid blocking or errors, just drop the reference. + del self._loop_state_by_loop[loop] + else: + return state # We DO NOT use the global client approach for multi-loop safety simpler # or we must ensure _GLOBAL_WRITE_CLIENT usage is safe. @@ -1772,7 +1792,16 @@ async def flush(self) -> None: try: loop = asyncio.get_running_loop() if loop in self._loop_state_by_loop: - await self._loop_state_by_loop[loop].batch_processor.flush() + state = self._loop_state_by_loop[loop] + # Validate loop matched before flushing + if state.batch_processor._queue._loop is loop: + await state.batch_processor.flush() + else: + logger.warning( + "Skipping flush for loop %s due to loop mismatch (stale state).", + loop, + ) + del self._loop_state_by_loop[loop] except RuntimeError: # No running loop or other issue pass