Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 91 additions & 25 deletions src/strands/agent/a2a_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
"""

import logging
import warnings
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

import httpx
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
from a2a.client import A2ACardResolver, Client, ClientConfig, ClientFactory
from a2a.types import AgentCard, Message, TaskArtifactUpdateEvent, TaskState, TaskStatusUpdateEvent

from .._async import run_async
Expand All @@ -38,6 +38,7 @@ def __init__(
name: str | None = None,
description: str | None = None,
timeout: int = _DEFAULT_TIMEOUT,
client_config: ClientConfig | None = None,
a2a_client_factory: ClientFactory | None = None,
):
"""Initialize A2A agent.
Expand All @@ -47,17 +48,33 @@ def __init__(
name: Agent name. If not provided, will be populated from agent card.
description: Agent description. If not provided, will be populated from agent card.
timeout: Timeout for HTTP operations in seconds (defaults to 300).
a2a_client_factory: Optional pre-configured A2A ClientFactory. If provided,
it will be used to create the A2A client after discovering the agent card.
Note: When providing a custom factory, you are responsible for managing
the lifecycle of any httpx client it uses.
client_config: Optional A2A ClientConfig for authentication and transport settings.
When provided, the config (including any authenticated httpx client) is used
for both agent card discovery and A2A message sending. This is the recommended
way to configure authentication (e.g. SigV4, OAuth bearer tokens).
a2a_client_factory: Deprecated. Use ``client_config`` instead. Optional pre-configured
A2A ClientFactory. When provided, ``factory.create()`` is used for client creation,
preserving any configured interceptors, consumers, and custom transports. For card
resolution, ``client_config`` is preferred if provided; otherwise the factory's
internal config is used as a fallback. The caller is responsible for managing the
lifecycle of any httpx client configured in the factory.
"""
if a2a_client_factory is not None:
warnings.warn(
"a2a_client_factory is deprecated, use client_config instead. "
"a2a_client_factory will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)

self.endpoint = endpoint
self.name = name
self.description = description
self.timeout = timeout
self._agent_card: AgentCard | None = None
self._client_config = client_config
self._a2a_client_factory: ClientFactory | None = a2a_client_factory
self._agent_card: AgentCard | None = None
self._a2a_client: Client | None = None

def __call__(
self,
Expand Down Expand Up @@ -164,15 +181,23 @@ async def get_agent_card(self) -> AgentCard:
populating name and description if not already set. The card is cached
after the first fetch.

When ``client_config`` is provided, its httpx client is used for card resolution
so that any pre-configured authentication (SigV4, OAuth bearer tokens, etc.) is applied.

Returns:
The remote agent's AgentCard containing name, description, capabilities, skills, etc.
"""
if self._agent_card is not None:
return self._agent_card

async with httpx.AsyncClient(timeout=self.timeout) as client:
resolver = A2ACardResolver(httpx_client=client, base_url=self.endpoint)
config = self._resolve_client_config()
if config is not None and config.httpx_client is not None:
resolver = A2ACardResolver(httpx_client=config.httpx_client, base_url=self.endpoint)
self._agent_card = await resolver.get_agent_card()
else:
async with httpx.AsyncClient(timeout=self.timeout) as client:
resolver = A2ACardResolver(httpx_client=client, base_url=self.endpoint)
self._agent_card = await resolver.get_agent_card()

# Populate name from card if not set
if self.name is None and self._agent_card.name:
Expand All @@ -185,25 +210,66 @@ async def get_agent_card(self) -> AgentCard:
logger.debug("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
return self._agent_card

@asynccontextmanager
async def _get_a2a_client(self) -> AsyncIterator[Any]:
"""Get A2A client for sending messages.
def _resolve_client_config(self) -> ClientConfig | None:
"""Resolve the effective client config for card resolution and client creation.

If a custom factory was provided, uses that (caller manages httpx lifecycle).
Otherwise creates a per-call httpx client with proper cleanup.
Precedence:
1. Explicit ``client_config`` parameter (always preferred)
2. Factory's internal config (fallback for deprecated factory path)
3. None (use defaults)

Yields:
Configured A2A client instance.
Returns:
Resolved ClientConfig, or None if no config is available.
"""
agent_card = await self.get_agent_card()
if self._client_config is not None:
return self._client_config

if self._a2a_client_factory is not None:
config = getattr(self._a2a_client_factory, "_config", None)
if config is None:
logger.warning(
"endpoint=<%s> | could not access factory client config, "
"falling back to default config for card resolution",
self.endpoint,
)
return config

return None

async def _get_or_create_client(self) -> Client:
"""Get or create an A2A client for communicating with the remote agent.

When a deprecated factory is provided, ``factory.create()`` is used for client creation
to preserve interceptors, consumers, and custom transports. The resulting client is cached.

When ``client_config`` is provided without a factory, ``ClientFactory.connect()`` is used
with the config for both card resolution and client creation. The client is cached.

When neither is provided, a transient client is created per call to avoid long-lived
httpx connections which can cause connection breakdown and deadlocks on Windows.

Returns:
Configured A2A Client instance.
"""
# Deprecated factory path: use factory.create() to preserve interceptors/consumers/transports
if self._a2a_client_factory is not None:
yield self._a2a_client_factory.create(agent_card)
return
if self._a2a_client is not None:
return self._a2a_client

agent_card = await self.get_agent_card()
self._a2a_client = self._a2a_client_factory.create(agent_card)
return self._a2a_client

# client_config path: cache the client
if self._client_config is not None:
if self._a2a_client is not None:
return self._a2a_client

self._a2a_client = await ClientFactory.connect(self.endpoint, client_config=self._client_config)
return self._a2a_client

async with httpx.AsyncClient(timeout=self.timeout) as httpx_client:
config = ClientConfig(httpx_client=httpx_client, streaming=True)
yield ClientFactory(config).create(agent_card)
# No factory or config: create transient client per call
return await ClientFactory.connect(self.endpoint)

async def _send_message(self, prompt: AgentInput) -> AsyncIterator[A2AResponse]:
"""Send message to A2A agent.
Expand All @@ -223,9 +289,9 @@ async def _send_message(self, prompt: AgentInput) -> AsyncIterator[A2AResponse]:
message = convert_input_to_message(prompt)
logger.debug("agent=<%s>, endpoint=<%s> | sending message", self.name, self.endpoint)

async with self._get_a2a_client() as client:
async for event in client.send_message(message):
yield event
client = await self._get_or_create_client()
async for event in client.send_message(message):
yield event

def _is_complete_event(self, event: A2AResponse) -> bool:
"""Check if an A2A event represents a complete response.
Expand Down
Loading
Loading