diff --git a/packages/a2a/src/keycardai/a2a/server/delegation.py b/packages/a2a/src/keycardai/a2a/server/delegation.py index fb797c1..67b200f 100644 --- a/packages/a2a/src/keycardai/a2a/server/delegation.py +++ b/packages/a2a/src/keycardai/a2a/server/delegation.py @@ -10,6 +10,7 @@ from typing import Any import httpx +from a2a.utils import constants from keycardai.oauth import AsyncClient as AsyncOAuthClient from keycardai.oauth import Client as SyncOAuthClient @@ -22,13 +23,16 @@ logger = logging.getLogger(__name__) -def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: - """Wrap a task in an A2A JSONRPC ``message/send`` envelope. +def _build_jsonrpc_send_message(task: dict[str, Any] | str) -> dict[str, Any]: + """Wrap a task in an A2A 1.x JSONRPC ``SendMessage`` envelope. - ``task`` may be a plain string, a dict carrying a ``"task"`` string under - that key (legacy shape preserved for the keycardai-agents CrewAI - integration), or any other dict (serialized to JSON for the message + ``task`` may be a plain string, a dict carrying a ``"task"`` string at the + ``"task"`` key, or any other dict (serialized to JSON for the message text). + + The shape mirrors ``a2a.types.SendMessageRequest`` after JSON-marshalling + via ``google.protobuf.json_format``: a ``messageId`` (required by the + dispatcher), an enum-string ``role``, and ``parts`` carrying the text. """ if isinstance(task, str): text = task @@ -43,10 +47,11 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: return { "jsonrpc": "2.0", "id": str(uuid.uuid4()), - "method": "message/send", + "method": "SendMessage", "params": { "message": { - "role": "user", + "messageId": str(uuid.uuid4()), + "role": "ROLE_USER", "parts": [{"text": text}], }, }, @@ -54,16 +59,17 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: - """Unwrap an A2A JSONRPC response into the ``{result, delegation_chain}`` shape. + """Unwrap an A2A 1.x JSONRPC ``SendMessageResponse`` into a flat + ``{result, delegation_chain}`` dict. - Best-effort surface used by CrewAI delegation tools. If the JSONRPC - result is a ``Message`` (parts with ``text``), the text parts are - joined; otherwise the raw result is JSON-stringified. + ``SendMessageResponse`` is a oneof of ``message`` or ``task``. When the + remote executor enqueues a ``Message``, the text is read from + ``result.message.parts[].text`` and joined. When it produces a ``Task``, + the task is JSON-stringified into ``result``; callers needing the full + Task lifecycle should use ``a2a.client.create_client`` directly. - ``delegation_chain`` is returned empty: the legacy keycardai-agents - chain reconstruction read from ``request.state.keycardai_auth_info`` - which never carried the claim, so it was always single-hop. Callers - that need multi-hop tracking should parse JWT claims directly. + ``delegation_chain`` is always empty here. Multi-hop chain tracking + requires parsing JWT claims directly. Raises: ValueError: if the response carries a JSONRPC ``error`` member. @@ -78,18 +84,20 @@ def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: if result is None: return {"result": "", "delegation_chain": []} if isinstance(result, dict): - parts = result.get("parts") - if isinstance(parts, list): - text_parts = [ - p.get("text", "") - for p in parts - if isinstance(p, dict) and "text" in p - ] - if text_parts: - return { - "result": "\n".join(text_parts), - "delegation_chain": [], - } + message = result.get("message") + if isinstance(message, dict): + parts = message.get("parts") + if isinstance(parts, list): + text_parts = [ + p.get("text", "") + for p in parts + if isinstance(p, dict) and "text" in p + ] + if text_parts: + return { + "result": "\n".join(text_parts), + "delegation_chain": [], + } if isinstance(result, str): return {"result": result, "delegation_chain": []} return {"result": json.dumps(result), "delegation_chain": []} @@ -278,11 +286,10 @@ async def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` - and returns ``{"result": , "delegation_chain": []}`` for - compatibility with the legacy invocation surface. If you need the - full A2A protocol surface (Task lifecycle, streaming, status - updates), use ``a2a.client.create_client`` directly. + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + and returns ``{"result": , "delegation_chain": []}``. For the + full A2A protocol surface (Task lifecycle, streaming, status updates), + use ``a2a.client.create_client`` directly. Args: service_url: Base URL of the target service @@ -311,13 +318,16 @@ async def invoke_service( token = await self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = await self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) @@ -524,9 +534,10 @@ def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` - and returns ``{"result": , "delegation_chain": []}`` for - compatibility with the legacy invocation surface. + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + and returns ``{"result": , "delegation_chain": []}``. For the + full A2A protocol surface (Task lifecycle, streaming, status updates), + use ``a2a.client.create_client`` directly. Args: service_url: Base URL of the target service @@ -547,13 +558,16 @@ def invoke_service( token = self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) diff --git a/packages/a2a/tests/test_a2a_client.py b/packages/a2a/tests/test_a2a_client.py index 34505a3..f2e6819 100644 --- a/packages/a2a/tests/test_a2a_client.py +++ b/packages/a2a/tests/test_a2a_client.py @@ -111,14 +111,23 @@ async def test_get_delegation_token_client_credentials(a2a_client): @pytest.mark.asyncio async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): - """invoke_service sends a JSONRPC message/send to /a2a/jsonrpc.""" + """invoke_service sends a 1.x SendMessage JSONRPC request to /a2a/jsonrpc. + + The dispatcher requires a CamelCase method name and an A2A-Version + header; the message envelope must carry messageId and an enum-string + role. Any of these wrong and the dispatcher rejects the request before + the executor runs. + """ mock_response = Mock() mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", "result": { - "role": "agent", - "parts": [{"text": "Task completed successfully"}], + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Task completed successfully"}], + }, }, } mock_response.raise_for_status = Mock() @@ -132,17 +141,21 @@ async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): token="test_token_123", ) - # The wrapper unwraps the JSONRPC result back to the legacy shape. + # The wrapper unwraps the SendMessageResponse.message.parts text. assert result["result"] == "Task completed successfully" assert result["delegation_chain"] == [] - # Confirm the request was a JSONRPC envelope to /a2a/jsonrpc. + # Confirm the request matches the 1.x dispatcher contract. posted_url = mock_post.call_args[0][0] posted_body = mock_post.call_args[1]["json"] + posted_headers = mock_post.call_args[1]["headers"] assert posted_url == "https://target.example.com/a2a/jsonrpc" assert posted_body["jsonrpc"] == "2.0" - assert posted_body["method"] == "message/send" + assert posted_body["method"] == "SendMessage" + assert posted_body["params"]["message"]["role"] == "ROLE_USER" assert posted_body["params"]["message"]["parts"][0]["text"] == "Test task" + assert posted_body["params"]["message"]["messageId"] + assert posted_headers["A2A-Version"] == "1.0" @pytest.mark.asyncio @@ -156,7 +169,13 @@ async def test_invoke_service_auto_token_exchange(a2a_client): mock_http_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Success"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Success"}], + } + }, } mock_http_response.raise_for_status = Mock() @@ -183,7 +202,13 @@ async def test_invoke_service_string_task(a2a_client): mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Done"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Done"}], + } + }, } mock_response.raise_for_status = Mock() diff --git a/packages/a2a/tests/test_jsonrpc_dispatch.py b/packages/a2a/tests/test_jsonrpc_dispatch.py new file mode 100644 index 0000000..3e9b771 --- /dev/null +++ b/packages/a2a/tests/test_jsonrpc_dispatch.py @@ -0,0 +1,157 @@ +"""Positive-path JSONRPC test for the keycardai-a2a server primitives. + +Sends an authenticated ``SendMessage`` JSONRPC request through +``create_jsonrpc_routes`` against a real ``DefaultRequestHandler`` and +asserts the executor was invoked with the bearer-derived +``access_token`` from ``KeycardServerCallContextBuilder``. Guards the +contract between Keycard primitives and the a2a-sdk dispatcher: method +name, required headers, message envelope shape, and context propagation. +""" + +import pytest +from a2a.server.agent_execution import AgentExecutor +from a2a.server.events.event_queue_v2 import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import Message, Part, Role +from keycardai.starlette import KeycardUser +from starlette.applications import Starlette +from starlette.authentication import AuthCredentials, AuthenticationBackend +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount +from starlette.testclient import TestClient + +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) + + +class _StubAuthBackend(AuthenticationBackend): + """Always authenticates with a fixed ``KeycardUser``. + + Lets the test focus on the dispatcher contract without depending on a + reachable JWKS endpoint. The context builder still runs against a real + ``KeycardUser`` instance, so the access_token propagation path is + exercised normally. + """ + + def __init__(self, access_token: str): + self._access_token = access_token + + async def authenticate(self, conn): + user = KeycardUser( + access_token=self._access_token, + client_id="caller-svc", + zone_id="test_zone", + resource_server_url="https://test.example.com", + scopes=["test"], + ) + return AuthCredentials(["authenticated"]), user + + +class _EchoMessageExecutor(AgentExecutor): + """Enqueues a ``Message`` carrying the user's input plus the + ``access_token`` observed in ``RequestContext.call_context.state``. + + Driving the full dispatcher -> context_builder -> executor -> + event_queue chain via a real ``AgentExecutor`` forces every link to + run. If any breaks, the assertions on the response body fail. + """ + + async def execute(self, context, event_queue: EventQueue) -> None: + user_input = context.get_user_input() + call_ctx = getattr(context, "call_context", None) + access_token = call_ctx.state.get("access_token") if call_ctx else None + body = f"echoed: {user_input}; token: {access_token}" + message = Message( + message_id="resp-1", + role=Role.ROLE_AGENT, + parts=[Part(text=body)], + ) + await event_queue.enqueue_event(message) + + async def cancel(self, context, event_queue: EventQueue) -> None: + return None + + +@pytest.fixture +def service_config(): + return AgentServiceConfig( + service_name="JSONRPC Dispatch Test", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone", + capabilities=["test"], + ) + + +@pytest.fixture +def client(service_config): + agent_card = build_agent_card_from_config(service_config) + request_handler = DefaultRequestHandler( + agent_executor=_EchoMessageExecutor(), + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + app = Starlette( + routes=[ + Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=_StubAuthBackend(access_token="bearer-test-token"), + ), + ], + ), + ] + ) + return TestClient(app) + + +class TestJsonRpcDispatchPositivePath: + """Real JSONRPC ``SendMessage`` call lands at the executor and the + response carries the executor's enqueued message. + """ + + def test_send_message_drives_executor_and_returns_response(self, client): + """A successful round-trip exercises every link in the chain. + + Failures here indicate dispatcher contract drift: JSONRPC method + name, ``A2A-Version`` header requirement, message envelope shape, + or ``DefaultRequestHandler`` response shaping. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "req-1", + "role": "ROLE_USER", + "parts": [{"text": "ping"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + body = response.text + # The executor echoed the input. + assert "echoed: ping" in body + # The KeycardServerCallContextBuilder propagated the access_token + # from the auth backend's KeycardUser into ServerCallContext.state. + assert "token: bearer-test-token" in body