diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ff9b37a..0d82c46 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,7 @@ on: - '*-keycardai-fastmcp' - '*-keycardai-agents' - '*-keycardai-a2a' + - '*-keycardai-crewai' jobs: detect-package: diff --git a/packages/a2a/src/keycardai/a2a/server/delegation.py b/packages/a2a/src/keycardai/a2a/server/delegation.py index fb797c1..a1f75e6 100644 --- a/packages/a2a/src/keycardai/a2a/server/delegation.py +++ b/packages/a2a/src/keycardai/a2a/server/delegation.py @@ -10,6 +10,7 @@ from typing import Any import httpx +from a2a.utils import constants from keycardai.oauth import AsyncClient as AsyncOAuthClient from keycardai.oauth import Client as SyncOAuthClient @@ -22,13 +23,16 @@ logger = logging.getLogger(__name__) -def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: - """Wrap a task in an A2A JSONRPC ``message/send`` envelope. +def _build_jsonrpc_send_message(task: dict[str, Any] | str) -> dict[str, Any]: + """Wrap a task in an A2A 1.x JSONRPC ``SendMessage`` envelope. ``task`` may be a plain string, a dict carrying a ``"task"`` string under - that key (legacy shape preserved for the keycardai-agents CrewAI - integration), or any other dict (serialized to JSON for the message - text). + that key (legacy shape preserved for the CrewAI integration), or any + other dict (serialized to JSON for the message text). + + The shape mirrors ``a2a.types.SendMessageRequest`` after JSON-marshalling + via ``google.protobuf.json_format``: a ``messageId`` (required by the + dispatcher), an enum-string ``role``, and ``parts`` carrying the text. """ if isinstance(task, str): text = task @@ -43,10 +47,11 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: return { "jsonrpc": "2.0", "id": str(uuid.uuid4()), - "method": "message/send", + "method": "SendMessage", "params": { "message": { - "role": "user", + "messageId": str(uuid.uuid4()), + "role": "ROLE_USER", "parts": [{"text": text}], }, }, @@ -54,16 +59,20 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: - """Unwrap an A2A JSONRPC response into the ``{result, delegation_chain}`` shape. + """Unwrap an A2A 1.x JSONRPC ``SendMessageResponse`` into the + ``{result, delegation_chain}`` shape consumed by the CrewAI delegation + tool. - Best-effort surface used by CrewAI delegation tools. If the JSONRPC - result is a ``Message`` (parts with ``text``), the text parts are - joined; otherwise the raw result is JSON-stringified. + ``SendMessageResponse`` is a oneof of ``message`` or ``task``. If the + remote executor enqueued a ``Message`` (the common case for synchronous + crews), the text is at ``result.message.parts[].text``. If it produced a + ``Task``, we fall back to JSON-stringifying the task; callers wanting the + full Task lifecycle should reach for ``a2a.client.create_client``. - ``delegation_chain`` is returned empty: the legacy keycardai-agents - chain reconstruction read from ``request.state.keycardai_auth_info`` - which never carried the claim, so it was always single-hop. Callers - that need multi-hop tracking should parse JWT claims directly. + ``delegation_chain`` is returned empty: the legacy chain reconstruction + read from ``request.state.keycardai_auth_info``, which never carried the + claim, so it was always single-hop. Callers needing multi-hop tracking + should parse JWT claims directly. Raises: ValueError: if the response carries a JSONRPC ``error`` member. @@ -78,18 +87,20 @@ def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: if result is None: return {"result": "", "delegation_chain": []} if isinstance(result, dict): - parts = result.get("parts") - if isinstance(parts, list): - text_parts = [ - p.get("text", "") - for p in parts - if isinstance(p, dict) and "text" in p - ] - if text_parts: - return { - "result": "\n".join(text_parts), - "delegation_chain": [], - } + message = result.get("message") + if isinstance(message, dict): + parts = message.get("parts") + if isinstance(parts, list): + text_parts = [ + p.get("text", "") + for p in parts + if isinstance(p, dict) and "text" in p + ] + if text_parts: + return { + "result": "\n".join(text_parts), + "delegation_chain": [], + } if isinstance(result, str): return {"result": result, "delegation_chain": []} return {"result": json.dumps(result), "delegation_chain": []} @@ -278,7 +289,7 @@ async def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` and returns ``{"result": , "delegation_chain": []}`` for compatibility with the legacy invocation surface. If you need the full A2A protocol surface (Task lifecycle, streaming, status @@ -311,13 +322,16 @@ async def invoke_service( token = await self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = await self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) @@ -524,7 +538,7 @@ def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` and returns ``{"result": , "delegation_chain": []}`` for compatibility with the legacy invocation surface. @@ -547,13 +561,16 @@ def invoke_service( token = self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) diff --git a/packages/a2a/tests/test_a2a_client.py b/packages/a2a/tests/test_a2a_client.py index 34505a3..f2e6819 100644 --- a/packages/a2a/tests/test_a2a_client.py +++ b/packages/a2a/tests/test_a2a_client.py @@ -111,14 +111,23 @@ async def test_get_delegation_token_client_credentials(a2a_client): @pytest.mark.asyncio async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): - """invoke_service sends a JSONRPC message/send to /a2a/jsonrpc.""" + """invoke_service sends a 1.x SendMessage JSONRPC request to /a2a/jsonrpc. + + The dispatcher requires a CamelCase method name and an A2A-Version + header; the message envelope must carry messageId and an enum-string + role. Any of these wrong and the dispatcher rejects the request before + the executor runs. + """ mock_response = Mock() mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", "result": { - "role": "agent", - "parts": [{"text": "Task completed successfully"}], + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Task completed successfully"}], + }, }, } mock_response.raise_for_status = Mock() @@ -132,17 +141,21 @@ async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): token="test_token_123", ) - # The wrapper unwraps the JSONRPC result back to the legacy shape. + # The wrapper unwraps the SendMessageResponse.message.parts text. assert result["result"] == "Task completed successfully" assert result["delegation_chain"] == [] - # Confirm the request was a JSONRPC envelope to /a2a/jsonrpc. + # Confirm the request matches the 1.x dispatcher contract. posted_url = mock_post.call_args[0][0] posted_body = mock_post.call_args[1]["json"] + posted_headers = mock_post.call_args[1]["headers"] assert posted_url == "https://target.example.com/a2a/jsonrpc" assert posted_body["jsonrpc"] == "2.0" - assert posted_body["method"] == "message/send" + assert posted_body["method"] == "SendMessage" + assert posted_body["params"]["message"]["role"] == "ROLE_USER" assert posted_body["params"]["message"]["parts"][0]["text"] == "Test task" + assert posted_body["params"]["message"]["messageId"] + assert posted_headers["A2A-Version"] == "1.0" @pytest.mark.asyncio @@ -156,7 +169,13 @@ async def test_invoke_service_auto_token_exchange(a2a_client): mock_http_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Success"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Success"}], + } + }, } mock_http_response.raise_for_status = Mock() @@ -183,7 +202,13 @@ async def test_invoke_service_string_task(a2a_client): mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Done"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Done"}], + } + }, } mock_response.raise_for_status = Mock() diff --git a/packages/a2a/tests/test_jsonrpc_dispatch.py b/packages/a2a/tests/test_jsonrpc_dispatch.py new file mode 100644 index 0000000..352b840 --- /dev/null +++ b/packages/a2a/tests/test_jsonrpc_dispatch.py @@ -0,0 +1,169 @@ +"""Positive-path JSONRPC test for the keycardai-a2a server primitives. + +The auth-gate tests in ``test_agent_card_server.py`` only assert that the +``/a2a/jsonrpc`` mount returns 401 for anonymous requests; the body is +rejected before the dispatcher sees it. That coverage misses dispatcher- +shape drift (method-name renames, new required headers, message-envelope +field changes) because the gate fires first. This module sends a real +authenticated JSONRPC request through the dispatcher and asserts a real +response, so future a2a-sdk dispatcher changes that break our delegation +or context-builder wiring fail here rather than in customer deployments. + +It pairs with ``packages/crewai/tests/test_default_request_handler_integration.py`` +on the executor side; the two together cover the dispatcher contract and +the executor wrap. +""" + +import pytest +from a2a.server.agent_execution import AgentExecutor +from a2a.server.events.event_queue_v2 import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import Message, Part, Role +from keycardai.starlette import KeycardUser +from starlette.applications import Starlette +from starlette.authentication import AuthCredentials, AuthenticationBackend +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount +from starlette.testclient import TestClient + +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) + + +class _StubAuthBackend(AuthenticationBackend): + """Stand-in for ``KeycardAuthBackend`` that always authenticates with a + fixed ``KeycardUser``. + + Real auth is covered by keycardai-starlette's own tests. The point of + this module is the dispatcher contract, not auth verification, so a + permissive backend that still injects a valid ``KeycardUser`` is the + right level of isolation: the context builder still runs against a + real user object. + """ + + def __init__(self, access_token: str): + self._access_token = access_token + + async def authenticate(self, conn): + user = KeycardUser( + access_token=self._access_token, + client_id="caller-svc", + zone_id="test_zone", + resource_server_url="https://test.example.com", + scopes=["test"], + ) + return AuthCredentials(["authenticated"]), user + + +class _EchoMessageExecutor(AgentExecutor): + """Minimal ``AgentExecutor`` that enqueues a ``Message`` carrying the + user's input back, plus the ``access_token`` it observed in + ``RequestContext.call_context.state``. + + Using a real ``AgentExecutor`` (rather than mocking ``DefaultRequestHandler``) + forces the full chain to run: dispatcher -> context_builder -> executor + -> event_queue -> response. If any link breaks, this test fails. + """ + + async def execute(self, context, event_queue: EventQueue) -> None: + user_input = context.get_user_input() + call_ctx = getattr(context, "call_context", None) + access_token = call_ctx.state.get("access_token") if call_ctx else None + body = f"echoed: {user_input}; token: {access_token}" + message = Message( + message_id="resp-1", + role=Role.ROLE_AGENT, + parts=[Part(text=body)], + ) + await event_queue.enqueue_event(message) + + async def cancel(self, context, event_queue: EventQueue) -> None: + return None + + +@pytest.fixture +def service_config(): + return AgentServiceConfig( + service_name="JSONRPC Dispatch Test", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone", + capabilities=["test"], + ) + + +@pytest.fixture +def client(service_config): + agent_card = build_agent_card_from_config(service_config) + request_handler = DefaultRequestHandler( + agent_executor=_EchoMessageExecutor(), + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + app = Starlette( + routes=[ + Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=_StubAuthBackend(access_token="bearer-test-token"), + ), + ], + ), + ] + ) + return TestClient(app) + + +class TestJsonRpcDispatchPositivePath: + """Real JSONRPC ``SendMessage`` call lands at the executor and the + response carries the executor's enqueued message. + """ + + def test_send_message_drives_executor_and_returns_response(self, client): + """A successful round-trip exercises every link in the chain. + + If a2a-sdk renames the JSONRPC method, drops the A2A-Version header + requirement, changes the message envelope shape, or changes how + DefaultRequestHandler shapes the response, this test fails. The + keycardai-crewai integration test exercises the same chain with a + crew on top; this one isolates the keycardai-a2a primitives so a + regression here is attributed to the right package. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "req-1", + "role": "ROLE_USER", + "parts": [{"text": "ping"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + body = response.text + # The executor echoed the input. + assert "echoed: ping" in body + # The KeycardServerCallContextBuilder propagated the access_token + # from the auth backend's KeycardUser into ServerCallContext.state. + assert "token: bearer-test-token" in body diff --git a/packages/agents/README.md b/packages/agents/README.md index ce57109..81e69ee 100644 --- a/packages/agents/README.md +++ b/packages/agents/README.md @@ -1,534 +1,14 @@ -# KeycardAI Agents +# keycardai-agents (deprecated, pending archive) -Framework-agnostic agent service SDK for A2A (Agent-to-Agent) delegation with Keycard OAuth authentication. +This package is being archived. Per the KEP "Decompose keycardai-agents", everything that lived here has moved: -## Requirements +| Old import | New import | +| --- | --- | +| `from keycardai.agents.server import AgentServer, ...` | `from keycardai.a2a import ...` (#105 / ACC-230) | +| `from keycardai.agents.client.discovery import ServiceDiscovery` | `from keycardai.a2a import ServiceDiscovery` (#105 / ACC-230) | +| `from keycardai.agents.client import AgentClient` | `from keycardai.oauth.pkce import authenticate` (#101 / ACC-229) | +| `from keycardai.agents.integrations.crewai import CrewAIExecutor, get_a2a_tools, ...` | `from keycardai.crewai import ...` (this PR / ACC-231) | -- **Python 3.10 or greater** -- Virtual environment (recommended) +`keycardai-agents` no longer ships any code. It exists at this version only to give downstream installs a clean upgrade path. The next step (ACC-232) archives the package source entirely. -## Features - -- 🔐 **Built-in OAuth**: Automatic JWKS validation, token exchange, delegation chains -- 🌐 **Dual Protocol Support**: A2A JSONRPC + custom REST endpoints (same executor powers both) -- 🔧 **Framework Agnostic**: Supports CrewAI, LangChain, custom via `AgentExecutor` protocol -- 🔄 **Service Delegation**: RFC 8693 token exchange preserves user context -- 👤 **User Auth**: PKCE OAuth flow with browser-based login - -## A2A Protocol Integration - -We use [a2a-python SDK](https://github.com/a2aproject/a2a-python) for protocol compliance while adding production-ready authentication: - -- ✅ **Full A2A JSONRPC support** - Standards-compliant `/a2a/jsonrpc` endpoint -- ✅ **Plus simpler REST endpoint** - Custom `/invoke` for easier integration -- ✅ **Production OAuth layer** - BearerAuthMiddleware, JWKS, token exchange (A2A SDK has none) -- ✅ **Delegation chain tracking** - JWT-based audit trail for service-to-service calls -- ✅ **Dual protocol support** - Same executor powers both JSONRPC and REST endpoints - -**Result**: A2A standards compliance + Keycard security + flexible APIs = Best of both worlds - -## Installation - -```bash -pip install keycardai-agents - -# With CrewAI support -pip install 'keycardai-agents[crewai]' -``` - -## Quick Start - -### CrewAI Service - -```python -import os -from crewai import Agent, Crew, Task -from keycardai.agents import AgentServiceConfig -from keycardai.agents.integrations.crewai import CrewAIExecutor -from keycardai.agents.server import serve_agent - -def create_my_crew(): - agent = Agent(role="Assistant", goal="Help users", backstory="AI helper") - task = Task(description="{task}", agent=agent, expected_output="Response") - return Crew(agents=[agent], tasks=[task]) - -config = AgentServiceConfig( - service_name="My Service", - client_id=os.getenv("CLIENT_ID"), - client_secret=os.getenv("CLIENT_SECRET"), - identity_url="http://localhost:8000", - zone_id=os.getenv("ZONE_ID"), - agent_executor=CrewAIExecutor(create_my_crew), # Framework adapter - capabilities=["assistance"], -) - -serve_agent(config) # Starts server with OAuth middleware -``` - -### Custom Executor - -```python -from keycardai.agents.server import LambdaExecutor - -def my_logic(task, inputs): - return f"Processed: {task}" - -config = AgentServiceConfig( - # ... same config as above - agent_executor=LambdaExecutor(my_logic), # Simple function wrapper -) -``` - -### Advanced: Custom Executor Class - -```python -from keycardai.agents.server import AgentExecutor - -class MyFrameworkExecutor: - """Implement AgentExecutor protocol for any framework.""" - - def execute(self, task, inputs): - # Your framework logic here - result = my_framework.run(task, inputs) - return result - - def set_token_for_delegation(self, access_token): - # Optional: handle delegation token - self.context.set_auth(access_token) - -config = AgentServiceConfig( - # ... - agent_executor=MyFrameworkExecutor(), -) -``` - -## Client Usage - -### User Authentication (PKCE) - -```python -from keycardai.agents.client import AgentClient - -async with AgentClient(config) as client: - # Automatically: OAuth discovery → Browser login → Token exchange - result = await client.invoke("https://service.com", task="Hello") -``` - -### Service-to-Service (Token Exchange) - -```python -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_config) - -# Get delegation token (RFC 8693) - preserves user context -token = await client.get_delegation_token( - "https://target.com", - subject_token="user_token" -) - -# Invoke with token -result = await client.invoke_service( - "https://target.com", - task="Process data", - token=token -) -# Result includes delegation_chain: ["service_a", "service_b"] -``` - -## Architecture - -### Server - -``` -Your Agent - ↓ -AgentExecutor.execute(task, inputs) - ↓ -AgentServer (keycardai-agents) - ├─ OAuth Middleware (BearerAuthMiddleware) - │ ├─ JWKS validation - │ ├─ Token audience check - │ └─ Delegation chain extraction - ├─ /invoke (protected, REST-like) - ├─ /a2a/jsonrpc (protected, A2A JSONRPC) - │ ├─ message/send - │ ├─ message/stream - │ └─ tasks/* (get, cancel, list) - ├─ /.well-known/agent-card.json (A2A format) - ├─ /.well-known/oauth-protected-resource - └─ /status -``` - -### Dual Protocol Support - -The SDK provides **two ways** to invoke agents: - -1. **A2A JSONRPC** (`/a2a/jsonrpc`) - Standards-compliant - - Use when: Integrating with A2A ecosystem, need standard protocol - - Methods: `message/send`, `message/stream`, `tasks/get`, etc. - - Bridge: `KeycardToA2AExecutorBridge` adapts your executor to A2A protocol - -2. **Custom REST** (`/invoke`) - Simpler API - - Use when: Direct service calls, simpler integration - - Format: `{"task": "...", "inputs": {...}}` - - Direct executor invocation - -**Both endpoints share the same underlying executor** - write once, support both protocols. - -### OAuth Flow - -``` -User → OAuth Login (PKCE) - ↓ -User Token → Service A - ↓ -Service A → Token Exchange (RFC 8693) → Service B Token - ↓ -Service A → Calls Service B with Service B Token - ↓ -Service B validates token (JWKS) -Service B updates delegation_chain -``` - -## A2A Protocol Compliance - -### Agent Card - -Services expose A2A-compliant agent cards at `/.well-known/agent-card.json`: - -```json -{ - "name": "My Service", - "url": "https://my-service.com", - "version": "1.0.0", - "protocolVersion": "0.3.0", - "skills": [ - { - "id": "assistance", - "name": "Assistance", - "description": "assistance capability", - "tags": ["assistance"] - } - ], - "capabilities": { - "streaming": false, - "multiTurn": true - }, - "additionalInterfaces": [ - { - "url": "https://my-service.com/invoke", - "transport": "http+json" - } - ], - "securitySchemes": { - "oauth2": { - "type": "oauth2", - "flows": { - "authorizationCode": { - "authorizationUrl": "https://zone.keycard.cloud/oauth/authorize", - "tokenUrl": "https://zone.keycard.cloud/oauth/token" - } - } - } - } -} -``` - -### Endpoints - -#### A2A JSONRPC Endpoint (Standards-Compliant) - -```bash -POST /a2a/jsonrpc -Authorization: Bearer -Content-Type: application/json - -{ - "jsonrpc": "2.0", - "method": "message/send", - "params": { - "message": { - "role": "user", - "parts": [{"text": "Do something"}] - } - }, - "id": 1 -} -``` - -Response: -```json -{ - "jsonrpc": "2.0", - "result": { - "task": { - "taskId": "task-123", - "state": "completed", - "result": {...} - } - }, - "id": 1 -} -``` - -**Supported methods:** -- `message/send` - Send message to agent -- `message/stream` - Stream agent responses -- `tasks/get` - Get task status -- `tasks/cancel` - Cancel running task -- `tasks/list` - List all tasks - -#### Custom REST Endpoint (Simpler API) - -```bash -POST /invoke -Authorization: Bearer - -{ - "task": "Do something", - "inputs": {"key": "value"} -} -``` - -Response: -```json -{ - "result": "Done", - "delegation_chain": ["service_a", "service_b"] -} -``` - -**Use `/invoke` for:** Direct service calls, easier integration, delegation chain tracking. - -**Use `/a2a/jsonrpc` for:** A2A ecosystem integration, standard protocol compliance, task management. - -## Framework Support - -### CrewAI - -```python -from keycardai.agents.integrations.crewai import CrewAIExecutor - -executor = CrewAIExecutor(lambda: create_my_crew()) -``` - -**Features:** -- Automatic delegation token context -- Supports CrewAI tools -- Handles `crew.kickoff()` execution - -### LangChain, AutoGen, Custom - -Implement the `AgentExecutor` protocol: - -```python -class MyExecutor: - def execute(self, task, inputs): - # Your logic - return result -``` - -## API Reference - -### AgentServiceConfig - -```python -@dataclass -class AgentServiceConfig: - service_name: str # Human-readable name - client_id: str # Keycard Application client ID - client_secret: str # Keycard Application secret - identity_url: str # Public URL - zone_id: str # Keycard zone ID - agent_executor: AgentExecutor # REQUIRED: Executor instance - - # Optional - authorization_server_url: str | None = None - port: int = 8000 - host: str = "0.0.0.0" - description: str = "" - capabilities: list[str] = [] -``` - -### AgentExecutor Protocol - -```python -class AgentExecutor(Protocol): - def execute( - self, - task: dict[str, Any] | str, - inputs: dict[str, Any] | None = None, - ) -> Any: - """Execute agent task.""" - ... - - def set_token_for_delegation(self, access_token: str) -> None: - """Optional: Set token for delegation.""" - ... -``` - -### KeycardToA2AExecutorBridge - -Bridge adapter that makes your executor work with A2A JSONRPC protocol: - -```python -from keycardai.agents.server import KeycardToA2AExecutorBridge, SimpleExecutor - -# Your executor -executor = SimpleExecutor() - -# Wrap for A2A JSONRPC support -a2a_executor = KeycardToA2AExecutorBridge(executor) - -# Now works with A2A DefaultRequestHandler -from a2a.server.request_handlers import DefaultRequestHandler -from a2a.server.tasks import InMemoryTaskStore - -handler = DefaultRequestHandler( - agent_executor=a2a_executor, - task_store=InMemoryTaskStore() -) -``` - -**What it does:** -- Converts A2A `RequestContext` → Keycard `task/inputs` format -- Calls your synchronous executor -- Publishes result as A2A Task events -- Handles delegation tokens - -**Note:** This bridge is automatically configured when using `serve_agent()` - you don't need to use it directly unless building custom A2A integrations. - -### serve_agent() - -Start an agent service (blocking): - -```python -serve_agent(config: AgentServiceConfig) -> None -``` - -### AgentClient - -User authentication with PKCE OAuth: - -```python -from keycardai.agents.client import AgentClient - -async with AgentClient(service_config) as client: - result = await client.invoke(service_url, task, inputs) - agent_card = await client.discover_service(service_url) -``` - -### DelegationClient - -Service-to-service with token exchange: - -```python -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_config) -token = await client.get_delegation_token(target_url, subject_token) -result = await client.invoke_service(url, task, token) -``` - -## Service Delegation - -### Pattern - -```python -# In Service A (orchestrator) -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_a_config) - -# Discover Service B -card = await client.discover_service("https://service-b.com") - -# Get token with user context -token = await client.get_delegation_token( - "https://service-b.com", - subject_token=user_access_token -) - -# Call Service B -result = await client.invoke_service( - "https://service-b.com", - task="Process data", - token=token -) - -# Result includes delegation chain for audit -print(result["delegation_chain"]) -# ["user_service", "service_a", "service_b"] -``` - -### Delegation Chain Tracking - -1. User authenticates → Token with empty `delegation_chain` -2. User calls Service A → Service A adds itself to chain -3. Service A calls Service B → Token exchange preserves chain -4. Service B adds itself → Full chain in response for audit - -## Production Deployment - -### Environment Variables - -```bash -# Required -export KEYCARD_ZONE_ID="your_zone_id" -export KEYCARD_CLIENT_ID="service_client_id" -export KEYCARD_CLIENT_SECRET="client_secret" -export SERVICE_URL="https://your-service.com" - -# Optional -export PORT="8000" -export HOST="0.0.0.0" -``` - -### Health Checks - -```bash -# Liveness -curl https://your-service.com/status - -# Agent card -curl https://your-service.com/.well-known/agent-card.json -``` - -### Security - -- **Token Validation**: JWKS-based JWT signature verification -- **Audience Check**: Token `aud` must match service URL -- **Issuer Validation**: Token `iss` from Keycard zone -- **Delegation Chain**: Preserved for audit trail - -## Examples - -See `examples/` directory: -- `oauth_client_usage.py` - PKCE user authentication - -## FAQ - -### Q: Why not use the A2A SDK server? -**A**: The A2A SDK has no authentication layer. We'd have to rebuild all OAuth infrastructure. - -### Q: Can I use LangChain/AutoGen? -**A**: Yes! Implement the `AgentExecutor` protocol or use `LambdaExecutor` for simple functions. - -### Q: What's the difference between AgentClient and DelegationClient? -**A**: -- `AgentClient`: User authentication with PKCE (browser-based login) -- `DelegationClient`: Service-to-service with token exchange (RFC 8693) - -### Q: Do I need CrewAI? -**A**: No! Use any framework or write custom logic. Just implement `AgentExecutor`. - -## Support - -- **GitHub**: https://github.com/keycardai/python-sdk -- **Issues**: https://github.com/keycardai/python-sdk/issues -- **Docs**: https://docs.keycard.ai - -## License - -MIT +If you are starting fresh: skip `keycardai-agents` and depend on the destination packages directly. diff --git a/packages/agents/pyproject.toml b/packages/agents/pyproject.toml index 43046f5..a4a4f01 100644 --- a/packages/agents/pyproject.toml +++ b/packages/agents/pyproject.toml @@ -1,16 +1,13 @@ [project] name = "keycardai-agents" dynamic = ["version"] -description = "Legacy CrewAI-over-A2A integration. Decomposing per the Keycard SDK packaging KEP: see keycardai-a2a (delegation), keycardai-oauth (PKCE), and forthcoming keycardai-crewai. This package will be archived." +description = "Empty legacy package, pending archive. Decomposed per the Keycard SDK packaging KEP into keycardai-a2a (delegation), keycardai-oauth (PKCE user-login), and keycardai-crewai (CrewAI integration)." readme = "README.md" requires-python = ">=3.10" license = { text = "MIT" } authors = [{ name = "Keycard", email = "support@keycard.ai" }] -dependencies = [ - "keycardai-a2a>=0.1.0", - "pydantic>=2.11.7", -] -keywords = ["agents", "ai", "crewai", "authentication", "authorization", "service", "delegation"] +dependencies = [] +keywords = ["keycard", "deprecated"] classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", @@ -27,18 +24,11 @@ classifiers = [ ] [project.optional-dependencies] -crewai = [ - "crewai>=0.86.0", -] test = [ "pytest>=8.4.1", - "pytest-asyncio>=1.1.0", - "pytest-cov>=6.2.1", - "pytest-timeout>=2.3.1", ] dev = [ "ruff>=0.8.6", - "mypy>=1.14.1", ] [build-system] diff --git a/packages/agents/src/keycardai/agents/__init__.py b/packages/agents/src/keycardai/agents/__init__.py index bc708d9..488daa8 100644 --- a/packages/agents/src/keycardai/agents/__init__.py +++ b/packages/agents/src/keycardai/agents/__init__.py @@ -1,24 +1,15 @@ -"""KeycardAI Agents (legacy package). +"""KeycardAI Agents (legacy package, pending archive). This package previously housed three concerns. Per the KEP "Decompose -keycardai-agents", they have moved to: +keycardai-agents", they have all moved: -- A2A delegation, agent service hosting, executor primitives, and service - discovery → ``keycardai-a2a`` (``from keycardai.a2a import ...``). +- A2A delegation, agent service primitives, and service discovery → + ``keycardai-a2a`` (``from keycardai.a2a import ...``). - OAuth 2.0 PKCE user-login flow (``AgentClient``) → ``keycardai-oauth`` (``from keycardai.oauth.pkce import authenticate``). -- The CrewAI-over-A2A integration is the only remaining piece, accessible - via ``from keycardai.agents.integrations.crewai import ...``. It will - move to a dedicated ``keycardai-crewai`` package; this package will be - archived once that ships. -""" - -# Integrations (optional) -try: - from .integrations import crewai -except ImportError: - crewai = None +- CrewAI-over-A2A integration (executor + delegation tools) → + ``keycardai-crewai`` (``from keycardai.crewai import ...``). -__all__ = [ - "crewai", -] +This package now exposes no symbols. It will be archived once downstream +references catch up (tracked in ACC-232). +""" diff --git a/packages/agents/src/keycardai/agents/integrations/__init__.py b/packages/agents/src/keycardai/agents/integrations/__init__.py deleted file mode 100644 index 039cca5..0000000 --- a/packages/agents/src/keycardai/agents/integrations/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Integrations with agent frameworks. - -This package provides integrations with various agent frameworks: -- CrewAI: Tools for agent-to-agent delegation -""" - -try: - from .crewai import create_a2a_tool_for_service, get_a2a_tools, set_delegation_token - - __all__ = [ - "get_a2a_tools", - "set_delegation_token", - "create_a2a_tool_for_service", - ] -except ImportError: - # CrewAI not installed - __all__ = [] diff --git a/packages/agents/tests/integrations/__init__.py b/packages/agents/tests/integrations/__init__.py deleted file mode 100644 index 849610e..0000000 --- a/packages/agents/tests/integrations/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Integration tests for agent frameworks.""" diff --git a/packages/crewai/CHANGELOG.md b/packages/crewai/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/crewai/README.md b/packages/crewai/README.md new file mode 100644 index 0000000..53563a6 --- /dev/null +++ b/packages/crewai/README.md @@ -0,0 +1,127 @@ +# keycardai-crewai + +CrewAI integration for [keycardai-a2a](../a2a). Use a CrewAI `Crew` as the agent body of a Keycard-protected A2A service, and give CrewAI agents tools that delegate work to other A2A services with the user's token exchanged via Keycard. + +> **Preview.** This package is pre-1.0. APIs may change between minor versions. + +## What's in here + +Server-side: + +- **`CrewAIExecutor`**: an `a2a-sdk` 1.x `AgentExecutor` subclass that runs a CrewAI `Crew`. Pass it directly to `a2a.server.request_handlers.DefaultRequestHandler(agent_executor=...)` — no wrapper needed. +- **`set_delegation_token(access_token)`**: stash an inbound bearer in the contextvar that synchronous CrewAI tools read at delegation time. `CrewAIExecutor` calls this for you; reach for it directly only when running a crew outside the executor. + +Client-side (CrewAI tools that delegate to other A2A services): + +- **`get_a2a_tools(service_config, delegatable_services)`**: returns a list of `crewai.tools.BaseTool` instances, one per delegatable service. Each tool calls the target service via `keycardai-a2a`'s `DelegationClientSync.invoke_service`, exchanging the contextvar token along the way. +- **`create_a2a_tool_for_service(service_config, target_service_url)`**: single-tool variant for cases where the caller already knows the target URL. + +## Installation + +```bash +pip install keycardai-crewai +``` + +This pulls in `keycardai-a2a`, `crewai`, and (transitively) `keycardai-oauth` + `keycardai-starlette`. + +## Quick start + +`CrewAIExecutor` is an `a2a-sdk` `AgentExecutor`, so it slots into `DefaultRequestHandler` the same way any other executor does. Build the Keycard-protected mount with `keycardai-a2a`'s primitives, drop `CrewAIExecutor(make_crew)` in as the executor, and you are done. + +```python +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount + +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) +from keycardai.crewai import CrewAIExecutor, get_a2a_tools +from keycardai.oauth.server.credentials import ClientSecret +from keycardai.starlette import AuthProvider, KeycardAuthBackend, keycard_on_error +from keycardai.starlette.routers.metadata import ( + well_known_authorization_server_route, + well_known_protected_resource_route, +) + +config = AgentServiceConfig( + service_name="My Crew", + client_id="...", + client_secret="...", + identity_url="https://my-crew.example.com", + zone_id="your-zone-id", + capabilities=["orchestrator"], +) + +# Build CrewAI tools that delegate to other A2A services +delegatable = [{"name": "Echo", "url": "https://echo.example.com", "description": "echoes input", "capabilities": ["echo"]}] +tools = await get_a2a_tools(config, delegatable_services=delegatable) + + +def make_crew(): + from crewai import Agent, Crew, Task + + orchestrator = Agent(role="Orchestrator", goal="Delegate to specialists", tools=tools) + task = Task(description="{task}", agent=orchestrator, expected_output="result") + return Crew(agents=[orchestrator], tasks=[task]) + + +auth_provider = AuthProvider( + zone_url=config.auth_server_url, + server_name=config.service_name, + server_url=config.identity_url, + application_credential=ClientSecret((config.client_id, config.client_secret)), +) +verifier = auth_provider.get_token_verifier() + +agent_card = build_agent_card_from_config(config) +request_handler = DefaultRequestHandler( + agent_executor=CrewAIExecutor(make_crew), + task_store=InMemoryTaskStore(), + agent_card=agent_card, +) + +# Add these routes to your existing Starlette / FastAPI app: +your_app.routes.extend(create_agent_card_routes(agent_card=agent_card)) +your_app.routes.append(well_known_protected_resource_route( + issuer=config.auth_server_url, + resource="/.well-known/oauth-protected-resource{resource_path:path}", +)) +your_app.routes.append(well_known_authorization_server_route( + issuer=config.auth_server_url, + resource="/.well-known/oauth-authorization-server{resource_path:path}", +)) +your_app.routes.append(Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=KeycardAuthBackend(verifier, require_authentication=True), + on_error=keycard_on_error, + ), + ], +)) +``` + +`CrewAIExecutor.execute` reads the verified bearer from `context.call_context.state["access_token"]` (set by `KeycardServerCallContextBuilder`), calls `set_delegation_token` so synchronous CrewAI tools can read it, and runs `crew.kickoff()` on a worker thread via `asyncio.to_thread` to avoid blocking the event loop. + +## Relationship to other Keycard packages + +- **`keycardai-a2a`**: provides the agent service primitives this package builds on. `CrewAIExecutor` subclasses `a2a-sdk`'s `AgentExecutor` directly; the tools created by `get_a2a_tools` go through `keycardai-a2a`'s `DelegationClientSync`. +- **`keycardai-oauth`**: token exchange runs through `keycardai-oauth` under the hood, via `keycardai-a2a`'s delegation client. +- **`keycardai-starlette`**: the auth backend protecting the agent service mount lives here. +- **`keycardai-mcp`**: hosts a separate CrewAI integration for **MCP tools** (different protocol). That one stays in `keycardai-mcp` and is unrelated to this package. + +## History + +This package was extracted from the original `keycardai-agents` package (KEP: Decompose keycardai-agents). The PKCE user-login client moved to `keycardai-oauth`; the A2A delegation surface moved to `keycardai-a2a`; the `keycardai-agents` source directory is being archived. diff --git a/packages/crewai/pyproject.toml b/packages/crewai/pyproject.toml new file mode 100644 index 0000000..e73612e --- /dev/null +++ b/packages/crewai/pyproject.toml @@ -0,0 +1,100 @@ +[project] +name = "keycardai-crewai" +dynamic = ["version"] +description = "CrewAI integration for Keycard A2A: run CrewAI crews as agent services and create delegation tools that exchange tokens with downstream services." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +authors = [{ name = "Keycard", email = "support@keycard.ai" }] +dependencies = [ + "keycardai-a2a>=0.2.0", + "pydantic>=2.11.7", + "crewai>=0.86.0", +] +keywords = ["crewai", "agents", "ai", "a2a", "agent-to-agent", "delegation", "authentication", "oauth", "token-exchange", "keycard"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "License :: OSI Approved :: MIT License", +] + +[project.optional-dependencies] +test = [ + "pytest>=8.4.1", + "pytest-asyncio>=1.1.0", + "pytest-cov>=6.2.1", + "pytest-timeout>=2.3.1", +] +dev = [ + "ruff>=0.8.6", + "mypy>=1.14.1", +] + +[project.urls] +Homepage = "https://github.com/keycardai/python-sdk" +Repository = "https://github.com/keycardai/python-sdk" +Documentation = "https://docs.keycardai.com" +Issues = "https://github.com/keycardai/python-sdk/issues" + +[build-system] +requires = ["hatchling", "uv-dynamic-versioning"] +build-backend = "hatchling.build" + +[tool.hatch.version] +source = "uv-dynamic-versioning" + +[tool.uv-dynamic-versioning] +vcs = "git" +pattern = "(?P\\d+\\.\\d+\\.\\d+)-keycardai-crewai" +style = "pep440" + +[tool.hatch.build.targets.wheel] +packages = ["src/keycardai"] + +[tool.hatch.build.targets.sdist] +exclude = [ + "/.github", + "/tests", +] + +[tool.ruff] +line-length = 120 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = ["E501"] + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" + +[tool.commitizen] +name = "cz_customize" +version = "0.0.0" +tag_format = "${version}-keycardai-crewai" +ignored_tag_formats = ["${version}-*"] +update_changelog_on_bump = true +bump_message = "bump: keycardai-crewai $current_version → $new_version" +major_version_zero = true + +[tool.commitizen.customize] +changelog_pattern = "^(feat|fix|refactor|perf|test|build|ci|revert)\\(keycardai-crewai\\)(!)?:" diff --git a/packages/agents/src/keycardai/agents/integrations/crewai.py b/packages/crewai/src/keycardai/crewai/__init__.py similarity index 71% rename from packages/agents/src/keycardai/agents/integrations/crewai.py rename to packages/crewai/src/keycardai/crewai/__init__.py index 188e54a..84124d3 100644 --- a/packages/agents/src/keycardai/agents/integrations/crewai.py +++ b/packages/crewai/src/keycardai/crewai/__init__.py @@ -1,12 +1,20 @@ """CrewAI integration for A2A (agent-to-agent) delegation. -This module provides: -1. CrewAIExecutor: Adapter for running CrewAI crews in the agent service server -2. Delegation tools: CrewAI tools for calling other agent services +Two halves: + +1. ``CrewAIExecutor``: an ``a2a-sdk`` 1.x ``AgentExecutor`` subclass that runs + a CrewAI ``Crew`` in response to incoming A2A messages. Pass it directly + to ``a2a.server.request_handlers.DefaultRequestHandler`` — no wrapper + needed. +2. Delegation tools: ``crewai.tools.BaseTool`` instances that let a CrewAI + agent delegate work to other A2A services, exchanging the inbound user + token via Keycard. Usage with executor: - >>> from keycardai.a2a import AgentServiceConfig - >>> from keycardai.agents.integrations.crewai import CrewAIExecutor + >>> from a2a.server.request_handlers import DefaultRequestHandler + >>> from a2a.server.tasks import InMemoryTaskStore + >>> from keycardai.a2a import AgentServiceConfig, build_agent_card_from_config + >>> from keycardai.crewai import CrewAIExecutor >>> from crewai import Agent, Crew, Task >>> >>> def create_my_crew(): @@ -14,15 +22,17 @@ ... task = Task(description="{task}", agent=agent) ... return Crew(agents=[agent], tasks=[task]) >>> - >>> config = AgentServiceConfig( - ... service_name="My Service", + >>> config = AgentServiceConfig(service_name="My Service", ...) + >>> agent_card = build_agent_card_from_config(config) + >>> request_handler = DefaultRequestHandler( ... agent_executor=CrewAIExecutor(create_my_crew), - ... # ... other config + ... task_store=InMemoryTaskStore(), + ... agent_card=agent_card, ... ) Usage with delegation tools: >>> from keycardai.a2a import AgentServiceConfig - >>> from keycardai.agents.integrations.crewai import get_a2a_tools + >>> from keycardai.crewai import get_a2a_tools >>> from crewai import Agent, Crew >>> >>> # Create service config @@ -48,67 +58,74 @@ >>> ) """ +import asyncio import contextvars import logging from typing import Any, Callable +from a2a.server.agent_execution import AgentExecutor +from a2a.server.events.event_queue_v2 import EventQueue +from a2a.types import Message, Part, Role +from keycardai.a2a import AgentServiceConfig, DelegationClientSync, ServiceDiscovery from pydantic import BaseModel, Field -from keycardai.a2a import AgentServiceConfig, DelegationClientSync, ServiceDiscovery +from crewai import Crew +from crewai.tools import BaseTool -# Context variable to store the current user's access token for delegation +# Context variable to store the current user's access token for delegation. +# Read by ServiceDelegationTool._run; written by CrewAIExecutor.execute (or +# manually via set_delegation_token). _current_user_token: contextvars.ContextVar[str | None] = contextvars.ContextVar( "current_user_token", default=None ) -try: - from crewai import Crew - from crewai.tools import BaseTool -except ImportError: - raise ImportError( - "CrewAI is not installed. Install it with: pip install 'keycardai-agents[crewai]'" - ) from None - logger = logging.getLogger(__name__) def set_delegation_token(access_token: str) -> None: """Set the user's access token for delegation context. - This should be called before crew execution to provide the user's - token for service-to-service delegation. The token will be used - for token exchange when delegating to other services. + ``CrewAIExecutor`` calls this for you. Use it directly only when running + a crew outside the executor (e.g., from a custom AgentExecutor or a + test). Args: access_token: The user's access token from the request Example: - >>> # In your server's invoke handler - >>> access_token = request.user.access_token - >>> set_delegation_token(access_token) - >>> - >>> # Now crew tools can delegate with the user's context - >>> crew = create_my_crew() - >>> result = crew.kickoff() + >>> # In a custom AgentExecutor.execute method + >>> access_token = context.call_context.state.get("access_token") + >>> if access_token: + ... set_delegation_token(access_token) + >>> result = my_crew.kickoff(...) """ _current_user_token.set(access_token) -class CrewAIExecutor: - """Executor adapter for CrewAI crews. +class CrewAIExecutor(AgentExecutor): + """``a2a-sdk`` 1.x ``AgentExecutor`` that runs a CrewAI ``Crew``. + + Pass an instance directly to + ``a2a.server.request_handlers.DefaultRequestHandler(agent_executor=...)``; + no outer wrapper is needed. Subclasses ``a2a.server.agent_execution.AgentExecutor`` + so it satisfies the wire-up contract that ``DefaultRequestHandler`` expects. - This executor implements the AgentExecutor protocol for CrewAI crews, - allowing them to be used in the generic agent service server. + On each call to ``execute``: - The executor: - 1. Takes a crew factory callable - 2. Sets delegation token context before execution - 3. Calls crew.kickoff() with the task/inputs - 4. Returns the result as a string + 1. Reads ``context.call_context.state["access_token"]`` (populated by + ``keycardai.a2a.KeycardServerCallContextBuilder``) and sets the + delegation contextvar so synchronous CrewAI tools can pick it up. + 2. Calls the ``crew_factory`` to build a fresh ``Crew``. + 3. Runs ``crew.kickoff(inputs={"task": })`` on a worker thread + via ``asyncio.to_thread`` so the synchronous CrewAI runtime does not + starve uvicorn's event loop. ``asyncio.to_thread`` propagates the + contextvar via ``contextvars.copy_context``; do **not** swap this for a + raw ``ThreadPoolExecutor``, which would not, and would silently break + delegation. + 4. Wraps the string result in an A2A ``Message`` and enqueues it. Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution + crew_factory: Callable that returns a fresh ``Crew`` for each request. Example: >>> from crewai import Agent, Crew, Task @@ -119,68 +136,38 @@ class CrewAIExecutor: ... return Crew(agents=[agent], tasks=[task]) >>> >>> executor = CrewAIExecutor(create_my_crew) - >>> result = executor.execute("Hello world", {"name": "Alice"}) """ - def __init__(self, crew_factory: Callable[[], Crew], set_token_context: bool = True): - """Initialize CrewAI executor. - - Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution - """ + def __init__(self, crew_factory: Callable[[], Crew]): self.crew_factory = crew_factory - self.set_token_context = set_token_context - - def execute( - self, - task: dict[str, Any] | str, - inputs: dict[str, Any] | None = None, - ) -> str: - """Execute crew with the given task and inputs. - - Args: - task: Task description (string) or parameters (dict) - inputs: Optional additional inputs for the crew - - Returns: - Result from crew execution as string - - Raises: - Exception: If crew execution fails - """ - # Create crew instance - crew = self.crew_factory() - # Prepare inputs for crew - if isinstance(task, dict): - crew_inputs = task + async def execute(self, context: Any, event_queue: EventQueue) -> None: + call_ctx = getattr(context, "call_context", None) + access_token = call_ctx.state.get("access_token") if call_ctx else None + if access_token: + set_delegation_token(access_token) else: - crew_inputs = {"task": task} - - # Merge additional inputs if provided - if inputs: - crew_inputs.update(inputs) + logger.warning( + "No access_token in RequestContext.call_context.state; " + "delegation tools will run without a user token. Ensure the " + "JSONRPC mount uses keycardai.a2a.KeycardServerCallContextBuilder." + ) - # Execute crew - # Note: crew.kickoff() is synchronous in CrewAI - logger.info(f"Executing CrewAI crew with inputs: {list(crew_inputs.keys())}") - result = crew.kickoff(inputs=crew_inputs) - - # Return result as string - return str(result) + user_input = context.get_user_input() + crew = self.crew_factory() + crew_inputs = {"task": user_input} - def set_token_for_delegation(self, access_token: str) -> None: - """Set access token for delegation context. + logger.info("Executing CrewAI crew") + result = await asyncio.to_thread(crew.kickoff, inputs=crew_inputs) - This is called by the server before execution to provide - the user's token for service-to-service delegation. + message = Message( + role=Role.ROLE_AGENT, + parts=[Part(text=str(result))], + ) + await event_queue.enqueue_event(message) - Args: - access_token: User's access token - """ - if self.set_token_context: - set_delegation_token(access_token) + async def cancel(self, context: Any, event_queue: EventQueue) -> None: + return None async def get_a2a_tools( @@ -411,3 +398,11 @@ async def create_a2a_tool_for_service( # Create and return tool return _create_delegation_tool(service_info, delegation_client) + + +__all__ = [ + "CrewAIExecutor", + "create_a2a_tool_for_service", + "get_a2a_tools", + "set_delegation_token", +] diff --git a/packages/crewai/tests/__init__.py b/packages/crewai/tests/__init__.py new file mode 100644 index 0000000..5a78d33 --- /dev/null +++ b/packages/crewai/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for keycardai-crewai package.""" diff --git a/packages/agents/tests/integrations/test_crewai_a2a.py b/packages/crewai/tests/test_crewai_a2a.py similarity index 69% rename from packages/agents/tests/integrations/test_crewai_a2a.py rename to packages/crewai/tests/test_crewai_a2a.py index 7b8d6de..41688b9 100644 --- a/packages/agents/tests/integrations/test_crewai_a2a.py +++ b/packages/crewai/tests/test_crewai_a2a.py @@ -1,17 +1,22 @@ """Tests for CrewAI A2A delegation integration.""" -from unittest.mock import AsyncMock, patch +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch import pytest pytest.importorskip("crewai") +from a2a.server.agent_execution import AgentExecutor from keycardai.a2a import AgentServiceConfig -from keycardai.agents.integrations.crewai import ( +from keycardai.crewai import ( + CrewAIExecutor, _create_delegation_tool, + _current_user_token, create_a2a_tool_for_service, get_a2a_tools, + set_delegation_token, ) @@ -101,7 +106,7 @@ async def test_get_a2a_tools_discovers_services_when_none_provided( # When delegatable_services=None, it should try to discover # Currently returns empty list (discovery not implemented) with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" + "keycardai.crewai.ServiceDiscovery" ) as mock_discovery_class: mock_discovery = AsyncMock() mock_discovery.list_delegatable_services.return_value = [] @@ -366,7 +371,7 @@ async def test_create_tool_fetches_agent_card( ): """Test create_a2a_tool_for_service fetches agent card.""" with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" + "keycardai.crewai.ServiceDiscovery" ) as mock_discovery_class: mock_discovery = AsyncMock() mock_discovery.get_service_card.return_value = mock_agent_card @@ -390,7 +395,7 @@ async def test_create_tool_fetches_agent_card( async def test_create_tool_for_service(self, service_config, mock_agent_card): """Test tool is created correctly from agent card.""" with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" + "keycardai.crewai.ServiceDiscovery" ) as mock_discovery_class: mock_discovery = AsyncMock() mock_discovery.get_service_card.return_value = mock_agent_card @@ -404,3 +409,176 @@ async def test_create_tool_for_service(self, service_config, mock_agent_card): # Tool name should be based on service name from agent card assert "echo" in tool.name.lower() assert "service" in tool.name.lower() + + +def _make_request_context(*, user_input: str = "hello", access_token: str | None = None): + """Build a stand-in for a2a-sdk's RequestContext. + + Only the attributes CrewAIExecutor.execute touches are populated: + ``get_user_input()`` and ``call_context.state``. + """ + call_context = SimpleNamespace( + state={"access_token": access_token} if access_token is not None else {} + ) + return SimpleNamespace( + get_user_input=lambda: user_input, + call_context=call_context, + ) + + +@pytest.fixture(autouse=True) +def _reset_delegation_token(): + """Each test starts with a clean contextvar.""" + token = _current_user_token.set(None) + yield + _current_user_token.reset(token) + + +class TestCrewAIExecutor: + """CrewAIExecutor wires a CrewAI Crew into a2a-sdk's AgentExecutor contract.""" + + def test_subclasses_a2a_agent_executor(self): + """The whole point of the wrap: instances pass DefaultRequestHandler's type check.""" + executor = CrewAIExecutor(crew_factory=lambda: MagicMock()) + + assert isinstance(executor, AgentExecutor) + + @pytest.mark.asyncio + async def test_execute_runs_crew_with_user_input(self): + """The user input from RequestContext lands in crew.kickoff(inputs={"task": ...}).""" + crew = MagicMock() + crew.kickoff.return_value = "crew result" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(user_input="analyze this") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + crew.kickoff.assert_called_once_with(inputs={"task": "analyze this"}) + + @pytest.mark.asyncio + async def test_execute_enqueues_message_with_crew_result(self): + """The string form of the crew result becomes the agent message.""" + crew = MagicMock() + crew.kickoff.return_value = "the answer" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(user_input="ask") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + event_queue.enqueue_event.assert_called_once() + message = event_queue.enqueue_event.call_args[0][0] + # The Message body should carry the crew result. + assert "the answer" in str(message) + + @pytest.mark.asyncio + async def test_execute_propagates_access_token_to_contextvar(self): + """The bearer in call_context.state must reach _current_user_token by the time + crew.kickoff runs, since synchronous CrewAI tools read the contextvar there. + + asyncio.to_thread inherits the calling task's context via copy_context, so the + contextvar set in execute() is visible inside the worker thread. + """ + observed = {} + + def crew_factory(): + crew = MagicMock() + + def kickoff(inputs): + observed["token"] = _current_user_token.get() + return "ok" + + crew.kickoff.side_effect = kickoff + return crew + + executor = CrewAIExecutor(crew_factory=crew_factory) + + context = _make_request_context(access_token="bearer-abc") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + assert observed["token"] == "bearer-abc" + + @pytest.mark.asyncio + async def test_execute_warns_when_access_token_missing(self, caplog): + """No token in state ⇒ log a warning so misconfigured deployments are visible.""" + crew = MagicMock() + crew.kickoff.return_value = "ok" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(access_token=None) + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + with caplog.at_level("WARNING", logger="keycardai.crewai"): + await executor.execute(context, event_queue) + + assert any( + "access_token" in record.message + for record in caplog.records + if record.levelname == "WARNING" + ) + + @pytest.mark.asyncio + async def test_execute_does_not_block_event_loop(self): + """crew.kickoff must run on a worker thread, not on the event loop. + + The probe records the running loop's policy at kickoff time. If kickoff + ran on the event loop directly, asyncio.get_running_loop() would succeed + in the same task; in a worker thread it raises RuntimeError. + """ + observed = {} + + def crew_factory(): + crew = MagicMock() + + def kickoff(inputs): + import asyncio + + try: + asyncio.get_running_loop() + observed["on_loop"] = True + except RuntimeError: + observed["on_loop"] = False + return "ok" + + crew.kickoff.side_effect = kickoff + return crew + + executor = CrewAIExecutor(crew_factory=crew_factory) + + context = _make_request_context() + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + assert observed["on_loop"] is False + + @pytest.mark.asyncio + async def test_cancel_returns_none(self): + """Default cancel is a no-op; AgentExecutor.cancel must not raise.""" + executor = CrewAIExecutor(crew_factory=lambda: MagicMock()) + + context = _make_request_context() + event_queue = MagicMock() + + result = await executor.cancel(context, event_queue) + + assert result is None + + +class TestSetDelegationToken: + """set_delegation_token writes to the public contextvar.""" + + def test_set_delegation_token_updates_contextvar(self): + set_delegation_token("token-xyz") + + assert _current_user_token.get() == "token-xyz" diff --git a/packages/crewai/tests/test_default_request_handler_integration.py b/packages/crewai/tests/test_default_request_handler_integration.py new file mode 100644 index 0000000..2ca381f --- /dev/null +++ b/packages/crewai/tests/test_default_request_handler_integration.py @@ -0,0 +1,241 @@ +"""Wire-up smoke test for ``CrewAIExecutor`` against ``a2a-sdk``'s +``DefaultRequestHandler`` and the JSONRPC route factory. + +The ``test_crewai_a2a.py`` tests cover ``CrewAIExecutor.execute`` in isolation +with mocked context and event_queue. They prove the method does the right thing +when invoked directly, but they do NOT prove that ``DefaultRequestHandler`` +actually invokes our ``execute`` method when a real JSONRPC ``message/send`` +request comes in. That gap is the highest-value thing to close: it verifies the +wrap-fidelity claim end-to-end. + +This module instantiates the headline composition exactly the way the README +quickstart shows (``DefaultRequestHandler(agent_executor=CrewAIExecutor(...), ...)`` +mounted via ``create_jsonrpc_routes``), drives it with a real JSONRPC POST +through Starlette's ``TestClient``, and asserts the crew result comes back in +the response. A stub auth backend stands in for the real Keycard verifier (real +auth is exercised in keycardai-starlette's own tests); the only thing under +test here is the wire-up between the JSONRPC dispatcher and our executor. +""" + +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("crewai") + +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) +from keycardai.starlette import KeycardUser +from starlette.applications import Starlette +from starlette.authentication import AuthCredentials, AuthenticationBackend +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount +from starlette.testclient import TestClient + +from keycardai.crewai import CrewAIExecutor, _current_user_token + + +class _StubAuthBackend(AuthenticationBackend): + """Stand-in for ``KeycardAuthBackend`` that always authenticates with a + fixed ``KeycardUser``. + + Real auth is covered by keycardai-starlette's own tests. Using a real + verifier here would require a reachable JWKS endpoint, which the test + environment does not provide; the cost of mocking the verifier outweighs + the value, since the verifier-to-context flow is already covered by the + ``KeycardServerCallContextBuilder`` propagation tests in keycardai-a2a. + What this test isolates is the JSONRPC -> DefaultRequestHandler -> + CrewAIExecutor.execute path. + """ + + def __init__(self, access_token: str): + self._access_token = access_token + + async def authenticate(self, conn): + user = KeycardUser( + access_token=self._access_token, + client_id="caller-svc", + zone_id="test_zone", + resource_server_url="https://test.example.com", + scopes=["test"], + ) + return AuthCredentials(["authenticated"]), user + + +@pytest.fixture +def service_config(): + return AgentServiceConfig( + service_name="Test Crew Service", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone", + capabilities=["test"], + ) + + +@pytest.fixture +def crew_observation(): + """Captures what the (fake) crew was driven with, for assertion.""" + return {} + + +@pytest.fixture +def fake_crew_factory(crew_observation): + def _factory(): + crew = MagicMock() + + def kickoff(inputs): + crew_observation["inputs"] = inputs + crew_observation["token_at_kickoff"] = _current_user_token.get() + return "fake crew result" + + crew.kickoff.side_effect = kickoff + return crew + + return _factory + + +@pytest.fixture +def app(service_config, fake_crew_factory): + """The composition under test: DefaultRequestHandler wraps CrewAIExecutor, + mounted via the standard a2a-sdk JSONRPC route factory, with the Keycard + context-builder propagating the verified user into ServerCallContext.state. + """ + executor = CrewAIExecutor(crew_factory=fake_crew_factory) + agent_card = build_agent_card_from_config(service_config) + request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + + return Starlette( + routes=[ + Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=_StubAuthBackend(access_token="bearer-test-token"), + ), + ], + ), + ] + ) + + +@pytest.fixture +def client(app): + return TestClient(app) + + +class TestDefaultRequestHandlerInvokesCrewAIExecutor: + """The wrap actually wraps: DefaultRequestHandler drives CrewAIExecutor when + a JSONRPC ``message/send`` request lands at ``/a2a/jsonrpc``. + """ + + def test_message_send_drives_crew_kickoff_with_user_input( + self, client, crew_observation + ): + """The user's message text reaches ``crew.kickoff(inputs={"task": ...})``. + + This is the core wire-up assertion: a JSONRPC envelope hitting the mount + causes ``DefaultRequestHandler`` to build a ``RequestContext``, call our + executor's ``execute``, which calls our crew. If the crew never sees the + text, something between the dispatcher and ``execute`` is broken. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-1", + "role": "ROLE_USER", + "parts": [{"text": "hello world"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert crew_observation["inputs"] == {"task": "hello world"} + + def test_response_carries_crew_result(self, client): + """The string returned by the crew comes back to the JSONRPC caller. + + ``CrewAIExecutor.execute`` enqueues a ``Message`` whose text is the + ``str(crew.kickoff(...))`` result. ``DefaultRequestHandler`` shapes that + into the ``SendMessageResponse`` payload. The crew result string should + appear somewhere in the response body. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-2", + "role": "ROLE_USER", + "parts": [{"text": "anything"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert "fake crew result" in response.text + + def test_access_token_propagates_into_crew_kickoff( + self, client, crew_observation + ): + """The bearer reaches the contextvar by the time crew.kickoff runs. + + The verified bearer flows: KeycardUser (set by the auth backend) -> + request.scope["user"] -> KeycardServerCallContextBuilder -> + ServerCallContext.state["access_token"] -> CrewAIExecutor.execute reads + it -> set_delegation_token writes it to the contextvar -> asyncio.to_thread + copies the context into the worker thread -> crew.kickoff observes it. + + If any link breaks, synchronous CrewAI tools delegate without the user + token and downstream services either reject the call or attribute it to + the wrong identity. Worth a single assertion that the chain holds. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-3", + "role": "ROLE_USER", + "parts": [{"text": "doesn't matter"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert crew_observation["token_at_kickoff"] == "bearer-test-token" diff --git a/pyproject.toml b/pyproject.toml index 9a2d18e..5a2ca1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ keycardai-fastmcp = { workspace = true } keycardai-mcp-fastmcp = { workspace = true } keycardai-mcp = { workspace = true } keycardai-a2a = { workspace = true } +keycardai-crewai = { workspace = true } keycardai-agents = { workspace = true } [build-system] diff --git a/uv.lock b/uv.lock index 432b2b4..f44b94b 100644 --- a/uv.lock +++ b/uv.lock @@ -24,6 +24,7 @@ members = [ "keycardai", "keycardai-a2a", "keycardai-agents", + "keycardai-crewai", "keycardai-fastmcp", "keycardai-mcp", "keycardai-mcp-fastmcp", @@ -2440,15 +2441,32 @@ provides-extras = ["dev", "test"] [[package]] name = "keycardai-agents" source = { editable = "packages/agents" } + +[package.optional-dependencies] +dev = [ + { name = "ruff" }, +] +test = [ + { name = "pytest" }, +] + +[package.metadata] +requires-dist = [ + { name = "pytest", marker = "extra == 'test'", specifier = ">=8.4.1" }, + { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.6" }, +] +provides-extras = ["dev", "test"] + +[[package]] +name = "keycardai-crewai" +source = { editable = "packages/crewai" } dependencies = [ + { name = "crewai" }, { name = "keycardai-a2a" }, { name = "pydantic" }, ] [package.optional-dependencies] -crewai = [ - { name = "crewai" }, -] dev = [ { name = "mypy" }, { name = "ruff" }, @@ -2462,7 +2480,7 @@ test = [ [package.metadata] requires-dist = [ - { name = "crewai", marker = "extra == 'crewai'", specifier = ">=0.86.0" }, + { name = "crewai", specifier = ">=0.86.0" }, { name = "keycardai-a2a", editable = "packages/a2a" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.14.1" }, { name = "pydantic", specifier = ">=2.11.7" }, @@ -2472,7 +2490,7 @@ requires-dist = [ { name = "pytest-timeout", marker = "extra == 'test'", specifier = ">=2.3.1" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.6" }, ] -provides-extras = ["crewai", "dev", "test"] +provides-extras = ["dev", "test"] [[package]] name = "keycardai-fastmcp"