Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- '*-keycardai-fastmcp'
- '*-keycardai-agents'
- '*-keycardai-a2a'
- '*-keycardai-crewai'

jobs:
detect-package:
Expand Down
83 changes: 50 additions & 33 deletions packages/a2a/src/keycardai/a2a/server/delegation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any

import httpx
from a2a.utils import constants

from keycardai.oauth import AsyncClient as AsyncOAuthClient
from keycardai.oauth import Client as SyncOAuthClient
Expand All @@ -22,13 +23,16 @@
logger = logging.getLogger(__name__)


def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]:
"""Wrap a task in an A2A JSONRPC ``message/send`` envelope.
def _build_jsonrpc_send_message(task: dict[str, Any] | str) -> dict[str, Any]:
"""Wrap a task in an A2A 1.x JSONRPC ``SendMessage`` envelope.

``task`` may be a plain string, a dict carrying a ``"task"`` string under
that key (legacy shape preserved for the keycardai-agents CrewAI
integration), or any other dict (serialized to JSON for the message
text).
that key (legacy shape preserved for the CrewAI integration), or any
other dict (serialized to JSON for the message text).

The shape mirrors ``a2a.types.SendMessageRequest`` after JSON-marshalling
via ``google.protobuf.json_format``: a ``messageId`` (required by the
dispatcher), an enum-string ``role``, and ``parts`` carrying the text.
"""
if isinstance(task, str):
text = task
Expand All @@ -43,27 +47,32 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"method": "SendMessage",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"role": "ROLE_USER",
"parts": [{"text": text}],
},
},
}


def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]:
"""Unwrap an A2A JSONRPC response into the ``{result, delegation_chain}`` shape.
"""Unwrap an A2A 1.x JSONRPC ``SendMessageResponse`` into the
``{result, delegation_chain}`` shape consumed by the CrewAI delegation
tool.

Best-effort surface used by CrewAI delegation tools. If the JSONRPC
result is a ``Message`` (parts with ``text``), the text parts are
joined; otherwise the raw result is JSON-stringified.
``SendMessageResponse`` is a oneof of ``message`` or ``task``. If the
remote executor enqueued a ``Message`` (the common case for synchronous
crews), the text is at ``result.message.parts[].text``. If it produced a
``Task``, we fall back to JSON-stringifying the task; callers wanting the
full Task lifecycle should reach for ``a2a.client.create_client``.

``delegation_chain`` is returned empty: the legacy keycardai-agents
chain reconstruction read from ``request.state.keycardai_auth_info``
which never carried the claim, so it was always single-hop. Callers
that need multi-hop tracking should parse JWT claims directly.
``delegation_chain`` is returned empty: the legacy chain reconstruction
read from ``request.state.keycardai_auth_info``, which never carried the
claim, so it was always single-hop. Callers needing multi-hop tracking
should parse JWT claims directly.

Raises:
ValueError: if the response carries a JSONRPC ``error`` member.
Expand All @@ -78,18 +87,20 @@ def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]:
if result is None:
return {"result": "", "delegation_chain": []}
if isinstance(result, dict):
parts = result.get("parts")
if isinstance(parts, list):
text_parts = [
p.get("text", "")
for p in parts
if isinstance(p, dict) and "text" in p
]
if text_parts:
return {
"result": "\n".join(text_parts),
"delegation_chain": [],
}
message = result.get("message")
if isinstance(message, dict):
parts = message.get("parts")
if isinstance(parts, list):
text_parts = [
p.get("text", "")
for p in parts
if isinstance(p, dict) and "text" in p
]
if text_parts:
return {
"result": "\n".join(text_parts),
"delegation_chain": [],
}
if isinstance(result, str):
return {"result": result, "delegation_chain": []}
return {"result": json.dumps(result), "delegation_chain": []}
Expand Down Expand Up @@ -278,7 +289,7 @@ async def invoke_service(
) -> dict[str, Any]:
"""Call another agent service over A2A JSONRPC with bearer auth.

Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}`` for
compatibility with the legacy invocation surface. If you need the
full A2A protocol surface (Task lifecycle, streaming, status
Expand Down Expand Up @@ -311,13 +322,16 @@ async def invoke_service(
token = await self.get_delegation_token(service_url, subject_token)

jsonrpc_url = f"{service_url}/a2a/jsonrpc"
envelope = _build_jsonrpc_message_send(task)
envelope = _build_jsonrpc_send_message(task)

try:
response = await self.http_client.post(
jsonrpc_url,
json=envelope,
headers={"Authorization": f"Bearer {token}"},
headers={
"Authorization": f"Bearer {token}",
constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0,
},
)
response.raise_for_status()
unwrapped = _unwrap_jsonrpc_response(response.json())
Expand Down Expand Up @@ -524,7 +538,7 @@ def invoke_service(
) -> dict[str, Any]:
"""Call another agent service over A2A JSONRPC with bearer auth.

Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc``
and returns ``{"result": <text>, "delegation_chain": []}`` for
compatibility with the legacy invocation surface.

Expand All @@ -547,13 +561,16 @@ def invoke_service(
token = self.get_delegation_token(service_url, subject_token)

jsonrpc_url = f"{service_url}/a2a/jsonrpc"
envelope = _build_jsonrpc_message_send(task)
envelope = _build_jsonrpc_send_message(task)

try:
response = self.http_client.post(
jsonrpc_url,
json=envelope,
headers={"Authorization": f"Bearer {token}"},
headers={
"Authorization": f"Bearer {token}",
constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0,
},
)
response.raise_for_status()
unwrapped = _unwrap_jsonrpc_response(response.json())
Expand Down
41 changes: 33 additions & 8 deletions packages/a2a/tests/test_a2a_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,23 @@ async def test_get_delegation_token_client_credentials(a2a_client):

@pytest.mark.asyncio
async def test_invoke_service_posts_jsonrpc_envelope(a2a_client):
"""invoke_service sends a JSONRPC message/send to /a2a/jsonrpc."""
"""invoke_service sends a 1.x SendMessage JSONRPC request to /a2a/jsonrpc.

The dispatcher requires a CamelCase method name and an A2A-Version
header; the message envelope must carry messageId and an enum-string
role. Any of these wrong and the dispatcher rejects the request before
the executor runs.
"""
mock_response = Mock()
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {
"role": "agent",
"parts": [{"text": "Task completed successfully"}],
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Task completed successfully"}],
},
},
}
mock_response.raise_for_status = Mock()
Expand All @@ -132,17 +141,21 @@ async def test_invoke_service_posts_jsonrpc_envelope(a2a_client):
token="test_token_123",
)

# The wrapper unwraps the JSONRPC result back to the legacy shape.
# The wrapper unwraps the SendMessageResponse.message.parts text.
assert result["result"] == "Task completed successfully"
assert result["delegation_chain"] == []

# Confirm the request was a JSONRPC envelope to /a2a/jsonrpc.
# Confirm the request matches the 1.x dispatcher contract.
posted_url = mock_post.call_args[0][0]
posted_body = mock_post.call_args[1]["json"]
posted_headers = mock_post.call_args[1]["headers"]
assert posted_url == "https://target.example.com/a2a/jsonrpc"
assert posted_body["jsonrpc"] == "2.0"
assert posted_body["method"] == "message/send"
assert posted_body["method"] == "SendMessage"
assert posted_body["params"]["message"]["role"] == "ROLE_USER"
assert posted_body["params"]["message"]["parts"][0]["text"] == "Test task"
assert posted_body["params"]["message"]["messageId"]
assert posted_headers["A2A-Version"] == "1.0"


@pytest.mark.asyncio
Expand All @@ -156,7 +169,13 @@ async def test_invoke_service_auto_token_exchange(a2a_client):
mock_http_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {"role": "agent", "parts": [{"text": "Success"}]},
"result": {
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Success"}],
}
},
}
mock_http_response.raise_for_status = Mock()

Expand All @@ -183,7 +202,13 @@ async def test_invoke_service_string_task(a2a_client):
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {"role": "agent", "parts": [{"text": "Done"}]},
"result": {
"message": {
"messageId": "resp-1",
"role": "ROLE_AGENT",
"parts": [{"text": "Done"}],
}
},
}
mock_response.raise_for_status = Mock()

Expand Down
Loading
Loading