Add BaseAIHook and Update usages#67438
Open
gopidesupavan wants to merge 2 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a new BaseAIHook contract in the common-ai provider to make multi-turn agent execution backend-neutral. Operators/decorators now construct an AgentRunRequest, resolve the runtime hook from the connection conn_type, and delegate agent lifecycle/tool resolution/durable execution to the hook implementation (starting with PydanticAIHook).
Changes:
- Add
BaseAIHook+ shared request/response/tool abstractions (AgentRunRequest,AgentRunResult,ToolSpec,BaseToolset) and shared helper logic (tool resolution, logging, caching). - Refactor
AgentOperator, LLM operators/decorators, andLLMRetryPolicyto useget_agent_hook()and the sharedcreate_agent(request)/run_agent(agent, request)flow. - Migrate
SQLToolsetto the framework-agnosticBaseToolsetinterface and update logging utilities, docs, examples, and tests accordingly.
Reviewed changes
Copilot reviewed 34 out of 35 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Lockfile update (adds additional jpype1 wheel entries). |
| providers/common/ai/tests/unit/common/ai/utils/test_logging.py | Update logging tests to validate AgentRunResult-based summaries. |
| providers/common/ai/tests/unit/common/ai/toolsets/test_sql.py | Rewrite SQLToolset tests for BaseToolset.as_tools() + direct tool callables. |
| providers/common/ai/tests/unit/common/ai/policies/test_retry.py | Update retry policy tests to mock BaseAIHook.get_agent_hook() + request forwarding. |
| providers/common/ai/tests/unit/common/ai/operators/test_llm.py | Update LLMOperator tests to assert AgentRunRequest construction and run_agent usage. |
| providers/common/ai/tests/unit/common/ai/operators/test_llm_sql.py | Update LLMSQL operator tests for request-based hook invocation. |
| providers/common/ai/tests/unit/common/ai/operators/test_llm_schema_compare.py | Update schema compare operator tests to validate request contents + run_agent call. |
| providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py | Update file analysis operator tests for BaseAIHook + request flow. |
| providers/common/ai/tests/unit/common/ai/operators/test_llm_branch.py | Update branch operator tests to use BaseAIHook and request-based execution. |
| providers/common/ai/tests/unit/common/ai/operators/test_agent.py | Update AgentOperator tests for capability validation + request/durable context forwarding. |
| providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py | Expand PydanticAIHook tests for BaseAIHook contract, tool routing, and durable behavior. |
| providers/common/ai/tests/unit/common/ai/hooks/test_base_ai.py | Add new unit tests covering BaseAIHook dataclasses and tool/log/cache helpers. |
| providers/common/ai/tests/unit/common/ai/decorators/test_llm.py | Update @task.llm tests to mock BaseAIHook.get_agent_hook() and validate request prompt. |
| providers/common/ai/tests/unit/common/ai/decorators/test_llm_sql.py | Update @task.llm_sql tests for request-based hook execution. |
| providers/common/ai/tests/unit/common/ai/decorators/test_llm_schema_compare.py | Update schema compare decorator tests for BaseAIHook.get_agent_hook() flow. |
| providers/common/ai/tests/unit/common/ai/decorators/test_llm_file_analysis.py | Update file analysis decorator tests for request-based execution. |
| providers/common/ai/tests/unit/common/ai/decorators/test_llm_branch.py | Update branch decorator tests to mock BaseAIHook.get_agent_hook() and validate behavior. |
| providers/common/ai/tests/unit/common/ai/decorators/test_agent.py | Update @task.agent tests for request forwarding and toolset passthrough. |
| providers/common/ai/src/airflow/providers/common/ai/utils/logging.py | Make logging backend-neutral by consuming AgentRunResult directly. |
| providers/common/ai/src/airflow/providers/common/ai/toolsets/sql.py | Convert SQLToolset from pydantic-ai AbstractToolset to framework-neutral BaseToolset. |
| providers/common/ai/src/airflow/providers/common/ai/policies/retry.py | Migrate LLMRetryPolicy to use BaseAIHook + AgentRunRequest and run_agent. |
| providers/common/ai/src/airflow/providers/common/ai/operators/llm.py | Refactor LLMOperator to build AgentRunRequest and call hook create_agent/run_agent. |
| providers/common/ai/src/airflow/providers/common/ai/operators/llm_sql.py | Refactor LLMSQL operator to use AgentRunRequest and hook execution. |
| providers/common/ai/src/airflow/providers/common/ai/operators/llm_schema_compare.py | Refactor schema compare operator to request-based hook execution. |
| providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py | Refactor file analysis operator to request-based hook execution. |
| providers/common/ai/src/airflow/providers/common/ai/operators/llm_branch.py | Refactor branch operator to request-based hook execution. |
| providers/common/ai/src/airflow/providers/common/ai/operators/agent.py | Thin AgentOperator: capability validation + request building + hook-driven execution/durable/tool logging. |
| providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py | Implement BaseAIHook contract for pydantic-ai, including tool routing and durable execution support. |
| providers/common/ai/src/airflow/providers/common/ai/hooks/base_ai.py | Add new BaseAIHook contract, request/result dataclasses, tool abstraction, and shared helpers. |
| providers/common/ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py | Update example DAG to use BaseAIHook.get_agent_hook() + AgentRunRequest. |
| providers/common/ai/docs/toolsets.rst | Update toolset docs to describe mixed toolset routing and BaseToolset usage. |
| providers/common/ai/docs/operators/agent.rst | Update AgentOperator docs to explain backend selection via connection conn_type and new toolset shapes. |
| providers/common/ai/docs/hooks/index.rst | Update hook selection docs to reflect conn_type-driven backend selection for agents. |
| providers/common/ai/docs/changelog.rst | Add changelog entry for BaseAIHook contract introduction. |
| providers/common/ai/AGENTS.md | Update contributor guidance to describe BaseAIHook and backend-neutral agent design. |
Comment on lines
+269
to
+273
| if self._durable_storage is not None and self._durable_counter is not None: | ||
| from airflow.providers.common.ai.durable.caching_model import CachingModel | ||
|
|
||
| resolved_model = infer_model(agent.model) | ||
| caching_model = CachingModel( |
Comment on lines
+287
to
+288
| elif inspect.isfunction(ts): | ||
| specs = [ToolSpec(name=ts.__name__, description=ts.__doc__ or "", parameters={}, fn=ts)] |
Comment on lines
85
to
+88
| the durable execution (step-level caching with retry replay), HITL review | ||
| integration, and automatic tool call logging that ``AgentOperator`` provides. | ||
| integration, and the automatic tool call logging and routing that | ||
| ``AgentOperator`` provides via | ||
| :class:`~airflow.providers.common.ai.toolsets.logging.LoggingToolset`. |
Comment on lines
+327
to
+343
| storage = MagicMock() | ||
| counter = MagicMock() | ||
| counter.next_step.return_value = 1 | ||
| storage.load_tool_result.return_value = (True, "cached_value") | ||
|
|
||
| calls = [] | ||
|
|
||
| def fn(): | ||
| calls.append(1) | ||
| return "computed" | ||
|
|
||
| wrapped = BaseAIHook._cached_callable(fn, storage, counter) | ||
| result = wrapped() | ||
|
|
||
| assert result == "cached_value" | ||
| assert calls == [] | ||
| counter.replayed_tool += 1 |
1aef4ec to
149dbfd
Compare
1 task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add BaseAIHook and Update usage
Summary
Introduce
BaseAIHook, a backend-neutral contract for multi-turn LLM agents in thecommon-aiprovider.AgentOperatorand@task.agentnow resolve the agent runtime from the connectionconn_type(for examplepydanticai,pydanticai-bedrock,pydanticai-azure) and delegate all framework-specific work to the hook.PydanticAIHookis the first implementation. All LLM operators andLLMRetryPolicyare migrated to a sharedAgentRunRequest/run_agentAPI.SQLToolsetis migrated to the new framework-agnosticBaseToolsetinterface.This lays the foundation for additional agent backends without adding parallel operator classes per framework. A follow-up PR will add AWS Strands as the next hook implementation; this contract also opens the door for Google ADK and other agent runtimes behind the same
AgentOperator/@task.agentsurface.Motivation
Before this change:
AgentOperatorcontained pydantic-ai-specific logic (tool wrapping, durable caching, agent construction).PydanticAIHook.create_agent()/run_agent()used ad-hoc keyword arguments.LoggingToolset/CachingToolsetwrappers.The operator should stay framework-agnostic. Hooks should own agent lifecycle, tool resolution, durable execution, and normalized results.
Design
BaseAIHookcontractNew abstract hook with:
get_model()get_conn()get_model()create_agent(request)run_agent(agent, request)AgentRunResult_tool_spec_to_native(spec)ToolSpec→ native tool representationget_agent_hook(conn_id)conn_typeCapability flags:
supports_toolsets,supports_durable,supports_usage_limits.Parameter objects
AgentRunRequest— prompt, output type, instructions, toolsets, usage limits, message history, durable context, agent paramsAgentRunResult— output, message history, model name, usage, tool names, durable statsToolSpec— framework-neutral tool descriptor (name, description, JSON schema, callable)BaseToolset— abstractas_tools() → list[ToolSpec]DurableContext/DurableStats— durable execution identity and cache statisticsShared hook helpers
Moved into
BaseAIHook:_resolve_tools()— convertsBaseToolset, plain callables, and native tool objects_logged_callable()— per-tool real-time logging_cached_callable()— per-tool durable step caching_init_durable()—DurableStorage/DurableStepCountersetupPydanticAIHookimplementationBaseAIHookcontractAbstractToolset(HookToolset,MCPToolset,DataFusionToolset, third-party) →Agent(toolsets=[...])withLoggingToolset/CachingToolsetwrapping when enabledBaseToolset/ callables / nativeTool→ resolved via_resolve_tools→Agent(tools=[...])CachingModelinrun_agentget_model()replaces directget_conn()usage;get_conn()delegates for backward compatibilityAgentOperatorthinningOperator execution is now:
No pydantic-ai imports at runtime (except
UsageLimitsunderTYPE_CHECKING).Early validation via
_validate_hook_capabilities()checks hook support for toolsets, durable, and usage limits.SQLToolset→BaseToolsetSQLToolsetno longer implements pydantic-ai'sAbstractToolset. It implementsBaseToolset.as_tools()returning fourToolSpecobjects with JSON schemas (list_tables,get_schema,query,check_query).HookToolset,MCPToolset, andDataFusionToolsetremainAbstractToolsetand continue to work unchanged through the pydantic-ai routing path.Other changes
All LLM operators migrated
These now use
BaseAIHook.get_agent_hook()andAgentRunRequest:LLMOperatorLLMBranchOperatorLLMSQLOperatorLLMSchemaCompareOperatorLLMFileAnalysisOperatorLLMRetryPolicyLogging utilities
log_run_summary()now acceptsAgentRunResultdirectlywrap_toolsets_for_logging()from the operator path; logging is handled in the hook layerExamples and docs
example_pydantic_ai_hook.pyto useBaseAIHook.get_agent_hook()+AgentRunRequestdocs/operators/agent.rst,docs/toolsets.rst,AGENTS.mdTests
test_base_ai.py— dataclasses,_resolve_tools, logging/caching wrapperstest_pydantic_ai.py— contract, durable init,AbstractToolsetrouting/wrappingBaseAIHookand assertAgentRunRequestforwardingtest_sql.pyforBaseToolset.as_tools()APIBreaking changes
PydanticAIHookAPIBefore:
After:
get_conn()still works (delegates toget_model()).SQLToolsetdirect pydantic-ai usageBefore: pass
SQLToolset(...)directly to pydantic-aiAgent(toolsets=[...]).After: use via
AgentOperator/@task.agent, or build through the hook:SQLToolsetis now aBaseToolset, not anAbstractToolset.Migration guide
Custom code calling
PydanticAIHookdirectlyReplace kwargs-style
create_agent/run_agentwithAgentRunRequest:DAG authors using operators / decorators
No DAG changes required for:
AgentOperator/@task.agentLLMOperator/@task.llmConnection
conn_typecontinues to select the backend.Adding a new agent backend
Subclass
BaseAIHookand implement:get_model()create_agent(request)run_agent(agent, request)_tool_spec_to_native(spec)Register the hook in
provider.yaml. Reuse shared helpers (_resolve_tools,_logged_callable,_cached_callable,_init_durable) where applicable.Known limitations / follow-ups
HookToolset/DataFusionToolsetcould be migrated toBaseToolsetin a follow-up; they work today via theAbstractToolsetpass-through path.Test plan
tests/unit/common/ai/hooks/test_base_ai.pytests/unit/common/ai/hooks/test_pydantic_ai.pytests/unit/common/ai/operators/test_agent.pytests/unit/common/ai/operators/test_llm.pytests/unit/common/ai/operators/test_llm_branch.pytests/unit/common/ai/operators/test_llm_sql.pytests/unit/common/ai/operators/test_llm_schema_compare.pytests/unit/common/ai/operators/test_llm_file_analysis.pytests/unit/common/ai/decorators/test_agent.pytests/unit/common/ai/decorators/test_llm*.pytests/unit/common/ai/policies/test_retry.pytests/unit/common/ai/toolsets/test_sql.pytests/unit/common/ai/utils/test_logging.pyFollow-up: AWS Strands agent hook (
StrandsAIHookimplementingBaseAIHook)Follow-up: Google ADK agent hook (same contract, new
conn_typeregistration)Follow-up: migrate
HookToolset,MCPToolset, andDataFusionToolsetfrom pydantic-aiAbstractToolsettoBaseToolsetWas generative AI tooling used to co-author this PR?
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.