Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 53 additions & 39 deletions packages/a2a/src/keycardai/a2a/server/delegation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any

import httpx
from a2a.utils import constants

from keycardai.oauth import AsyncClient as AsyncOAuthClient
from keycardai.oauth import Client as SyncOAuthClient
Expand All @@ -22,13 +23,16 @@
logger = logging.getLogger(__name__)


def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]:
"""Wrap a task in an A2A JSONRPC ``message/send`` envelope.
def _build_jsonrpc_send_message(task: dict[str, Any] | str) -> dict[str, Any]:
"""Wrap a task in an A2A 1.x JSONRPC ``SendMessage`` envelope.

``task`` may be a plain string, a dict carrying a ``"task"`` string under
that key (legacy shape preserved for the keycardai-agents CrewAI
integration), or any other dict (serialized to JSON for the message
``task`` may be a plain string, a dict carrying a ``"task"`` string at the
``"task"`` key, or any other dict (serialized to JSON for the message
text).

The shape mirrors ``a2a.types.SendMessageRequest`` after JSON-marshalling
via ``google.protobuf.json_format``: a ``messageId`` (required by the
dispatcher), an enum-string ``role``, and ``parts`` carrying the text.
"""
if isinstance(task, str):
text = task
Expand All @@ -43,27 +47,29 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"method": "SendMessage",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"role": "ROLE_USER",
"parts": [{"text": text}],
},
},
}


def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]:
"""Unwrap an A2A JSONRPC response into the ``{result, delegation_chain}`` shape.
"""Unwrap an A2A 1.x JSONRPC ``SendMessageResponse`` into a flat
``{result, delegation_chain}`` dict.

Best-effort surface used by CrewAI delegation tools. If the JSONRPC
result is a ``Message`` (parts with ``text``), the text parts are
joined; otherwise the raw result is JSON-stringified.
``SendMessageResponse`` is a oneof of ``message`` or ``task``. When the
remote executor enqueues a ``Message``, the text is read from
``result.message.parts[].text`` and joined. When it produces a ``Task``,
the task is JSON-stringified into ``result``; callers needing the full
Task lifecycle should use ``a2a.client.create_client`` directly.

``delegation_chain`` is returned empty: the legacy keycardai-agents
chain reconstruction read from ``request.state.keycardai_auth_info``
which never carried the claim, so it was always single-hop. Callers
that need multi-hop tracking should parse JWT claims directly.
``delegation_chain`` is always empty here. Multi-hop chain tracking
requires parsing JWT claims directly.

Raises:
ValueError: if the response carries a JSONRPC ``error`` member.
Expand All @@ -78,18 +84,20 @@ def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]:
if result is None:
return {"result": "", "delegation_chain": []}
if isinstance(result, dict):
parts = result.get("parts")
if isinstance(parts, list):
text_parts = [
p.get("text", "")
for p in parts
if isinstance(p, dict) and "text" in p
]
if text_parts:
return {
"result": "\n".join(text_parts),
"delegation_chain": [],
}
message = result.get("message")
if isinstance(message, dict):
parts = message.get("parts")
if isinstance(parts, list):
text_parts = [
p.get("text", "")
for p in parts
if isinstance(p, dict) and "text" in p
]
if text_parts:
return {
"result": "\n".join(text_parts),
"delegation_chain": [],
}
if isinstance(result, str):
return {"result": result, "delegation_chain": []}
return {"result": json.dumps(result), "delegation_chain": []}
Expand Down Expand Up @@ -278,11 +286,10 @@ async def invoke_service(
) -> dict[str, Any]:
"""Call another agent service over A2A JSONRPC with bearer auth.

Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}`` for
compatibility with the legacy invocation surface. If you need the
full A2A protocol surface (Task lifecycle, streaming, status
updates), use ``a2a.client.create_client`` directly.
Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}``. For the
full A2A protocol surface (Task lifecycle, streaming, status updates),
use ``a2a.client.create_client`` directly.

Args:
service_url: Base URL of the target service
Expand Down Expand Up @@ -311,13 +318,16 @@ async def invoke_service(
token = await self.get_delegation_token(service_url, subject_token)

jsonrpc_url = f"{service_url}/a2a/jsonrpc"
envelope = _build_jsonrpc_message_send(task)
envelope = _build_jsonrpc_send_message(task)

try:
response = await self.http_client.post(
jsonrpc_url,
json=envelope,
headers={"Authorization": f"Bearer {token}"},
headers={
"Authorization": f"Bearer {token}",
constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0,
},
)
response.raise_for_status()
unwrapped = _unwrap_jsonrpc_response(response.json())
Expand Down Expand Up @@ -524,9 +534,10 @@ def invoke_service(
) -> dict[str, Any]:
"""Call another agent service over A2A JSONRPC with bearer auth.

Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}`` for
compatibility with the legacy invocation surface.
Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}``. For the
full A2A protocol surface (Task lifecycle, streaming, status updates),
use ``a2a.client.create_client`` directly.

Args:
service_url: Base URL of the target service
Expand All @@ -547,13 +558,16 @@ def invoke_service(
token = self.get_delegation_token(service_url, subject_token)

jsonrpc_url = f"{service_url}/a2a/jsonrpc"
envelope = _build_jsonrpc_message_send(task)
envelope = _build_jsonrpc_send_message(task)

try:
response = self.http_client.post(
jsonrpc_url,
json=envelope,
headers={"Authorization": f"Bearer {token}"},
headers={
"Authorization": f"Bearer {token}",
constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0,
},
)
response.raise_for_status()
unwrapped = _unwrap_jsonrpc_response(response.json())
Expand Down
41 changes: 33 additions & 8 deletions packages/a2a/tests/test_a2a_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,23 @@ async def test_get_delegation_token_client_credentials(a2a_client):

@pytest.mark.asyncio
async def test_invoke_service_posts_jsonrpc_envelope(a2a_client):
"""invoke_service sends a JSONRPC message/send to /a2a/jsonrpc."""
"""invoke_service sends a 1.x SendMessage JSONRPC request to /a2a/jsonrpc.

The dispatcher requires a CamelCase method name and an A2A-Version
header; the message envelope must carry messageId and an enum-string
role. Any of these wrong and the dispatcher rejects the request before
the executor runs.
"""
mock_response = Mock()
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {
"role": "agent",
"parts": [{"text": "Task completed successfully"}],
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Task completed successfully"}],
},
},
}
mock_response.raise_for_status = Mock()
Expand All @@ -132,17 +141,21 @@ async def test_invoke_service_posts_jsonrpc_envelope(a2a_client):
token="test_token_123",
)

# The wrapper unwraps the JSONRPC result back to the legacy shape.
# The wrapper unwraps the SendMessageResponse.message.parts text.
assert result["result"] == "Task completed successfully"
assert result["delegation_chain"] == []

# Confirm the request was a JSONRPC envelope to /a2a/jsonrpc.
# Confirm the request matches the 1.x dispatcher contract.
posted_url = mock_post.call_args[0][0]
posted_body = mock_post.call_args[1]["json"]
posted_headers = mock_post.call_args[1]["headers"]
assert posted_url == "https://target.example.com/a2a/jsonrpc"
assert posted_body["jsonrpc"] == "2.0"
assert posted_body["method"] == "message/send"
assert posted_body["method"] == "SendMessage"
assert posted_body["params"]["message"]["role"] == "ROLE_USER"
assert posted_body["params"]["message"]["parts"][0]["text"] == "Test task"
assert posted_body["params"]["message"]["messageId"]
assert posted_headers["A2A-Version"] == "1.0"


@pytest.mark.asyncio
Expand All @@ -156,7 +169,13 @@ async def test_invoke_service_auto_token_exchange(a2a_client):
mock_http_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {"role": "agent", "parts": [{"text": "Success"}]},
"result": {
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Success"}],
}
},
}
mock_http_response.raise_for_status = Mock()

Expand All @@ -183,7 +202,13 @@ async def test_invoke_service_string_task(a2a_client):
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {"role": "agent", "parts": [{"text": "Done"}]},
"result": {
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Done"}],
}
},
}
mock_response.raise_for_status = Mock()

Expand Down
Loading
Loading