Skip to content

Introduce BaseAIHook to common-ai provider#67373

Draft
gopidesupavan wants to merge 1 commit into
apache:mainfrom
gopidesupavan:introduce-baseaihook
Draft

Introduce BaseAIHook to common-ai provider#67373
gopidesupavan wants to merge 1 commit into
apache:mainfrom
gopidesupavan:introduce-baseaihook

Conversation

@gopidesupavan
Copy link
Copy Markdown
Member

Introduces BaseAIHook, an abstract hook that defines the contract every
agent-framework backend must implement. AgentOperator (and the @task.agent
decorator) now selects the backend at runtime from the Airflow connection's
conn_type — no new operator class is needed when adding another framework.

What changed

New: BaseAIHook contract (hooks/base_ai.py)

  • BaseAIHook(BaseHook) — abstract base with three abstract methods:
    • get_conn() — return the backend model/client
    • create_agent(output_type, instructions, **kwargs) — build an agent
    • run_agent(agent, *, prompt, usage_limits, message_history) → AgentRunResult
  • AgentRunResult dataclass — backend-neutral result:
    output, message_history, model_name, usage, tool_names
  • AgentUsage dataclass — normalized token/request counters:
    requests, tool_calls, input_tokens, output_tokens, total_tokens
  • Class-level capability flags: supports_toolsets, supports_durable,
    supports_usage_limits
  • BaseAIHook.get_agent_hook(conn_id) — resolves the registered hook from
    conn_type and raises TypeError if it is not a BaseAIHook

Refactor: PydanticAIHook(BaseAIHook)

PydanticAIHook (and its subclasses PydanticAIAzureHook,
PydanticAIBedrockHook, PydanticAIVertexHook) now subclass BaseAIHook.
run_agent() wraps the pydantic-ai RunResult into AgentRunResult, so all
callers receive a backend-neutral object.

AgentOperator dispatch via get_agent_hook()

AgentOperator.execute() calls BaseAIHook.get_agent_hook(conn_id) instead of
importing PydanticAIHook directly. This is the main extensibility seam:
a future hook registered under a new conn_type is picked up with no operator
changes.

LLMOperator / LLMFileAnalysisOperator aligned

Both operators previously called agent.run_sync() directly, bypassing the hook
abstraction and receiving a pydantic-ai RunResult rather than an
AgentRunResult. They now call self.llm_hook.run_agent(agent, prompt=…),
which means:

  • log_run_summary() receives a consistent AgentRunResult from all operators
  • The operators are backend-agnostic alongside AgentOperator

logging.py backend-agnostic

log_run_summary() now reads result.model_name, result.usage.*, and
result.tool_names from AgentRunResult directly — no more pydantic-ai
result.response.model_name or result.usage() callable.

Tests

  • New tests/unit/common/ai/hooks/test_base_ai.py covering the contract,
    get_agent_hook() dispatch, and TypeError on non-agent hook
  • Updated test_pydantic_ai.py, test_agent.py, test_llm.py,
    test_llm_file_analysis.py to mock hook.run_agent() returning
    AgentRunResult instead of a raw pydantic-ai RunResult

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant