From 073da993dfe0afcd5fd427095e2bd305813d14b2 Mon Sep 17 00:00:00 2001 From: surfai <166616086+surfai@users.noreply.github.com> Date: Wed, 15 Apr 2026 01:44:10 +0800 Subject: [PATCH] fix(runners): dispatch run_after_run_callback in _run_node_async (#5282) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Runner._run_node_async had a TODO at runners.py:427 for node runtime plugin lifecycle, and never dispatched plugin_manager.run_after_run_callback. Because both Workflow(BaseNode) and (post a3 refactor) LlmAgent roots funnel through _run_node_async, any BasePlugin subclass that overrides after_run_callback silently no-op'd — breaking the canonical pattern for memory persistence, metrics emission, and post-run audit hooks. This wires one dispatch call after _consume_event_queue drains, mirroring the legacy path in _exec_with_plugin at runners.py:1230 exactly: outside the finally block, so semantics match legacy (fires on natural drain only, skipped on error in the loop, skipped on caller-break via GeneratorExit). Also: - Narrows the TODO at runners.py:427 from "tracing and plugin lifecycle" to "tracing" since plugin lifecycle is now wired. - Incidental pyink reformat at runners.py:1451: pre-existing 82-char line on v2 HEAD, unrelated to #5282 but required by the pyink CI gate. Companion to test-only PR #5301 (which established the regression anchor). This PR ships the fix plus flips the anchor by replacing the WorkaroundRunner scaffolding with a direct positive assertion. Alternatives considered and rejected: - Wrap in finally to fire on error/break — changes legacy contract. - Lift dispatch to run_async call sites — requires plumbing ic out of _run_node_async, bigger blast radius. - Extract a shared _with_plugin_lifecycle context manager — right long-term shape, but refactors the legacy path too and expands scope. Fixes #5282 --- src/google/adk/runners.py | 9 +- tests/unittests/runners/test_issue_5282.py | 161 +++++++++++++++++++++ 2 files changed, 168 insertions(+), 2 deletions(-) create mode 100644 tests/unittests/runners/test_issue_5282.py diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index 25722f73db..4eba0a70a9 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -424,7 +424,7 @@ async def _run_node_async( Events flow through ic.event_queue via NodeRunner. - TODO: Add tracing and plugin lifecycle for the node runtime path. + TODO: Add tracing for the node runtime path. """ from .workflow._node_runner_class import NodeRunner @@ -505,6 +505,9 @@ async def _drive_root_node(): try: async for event in self._consume_event_queue(ic, done_sentinel): yield event + # 5. Run the after_run callbacks. Mirrors _exec_with_plugin:1230. + # This does NOT emit any event. + await ic.plugin_manager.run_after_run_callback(invocation_context=ic) finally: await self._cleanup_root_task(task, self.agent.name) @@ -1448,7 +1451,9 @@ def _find_agent_to_run( # type of the agent. e.g. a remote a2a agent may surface a credential # request as a special long-running function tool call. event = find_matching_function_call(session.events) - is_resumable = self.resumability_config and self.resumability_config.is_resumable + is_resumable = ( + self.resumability_config and self.resumability_config.is_resumable + ) # Only route based on a past function response if resumability is enabled. # In non-resumable scenarios, a turn ending with function call response # shouldn't trap the next turn on that same agent if it's not transferable. diff --git a/tests/unittests/runners/test_issue_5282.py b/tests/unittests/runners/test_issue_5282.py new file mode 100644 index 0000000000..5245484e92 --- /dev/null +++ b/tests/unittests/runners/test_issue_5282.py @@ -0,0 +1,161 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Regression test for google/adk-python#5282. + +Before the fix, Runner._run_node_async never dispatched +plugin_manager.run_after_run_callback on the BaseNode path (runners.py:427 +TODO), so Plugin.after_run_callback silently no-op'd for Workflow roots. + +These tests pin the post-fix behavior: both the pre-run/event hooks AND +after_run_callback must fire on a Workflow root. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from google.adk import Event +from google.adk import Workflow +from google.adk.agents.invocation_context import InvocationContext +from google.adk.apps.app import App +from google.adk.memory.in_memory_memory_service import InMemoryMemoryService +from google.adk.plugins.base_plugin import BasePlugin +from google.adk.runners import Runner +from google.adk.sessions.in_memory_session_service import InMemorySessionService +from google.genai import types +import pytest + + +@dataclass +class CallbackCounts: + on_user_message_callback: int = 0 + before_run_callback: int = 0 + on_event_callback: int = 0 + after_run_callback: int = 0 + + +class TracerPlugin(BasePlugin): + """Counts every Plugin lifecycle callback the Runner actually dispatches.""" + + def __init__(self) -> None: + super().__init__(name='tracer') + self.counts = CallbackCounts() + + async def on_user_message_callback( + self, + *, + invocation_context: InvocationContext, + user_message: types.Content, + ) -> Optional[types.Content]: + del invocation_context, user_message + self.counts.on_user_message_callback += 1 + return None + + async def before_run_callback( + self, *, invocation_context: InvocationContext + ) -> Optional[types.Content]: + del invocation_context + self.counts.before_run_callback += 1 + return None + + async def on_event_callback( + self, *, invocation_context: InvocationContext, event: Event + ) -> Optional[Event]: + del invocation_context, event + self.counts.on_event_callback += 1 + return None + + async def after_run_callback( + self, *, invocation_context: InvocationContext + ) -> None: + del invocation_context + self.counts.after_run_callback += 1 + return None + + +async def _terminal_node(ctx) -> Event: + """A single terminal node that yields a content-bearing Event. + + Using content (not just state) ensures _consume_event_queue actually runs + the on_event_callback path -- state-only events still flow through the + queue, but content is the canonical case the plugin hook was designed for. + """ + del ctx + return Event( + content=types.Content( + parts=[types.Part(text='done')], + role='model', + ) + ) + + +def _build_runner(plugin: TracerPlugin) -> Runner: + workflow = Workflow( + name='Issue5282Repro', + edges=[('START', _terminal_node)], + ) + app = App(name='issue_5282_repro', root_agent=workflow, plugins=[plugin]) + return Runner( + app_name='issue_5282_repro', + app=app, + session_service=InMemorySessionService(), + memory_service=InMemoryMemoryService(), + ) + + +async def _drive_one_invocation(runner: Runner) -> None: + session = await runner.session_service.create_session( + app_name='issue_5282_repro', user_id='u1' + ) + async for _ in runner.run_async( + user_id='u1', + session_id=session.id, + new_message=types.Content(parts=[types.Part(text='hi')], role='user'), + ): + pass + + +@pytest.mark.asyncio +async def test_workflow_root_dispatches_pre_run_and_event_hooks(): + """Baseline: pre-run and per-event hooks fire on a Workflow root.""" + plugin = TracerPlugin() + runner = _build_runner(plugin) + + await _drive_one_invocation(runner) + + assert plugin.counts.on_user_message_callback == 1 + assert plugin.counts.before_run_callback == 1 + assert plugin.counts.on_event_callback >= 1, ( + 'on_event_callback should fire at least once via _consume_event_queue ' + 'for the content-bearing terminal event' + ) + + +@pytest.mark.asyncio +async def test_workflow_root_dispatches_after_run_callback(): + """Regression anchor for #5282: after_run_callback fires on Workflow roots. + + Pre-fix, _run_node_async never dispatched run_after_run_callback on the + BaseNode path, so this would have asserted 0 and been xfail-anchored. + Post-fix, the dispatch mirrors _exec_with_plugin's legacy path and fires + exactly once per successful invocation. + """ + plugin = TracerPlugin() + runner = _build_runner(plugin) + + await _drive_one_invocation(runner) + + assert plugin.counts.after_run_callback == 1