Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,11 @@ def _batch_processor_prop(self) -> Optional["BatchProcessor"]:
try:
loop = asyncio.get_running_loop()
if loop in self._loop_state_by_loop:
return self._loop_state_by_loop[loop].batch_processor
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
return None
return state.batch_processor
Comment on lines +1652 to +1656
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the changes in _get_loop_state and flush, it would be better to also log a warning and clean up the stale state from _loop_state_by_loop here. Currently, a stale state is detected but not removed, which means subsequent calls to this property will repeatedly find the stale state until it's cleaned up by one of the other methods.

Suggested change
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
return None
return state.batch_processor
state = self._loop_state_by_loop[loop]
# Validate loop identity to prevent reuse of closed loops
if state.batch_processor._queue._loop is not loop:
logger.warning(
"Detected stale loop state for loop %s (id=%s) in property, clearing.",
loop,
id(loop),
)
del self._loop_state_by_loop[loop]
return None
return state.batch_processor

except RuntimeError:
pass
return None
Expand Down Expand Up @@ -1701,7 +1705,23 @@ async def _get_loop_state(self) -> _LoopState:
"""
loop = asyncio.get_running_loop()
if loop in self._loop_state_by_loop:
return self._loop_state_by_loop[loop]
state = self._loop_state_by_loop[loop]
# Robustness Check: Ensure the stored state actually belongs to THIS loop instance.
# Python's asyncio loops in different threads/contexts might reuse memory addresses
# if one is closed and another created, confusing the dict lookup.
# asyncio.Queue bound to a closed loop will raise RuntimeError if used here.
if state.batch_processor._queue._loop is not loop:
logger.warning(
"Detected stale loop state for loop %s (id=%s). Clearing and"
" recreating.",
loop,
id(loop),
)
# Clean up old clients if possible (though loop likely closed)
# We don't await here to avoid blocking or errors, just drop the reference.
del self._loop_state_by_loop[loop]
else:
return state

# We DO NOT use the global client approach for multi-loop safety simpler
# or we must ensure _GLOBAL_WRITE_CLIENT usage is safe.
Expand Down Expand Up @@ -1772,7 +1792,16 @@ async def flush(self) -> None:
try:
loop = asyncio.get_running_loop()
if loop in self._loop_state_by_loop:
await self._loop_state_by_loop[loop].batch_processor.flush()
state = self._loop_state_by_loop[loop]
# Validate loop matched before flushing
if state.batch_processor._queue._loop is loop:
await state.batch_processor.flush()
else:
logger.warning(
"Skipping flush for loop %s due to loop mismatch (stale state).",
loop,
)
del self._loop_state_by_loop[loop]
except RuntimeError:
# No running loop or other issue
pass
Expand Down
Loading