From 94fca72f4e714dfdebbcc7071a4759ab0a0121c1 Mon Sep 17 00:00:00 2001 From: jayy-77 <1427jay@gmail.com> Date: Wed, 11 Feb 2026 03:45:18 +0530 Subject: [PATCH] Implement per-session compaction control and plugin hooks --- src/google/adk/apps/app.py | 25 +++++++++++++++++++++ src/google/adk/apps/compaction.py | 31 +++++++++++++++++++++++++++ src/google/adk/plugins/base_plugin.py | 20 +++++++++++++++++ src/google/adk/runners.py | 18 +++++++++++++++- src/google/adk/sessions/session.py | 25 +++++++++++++++++++++ 5 files changed, 118 insertions(+), 1 deletion(-) diff --git a/src/google/adk/apps/app.py b/src/google/adk/apps/app.py index 71ea5ce5aa..001591b6ba 100644 --- a/src/google/adk/apps/app.py +++ b/src/google/adk/apps/app.py @@ -13,7 +13,9 @@ # limitations under the License. from __future__ import annotations +from typing import Callable from typing import Optional +from typing import TYPE_CHECKING from pydantic import BaseModel from pydantic import ConfigDict @@ -26,6 +28,9 @@ from ..plugins.base_plugin import BasePlugin from ..utils.feature_decorator import experimental +if TYPE_CHECKING: + from ..sessions.session import Session + def validate_app_name(name: str) -> None: """Ensures the provided application name is safe and intuitive.""" @@ -80,6 +85,26 @@ class EventsCompactionConfig(BaseModel): end of the last compacted range. This creates an overlap between consecutive compacted summaries, maintaining context.""" + compaction_predicate: Optional[Callable[[Session], bool]] = None + """Optional callable to dynamically determine if compaction should run for a + specific session. + + This enables per-session compaction control based on user metadata, session + state, or other runtime conditions. If provided, this predicate is evaluated + before each compaction. Return True to allow compaction, False to skip. + + Example: + def should_compact(session: Session) -> bool: + # Only compact for non-premium users + return session.state.get('user_tier') != 'premium' + + config = EventsCompactionConfig( + compaction_interval=5, + overlap_size=1, + compaction_predicate=should_compact + ) + """ + class App(BaseModel): """Represents an LLM-backed agentic application. diff --git a/src/google/adk/apps/compaction.py b/src/google/adk/apps/compaction.py index 5f3edd92e7..e27a042132 100644 --- a/src/google/adk/apps/compaction.py +++ b/src/google/adk/apps/compaction.py @@ -24,6 +24,37 @@ logger = logging.getLogger('google_adk.' + __name__) +def should_compact_session(app: App, session: Session) -> bool: + """Determines if event compaction should run for a given session. + + This function evaluates per-session overrides and app-level predicates + to decide if compaction should proceed. + + Args: + app: The application instance with compaction configuration. + session: The session to evaluate. + + Returns: + True if compaction should run, False otherwise. + """ + if not app.events_compaction_config: + return False + + config = app.events_compaction_config + + # Check session-level override first (highest priority) + compaction_enabled = session.is_compaction_enabled() + if compaction_enabled is not None: + return compaction_enabled + + # Check app-level predicate + if config.compaction_predicate: + return config.compaction_predicate(session) + + # Default: compaction is enabled + return True + + async def _run_compaction_for_sliding_window( app: App, session: Session, session_service: BaseSessionService ): diff --git a/src/google/adk/plugins/base_plugin.py b/src/google/adk/plugins/base_plugin.py index 3639f61aa2..003565ffba 100644 --- a/src/google/adk/plugins/base_plugin.py +++ b/src/google/adk/plugins/base_plugin.py @@ -31,6 +31,7 @@ if TYPE_CHECKING: from ..agents.invocation_context import InvocationContext + from ..sessions.session import Session from ..tools.tool_context import ToolContext @@ -152,6 +153,25 @@ async def before_run_callback( """ pass + async def before_session_start(self, *, session: Session) -> None: + """Callback executed when a session is loaded/created before invocation. + + This callback allows plugins to inspect and modify session settings + dynamically based on session state or metadata. It's ideal for per-session + configuration like enabling/disabling compaction for specific users. + + Args: + session: The session being prepared for the invocation. + + Example: + >>> class PremiumUserPlugin(BasePlugin): + .. async def before_session_start(self, *, session: Session) -> None: + .. # Disable compaction for premium users + .. if session.state.get('user_tier') == 'premium': + .. session.set_compaction_enabled(False) + """ + pass + async def on_event_callback( self, *, invocation_context: InvocationContext, event: Event ) -> Optional[Event]: diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index 545a0e83e6..90e3dc09f0 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -29,6 +29,7 @@ import warnings from google.adk.apps.compaction import _run_compaction_for_sliding_window +from google.adk.apps.compaction import should_compact_session from google.adk.artifacts import artifact_util from google.genai import types @@ -497,6 +498,11 @@ async def _run_with_trace( session = await self._get_or_create_session( user_id=user_id, session_id=session_id ) + + # Call before_session_start plugin hooks + for plugin in self.plugins: + await plugin.before_session_start(session=session) + if not invocation_id and not new_message: raise ValueError( 'Running an agent requires either a new_message or an ' @@ -552,7 +558,7 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]: # Run compaction after all events are yielded from the agent. # (We don't compact in the middle of an invocation, we only compact at # the end of an invocation.) - if self.app and self.app.events_compaction_config: + if should_compact_session(self.app, session): logger.debug('Running event compactor.') await _run_compaction_for_sliding_window( self.app, session, self.session_service @@ -573,6 +579,11 @@ async def rewind_async( session = await self._get_or_create_session( user_id=user_id, session_id=session_id ) + + # Call before_session_start plugin hooks + for plugin in self.plugins: + await plugin.before_session_start(session=session) + rewind_event_index = -1 for i, event in enumerate(session.events): if event.invocation_id == rewind_before_invocation_id: @@ -1004,6 +1015,11 @@ async def run_live( session = await self._get_or_create_session( user_id=user_id, session_id=session_id ) + + # Call before_session_start plugin hooks + for plugin in self.plugins: + await plugin.before_session_start(session=session) + invocation_context = self._new_invocation_context_for_live( session, live_request_queue=live_request_queue, diff --git a/src/google/adk/sessions/session.py b/src/google/adk/sessions/session.py index 89af1d4167..bb2c362465 100644 --- a/src/google/adk/sessions/session.py +++ b/src/google/adk/sessions/session.py @@ -48,3 +48,28 @@ class Session(BaseModel): call/response, etc.""" last_update_time: float = 0.0 """The last update time of the session.""" + + def set_compaction_enabled(self, enabled: bool) -> None: + """Controls whether event compaction is enabled for this session. + + This setting overrides the app-level compaction configuration, allowing + per-session control of context compression. + + Args: + enabled: True to enable compaction, False to disable it. + + Example: + session.set_compaction_enabled(False) # Disable for premium users + """ + self.state['_adk_disable_compaction'] = not enabled + + def is_compaction_enabled(self) -> Optional[bool]: + """Returns whether compaction is explicitly enabled/disabled for this session. + + Returns: + True if explicitly enabled, False if explicitly disabled, None if not set + (will use app-level default). + """ + if '_adk_disable_compaction' in self.state: + return not self.state['_adk_disable_compaction'] + return None