From 55c5e3eb85957b1cc66e277a11d44408742191b9 Mon Sep 17 00:00:00 2001 From: Larry-Osakwe Date: Wed, 29 Apr 2026 14:23:14 -0700 Subject: [PATCH 1/5] feat(keycardai-crewai)!: new package split from keycardai-agents (ACC-231) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the KEP "Decompose keycardai-agents", the CrewAI-over-A2A integration moves out of keycardai-agents into a new keycardai-crewai package, sister to keycardai-a2a (#105 / ACC-230). Public surface in keycardai.crewai: - CrewAIExecutor — adapter that runs a CrewAI Crew factory and returns the result as a string. Intended to be called from inside an a2a-sdk AgentExecutor.execute method. - set_delegation_token(access_token) — stash the inbound bearer in a contextvar so synchronous CrewAI tool calls can pick it up at delegation time. - get_a2a_tools(service_config, delegatable_services) — return a list of crewai.tools.BaseTool instances, one per delegatable service. Each tool calls the target service via keycardai-a2a is DelegationClientSync.invoke_service. - create_a2a_tool_for_service(service_config, target_service_url) — single-tool variant for callers that already know the target URL. The body is the same code that lived at keycardai.agents.integrations.crewai with three string-level changes: - ImportError message updated to "pip install keycardai-crewai". - Docstring usage examples updated to "from keycardai.crewai import ...". - Module path in test mock targets ("patch(...ServiceDiscovery)") updated to "keycardai.crewai.ServiceDiscovery". Wrap rules (mirrored from keycardai-a2a): 1. Use CrewAI native Crew and BaseTool types directly. No parallel protocol. 2. Use keycardai-a2a native primitives (DelegationClientSync, ServiceDiscovery, AgentServiceConfig, KeycardServerCallContextBuilder) directly. 3. The bearer-token contextvar bridges from ServerCallContext.state["access_token"] (set by keycardai-a2a is KeycardServerCallContextBuilder) into the synchronous CrewAI tool execution path. 4. No new HTTP endpoints, no new wire formats. Customers compose CrewAIExecutor with keycardai-a2a primitives in their own Starlette or FastAPI app. The CrewAI-over-MCP-tools integration in keycardai-mcp is a separate concern (different protocol, different package) and stays put. Followups: - Empty out keycardai-agents (next commit on this branch). - ACC-232 archives the keycardai-agents source directory. - Refactor CrewAIExecutor to subclass a2a.server.agent_execution.AgentExecutor (the 1.x async base) so it can be passed straight to DefaultRequestHandler without an outer wrapper. - Update the keycardai-crewai row in the ACC-216 feature parity matrix. --- .github/workflows/release.yml | 1 + packages/crewai/CHANGELOG.md | 0 packages/crewai/README.md | 88 ++++ packages/crewai/pyproject.toml | 101 +++++ .../crewai/src/keycardai/crewai/__init__.py | 422 ++++++++++++++++++ packages/crewai/tests/__init__.py | 1 + packages/crewai/tests/test_crewai_a2a.py | 406 +++++++++++++++++ pyproject.toml | 1 + uv.lock | 36 ++ 9 files changed, 1056 insertions(+) create mode 100644 packages/crewai/CHANGELOG.md create mode 100644 packages/crewai/README.md create mode 100644 packages/crewai/pyproject.toml create mode 100644 packages/crewai/src/keycardai/crewai/__init__.py create mode 100644 packages/crewai/tests/__init__.py create mode 100644 packages/crewai/tests/test_crewai_a2a.py diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ff9b37a..0d82c46 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,7 @@ on: - '*-keycardai-fastmcp' - '*-keycardai-agents' - '*-keycardai-a2a' + - '*-keycardai-crewai' jobs: detect-package: diff --git a/packages/crewai/CHANGELOG.md b/packages/crewai/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/crewai/README.md b/packages/crewai/README.md new file mode 100644 index 0000000..75fdd38 --- /dev/null +++ b/packages/crewai/README.md @@ -0,0 +1,88 @@ +# keycardai-crewai + +CrewAI integration for [keycardai-a2a](../a2a). Use a CrewAI `Crew` as the agent body of a Keycard-protected A2A service, and give CrewAI agents tools that delegate work to other A2A services with the user's token exchanged via Keycard. + +> **Preview.** This package is pre-1.0. APIs may change between minor versions. + +## What's in here + +Server-side: + +- **`CrewAIExecutor`**: adapter that wraps a CrewAI `Crew` factory so it can be wired into `a2a-sdk`'s `DefaultRequestHandler` as the agent body. +- **`set_delegation_token(access_token)`**: stash the inbound bearer in a contextvar so synchronous CrewAI tool calls can pick it up at delegation time. + +Client-side (CrewAI tools that delegate to other A2A services): + +- **`get_a2a_tools(service_config, delegatable_services)`**: returns a list of `crewai.tools.BaseTool` instances, one per delegatable service. Each tool calls the target service via `keycardai-a2a`'s `DelegationClientSync.invoke_service`, exchanging the contextvar token along the way. +- **`create_a2a_tool_for_service(service_config, target_service_url)`**: single-tool variant for cases where the caller already knows the target URL. + +## Installation + +```bash +pip install keycardai-crewai +``` + +This pulls in `keycardai-a2a`, `crewai`, and (transitively) `keycardai-oauth` + `keycardai-starlette`. + +## Quick start + +In your A2A executor's `execute` method, call `set_delegation_token` with the verified bearer (read from `context.call_context.state["access_token"]`, populated by `keycardai.a2a.KeycardServerCallContextBuilder`) and then run your crew. CrewAI tools created via `get_a2a_tools` will pick the token up from the contextvar at invocation time. + +```python +from a2a.server.agent_execution import AgentExecutor +from a2a.types import Message, MessageRole, Part + +from keycardai.a2a import AgentServiceConfig +from keycardai.crewai import CrewAIExecutor, get_a2a_tools, set_delegation_token + +config = AgentServiceConfig( + service_name="My Crew", + client_id="...", + client_secret="...", + identity_url="https://my-crew.example.com", + zone_id="your-zone-id", + capabilities=["orchestrator"], +) + +# Build CrewAI tools that delegate to other A2A services +delegatable = [{"name": "Echo", "url": "https://echo.example.com", "description": "echoes input", "capabilities": ["echo"]}] +tools = await get_a2a_tools(config, delegatable_services=delegatable) + + +def make_crew(): + from crewai import Agent, Crew, Task + + orchestrator = Agent(role="Orchestrator", goal="Delegate to specialists", tools=tools) + task = Task(description="{task}", agent=orchestrator, expected_output="result") + return Crew(agents=[orchestrator], tasks=[task]) + + +class CrewExecutor(AgentExecutor): + async def execute(self, context, event_queue): + access_token = context.call_context.state.get("access_token") + if access_token: + set_delegation_token(access_token) + + crew_runner = CrewAIExecutor(make_crew) + text = context.get_user_input() + result = crew_runner.execute(text) + + message = Message(role=MessageRole.MESSAGE_ROLE_AGENT, parts=[Part(text=result)]) + await event_queue.enqueue_event(message) + + async def cancel(self, context, event_queue): + return None +``` + +Compose this `CrewExecutor` with the `keycardai-a2a` primitives in your Starlette/FastAPI app — see [`packages/a2a/examples/keycard_protected_server`](../a2a/examples/keycard_protected_server) for the auth wiring. + +## Relationship to other Keycard packages + +- **`keycardai-a2a`**: provides the agent service primitives this package builds on. The `CrewAIExecutor` is fed into a2a-sdk's `DefaultRequestHandler`; the tools created by `get_a2a_tools` go through `keycardai-a2a`'s `DelegationClientSync`. +- **`keycardai-oauth`**: token exchange runs through `keycardai-oauth` under the hood, via `keycardai-a2a`'s delegation client. +- **`keycardai-starlette`**: the auth backend protecting the agent service mount lives here. +- **`keycardai-mcp`**: hosts a separate CrewAI integration for **MCP tools** (different protocol). That one stays in `keycardai-mcp` and is unrelated to this package. + +## History + +This package was extracted from the original `keycardai-agents` package (KEP: Decompose keycardai-agents). The PKCE user-login client moved to `keycardai-oauth`; the A2A delegation surface moved to `keycardai-a2a`; the `keycardai-agents` source directory is being archived. diff --git a/packages/crewai/pyproject.toml b/packages/crewai/pyproject.toml new file mode 100644 index 0000000..0ca32f8 --- /dev/null +++ b/packages/crewai/pyproject.toml @@ -0,0 +1,101 @@ +[project] +name = "keycardai-crewai" +dynamic = ["version"] +description = "CrewAI integration for Keycard A2A: run CrewAI crews as agent services and create delegation tools that exchange tokens with downstream services." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +authors = [{ name = "Keycard", email = "support@keycard.ai" }] +dependencies = [ + "keycardai-a2a>=0.2.0", + "pydantic>=2.11.7", + "crewai>=0.86.0", +] +keywords = ["crewai", "agents", "ai", "a2a", "agent-to-agent", "delegation", "authentication", "oauth", "token-exchange", "keycard"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Security", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "License :: OSI Approved :: MIT License", +] + +[project.optional-dependencies] +test = [ + "pytest>=8.4.1", + "pytest-asyncio>=1.1.0", + "pytest-cov>=6.2.1", + "pytest-timeout>=2.3.1", +] +dev = [ + "ruff>=0.8.6", + "mypy>=1.14.1", +] + +[project.urls] +Homepage = "https://github.com/keycardai/python-sdk" +Repository = "https://github.com/keycardai/python-sdk" +Documentation = "https://docs.keycardai.com" +Issues = "https://github.com/keycardai/python-sdk/issues" + +[build-system] +requires = ["hatchling", "uv-dynamic-versioning"] +build-backend = "hatchling.build" + +[tool.hatch.version] +source = "uv-dynamic-versioning" + +[tool.uv-dynamic-versioning] +vcs = "git" +pattern = "(?P\\d+\\.\\d+\\.\\d+)-keycardai-crewai" +style = "pep440" + +[tool.hatch.build.targets.wheel] +packages = ["src/keycardai"] + +[tool.hatch.build.targets.sdist] +exclude = [ + "/.github", + "/tests", +] + +[tool.ruff] +line-length = 120 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = ["E501"] + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" + +[tool.commitizen] +name = "cz_customize" +version = "0.0.0" +tag_format = "${version}-keycardai-crewai" +ignored_tag_formats = ["${version}-*"] +update_changelog_on_bump = true +bump_message = "bump: keycardai-crewai $current_version → $new_version" +major_version_zero = true + +[tool.commitizen.customize] +changelog_pattern = "^(feat|fix|refactor|perf|test|build|ci|revert)\\(keycardai-crewai\\)(!)?:" diff --git a/packages/crewai/src/keycardai/crewai/__init__.py b/packages/crewai/src/keycardai/crewai/__init__.py new file mode 100644 index 0000000..dc9189b --- /dev/null +++ b/packages/crewai/src/keycardai/crewai/__init__.py @@ -0,0 +1,422 @@ +"""CrewAI integration for A2A (agent-to-agent) delegation. + +This module provides: +1. CrewAIExecutor: Adapter for running CrewAI crews in the agent service server +2. Delegation tools: CrewAI tools for calling other agent services + +Usage with executor: + >>> from keycardai.a2a import AgentServiceConfig + >>> from keycardai.crewai import CrewAIExecutor + >>> from crewai import Agent, Crew, Task + >>> + >>> def create_my_crew(): + ... agent = Agent(role="Assistant", goal="Help users") + ... task = Task(description="{task}", agent=agent) + ... return Crew(agents=[agent], tasks=[task]) + >>> + >>> config = AgentServiceConfig( + ... service_name="My Service", + ... # ... other config + ... ) + >>> executor = CrewAIExecutor(create_my_crew) + +Usage with delegation tools: + >>> from keycardai.a2a import AgentServiceConfig + >>> from keycardai.crewai import get_a2a_tools + >>> from crewai import Agent, Crew + >>> + >>> # Create service config + >>> config = AgentServiceConfig(...) + >>> + >>> # Define services we can delegate to + >>> delegatable_services = [ + >>> { + >>> "name": "echo_service", + >>> "url": "http://localhost:8002", + >>> "description": "Echo service that repeats messages", + >>> } + >>> ] + >>> + >>> # Get A2A delegation tools + >>> a2a_tools = await get_a2a_tools(config, delegatable_services) + >>> + >>> # Use tools in crew + >>> agent = Agent( + >>> role="Orchestrator", + >>> tools=a2a_tools, + >>> allow_delegation=True + >>> ) +""" + +import contextvars +import logging +from typing import Any, Callable + +from keycardai.a2a import AgentServiceConfig, DelegationClientSync, ServiceDiscovery +from pydantic import BaseModel, Field + +# Context variable to store the current user's access token for delegation +_current_user_token: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "current_user_token", default=None +) + +try: + from crewai import Crew + from crewai.tools import BaseTool +except ImportError: + raise ImportError( + "CrewAI is not installed. Install it with: pip install keycardai-crewai" + ) from None + +logger = logging.getLogger(__name__) + + +def set_delegation_token(access_token: str) -> None: + """Set the user's access token for delegation context. + + This should be called before crew execution to provide the user's + token for service-to-service delegation. The token will be used + for token exchange when delegating to other services. + + Args: + access_token: The user's access token from the request + + Example: + >>> # In your AgentExecutor.execute method + >>> access_token = context.call_context.state.get("access_token") + >>> set_delegation_token(access_token) + >>> + >>> # Now crew tools can delegate with the user's context + >>> crew = create_my_crew() + >>> result = crew.kickoff() + """ + _current_user_token.set(access_token) + + +class CrewAIExecutor: + """Executor adapter for CrewAI crews. + + Runs a CrewAI ``Crew`` factory and returns the result as a string. + Intended to be called from inside an ``a2a-sdk`` ``AgentExecutor.execute`` + method (which is async); the crew itself runs synchronously via + ``crew.kickoff()``. + + The executor: + 1. Takes a crew factory callable + 2. Sets delegation token context before execution (when ``set_token_context=True``) + 3. Calls crew.kickoff() with the task/inputs + 4. Returns the result as a string + + Args: + crew_factory: Callable that returns a Crew instance + set_token_context: If True, automatically set delegation token before execution + + Example: + >>> from crewai import Agent, Crew, Task + >>> + >>> def create_my_crew(): + ... agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI") + ... task = Task(description="{task}", agent=agent, expected_output="A response") + ... return Crew(agents=[agent], tasks=[task]) + >>> + >>> executor = CrewAIExecutor(create_my_crew) + >>> result = executor.execute("Hello world", {"name": "Alice"}) + """ + + def __init__(self, crew_factory: Callable[[], Crew], set_token_context: bool = True): + """Initialize CrewAI executor. + + Args: + crew_factory: Callable that returns a Crew instance + set_token_context: If True, automatically set delegation token before execution + """ + self.crew_factory = crew_factory + self.set_token_context = set_token_context + + def execute( + self, + task: dict[str, Any] | str, + inputs: dict[str, Any] | None = None, + ) -> str: + """Execute crew with the given task and inputs. + + Args: + task: Task description (string) or parameters (dict) + inputs: Optional additional inputs for the crew + + Returns: + Result from crew execution as string + + Raises: + Exception: If crew execution fails + """ + # Create crew instance + crew = self.crew_factory() + + # Prepare inputs for crew + if isinstance(task, dict): + crew_inputs = task + else: + crew_inputs = {"task": task} + + # Merge additional inputs if provided + if inputs: + crew_inputs.update(inputs) + + # Execute crew + # Note: crew.kickoff() is synchronous in CrewAI + logger.info(f"Executing CrewAI crew with inputs: {list(crew_inputs.keys())}") + result = crew.kickoff(inputs=crew_inputs) + + # Return result as string + return str(result) + + def set_token_for_delegation(self, access_token: str) -> None: + """Set access token for delegation context. + + This is called by the server before execution to provide + the user's token for service-to-service delegation. + + Args: + access_token: User's access token + """ + if self.set_token_context: + set_delegation_token(access_token) + + +async def get_a2a_tools( + service_config: AgentServiceConfig, + delegatable_services: list[dict[str, Any]] | None = None, +) -> list[BaseTool]: + """Get A2A delegation tools for CrewAI agents. + + Creates CrewAI tools that allow agents to delegate tasks to other + agent services. Tools are automatically generated based on: + 1. Keycard dependencies (services this service can call) + 2. Agent card capabilities (what each service can do) + + Args: + service_config: Configuration of the calling service + delegatable_services: Optional list of services to create tools for. + If not provided, queries Keycard for dependencies. + Each service dict should have: name, url, description, capabilities + + Returns: + List of CrewAI BaseTool objects for delegation + + Example: + >>> config = AgentServiceConfig(...) + >>> tools = await get_a2a_tools(config) + >>> # Returns tools like: + >>> # - delegate_to_slack_poster + >>> # - delegate_to_deployment_service + >>> agent = Agent(role="Orchestrator", tools=tools) + """ + # Discover delegatable services if not provided + if delegatable_services is None: + discovery = ServiceDiscovery(service_config) + try: + delegatable_services = await discovery.list_delegatable_services() + finally: + await discovery.close() + + if not delegatable_services: + logger.info("No delegatable services found - no A2A tools created") + return [] + + # Create delegation client for delegation (synchronous to avoid event loop issues) + delegation_client = DelegationClientSync(service_config) + + # Create tools for each service + tools = [] + for service_info in delegatable_services: + tool = _create_delegation_tool(service_info, delegation_client) + tools.append(tool) + + logger.info(f"Created {len(tools)} A2A delegation tools") + return tools + + +def _create_delegation_tool( + service_info: dict[str, Any], + delegation_client: DelegationClientSync, +) -> BaseTool: + """Create a CrewAI tool for delegating to a specific service. + + Args: + service_info: Service metadata (name, url, description, capabilities) + delegation_client: Delegation client for service invocation + + Returns: + CrewAI BaseTool for delegation + """ + service_name = service_info["name"] + service_url = service_info["url"] + service_description = service_info.get("description", "") + capabilities = service_info.get("capabilities", []) + + # Generate tool name (e.g., "PR Analysis Service" -> "delegate_to_pr_analysis_service") + tool_name = f"delegate_to_{service_name.lower().replace(' ', '_').replace('-', '_')}" + + # Generate tool description + capabilities_str = ", ".join(capabilities) if capabilities else "various tasks" + tool_description = f"""Delegate a task to {service_name}. + +{service_description} + +This service can handle: {capabilities_str} + +Use this tool when you need {service_name} to perform a task that is within its capabilities. +The service will process the task and return results.""" + + # Define the tool class + class ServiceDelegationTool(BaseTool): + """Tool for delegating to another agent service.""" + + name: str = tool_name + description: str = tool_description + + def __init__( + self, + delegation_client: DelegationClientSync, + service_url: str, + service_name: str, + **kwargs, + ): + super().__init__(**kwargs) + self._delegation_client = delegation_client + self._service_url = service_url + self._service_name = service_name + + def _run(self, task_description: str, task_inputs: dict[str, Any] | None = None) -> str: + """Delegate task to remote service. + + Args: + task_description: Description of the task to delegate + task_inputs: Optional additional inputs for the task + + Returns: + Result from the delegated service + """ + try: + # Prepare task + task = { + "task": task_description, + } + if task_inputs: + task["inputs"] = task_inputs + + # Get user token from context for delegation + user_token = _current_user_token.get() + if not user_token: + logger.warning( + "No user token available for delegation - " + "ensure set_delegation_token() is called before crew execution" + ) + + # Call remote service with user token for delegation + logger.info( + f"Delegating task to {self._service_name}: {task_description[:100]}" + ) + + result = self._delegation_client.invoke_service( + self._service_url, + task, + subject_token=user_token, + ) + + # Format result for agent + result_str = result.get("result", "") + delegation_chain = result.get("delegation_chain", []) + + # Include delegation chain in response for transparency + response = f"Result from {self._service_name}:\n\n{result_str}" + + if delegation_chain: + response += f"\n\n(Delegation chain: {' → '.join(delegation_chain)})" + + return response + + except Exception as e: + logger.error( + f"Delegation to {self._service_name} failed: {e}", + exc_info=True, + ) + return f"Error delegating to {self._service_name}: {str(e)}" + + # Create args schema + class DelegationInput(BaseModel): + """Input for service delegation tool.""" + + task_description: str = Field( + description=f"Description of the task to delegate to {service_name}" + ) + task_inputs: dict[str, Any] | None = Field( + default=None, + description="Optional additional inputs/parameters for the task", + ) + + ServiceDelegationTool.args_schema = DelegationInput + + # Instantiate and return tool + tool = ServiceDelegationTool( + delegation_client=delegation_client, + service_url=service_url, + service_name=service_name, + ) + + return tool + + +# For manual service list specification (useful for testing) +async def create_a2a_tool_for_service( + service_config: AgentServiceConfig, + target_service_url: str, +) -> BaseTool: + """Create a single A2A delegation tool for a specific service. + + Useful for testing or when you want to manually specify delegation targets. + + Args: + service_config: Configuration of the calling service + target_service_url: URL of the target service + + Returns: + CrewAI BaseTool for delegation + + Example: + >>> config = AgentServiceConfig(...) + >>> tool = await create_a2a_tool_for_service( + ... config, + ... "https://slack-poster.example.com" + ... ) + >>> agent = Agent(role="Orchestrator", tools=[tool]) + """ + # Discover the service + discovery = ServiceDiscovery(service_config) + try: + card = await discovery.get_service_card(target_service_url) + finally: + await discovery.close() + + # Create service info dict + service_info = { + "name": card["name"], + "url": target_service_url, + "description": card.get("description", ""), + "capabilities": card.get("capabilities", []), + } + + # Create delegation client (synchronous to avoid event loop issues) + delegation_client = DelegationClientSync(service_config) + + # Create and return tool + return _create_delegation_tool(service_info, delegation_client) + + +__all__ = [ + "CrewAIExecutor", + "create_a2a_tool_for_service", + "get_a2a_tools", + "set_delegation_token", +] diff --git a/packages/crewai/tests/__init__.py b/packages/crewai/tests/__init__.py new file mode 100644 index 0000000..5a78d33 --- /dev/null +++ b/packages/crewai/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for keycardai-crewai package.""" diff --git a/packages/crewai/tests/test_crewai_a2a.py b/packages/crewai/tests/test_crewai_a2a.py new file mode 100644 index 0000000..a872f4d --- /dev/null +++ b/packages/crewai/tests/test_crewai_a2a.py @@ -0,0 +1,406 @@ +"""Tests for CrewAI A2A delegation integration.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +pytest.importorskip("crewai") + +from keycardai.a2a import AgentServiceConfig + +from keycardai.crewai import ( + _create_delegation_tool, + create_a2a_tool_for_service, + get_a2a_tools, +) + + +@pytest.fixture +def service_config(): + """Create test service configuration.""" + return AgentServiceConfig( + service_name="Test Service", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone_123", + ) + + +@pytest.fixture +def mock_delegatable_services(): + """Mock list of delegatable services.""" + return [ + { + "name": "PR Analysis Service", + "url": "https://pr-analyzer.example.com", + "description": "Analyzes GitHub pull requests for code quality", + "capabilities": ["pr_analysis", "code_review", "security_scan"], + }, + { + "name": "Slack Notification Service", + "url": "https://slack-notifier.example.com", + "description": "Posts notifications to Slack channels", + "capabilities": ["slack_post", "notification"], + }, + ] + + +@pytest.fixture +def mock_agent_card(): + """Mock agent card for service discovery (a2a-sdk 1.x JSON shape).""" + return { + "name": "Echo Service", + "description": "Simple echo service for testing", + "version": "1.0.0", + "supportedInterfaces": [ + { + "url": "https://echo.example.com/a2a/jsonrpc", + "protocolBinding": "jsonrpc", + "protocolVersion": "1.0", + } + ], + "capabilities": {"streaming": False}, + "skills": [ + {"id": "echo", "name": "Echo"}, + {"id": "testing", "name": "Testing"}, + ], + } + + +class TestGetA2ATools: + """Test A2A tool generation.""" + + @pytest.mark.asyncio + async def test_get_a2a_tools_with_no_services(self, service_config): + """Test get_a2a_tools returns empty list when no services provided.""" + tools = await get_a2a_tools(service_config, delegatable_services=[]) + + assert tools == [] + assert isinstance(tools, list) + + @pytest.mark.asyncio + async def test_get_a2a_tools_with_provided_services( + self, service_config, mock_delegatable_services + ): + """Test get_a2a_tools creates tools for provided services.""" + tools = await get_a2a_tools( + service_config, delegatable_services=mock_delegatable_services + ) + + assert len(tools) == 2 + assert all(hasattr(tool, "name") for tool in tools) + assert all(hasattr(tool, "description") for tool in tools) + assert all(hasattr(tool, "_run") for tool in tools) + + @pytest.mark.asyncio + async def test_get_a2a_tools_discovers_services_when_none_provided( + self, service_config + ): + """Test get_a2a_tools discovers services from Keycard when not provided.""" + # When delegatable_services=None, it should try to discover + # Currently returns empty list (discovery not implemented) + with patch( + "keycardai.crewai.ServiceDiscovery" + ) as mock_discovery_class: + mock_discovery = AsyncMock() + mock_discovery.list_delegatable_services.return_value = [] + mock_discovery.close = AsyncMock() + mock_discovery_class.return_value = mock_discovery + + tools = await get_a2a_tools(service_config, delegatable_services=None) + + assert isinstance(tools, list) + + @pytest.mark.asyncio + async def test_get_a2a_tools_creates_correct_tool_count( + self, service_config, mock_delegatable_services + ): + """Test one tool is created per service.""" + tools = await get_a2a_tools( + service_config, delegatable_services=mock_delegatable_services + ) + + assert len(tools) == len(mock_delegatable_services) + + +class TestCreateDelegationTool: + """Test delegation tool creation.""" + + def test_tool_name_generation(self, service_config): + """Test tool name is generated correctly from service name.""" + service_info = { + "name": "PR Analysis Service", + "url": "https://pr-analyzer.example.com", + "description": "Test service", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + assert tool.name == "delegate_to_pr_analysis_service" + + def test_tool_name_handles_special_characters(self, service_config): + """Test tool name generation handles special characters.""" + service_info = { + "name": "Slack-Notification Service", + "url": "https://slack.example.com", + "description": "Test service", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + # Hyphens should be converted to underscores + assert tool.name == "delegate_to_slack_notification_service" + + def test_tool_description_includes_capabilities(self, service_config): + """Test tool description includes service capabilities.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "A test service", + "capabilities": ["capability1", "capability2", "capability3"], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + # Check capabilities are in description + assert "capability1" in tool.description + assert "capability2" in tool.description + assert "capability3" in tool.description + + def test_tool_has_correct_args_schema(self, service_config): + """Test tool has proper args schema for CrewAI.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + # Tool should have args_schema attribute + assert hasattr(tool, "args_schema") + # Args schema should have task_description field + assert "task_description" in tool.args_schema.model_fields + + +class TestDelegationToolExecution: + """Test tool execution behavior.""" + + def test_tool_run_with_task_string(self, service_config): + """Test tool execution with simple task string.""" + service_info = { + "name": "Echo Service", + "url": "https://echo.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + # Mock invoke_service to avoid actual network call + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.return_value = { + "result": "Echo response", + "delegation_chain": ["service1", "echo_service"], + } + + result = tool._run(task_description="Test task") + + assert "Echo response" in result + mock_invoke.assert_called_once() + + def test_tool_run_with_task_and_inputs(self, service_config): + """Test tool execution with task and additional inputs.""" + service_info = { + "name": "PR Analyzer", + "url": "https://pr-analyzer.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.return_value = { + "result": "PR analysis complete", + "delegation_chain": [], + } + + tool._run( + task_description="Analyze PR", task_inputs={"pr_number": 123} + ) + + # Check invoke_service was called with correct task structure + call_args = mock_invoke.call_args + task = call_args[0][1] # Second positional argument + assert task["task"] == "Analyze PR" + assert task["inputs"] == {"pr_number": 123} + + def test_tool_run_calls_a2a_client(self, service_config): + """Test tool delegates to A2A client correctly.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.return_value = {"result": "success", "delegation_chain": []} + + tool._run(task_description="Test") + + # Verify invoke_service was called with service URL + mock_invoke.assert_called_once() + assert mock_invoke.call_args[0][0] == "https://test.example.com" + + def test_tool_run_formats_result_correctly(self, service_config): + """Test tool formats result with delegation chain.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.return_value = { + "result": "Task complete", + "delegation_chain": ["service_a", "service_b"], + } + + result = tool._run(task_description="Test") + + # Result should include delegation chain + assert "Test Service" in result + assert "Task complete" in result + assert "service_a" in result + assert "service_b" in result + + def test_tool_run_includes_delegation_chain(self, service_config): + """Test tool includes delegation chain in response.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.return_value = { + "result": "Done", + "delegation_chain": ["chain_element_1", "chain_element_2"], + } + + result = tool._run(task_description="Test") + + assert "Delegation chain" in result or "delegation" in result.lower() + + def test_tool_run_handles_exceptions(self, service_config): + """Test tool handles exceptions gracefully.""" + service_info = { + "name": "Test Service", + "url": "https://test.example.com", + "description": "Test", + "capabilities": [], + } + + from keycardai.a2a.server.delegation import DelegationClientSync + + a2a_client = DelegationClientSync(service_config) + tool = _create_delegation_tool(service_info, a2a_client) + + with patch.object(a2a_client, "invoke_service") as mock_invoke: + mock_invoke.side_effect = RuntimeError("Network error") + + result = tool._run(task_description="Test") + + # Should return error message, not raise exception + assert "Error" in result or "error" in result + assert isinstance(result, str) + + +class TestCreateA2AToolForService: + """Test single service tool creation.""" + + @pytest.mark.asyncio + async def test_create_tool_fetches_agent_card( + self, service_config, mock_agent_card + ): + """Test create_a2a_tool_for_service fetches agent card.""" + with patch( + "keycardai.crewai.ServiceDiscovery" + ) as mock_discovery_class: + mock_discovery = AsyncMock() + mock_discovery.get_service_card.return_value = mock_agent_card + mock_discovery.close = AsyncMock() + mock_discovery_class.return_value = mock_discovery + + tool = await create_a2a_tool_for_service( + service_config, "https://echo.example.com" + ) + + # Should have fetched agent card + mock_discovery.get_service_card.assert_called_once_with( + "https://echo.example.com" + ) + + # Tool should be created with agent card info + assert hasattr(tool, "name") + assert hasattr(tool, "_run") + + @pytest.mark.asyncio + async def test_create_tool_for_service(self, service_config, mock_agent_card): + """Test tool is created correctly from agent card.""" + with patch( + "keycardai.crewai.ServiceDiscovery" + ) as mock_discovery_class: + mock_discovery = AsyncMock() + mock_discovery.get_service_card.return_value = mock_agent_card + mock_discovery.close = AsyncMock() + mock_discovery_class.return_value = mock_discovery + + tool = await create_a2a_tool_for_service( + service_config, "https://echo.example.com" + ) + + # Tool name should be based on service name from agent card + assert "echo" in tool.name.lower() + assert "service" in tool.name.lower() diff --git a/pyproject.toml b/pyproject.toml index 9a2d18e..5a2ca1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ keycardai-fastmcp = { workspace = true } keycardai-mcp-fastmcp = { workspace = true } keycardai-mcp = { workspace = true } keycardai-a2a = { workspace = true } +keycardai-crewai = { workspace = true } keycardai-agents = { workspace = true } [build-system] diff --git a/uv.lock b/uv.lock index 432b2b4..26bee4d 100644 --- a/uv.lock +++ b/uv.lock @@ -24,6 +24,7 @@ members = [ "keycardai", "keycardai-a2a", "keycardai-agents", + "keycardai-crewai", "keycardai-fastmcp", "keycardai-mcp", "keycardai-mcp-fastmcp", @@ -2474,6 +2475,41 @@ requires-dist = [ ] provides-extras = ["crewai", "dev", "test"] +[[package]] +name = "keycardai-crewai" +source = { editable = "packages/crewai" } +dependencies = [ + { name = "crewai" }, + { name = "keycardai-a2a" }, + { name = "pydantic" }, +] + +[package.optional-dependencies] +dev = [ + { name = "mypy" }, + { name = "ruff" }, +] +test = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-cov" }, + { name = "pytest-timeout" }, +] + +[package.metadata] +requires-dist = [ + { name = "crewai", specifier = ">=0.86.0" }, + { name = "keycardai-a2a", editable = "packages/a2a" }, + { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.14.1" }, + { name = "pydantic", specifier = ">=2.11.7" }, + { name = "pytest", marker = "extra == 'test'", specifier = ">=8.4.1" }, + { name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=1.1.0" }, + { name = "pytest-cov", marker = "extra == 'test'", specifier = ">=6.2.1" }, + { name = "pytest-timeout", marker = "extra == 'test'", specifier = ">=2.3.1" }, + { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.6" }, +] +provides-extras = ["dev", "test"] + [[package]] name = "keycardai-fastmcp" source = { editable = "packages/fastmcp" } From 707793c1c6bba1bf812c40027cc1272c8579ba04 Mon Sep 17 00:00:00 2001 From: Larry-Osakwe Date: Wed, 29 Apr 2026 14:27:36 -0700 Subject: [PATCH 2/5] refactor(keycardai-agents)!: empty out package, decomposition complete (ACC-231) Per the KEP "Decompose keycardai-agents", the final piece of code in this package - the CrewAI-over-A2A integration at keycardai.agents.integrations.crewai - moved to keycardai-crewai in the preceding commit. This commit empties out the package. Deleted: - packages/agents/src/keycardai/agents/integrations/ (whole directory) - packages/agents/tests/integrations/ (whole directory) Reduced packages/agents/pyproject.toml: - Dropped [project.optional-dependencies].crewai (the crewai>=0.86.0 extra) - Dropped keycardai-a2a from core deps (no consumer left in this package) - Dropped pydantic from core deps (same) - Trimmed test/dev extras to the minimum needed for an empty package Reduced packages/agents/src/keycardai/agents/__init__.py to a deprecation-only docstring with no re-exports. Replaced packages/agents/README.md with an upgrade-path table pointing the three concerns at their new homes: - A2A delegation -> keycardai-a2a (#105 / ACC-230) - PKCE user-login -> keycardai-oauth (#101 / ACC-229) - CrewAI-over-A2A -> keycardai-crewai (this PR / ACC-231) Hard cut, no transitional bridge or re-export shim. The Explore inventory confirms zero runtime imports of keycardai.agents.integrations.crewai outside packages/agents/ itself, matching the precedent set by #105. ACC-232 will archive the packages/agents/ source directory in a follow-up PR. BREAKING: - from keycardai.agents.integrations.crewai import ... is gone; use from keycardai.crewai import ... . - The keycardai-agents[crewai] extra no longer exists; install keycardai-crewai directly. --- packages/agents/README.md | 540 +----------------- packages/agents/pyproject.toml | 16 +- .../agents/src/keycardai/agents/__init__.py | 27 +- .../keycardai/agents/integrations/__init__.py | 17 - .../keycardai/agents/integrations/crewai.py | 413 -------------- .../agents/tests/integrations/__init__.py | 1 - .../tests/integrations/test_crewai_a2a.py | 406 ------------- uv.lock | 20 +- 8 files changed, 23 insertions(+), 1417 deletions(-) delete mode 100644 packages/agents/src/keycardai/agents/integrations/__init__.py delete mode 100644 packages/agents/src/keycardai/agents/integrations/crewai.py delete mode 100644 packages/agents/tests/integrations/__init__.py delete mode 100644 packages/agents/tests/integrations/test_crewai_a2a.py diff --git a/packages/agents/README.md b/packages/agents/README.md index ce57109..81e69ee 100644 --- a/packages/agents/README.md +++ b/packages/agents/README.md @@ -1,534 +1,14 @@ -# KeycardAI Agents +# keycardai-agents (deprecated, pending archive) -Framework-agnostic agent service SDK for A2A (Agent-to-Agent) delegation with Keycard OAuth authentication. +This package is being archived. Per the KEP "Decompose keycardai-agents", everything that lived here has moved: -## Requirements +| Old import | New import | +| --- | --- | +| `from keycardai.agents.server import AgentServer, ...` | `from keycardai.a2a import ...` (#105 / ACC-230) | +| `from keycardai.agents.client.discovery import ServiceDiscovery` | `from keycardai.a2a import ServiceDiscovery` (#105 / ACC-230) | +| `from keycardai.agents.client import AgentClient` | `from keycardai.oauth.pkce import authenticate` (#101 / ACC-229) | +| `from keycardai.agents.integrations.crewai import CrewAIExecutor, get_a2a_tools, ...` | `from keycardai.crewai import ...` (this PR / ACC-231) | -- **Python 3.10 or greater** -- Virtual environment (recommended) +`keycardai-agents` no longer ships any code. It exists at this version only to give downstream installs a clean upgrade path. The next step (ACC-232) archives the package source entirely. -## Features - -- 🔐 **Built-in OAuth**: Automatic JWKS validation, token exchange, delegation chains -- 🌐 **Dual Protocol Support**: A2A JSONRPC + custom REST endpoints (same executor powers both) -- 🔧 **Framework Agnostic**: Supports CrewAI, LangChain, custom via `AgentExecutor` protocol -- 🔄 **Service Delegation**: RFC 8693 token exchange preserves user context -- 👤 **User Auth**: PKCE OAuth flow with browser-based login - -## A2A Protocol Integration - -We use [a2a-python SDK](https://github.com/a2aproject/a2a-python) for protocol compliance while adding production-ready authentication: - -- ✅ **Full A2A JSONRPC support** - Standards-compliant `/a2a/jsonrpc` endpoint -- ✅ **Plus simpler REST endpoint** - Custom `/invoke` for easier integration -- ✅ **Production OAuth layer** - BearerAuthMiddleware, JWKS, token exchange (A2A SDK has none) -- ✅ **Delegation chain tracking** - JWT-based audit trail for service-to-service calls -- ✅ **Dual protocol support** - Same executor powers both JSONRPC and REST endpoints - -**Result**: A2A standards compliance + Keycard security + flexible APIs = Best of both worlds - -## Installation - -```bash -pip install keycardai-agents - -# With CrewAI support -pip install 'keycardai-agents[crewai]' -``` - -## Quick Start - -### CrewAI Service - -```python -import os -from crewai import Agent, Crew, Task -from keycardai.agents import AgentServiceConfig -from keycardai.agents.integrations.crewai import CrewAIExecutor -from keycardai.agents.server import serve_agent - -def create_my_crew(): - agent = Agent(role="Assistant", goal="Help users", backstory="AI helper") - task = Task(description="{task}", agent=agent, expected_output="Response") - return Crew(agents=[agent], tasks=[task]) - -config = AgentServiceConfig( - service_name="My Service", - client_id=os.getenv("CLIENT_ID"), - client_secret=os.getenv("CLIENT_SECRET"), - identity_url="http://localhost:8000", - zone_id=os.getenv("ZONE_ID"), - agent_executor=CrewAIExecutor(create_my_crew), # Framework adapter - capabilities=["assistance"], -) - -serve_agent(config) # Starts server with OAuth middleware -``` - -### Custom Executor - -```python -from keycardai.agents.server import LambdaExecutor - -def my_logic(task, inputs): - return f"Processed: {task}" - -config = AgentServiceConfig( - # ... same config as above - agent_executor=LambdaExecutor(my_logic), # Simple function wrapper -) -``` - -### Advanced: Custom Executor Class - -```python -from keycardai.agents.server import AgentExecutor - -class MyFrameworkExecutor: - """Implement AgentExecutor protocol for any framework.""" - - def execute(self, task, inputs): - # Your framework logic here - result = my_framework.run(task, inputs) - return result - - def set_token_for_delegation(self, access_token): - # Optional: handle delegation token - self.context.set_auth(access_token) - -config = AgentServiceConfig( - # ... - agent_executor=MyFrameworkExecutor(), -) -``` - -## Client Usage - -### User Authentication (PKCE) - -```python -from keycardai.agents.client import AgentClient - -async with AgentClient(config) as client: - # Automatically: OAuth discovery → Browser login → Token exchange - result = await client.invoke("https://service.com", task="Hello") -``` - -### Service-to-Service (Token Exchange) - -```python -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_config) - -# Get delegation token (RFC 8693) - preserves user context -token = await client.get_delegation_token( - "https://target.com", - subject_token="user_token" -) - -# Invoke with token -result = await client.invoke_service( - "https://target.com", - task="Process data", - token=token -) -# Result includes delegation_chain: ["service_a", "service_b"] -``` - -## Architecture - -### Server - -``` -Your Agent - ↓ -AgentExecutor.execute(task, inputs) - ↓ -AgentServer (keycardai-agents) - ├─ OAuth Middleware (BearerAuthMiddleware) - │ ├─ JWKS validation - │ ├─ Token audience check - │ └─ Delegation chain extraction - ├─ /invoke (protected, REST-like) - ├─ /a2a/jsonrpc (protected, A2A JSONRPC) - │ ├─ message/send - │ ├─ message/stream - │ └─ tasks/* (get, cancel, list) - ├─ /.well-known/agent-card.json (A2A format) - ├─ /.well-known/oauth-protected-resource - └─ /status -``` - -### Dual Protocol Support - -The SDK provides **two ways** to invoke agents: - -1. **A2A JSONRPC** (`/a2a/jsonrpc`) - Standards-compliant - - Use when: Integrating with A2A ecosystem, need standard protocol - - Methods: `message/send`, `message/stream`, `tasks/get`, etc. - - Bridge: `KeycardToA2AExecutorBridge` adapts your executor to A2A protocol - -2. **Custom REST** (`/invoke`) - Simpler API - - Use when: Direct service calls, simpler integration - - Format: `{"task": "...", "inputs": {...}}` - - Direct executor invocation - -**Both endpoints share the same underlying executor** - write once, support both protocols. - -### OAuth Flow - -``` -User → OAuth Login (PKCE) - ↓ -User Token → Service A - ↓ -Service A → Token Exchange (RFC 8693) → Service B Token - ↓ -Service A → Calls Service B with Service B Token - ↓ -Service B validates token (JWKS) -Service B updates delegation_chain -``` - -## A2A Protocol Compliance - -### Agent Card - -Services expose A2A-compliant agent cards at `/.well-known/agent-card.json`: - -```json -{ - "name": "My Service", - "url": "https://my-service.com", - "version": "1.0.0", - "protocolVersion": "0.3.0", - "skills": [ - { - "id": "assistance", - "name": "Assistance", - "description": "assistance capability", - "tags": ["assistance"] - } - ], - "capabilities": { - "streaming": false, - "multiTurn": true - }, - "additionalInterfaces": [ - { - "url": "https://my-service.com/invoke", - "transport": "http+json" - } - ], - "securitySchemes": { - "oauth2": { - "type": "oauth2", - "flows": { - "authorizationCode": { - "authorizationUrl": "https://zone.keycard.cloud/oauth/authorize", - "tokenUrl": "https://zone.keycard.cloud/oauth/token" - } - } - } - } -} -``` - -### Endpoints - -#### A2A JSONRPC Endpoint (Standards-Compliant) - -```bash -POST /a2a/jsonrpc -Authorization: Bearer -Content-Type: application/json - -{ - "jsonrpc": "2.0", - "method": "message/send", - "params": { - "message": { - "role": "user", - "parts": [{"text": "Do something"}] - } - }, - "id": 1 -} -``` - -Response: -```json -{ - "jsonrpc": "2.0", - "result": { - "task": { - "taskId": "task-123", - "state": "completed", - "result": {...} - } - }, - "id": 1 -} -``` - -**Supported methods:** -- `message/send` - Send message to agent -- `message/stream` - Stream agent responses -- `tasks/get` - Get task status -- `tasks/cancel` - Cancel running task -- `tasks/list` - List all tasks - -#### Custom REST Endpoint (Simpler API) - -```bash -POST /invoke -Authorization: Bearer - -{ - "task": "Do something", - "inputs": {"key": "value"} -} -``` - -Response: -```json -{ - "result": "Done", - "delegation_chain": ["service_a", "service_b"] -} -``` - -**Use `/invoke` for:** Direct service calls, easier integration, delegation chain tracking. - -**Use `/a2a/jsonrpc` for:** A2A ecosystem integration, standard protocol compliance, task management. - -## Framework Support - -### CrewAI - -```python -from keycardai.agents.integrations.crewai import CrewAIExecutor - -executor = CrewAIExecutor(lambda: create_my_crew()) -``` - -**Features:** -- Automatic delegation token context -- Supports CrewAI tools -- Handles `crew.kickoff()` execution - -### LangChain, AutoGen, Custom - -Implement the `AgentExecutor` protocol: - -```python -class MyExecutor: - def execute(self, task, inputs): - # Your logic - return result -``` - -## API Reference - -### AgentServiceConfig - -```python -@dataclass -class AgentServiceConfig: - service_name: str # Human-readable name - client_id: str # Keycard Application client ID - client_secret: str # Keycard Application secret - identity_url: str # Public URL - zone_id: str # Keycard zone ID - agent_executor: AgentExecutor # REQUIRED: Executor instance - - # Optional - authorization_server_url: str | None = None - port: int = 8000 - host: str = "0.0.0.0" - description: str = "" - capabilities: list[str] = [] -``` - -### AgentExecutor Protocol - -```python -class AgentExecutor(Protocol): - def execute( - self, - task: dict[str, Any] | str, - inputs: dict[str, Any] | None = None, - ) -> Any: - """Execute agent task.""" - ... - - def set_token_for_delegation(self, access_token: str) -> None: - """Optional: Set token for delegation.""" - ... -``` - -### KeycardToA2AExecutorBridge - -Bridge adapter that makes your executor work with A2A JSONRPC protocol: - -```python -from keycardai.agents.server import KeycardToA2AExecutorBridge, SimpleExecutor - -# Your executor -executor = SimpleExecutor() - -# Wrap for A2A JSONRPC support -a2a_executor = KeycardToA2AExecutorBridge(executor) - -# Now works with A2A DefaultRequestHandler -from a2a.server.request_handlers import DefaultRequestHandler -from a2a.server.tasks import InMemoryTaskStore - -handler = DefaultRequestHandler( - agent_executor=a2a_executor, - task_store=InMemoryTaskStore() -) -``` - -**What it does:** -- Converts A2A `RequestContext` → Keycard `task/inputs` format -- Calls your synchronous executor -- Publishes result as A2A Task events -- Handles delegation tokens - -**Note:** This bridge is automatically configured when using `serve_agent()` - you don't need to use it directly unless building custom A2A integrations. - -### serve_agent() - -Start an agent service (blocking): - -```python -serve_agent(config: AgentServiceConfig) -> None -``` - -### AgentClient - -User authentication with PKCE OAuth: - -```python -from keycardai.agents.client import AgentClient - -async with AgentClient(service_config) as client: - result = await client.invoke(service_url, task, inputs) - agent_card = await client.discover_service(service_url) -``` - -### DelegationClient - -Service-to-service with token exchange: - -```python -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_config) -token = await client.get_delegation_token(target_url, subject_token) -result = await client.invoke_service(url, task, token) -``` - -## Service Delegation - -### Pattern - -```python -# In Service A (orchestrator) -from keycardai.agents.server import DelegationClient - -client = DelegationClient(service_a_config) - -# Discover Service B -card = await client.discover_service("https://service-b.com") - -# Get token with user context -token = await client.get_delegation_token( - "https://service-b.com", - subject_token=user_access_token -) - -# Call Service B -result = await client.invoke_service( - "https://service-b.com", - task="Process data", - token=token -) - -# Result includes delegation chain for audit -print(result["delegation_chain"]) -# ["user_service", "service_a", "service_b"] -``` - -### Delegation Chain Tracking - -1. User authenticates → Token with empty `delegation_chain` -2. User calls Service A → Service A adds itself to chain -3. Service A calls Service B → Token exchange preserves chain -4. Service B adds itself → Full chain in response for audit - -## Production Deployment - -### Environment Variables - -```bash -# Required -export KEYCARD_ZONE_ID="your_zone_id" -export KEYCARD_CLIENT_ID="service_client_id" -export KEYCARD_CLIENT_SECRET="client_secret" -export SERVICE_URL="https://your-service.com" - -# Optional -export PORT="8000" -export HOST="0.0.0.0" -``` - -### Health Checks - -```bash -# Liveness -curl https://your-service.com/status - -# Agent card -curl https://your-service.com/.well-known/agent-card.json -``` - -### Security - -- **Token Validation**: JWKS-based JWT signature verification -- **Audience Check**: Token `aud` must match service URL -- **Issuer Validation**: Token `iss` from Keycard zone -- **Delegation Chain**: Preserved for audit trail - -## Examples - -See `examples/` directory: -- `oauth_client_usage.py` - PKCE user authentication - -## FAQ - -### Q: Why not use the A2A SDK server? -**A**: The A2A SDK has no authentication layer. We'd have to rebuild all OAuth infrastructure. - -### Q: Can I use LangChain/AutoGen? -**A**: Yes! Implement the `AgentExecutor` protocol or use `LambdaExecutor` for simple functions. - -### Q: What's the difference between AgentClient and DelegationClient? -**A**: -- `AgentClient`: User authentication with PKCE (browser-based login) -- `DelegationClient`: Service-to-service with token exchange (RFC 8693) - -### Q: Do I need CrewAI? -**A**: No! Use any framework or write custom logic. Just implement `AgentExecutor`. - -## Support - -- **GitHub**: https://github.com/keycardai/python-sdk -- **Issues**: https://github.com/keycardai/python-sdk/issues -- **Docs**: https://docs.keycard.ai - -## License - -MIT +If you are starting fresh: skip `keycardai-agents` and depend on the destination packages directly. diff --git a/packages/agents/pyproject.toml b/packages/agents/pyproject.toml index 43046f5..a4a4f01 100644 --- a/packages/agents/pyproject.toml +++ b/packages/agents/pyproject.toml @@ -1,16 +1,13 @@ [project] name = "keycardai-agents" dynamic = ["version"] -description = "Legacy CrewAI-over-A2A integration. Decomposing per the Keycard SDK packaging KEP: see keycardai-a2a (delegation), keycardai-oauth (PKCE), and forthcoming keycardai-crewai. This package will be archived." +description = "Empty legacy package, pending archive. Decomposed per the Keycard SDK packaging KEP into keycardai-a2a (delegation), keycardai-oauth (PKCE user-login), and keycardai-crewai (CrewAI integration)." readme = "README.md" requires-python = ">=3.10" license = { text = "MIT" } authors = [{ name = "Keycard", email = "support@keycard.ai" }] -dependencies = [ - "keycardai-a2a>=0.1.0", - "pydantic>=2.11.7", -] -keywords = ["agents", "ai", "crewai", "authentication", "authorization", "service", "delegation"] +dependencies = [] +keywords = ["keycard", "deprecated"] classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", @@ -27,18 +24,11 @@ classifiers = [ ] [project.optional-dependencies] -crewai = [ - "crewai>=0.86.0", -] test = [ "pytest>=8.4.1", - "pytest-asyncio>=1.1.0", - "pytest-cov>=6.2.1", - "pytest-timeout>=2.3.1", ] dev = [ "ruff>=0.8.6", - "mypy>=1.14.1", ] [build-system] diff --git a/packages/agents/src/keycardai/agents/__init__.py b/packages/agents/src/keycardai/agents/__init__.py index bc708d9..488daa8 100644 --- a/packages/agents/src/keycardai/agents/__init__.py +++ b/packages/agents/src/keycardai/agents/__init__.py @@ -1,24 +1,15 @@ -"""KeycardAI Agents (legacy package). +"""KeycardAI Agents (legacy package, pending archive). This package previously housed three concerns. Per the KEP "Decompose -keycardai-agents", they have moved to: +keycardai-agents", they have all moved: -- A2A delegation, agent service hosting, executor primitives, and service - discovery → ``keycardai-a2a`` (``from keycardai.a2a import ...``). +- A2A delegation, agent service primitives, and service discovery → + ``keycardai-a2a`` (``from keycardai.a2a import ...``). - OAuth 2.0 PKCE user-login flow (``AgentClient``) → ``keycardai-oauth`` (``from keycardai.oauth.pkce import authenticate``). -- The CrewAI-over-A2A integration is the only remaining piece, accessible - via ``from keycardai.agents.integrations.crewai import ...``. It will - move to a dedicated ``keycardai-crewai`` package; this package will be - archived once that ships. -""" - -# Integrations (optional) -try: - from .integrations import crewai -except ImportError: - crewai = None +- CrewAI-over-A2A integration (executor + delegation tools) → + ``keycardai-crewai`` (``from keycardai.crewai import ...``). -__all__ = [ - "crewai", -] +This package now exposes no symbols. It will be archived once downstream +references catch up (tracked in ACC-232). +""" diff --git a/packages/agents/src/keycardai/agents/integrations/__init__.py b/packages/agents/src/keycardai/agents/integrations/__init__.py deleted file mode 100644 index 039cca5..0000000 --- a/packages/agents/src/keycardai/agents/integrations/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Integrations with agent frameworks. - -This package provides integrations with various agent frameworks: -- CrewAI: Tools for agent-to-agent delegation -""" - -try: - from .crewai import create_a2a_tool_for_service, get_a2a_tools, set_delegation_token - - __all__ = [ - "get_a2a_tools", - "set_delegation_token", - "create_a2a_tool_for_service", - ] -except ImportError: - # CrewAI not installed - __all__ = [] diff --git a/packages/agents/src/keycardai/agents/integrations/crewai.py b/packages/agents/src/keycardai/agents/integrations/crewai.py deleted file mode 100644 index 188e54a..0000000 --- a/packages/agents/src/keycardai/agents/integrations/crewai.py +++ /dev/null @@ -1,413 +0,0 @@ -"""CrewAI integration for A2A (agent-to-agent) delegation. - -This module provides: -1. CrewAIExecutor: Adapter for running CrewAI crews in the agent service server -2. Delegation tools: CrewAI tools for calling other agent services - -Usage with executor: - >>> from keycardai.a2a import AgentServiceConfig - >>> from keycardai.agents.integrations.crewai import CrewAIExecutor - >>> from crewai import Agent, Crew, Task - >>> - >>> def create_my_crew(): - ... agent = Agent(role="Assistant", goal="Help users") - ... task = Task(description="{task}", agent=agent) - ... return Crew(agents=[agent], tasks=[task]) - >>> - >>> config = AgentServiceConfig( - ... service_name="My Service", - ... agent_executor=CrewAIExecutor(create_my_crew), - ... # ... other config - ... ) - -Usage with delegation tools: - >>> from keycardai.a2a import AgentServiceConfig - >>> from keycardai.agents.integrations.crewai import get_a2a_tools - >>> from crewai import Agent, Crew - >>> - >>> # Create service config - >>> config = AgentServiceConfig(...) - >>> - >>> # Define services we can delegate to - >>> delegatable_services = [ - >>> { - >>> "name": "echo_service", - >>> "url": "http://localhost:8002", - >>> "description": "Echo service that repeats messages", - >>> } - >>> ] - >>> - >>> # Get A2A delegation tools - >>> a2a_tools = await get_a2a_tools(config, delegatable_services) - >>> - >>> # Use tools in crew - >>> agent = Agent( - >>> role="Orchestrator", - >>> tools=a2a_tools, - >>> allow_delegation=True - >>> ) -""" - -import contextvars -import logging -from typing import Any, Callable - -from pydantic import BaseModel, Field - -from keycardai.a2a import AgentServiceConfig, DelegationClientSync, ServiceDiscovery - -# Context variable to store the current user's access token for delegation -_current_user_token: contextvars.ContextVar[str | None] = contextvars.ContextVar( - "current_user_token", default=None -) - -try: - from crewai import Crew - from crewai.tools import BaseTool -except ImportError: - raise ImportError( - "CrewAI is not installed. Install it with: pip install 'keycardai-agents[crewai]'" - ) from None - -logger = logging.getLogger(__name__) - - -def set_delegation_token(access_token: str) -> None: - """Set the user's access token for delegation context. - - This should be called before crew execution to provide the user's - token for service-to-service delegation. The token will be used - for token exchange when delegating to other services. - - Args: - access_token: The user's access token from the request - - Example: - >>> # In your server's invoke handler - >>> access_token = request.user.access_token - >>> set_delegation_token(access_token) - >>> - >>> # Now crew tools can delegate with the user's context - >>> crew = create_my_crew() - >>> result = crew.kickoff() - """ - _current_user_token.set(access_token) - - -class CrewAIExecutor: - """Executor adapter for CrewAI crews. - - This executor implements the AgentExecutor protocol for CrewAI crews, - allowing them to be used in the generic agent service server. - - The executor: - 1. Takes a crew factory callable - 2. Sets delegation token context before execution - 3. Calls crew.kickoff() with the task/inputs - 4. Returns the result as a string - - Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution - - Example: - >>> from crewai import Agent, Crew, Task - >>> - >>> def create_my_crew(): - ... agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI") - ... task = Task(description="{task}", agent=agent, expected_output="A response") - ... return Crew(agents=[agent], tasks=[task]) - >>> - >>> executor = CrewAIExecutor(create_my_crew) - >>> result = executor.execute("Hello world", {"name": "Alice"}) - """ - - def __init__(self, crew_factory: Callable[[], Crew], set_token_context: bool = True): - """Initialize CrewAI executor. - - Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution - """ - self.crew_factory = crew_factory - self.set_token_context = set_token_context - - def execute( - self, - task: dict[str, Any] | str, - inputs: dict[str, Any] | None = None, - ) -> str: - """Execute crew with the given task and inputs. - - Args: - task: Task description (string) or parameters (dict) - inputs: Optional additional inputs for the crew - - Returns: - Result from crew execution as string - - Raises: - Exception: If crew execution fails - """ - # Create crew instance - crew = self.crew_factory() - - # Prepare inputs for crew - if isinstance(task, dict): - crew_inputs = task - else: - crew_inputs = {"task": task} - - # Merge additional inputs if provided - if inputs: - crew_inputs.update(inputs) - - # Execute crew - # Note: crew.kickoff() is synchronous in CrewAI - logger.info(f"Executing CrewAI crew with inputs: {list(crew_inputs.keys())}") - result = crew.kickoff(inputs=crew_inputs) - - # Return result as string - return str(result) - - def set_token_for_delegation(self, access_token: str) -> None: - """Set access token for delegation context. - - This is called by the server before execution to provide - the user's token for service-to-service delegation. - - Args: - access_token: User's access token - """ - if self.set_token_context: - set_delegation_token(access_token) - - -async def get_a2a_tools( - service_config: AgentServiceConfig, - delegatable_services: list[dict[str, Any]] | None = None, -) -> list[BaseTool]: - """Get A2A delegation tools for CrewAI agents. - - Creates CrewAI tools that allow agents to delegate tasks to other - agent services. Tools are automatically generated based on: - 1. Keycard dependencies (services this service can call) - 2. Agent card capabilities (what each service can do) - - Args: - service_config: Configuration of the calling service - delegatable_services: Optional list of services to create tools for. - If not provided, queries Keycard for dependencies. - Each service dict should have: name, url, description, capabilities - - Returns: - List of CrewAI BaseTool objects for delegation - - Example: - >>> config = AgentServiceConfig(...) - >>> tools = await get_a2a_tools(config) - >>> # Returns tools like: - >>> # - delegate_to_slack_poster - >>> # - delegate_to_deployment_service - >>> agent = Agent(role="Orchestrator", tools=tools) - """ - # Discover delegatable services if not provided - if delegatable_services is None: - discovery = ServiceDiscovery(service_config) - try: - delegatable_services = await discovery.list_delegatable_services() - finally: - await discovery.close() - - if not delegatable_services: - logger.info("No delegatable services found - no A2A tools created") - return [] - - # Create delegation client for delegation (synchronous to avoid event loop issues) - delegation_client = DelegationClientSync(service_config) - - # Create tools for each service - tools = [] - for service_info in delegatable_services: - tool = _create_delegation_tool(service_info, delegation_client) - tools.append(tool) - - logger.info(f"Created {len(tools)} A2A delegation tools") - return tools - - -def _create_delegation_tool( - service_info: dict[str, Any], - delegation_client: DelegationClientSync, -) -> BaseTool: - """Create a CrewAI tool for delegating to a specific service. - - Args: - service_info: Service metadata (name, url, description, capabilities) - delegation_client: Delegation client for service invocation - - Returns: - CrewAI BaseTool for delegation - """ - service_name = service_info["name"] - service_url = service_info["url"] - service_description = service_info.get("description", "") - capabilities = service_info.get("capabilities", []) - - # Generate tool name (e.g., "PR Analysis Service" -> "delegate_to_pr_analysis_service") - tool_name = f"delegate_to_{service_name.lower().replace(' ', '_').replace('-', '_')}" - - # Generate tool description - capabilities_str = ", ".join(capabilities) if capabilities else "various tasks" - tool_description = f"""Delegate a task to {service_name}. - -{service_description} - -This service can handle: {capabilities_str} - -Use this tool when you need {service_name} to perform a task that is within its capabilities. -The service will process the task and return results.""" - - # Define the tool class - class ServiceDelegationTool(BaseTool): - """Tool for delegating to another agent service.""" - - name: str = tool_name - description: str = tool_description - - def __init__( - self, - delegation_client: DelegationClientSync, - service_url: str, - service_name: str, - **kwargs, - ): - super().__init__(**kwargs) - self._delegation_client = delegation_client - self._service_url = service_url - self._service_name = service_name - - def _run(self, task_description: str, task_inputs: dict[str, Any] | None = None) -> str: - """Delegate task to remote service. - - Args: - task_description: Description of the task to delegate - task_inputs: Optional additional inputs for the task - - Returns: - Result from the delegated service - """ - try: - # Prepare task - task = { - "task": task_description, - } - if task_inputs: - task["inputs"] = task_inputs - - # Get user token from context for delegation - user_token = _current_user_token.get() - if not user_token: - logger.warning( - "No user token available for delegation - " - "ensure set_delegation_token() is called before crew execution" - ) - - # Call remote service with user token for delegation - logger.info( - f"Delegating task to {self._service_name}: {task_description[:100]}" - ) - - result = self._delegation_client.invoke_service( - self._service_url, - task, - subject_token=user_token, - ) - - # Format result for agent - result_str = result.get("result", "") - delegation_chain = result.get("delegation_chain", []) - - # Include delegation chain in response for transparency - response = f"Result from {self._service_name}:\n\n{result_str}" - - if delegation_chain: - response += f"\n\n(Delegation chain: {' → '.join(delegation_chain)})" - - return response - - except Exception as e: - logger.error( - f"Delegation to {self._service_name} failed: {e}", - exc_info=True, - ) - return f"Error delegating to {self._service_name}: {str(e)}" - - # Create args schema - class DelegationInput(BaseModel): - """Input for service delegation tool.""" - - task_description: str = Field( - description=f"Description of the task to delegate to {service_name}" - ) - task_inputs: dict[str, Any] | None = Field( - default=None, - description="Optional additional inputs/parameters for the task", - ) - - ServiceDelegationTool.args_schema = DelegationInput - - # Instantiate and return tool - tool = ServiceDelegationTool( - delegation_client=delegation_client, - service_url=service_url, - service_name=service_name, - ) - - return tool - - -# For manual service list specification (useful for testing) -async def create_a2a_tool_for_service( - service_config: AgentServiceConfig, - target_service_url: str, -) -> BaseTool: - """Create a single A2A delegation tool for a specific service. - - Useful for testing or when you want to manually specify delegation targets. - - Args: - service_config: Configuration of the calling service - target_service_url: URL of the target service - - Returns: - CrewAI BaseTool for delegation - - Example: - >>> config = AgentServiceConfig(...) - >>> tool = await create_a2a_tool_for_service( - ... config, - ... "https://slack-poster.example.com" - ... ) - >>> agent = Agent(role="Orchestrator", tools=[tool]) - """ - # Discover the service - discovery = ServiceDiscovery(service_config) - try: - card = await discovery.get_service_card(target_service_url) - finally: - await discovery.close() - - # Create service info dict - service_info = { - "name": card["name"], - "url": target_service_url, - "description": card.get("description", ""), - "capabilities": card.get("capabilities", []), - } - - # Create delegation client (synchronous to avoid event loop issues) - delegation_client = DelegationClientSync(service_config) - - # Create and return tool - return _create_delegation_tool(service_info, delegation_client) diff --git a/packages/agents/tests/integrations/__init__.py b/packages/agents/tests/integrations/__init__.py deleted file mode 100644 index 849610e..0000000 --- a/packages/agents/tests/integrations/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Integration tests for agent frameworks.""" diff --git a/packages/agents/tests/integrations/test_crewai_a2a.py b/packages/agents/tests/integrations/test_crewai_a2a.py deleted file mode 100644 index 7b8d6de..0000000 --- a/packages/agents/tests/integrations/test_crewai_a2a.py +++ /dev/null @@ -1,406 +0,0 @@ -"""Tests for CrewAI A2A delegation integration.""" - -from unittest.mock import AsyncMock, patch - -import pytest - -pytest.importorskip("crewai") - -from keycardai.a2a import AgentServiceConfig - -from keycardai.agents.integrations.crewai import ( - _create_delegation_tool, - create_a2a_tool_for_service, - get_a2a_tools, -) - - -@pytest.fixture -def service_config(): - """Create test service configuration.""" - return AgentServiceConfig( - service_name="Test Service", - client_id="test_client", - client_secret="test_secret", - identity_url="https://test.example.com", - zone_id="test_zone_123", - ) - - -@pytest.fixture -def mock_delegatable_services(): - """Mock list of delegatable services.""" - return [ - { - "name": "PR Analysis Service", - "url": "https://pr-analyzer.example.com", - "description": "Analyzes GitHub pull requests for code quality", - "capabilities": ["pr_analysis", "code_review", "security_scan"], - }, - { - "name": "Slack Notification Service", - "url": "https://slack-notifier.example.com", - "description": "Posts notifications to Slack channels", - "capabilities": ["slack_post", "notification"], - }, - ] - - -@pytest.fixture -def mock_agent_card(): - """Mock agent card for service discovery (a2a-sdk 1.x JSON shape).""" - return { - "name": "Echo Service", - "description": "Simple echo service for testing", - "version": "1.0.0", - "supportedInterfaces": [ - { - "url": "https://echo.example.com/a2a/jsonrpc", - "protocolBinding": "jsonrpc", - "protocolVersion": "1.0", - } - ], - "capabilities": {"streaming": False}, - "skills": [ - {"id": "echo", "name": "Echo"}, - {"id": "testing", "name": "Testing"}, - ], - } - - -class TestGetA2ATools: - """Test A2A tool generation.""" - - @pytest.mark.asyncio - async def test_get_a2a_tools_with_no_services(self, service_config): - """Test get_a2a_tools returns empty list when no services provided.""" - tools = await get_a2a_tools(service_config, delegatable_services=[]) - - assert tools == [] - assert isinstance(tools, list) - - @pytest.mark.asyncio - async def test_get_a2a_tools_with_provided_services( - self, service_config, mock_delegatable_services - ): - """Test get_a2a_tools creates tools for provided services.""" - tools = await get_a2a_tools( - service_config, delegatable_services=mock_delegatable_services - ) - - assert len(tools) == 2 - assert all(hasattr(tool, "name") for tool in tools) - assert all(hasattr(tool, "description") for tool in tools) - assert all(hasattr(tool, "_run") for tool in tools) - - @pytest.mark.asyncio - async def test_get_a2a_tools_discovers_services_when_none_provided( - self, service_config - ): - """Test get_a2a_tools discovers services from Keycard when not provided.""" - # When delegatable_services=None, it should try to discover - # Currently returns empty list (discovery not implemented) - with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" - ) as mock_discovery_class: - mock_discovery = AsyncMock() - mock_discovery.list_delegatable_services.return_value = [] - mock_discovery.close = AsyncMock() - mock_discovery_class.return_value = mock_discovery - - tools = await get_a2a_tools(service_config, delegatable_services=None) - - assert isinstance(tools, list) - - @pytest.mark.asyncio - async def test_get_a2a_tools_creates_correct_tool_count( - self, service_config, mock_delegatable_services - ): - """Test one tool is created per service.""" - tools = await get_a2a_tools( - service_config, delegatable_services=mock_delegatable_services - ) - - assert len(tools) == len(mock_delegatable_services) - - -class TestCreateDelegationTool: - """Test delegation tool creation.""" - - def test_tool_name_generation(self, service_config): - """Test tool name is generated correctly from service name.""" - service_info = { - "name": "PR Analysis Service", - "url": "https://pr-analyzer.example.com", - "description": "Test service", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - assert tool.name == "delegate_to_pr_analysis_service" - - def test_tool_name_handles_special_characters(self, service_config): - """Test tool name generation handles special characters.""" - service_info = { - "name": "Slack-Notification Service", - "url": "https://slack.example.com", - "description": "Test service", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - # Hyphens should be converted to underscores - assert tool.name == "delegate_to_slack_notification_service" - - def test_tool_description_includes_capabilities(self, service_config): - """Test tool description includes service capabilities.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "A test service", - "capabilities": ["capability1", "capability2", "capability3"], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - # Check capabilities are in description - assert "capability1" in tool.description - assert "capability2" in tool.description - assert "capability3" in tool.description - - def test_tool_has_correct_args_schema(self, service_config): - """Test tool has proper args schema for CrewAI.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - # Tool should have args_schema attribute - assert hasattr(tool, "args_schema") - # Args schema should have task_description field - assert "task_description" in tool.args_schema.model_fields - - -class TestDelegationToolExecution: - """Test tool execution behavior.""" - - def test_tool_run_with_task_string(self, service_config): - """Test tool execution with simple task string.""" - service_info = { - "name": "Echo Service", - "url": "https://echo.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - # Mock invoke_service to avoid actual network call - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.return_value = { - "result": "Echo response", - "delegation_chain": ["service1", "echo_service"], - } - - result = tool._run(task_description="Test task") - - assert "Echo response" in result - mock_invoke.assert_called_once() - - def test_tool_run_with_task_and_inputs(self, service_config): - """Test tool execution with task and additional inputs.""" - service_info = { - "name": "PR Analyzer", - "url": "https://pr-analyzer.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.return_value = { - "result": "PR analysis complete", - "delegation_chain": [], - } - - tool._run( - task_description="Analyze PR", task_inputs={"pr_number": 123} - ) - - # Check invoke_service was called with correct task structure - call_args = mock_invoke.call_args - task = call_args[0][1] # Second positional argument - assert task["task"] == "Analyze PR" - assert task["inputs"] == {"pr_number": 123} - - def test_tool_run_calls_a2a_client(self, service_config): - """Test tool delegates to A2A client correctly.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.return_value = {"result": "success", "delegation_chain": []} - - tool._run(task_description="Test") - - # Verify invoke_service was called with service URL - mock_invoke.assert_called_once() - assert mock_invoke.call_args[0][0] == "https://test.example.com" - - def test_tool_run_formats_result_correctly(self, service_config): - """Test tool formats result with delegation chain.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.return_value = { - "result": "Task complete", - "delegation_chain": ["service_a", "service_b"], - } - - result = tool._run(task_description="Test") - - # Result should include delegation chain - assert "Test Service" in result - assert "Task complete" in result - assert "service_a" in result - assert "service_b" in result - - def test_tool_run_includes_delegation_chain(self, service_config): - """Test tool includes delegation chain in response.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.return_value = { - "result": "Done", - "delegation_chain": ["chain_element_1", "chain_element_2"], - } - - result = tool._run(task_description="Test") - - assert "Delegation chain" in result or "delegation" in result.lower() - - def test_tool_run_handles_exceptions(self, service_config): - """Test tool handles exceptions gracefully.""" - service_info = { - "name": "Test Service", - "url": "https://test.example.com", - "description": "Test", - "capabilities": [], - } - - from keycardai.a2a.server.delegation import DelegationClientSync - - a2a_client = DelegationClientSync(service_config) - tool = _create_delegation_tool(service_info, a2a_client) - - with patch.object(a2a_client, "invoke_service") as mock_invoke: - mock_invoke.side_effect = RuntimeError("Network error") - - result = tool._run(task_description="Test") - - # Should return error message, not raise exception - assert "Error" in result or "error" in result - assert isinstance(result, str) - - -class TestCreateA2AToolForService: - """Test single service tool creation.""" - - @pytest.mark.asyncio - async def test_create_tool_fetches_agent_card( - self, service_config, mock_agent_card - ): - """Test create_a2a_tool_for_service fetches agent card.""" - with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" - ) as mock_discovery_class: - mock_discovery = AsyncMock() - mock_discovery.get_service_card.return_value = mock_agent_card - mock_discovery.close = AsyncMock() - mock_discovery_class.return_value = mock_discovery - - tool = await create_a2a_tool_for_service( - service_config, "https://echo.example.com" - ) - - # Should have fetched agent card - mock_discovery.get_service_card.assert_called_once_with( - "https://echo.example.com" - ) - - # Tool should be created with agent card info - assert hasattr(tool, "name") - assert hasattr(tool, "_run") - - @pytest.mark.asyncio - async def test_create_tool_for_service(self, service_config, mock_agent_card): - """Test tool is created correctly from agent card.""" - with patch( - "keycardai.agents.integrations.crewai.ServiceDiscovery" - ) as mock_discovery_class: - mock_discovery = AsyncMock() - mock_discovery.get_service_card.return_value = mock_agent_card - mock_discovery.close = AsyncMock() - mock_discovery_class.return_value = mock_discovery - - tool = await create_a2a_tool_for_service( - service_config, "https://echo.example.com" - ) - - # Tool name should be based on service name from agent card - assert "echo" in tool.name.lower() - assert "service" in tool.name.lower() diff --git a/uv.lock b/uv.lock index 26bee4d..f44b94b 100644 --- a/uv.lock +++ b/uv.lock @@ -2441,39 +2441,21 @@ provides-extras = ["dev", "test"] [[package]] name = "keycardai-agents" source = { editable = "packages/agents" } -dependencies = [ - { name = "keycardai-a2a" }, - { name = "pydantic" }, -] [package.optional-dependencies] -crewai = [ - { name = "crewai" }, -] dev = [ - { name = "mypy" }, { name = "ruff" }, ] test = [ { name = "pytest" }, - { name = "pytest-asyncio" }, - { name = "pytest-cov" }, - { name = "pytest-timeout" }, ] [package.metadata] requires-dist = [ - { name = "crewai", marker = "extra == 'crewai'", specifier = ">=0.86.0" }, - { name = "keycardai-a2a", editable = "packages/a2a" }, - { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.14.1" }, - { name = "pydantic", specifier = ">=2.11.7" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.4.1" }, - { name = "pytest-asyncio", marker = "extra == 'test'", specifier = ">=1.1.0" }, - { name = "pytest-cov", marker = "extra == 'test'", specifier = ">=6.2.1" }, - { name = "pytest-timeout", marker = "extra == 'test'", specifier = ">=2.3.1" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.6" }, ] -provides-extras = ["crewai", "dev", "test"] +provides-extras = ["dev", "test"] [[package]] name = "keycardai-crewai" From db62070b0bae99fdf638b958963561d60e59865c Mon Sep 17 00:00:00 2001 From: Larry-Osakwe Date: Wed, 29 Apr 2026 18:44:30 -0700 Subject: [PATCH 3/5] refactor(keycardai-crewai)!: subclass a2a-sdk AgentExecutor, drop parallel surface (ACC-231) Addresses fresh-eyes review on PR #106. The previous commit moved CrewAIExecutor verbatim, which carried the 0.x sync execute(task, inputs) signature and forced the README quickstart to hand-write the exact parallel-protocol bridge that #105 was reshaped to remove. Ships as the 0.1.0 PyPI surface, the README example becomes the canonical pattern users copy, so the gap had to close before publish. CrewAIExecutor now subclasses a2a.server.agent_execution.AgentExecutor with async execute(self, context, event_queue) and slots straight into DefaultRequestHandler(agent_executor=...). The README quickstart drops the hand-written async wrapper. execute() does four things in order: 1. Read context.call_context.state["access_token"] (set by keycardai.a2a.KeycardServerCallContextBuilder), call set_delegation_token so synchronous CrewAI tools see it; warn if absent so misconfigured deployments are visible in logs. 2. Build a fresh Crew via the factory. 3. Run crew.kickoff() on a worker thread via asyncio.to_thread. crew.kickoff is synchronous and CPU/IO-bound for seconds at a time; calling it directly inside an async def starves uvicorn. The contextvar set in step 1 propagates into the worker via contextvars.copy_context (which asyncio.to_thread does for us), so ServiceDelegationTool._run can still read the token. The class docstring spells this out so a future reader does not swap to_thread for a raw ThreadPoolExecutor. 4. Wrap the string result in an A2A Message (Role.ROLE_AGENT) and enqueue it. Drops the dead set_token_context flag and set_token_for_delegation method on CrewAIExecutor: both existed only for the Keycard AgentExecutor protocol that #105 deleted. Nothing in the new world calls them. Drops the dead try/except around `from crewai import Crew`: crewai is a hard dependency in pyproject.toml, not an extra, so the except branch is unreachable. Trims the Topic :: Security classifier from pyproject.toml: copy-paste from keycardai-a2a, but a CrewAI integration is not security software. Adds 8 new tests (TestCrewAIExecutor + TestSetDelegationToken): - subclass check (isinstance(executor, AgentExecutor)) - execute runs crew.kickoff with the user input as task - execute enqueues a Message with the crew result - execute propagates access_token from RequestContext into the contextvar, observed from inside crew.kickoff via asyncio.to_thread context inheritance - execute warns when access_token is absent - execute does not block the event loop (probe inside kickoff confirms asyncio.get_running_loop() raises in the worker thread) - cancel returns None - set_delegation_token writes to the public contextvar Tests: crewai 24 passed (was 16; +8) a2a 44 passed agents 0 collected (empty package, expected) ruff clean --- packages/crewai/README.md | 85 ++++++--- packages/crewai/pyproject.toml | 1 - .../crewai/src/keycardai/crewai/__init__.py | 170 ++++++++--------- packages/crewai/tests/test_crewai_a2a.py | 180 +++++++++++++++++- 4 files changed, 319 insertions(+), 117 deletions(-) diff --git a/packages/crewai/README.md b/packages/crewai/README.md index 75fdd38..53563a6 100644 --- a/packages/crewai/README.md +++ b/packages/crewai/README.md @@ -8,8 +8,8 @@ CrewAI integration for [keycardai-a2a](../a2a). Use a CrewAI `Crew` as the agent Server-side: -- **`CrewAIExecutor`**: adapter that wraps a CrewAI `Crew` factory so it can be wired into `a2a-sdk`'s `DefaultRequestHandler` as the agent body. -- **`set_delegation_token(access_token)`**: stash the inbound bearer in a contextvar so synchronous CrewAI tool calls can pick it up at delegation time. +- **`CrewAIExecutor`**: an `a2a-sdk` 1.x `AgentExecutor` subclass that runs a CrewAI `Crew`. Pass it directly to `a2a.server.request_handlers.DefaultRequestHandler(agent_executor=...)` — no wrapper needed. +- **`set_delegation_token(access_token)`**: stash an inbound bearer in the contextvar that synchronous CrewAI tools read at delegation time. `CrewAIExecutor` calls this for you; reach for it directly only when running a crew outside the executor. Client-side (CrewAI tools that delegate to other A2A services): @@ -26,14 +26,28 @@ This pulls in `keycardai-a2a`, `crewai`, and (transitively) `keycardai-oauth` + ## Quick start -In your A2A executor's `execute` method, call `set_delegation_token` with the verified bearer (read from `context.call_context.state["access_token"]`, populated by `keycardai.a2a.KeycardServerCallContextBuilder`) and then run your crew. CrewAI tools created via `get_a2a_tools` will pick the token up from the contextvar at invocation time. +`CrewAIExecutor` is an `a2a-sdk` `AgentExecutor`, so it slots into `DefaultRequestHandler` the same way any other executor does. Build the Keycard-protected mount with `keycardai-a2a`'s primitives, drop `CrewAIExecutor(make_crew)` in as the executor, and you are done. ```python -from a2a.server.agent_execution import AgentExecutor -from a2a.types import Message, MessageRole, Part - -from keycardai.a2a import AgentServiceConfig -from keycardai.crewai import CrewAIExecutor, get_a2a_tools, set_delegation_token +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount + +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) +from keycardai.crewai import CrewAIExecutor, get_a2a_tools +from keycardai.oauth.server.credentials import ClientSecret +from keycardai.starlette import AuthProvider, KeycardAuthBackend, keycard_on_error +from keycardai.starlette.routers.metadata import ( + well_known_authorization_server_route, + well_known_protected_resource_route, +) config = AgentServiceConfig( service_name="My Crew", @@ -57,28 +71,53 @@ def make_crew(): return Crew(agents=[orchestrator], tasks=[task]) -class CrewExecutor(AgentExecutor): - async def execute(self, context, event_queue): - access_token = context.call_context.state.get("access_token") - if access_token: - set_delegation_token(access_token) - - crew_runner = CrewAIExecutor(make_crew) - text = context.get_user_input() - result = crew_runner.execute(text) +auth_provider = AuthProvider( + zone_url=config.auth_server_url, + server_name=config.service_name, + server_url=config.identity_url, + application_credential=ClientSecret((config.client_id, config.client_secret)), +) +verifier = auth_provider.get_token_verifier() - message = Message(role=MessageRole.MESSAGE_ROLE_AGENT, parts=[Part(text=result)]) - await event_queue.enqueue_event(message) +agent_card = build_agent_card_from_config(config) +request_handler = DefaultRequestHandler( + agent_executor=CrewAIExecutor(make_crew), + task_store=InMemoryTaskStore(), + agent_card=agent_card, +) - async def cancel(self, context, event_queue): - return None +# Add these routes to your existing Starlette / FastAPI app: +your_app.routes.extend(create_agent_card_routes(agent_card=agent_card)) +your_app.routes.append(well_known_protected_resource_route( + issuer=config.auth_server_url, + resource="/.well-known/oauth-protected-resource{resource_path:path}", +)) +your_app.routes.append(well_known_authorization_server_route( + issuer=config.auth_server_url, + resource="/.well-known/oauth-authorization-server{resource_path:path}", +)) +your_app.routes.append(Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=KeycardAuthBackend(verifier, require_authentication=True), + on_error=keycard_on_error, + ), + ], +)) ``` -Compose this `CrewExecutor` with the `keycardai-a2a` primitives in your Starlette/FastAPI app — see [`packages/a2a/examples/keycard_protected_server`](../a2a/examples/keycard_protected_server) for the auth wiring. +`CrewAIExecutor.execute` reads the verified bearer from `context.call_context.state["access_token"]` (set by `KeycardServerCallContextBuilder`), calls `set_delegation_token` so synchronous CrewAI tools can read it, and runs `crew.kickoff()` on a worker thread via `asyncio.to_thread` to avoid blocking the event loop. ## Relationship to other Keycard packages -- **`keycardai-a2a`**: provides the agent service primitives this package builds on. The `CrewAIExecutor` is fed into a2a-sdk's `DefaultRequestHandler`; the tools created by `get_a2a_tools` go through `keycardai-a2a`'s `DelegationClientSync`. +- **`keycardai-a2a`**: provides the agent service primitives this package builds on. `CrewAIExecutor` subclasses `a2a-sdk`'s `AgentExecutor` directly; the tools created by `get_a2a_tools` go through `keycardai-a2a`'s `DelegationClientSync`. - **`keycardai-oauth`**: token exchange runs through `keycardai-oauth` under the hood, via `keycardai-a2a`'s delegation client. - **`keycardai-starlette`**: the auth backend protecting the agent service mount lives here. - **`keycardai-mcp`**: hosts a separate CrewAI integration for **MCP tools** (different protocol). That one stays in `keycardai-mcp` and is unrelated to this package. diff --git a/packages/crewai/pyproject.toml b/packages/crewai/pyproject.toml index 0ca32f8..e73612e 100644 --- a/packages/crewai/pyproject.toml +++ b/packages/crewai/pyproject.toml @@ -22,7 +22,6 @@ classifiers = [ "Programming Language :: Python :: 3.13", "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", - "Topic :: Security", "Topic :: Scientific/Engineering :: Artificial Intelligence", "License :: OSI Approved :: MIT License", ] diff --git a/packages/crewai/src/keycardai/crewai/__init__.py b/packages/crewai/src/keycardai/crewai/__init__.py index dc9189b..84124d3 100644 --- a/packages/crewai/src/keycardai/crewai/__init__.py +++ b/packages/crewai/src/keycardai/crewai/__init__.py @@ -1,11 +1,19 @@ """CrewAI integration for A2A (agent-to-agent) delegation. -This module provides: -1. CrewAIExecutor: Adapter for running CrewAI crews in the agent service server -2. Delegation tools: CrewAI tools for calling other agent services +Two halves: + +1. ``CrewAIExecutor``: an ``a2a-sdk`` 1.x ``AgentExecutor`` subclass that runs + a CrewAI ``Crew`` in response to incoming A2A messages. Pass it directly + to ``a2a.server.request_handlers.DefaultRequestHandler`` — no wrapper + needed. +2. Delegation tools: ``crewai.tools.BaseTool`` instances that let a CrewAI + agent delegate work to other A2A services, exchanging the inbound user + token via Keycard. Usage with executor: - >>> from keycardai.a2a import AgentServiceConfig + >>> from a2a.server.request_handlers import DefaultRequestHandler + >>> from a2a.server.tasks import InMemoryTaskStore + >>> from keycardai.a2a import AgentServiceConfig, build_agent_card_from_config >>> from keycardai.crewai import CrewAIExecutor >>> from crewai import Agent, Crew, Task >>> @@ -14,11 +22,13 @@ ... task = Task(description="{task}", agent=agent) ... return Crew(agents=[agent], tasks=[task]) >>> - >>> config = AgentServiceConfig( - ... service_name="My Service", - ... # ... other config + >>> config = AgentServiceConfig(service_name="My Service", ...) + >>> agent_card = build_agent_card_from_config(config) + >>> request_handler = DefaultRequestHandler( + ... agent_executor=CrewAIExecutor(create_my_crew), + ... task_store=InMemoryTaskStore(), + ... agent_card=agent_card, ... ) - >>> executor = CrewAIExecutor(create_my_crew) Usage with delegation tools: >>> from keycardai.a2a import AgentServiceConfig @@ -48,68 +58,74 @@ >>> ) """ +import asyncio import contextvars import logging from typing import Any, Callable +from a2a.server.agent_execution import AgentExecutor +from a2a.server.events.event_queue_v2 import EventQueue +from a2a.types import Message, Part, Role from keycardai.a2a import AgentServiceConfig, DelegationClientSync, ServiceDiscovery from pydantic import BaseModel, Field -# Context variable to store the current user's access token for delegation +from crewai import Crew +from crewai.tools import BaseTool + +# Context variable to store the current user's access token for delegation. +# Read by ServiceDelegationTool._run; written by CrewAIExecutor.execute (or +# manually via set_delegation_token). _current_user_token: contextvars.ContextVar[str | None] = contextvars.ContextVar( "current_user_token", default=None ) -try: - from crewai import Crew - from crewai.tools import BaseTool -except ImportError: - raise ImportError( - "CrewAI is not installed. Install it with: pip install keycardai-crewai" - ) from None - logger = logging.getLogger(__name__) def set_delegation_token(access_token: str) -> None: """Set the user's access token for delegation context. - This should be called before crew execution to provide the user's - token for service-to-service delegation. The token will be used - for token exchange when delegating to other services. + ``CrewAIExecutor`` calls this for you. Use it directly only when running + a crew outside the executor (e.g., from a custom AgentExecutor or a + test). Args: access_token: The user's access token from the request Example: - >>> # In your AgentExecutor.execute method + >>> # In a custom AgentExecutor.execute method >>> access_token = context.call_context.state.get("access_token") - >>> set_delegation_token(access_token) - >>> - >>> # Now crew tools can delegate with the user's context - >>> crew = create_my_crew() - >>> result = crew.kickoff() + >>> if access_token: + ... set_delegation_token(access_token) + >>> result = my_crew.kickoff(...) """ _current_user_token.set(access_token) -class CrewAIExecutor: - """Executor adapter for CrewAI crews. +class CrewAIExecutor(AgentExecutor): + """``a2a-sdk`` 1.x ``AgentExecutor`` that runs a CrewAI ``Crew``. + + Pass an instance directly to + ``a2a.server.request_handlers.DefaultRequestHandler(agent_executor=...)``; + no outer wrapper is needed. Subclasses ``a2a.server.agent_execution.AgentExecutor`` + so it satisfies the wire-up contract that ``DefaultRequestHandler`` expects. - Runs a CrewAI ``Crew`` factory and returns the result as a string. - Intended to be called from inside an ``a2a-sdk`` ``AgentExecutor.execute`` - method (which is async); the crew itself runs synchronously via - ``crew.kickoff()``. + On each call to ``execute``: - The executor: - 1. Takes a crew factory callable - 2. Sets delegation token context before execution (when ``set_token_context=True``) - 3. Calls crew.kickoff() with the task/inputs - 4. Returns the result as a string + 1. Reads ``context.call_context.state["access_token"]`` (populated by + ``keycardai.a2a.KeycardServerCallContextBuilder``) and sets the + delegation contextvar so synchronous CrewAI tools can pick it up. + 2. Calls the ``crew_factory`` to build a fresh ``Crew``. + 3. Runs ``crew.kickoff(inputs={"task": })`` on a worker thread + via ``asyncio.to_thread`` so the synchronous CrewAI runtime does not + starve uvicorn's event loop. ``asyncio.to_thread`` propagates the + contextvar via ``contextvars.copy_context``; do **not** swap this for a + raw ``ThreadPoolExecutor``, which would not, and would silently break + delegation. + 4. Wraps the string result in an A2A ``Message`` and enqueues it. Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution + crew_factory: Callable that returns a fresh ``Crew`` for each request. Example: >>> from crewai import Agent, Crew, Task @@ -120,68 +136,38 @@ class CrewAIExecutor: ... return Crew(agents=[agent], tasks=[task]) >>> >>> executor = CrewAIExecutor(create_my_crew) - >>> result = executor.execute("Hello world", {"name": "Alice"}) """ - def __init__(self, crew_factory: Callable[[], Crew], set_token_context: bool = True): - """Initialize CrewAI executor. - - Args: - crew_factory: Callable that returns a Crew instance - set_token_context: If True, automatically set delegation token before execution - """ + def __init__(self, crew_factory: Callable[[], Crew]): self.crew_factory = crew_factory - self.set_token_context = set_token_context - - def execute( - self, - task: dict[str, Any] | str, - inputs: dict[str, Any] | None = None, - ) -> str: - """Execute crew with the given task and inputs. - - Args: - task: Task description (string) or parameters (dict) - inputs: Optional additional inputs for the crew - - Returns: - Result from crew execution as string - - Raises: - Exception: If crew execution fails - """ - # Create crew instance - crew = self.crew_factory() - # Prepare inputs for crew - if isinstance(task, dict): - crew_inputs = task + async def execute(self, context: Any, event_queue: EventQueue) -> None: + call_ctx = getattr(context, "call_context", None) + access_token = call_ctx.state.get("access_token") if call_ctx else None + if access_token: + set_delegation_token(access_token) else: - crew_inputs = {"task": task} - - # Merge additional inputs if provided - if inputs: - crew_inputs.update(inputs) - - # Execute crew - # Note: crew.kickoff() is synchronous in CrewAI - logger.info(f"Executing CrewAI crew with inputs: {list(crew_inputs.keys())}") - result = crew.kickoff(inputs=crew_inputs) + logger.warning( + "No access_token in RequestContext.call_context.state; " + "delegation tools will run without a user token. Ensure the " + "JSONRPC mount uses keycardai.a2a.KeycardServerCallContextBuilder." + ) - # Return result as string - return str(result) + user_input = context.get_user_input() + crew = self.crew_factory() + crew_inputs = {"task": user_input} - def set_token_for_delegation(self, access_token: str) -> None: - """Set access token for delegation context. + logger.info("Executing CrewAI crew") + result = await asyncio.to_thread(crew.kickoff, inputs=crew_inputs) - This is called by the server before execution to provide - the user's token for service-to-service delegation. + message = Message( + role=Role.ROLE_AGENT, + parts=[Part(text=str(result))], + ) + await event_queue.enqueue_event(message) - Args: - access_token: User's access token - """ - if self.set_token_context: - set_delegation_token(access_token) + async def cancel(self, context: Any, event_queue: EventQueue) -> None: + return None async def get_a2a_tools( diff --git a/packages/crewai/tests/test_crewai_a2a.py b/packages/crewai/tests/test_crewai_a2a.py index a872f4d..41688b9 100644 --- a/packages/crewai/tests/test_crewai_a2a.py +++ b/packages/crewai/tests/test_crewai_a2a.py @@ -1,17 +1,22 @@ """Tests for CrewAI A2A delegation integration.""" -from unittest.mock import AsyncMock, patch +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch import pytest pytest.importorskip("crewai") +from a2a.server.agent_execution import AgentExecutor from keycardai.a2a import AgentServiceConfig from keycardai.crewai import ( + CrewAIExecutor, _create_delegation_tool, + _current_user_token, create_a2a_tool_for_service, get_a2a_tools, + set_delegation_token, ) @@ -404,3 +409,176 @@ async def test_create_tool_for_service(self, service_config, mock_agent_card): # Tool name should be based on service name from agent card assert "echo" in tool.name.lower() assert "service" in tool.name.lower() + + +def _make_request_context(*, user_input: str = "hello", access_token: str | None = None): + """Build a stand-in for a2a-sdk's RequestContext. + + Only the attributes CrewAIExecutor.execute touches are populated: + ``get_user_input()`` and ``call_context.state``. + """ + call_context = SimpleNamespace( + state={"access_token": access_token} if access_token is not None else {} + ) + return SimpleNamespace( + get_user_input=lambda: user_input, + call_context=call_context, + ) + + +@pytest.fixture(autouse=True) +def _reset_delegation_token(): + """Each test starts with a clean contextvar.""" + token = _current_user_token.set(None) + yield + _current_user_token.reset(token) + + +class TestCrewAIExecutor: + """CrewAIExecutor wires a CrewAI Crew into a2a-sdk's AgentExecutor contract.""" + + def test_subclasses_a2a_agent_executor(self): + """The whole point of the wrap: instances pass DefaultRequestHandler's type check.""" + executor = CrewAIExecutor(crew_factory=lambda: MagicMock()) + + assert isinstance(executor, AgentExecutor) + + @pytest.mark.asyncio + async def test_execute_runs_crew_with_user_input(self): + """The user input from RequestContext lands in crew.kickoff(inputs={"task": ...}).""" + crew = MagicMock() + crew.kickoff.return_value = "crew result" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(user_input="analyze this") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + crew.kickoff.assert_called_once_with(inputs={"task": "analyze this"}) + + @pytest.mark.asyncio + async def test_execute_enqueues_message_with_crew_result(self): + """The string form of the crew result becomes the agent message.""" + crew = MagicMock() + crew.kickoff.return_value = "the answer" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(user_input="ask") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + event_queue.enqueue_event.assert_called_once() + message = event_queue.enqueue_event.call_args[0][0] + # The Message body should carry the crew result. + assert "the answer" in str(message) + + @pytest.mark.asyncio + async def test_execute_propagates_access_token_to_contextvar(self): + """The bearer in call_context.state must reach _current_user_token by the time + crew.kickoff runs, since synchronous CrewAI tools read the contextvar there. + + asyncio.to_thread inherits the calling task's context via copy_context, so the + contextvar set in execute() is visible inside the worker thread. + """ + observed = {} + + def crew_factory(): + crew = MagicMock() + + def kickoff(inputs): + observed["token"] = _current_user_token.get() + return "ok" + + crew.kickoff.side_effect = kickoff + return crew + + executor = CrewAIExecutor(crew_factory=crew_factory) + + context = _make_request_context(access_token="bearer-abc") + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + assert observed["token"] == "bearer-abc" + + @pytest.mark.asyncio + async def test_execute_warns_when_access_token_missing(self, caplog): + """No token in state ⇒ log a warning so misconfigured deployments are visible.""" + crew = MagicMock() + crew.kickoff.return_value = "ok" + executor = CrewAIExecutor(crew_factory=lambda: crew) + + context = _make_request_context(access_token=None) + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + with caplog.at_level("WARNING", logger="keycardai.crewai"): + await executor.execute(context, event_queue) + + assert any( + "access_token" in record.message + for record in caplog.records + if record.levelname == "WARNING" + ) + + @pytest.mark.asyncio + async def test_execute_does_not_block_event_loop(self): + """crew.kickoff must run on a worker thread, not on the event loop. + + The probe records the running loop's policy at kickoff time. If kickoff + ran on the event loop directly, asyncio.get_running_loop() would succeed + in the same task; in a worker thread it raises RuntimeError. + """ + observed = {} + + def crew_factory(): + crew = MagicMock() + + def kickoff(inputs): + import asyncio + + try: + asyncio.get_running_loop() + observed["on_loop"] = True + except RuntimeError: + observed["on_loop"] = False + return "ok" + + crew.kickoff.side_effect = kickoff + return crew + + executor = CrewAIExecutor(crew_factory=crew_factory) + + context = _make_request_context() + event_queue = MagicMock() + event_queue.enqueue_event = AsyncMock() + + await executor.execute(context, event_queue) + + assert observed["on_loop"] is False + + @pytest.mark.asyncio + async def test_cancel_returns_none(self): + """Default cancel is a no-op; AgentExecutor.cancel must not raise.""" + executor = CrewAIExecutor(crew_factory=lambda: MagicMock()) + + context = _make_request_context() + event_queue = MagicMock() + + result = await executor.cancel(context, event_queue) + + assert result is None + + +class TestSetDelegationToken: + """set_delegation_token writes to the public contextvar.""" + + def test_set_delegation_token_updates_contextvar(self): + set_delegation_token("token-xyz") + + assert _current_user_token.get() == "token-xyz" From 475cd56cb090209dc8c36b543aa19e7f208720af Mon Sep 17 00:00:00 2001 From: Larry-Osakwe Date: Wed, 29 Apr 2026 18:59:48 -0700 Subject: [PATCH 4/5] test(keycardai-crewai): integration test driving CrewAIExecutor through DefaultRequestHandler over JSONRPC (ACC-231) The unit tests in test_crewai_a2a.py prove CrewAIExecutor.execute does the right thing when called directly with mocked context and event_queue. They do not prove that DefaultRequestHandler actually invokes execute when a real JSONRPC SendMessage request lands on the mount. That gap is the highest-value thing to close before this hits PyPI; the README quickstart shape (DefaultRequestHandler(agent_executor=CrewAIExecutor(...), ...)) is what users will copy, so a regression here is invisible until somebody actually deploys the package. This module instantiates the headline composition the way the README shows it, drives it with a real JSONRPC POST through Starlette is TestClient, and asserts the crew result comes back. Three assertions: 1. The user message text reaches crew.kickoff(inputs={"task": ...}). If the dispatcher does not invoke our execute method, this fails. 2. The crew result string appears in the JSONRPC response body. Verifies the Message we enqueue is shaped correctly for SendMessageResponse. 3. The KeycardUser access_token reaches the contextvar by the time crew.kickoff runs. End-to-end check of the full chain: KeycardUser -> request.scope["user"] -> KeycardServerCallContextBuilder -> ServerCallContext.state["access_token"] -> CrewAIExecutor.execute -> set_delegation_token -> asyncio.to_thread context inheritance -> crew.kickoff observes it. Auth uses a stub backend that always returns a fixed KeycardUser. Real auth is exercised in keycardai-starlette is own tests; a real verifier here would need a reachable JWKS endpoint, and the verifier-to-context flow is already covered by the propagation tests in keycardai-a2a. The isolation lets this test focus on the wire-up between the JSONRPC dispatcher and our executor. Two non-obvious bits learned while wiring this up, both worth knowing: - a2a-sdk 1.x JSONRPC method names are CamelCase ("SendMessage"), not the 0.x slash form ("message/send"). The keycardai-a2a tests at packages/a2a/tests/test_agent_card_server.py use "message/send" but only assert 401 from the auth gate, so the dispatcher never sees the body and the wrong method name was never caught there. - The dispatcher requires an A2A-Version: 1.0 header on the request. Without it the server defaults to 0.3 and validate_version returns a JSONRPC error before the executor is ever called. Both worth following up on the keycardai-a2a side (the 401 tests should also cover at least one positive path so this kind of dispatcher-shape drift gets caught), but out of scope for this PR. Tests: crewai 27 passed (24 unit + 3 integration) a2a 44 passed agents 0 collected (empty package, expected) oauth/starlette/mcp/mcp-fastmcp/fastmcp green ruff clean --- ...est_default_request_handler_integration.py | 241 ++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 packages/crewai/tests/test_default_request_handler_integration.py diff --git a/packages/crewai/tests/test_default_request_handler_integration.py b/packages/crewai/tests/test_default_request_handler_integration.py new file mode 100644 index 0000000..2ca381f --- /dev/null +++ b/packages/crewai/tests/test_default_request_handler_integration.py @@ -0,0 +1,241 @@ +"""Wire-up smoke test for ``CrewAIExecutor`` against ``a2a-sdk``'s +``DefaultRequestHandler`` and the JSONRPC route factory. + +The ``test_crewai_a2a.py`` tests cover ``CrewAIExecutor.execute`` in isolation +with mocked context and event_queue. They prove the method does the right thing +when invoked directly, but they do NOT prove that ``DefaultRequestHandler`` +actually invokes our ``execute`` method when a real JSONRPC ``message/send`` +request comes in. That gap is the highest-value thing to close: it verifies the +wrap-fidelity claim end-to-end. + +This module instantiates the headline composition exactly the way the README +quickstart shows (``DefaultRequestHandler(agent_executor=CrewAIExecutor(...), ...)`` +mounted via ``create_jsonrpc_routes``), drives it with a real JSONRPC POST +through Starlette's ``TestClient``, and asserts the crew result comes back in +the response. A stub auth backend stands in for the real Keycard verifier (real +auth is exercised in keycardai-starlette's own tests); the only thing under +test here is the wire-up between the JSONRPC dispatcher and our executor. +""" + +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("crewai") + +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) +from keycardai.starlette import KeycardUser +from starlette.applications import Starlette +from starlette.authentication import AuthCredentials, AuthenticationBackend +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount +from starlette.testclient import TestClient + +from keycardai.crewai import CrewAIExecutor, _current_user_token + + +class _StubAuthBackend(AuthenticationBackend): + """Stand-in for ``KeycardAuthBackend`` that always authenticates with a + fixed ``KeycardUser``. + + Real auth is covered by keycardai-starlette's own tests. Using a real + verifier here would require a reachable JWKS endpoint, which the test + environment does not provide; the cost of mocking the verifier outweighs + the value, since the verifier-to-context flow is already covered by the + ``KeycardServerCallContextBuilder`` propagation tests in keycardai-a2a. + What this test isolates is the JSONRPC -> DefaultRequestHandler -> + CrewAIExecutor.execute path. + """ + + def __init__(self, access_token: str): + self._access_token = access_token + + async def authenticate(self, conn): + user = KeycardUser( + access_token=self._access_token, + client_id="caller-svc", + zone_id="test_zone", + resource_server_url="https://test.example.com", + scopes=["test"], + ) + return AuthCredentials(["authenticated"]), user + + +@pytest.fixture +def service_config(): + return AgentServiceConfig( + service_name="Test Crew Service", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone", + capabilities=["test"], + ) + + +@pytest.fixture +def crew_observation(): + """Captures what the (fake) crew was driven with, for assertion.""" + return {} + + +@pytest.fixture +def fake_crew_factory(crew_observation): + def _factory(): + crew = MagicMock() + + def kickoff(inputs): + crew_observation["inputs"] = inputs + crew_observation["token_at_kickoff"] = _current_user_token.get() + return "fake crew result" + + crew.kickoff.side_effect = kickoff + return crew + + return _factory + + +@pytest.fixture +def app(service_config, fake_crew_factory): + """The composition under test: DefaultRequestHandler wraps CrewAIExecutor, + mounted via the standard a2a-sdk JSONRPC route factory, with the Keycard + context-builder propagating the verified user into ServerCallContext.state. + """ + executor = CrewAIExecutor(crew_factory=fake_crew_factory) + agent_card = build_agent_card_from_config(service_config) + request_handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + + return Starlette( + routes=[ + Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=_StubAuthBackend(access_token="bearer-test-token"), + ), + ], + ), + ] + ) + + +@pytest.fixture +def client(app): + return TestClient(app) + + +class TestDefaultRequestHandlerInvokesCrewAIExecutor: + """The wrap actually wraps: DefaultRequestHandler drives CrewAIExecutor when + a JSONRPC ``message/send`` request lands at ``/a2a/jsonrpc``. + """ + + def test_message_send_drives_crew_kickoff_with_user_input( + self, client, crew_observation + ): + """The user's message text reaches ``crew.kickoff(inputs={"task": ...})``. + + This is the core wire-up assertion: a JSONRPC envelope hitting the mount + causes ``DefaultRequestHandler`` to build a ``RequestContext``, call our + executor's ``execute``, which calls our crew. If the crew never sees the + text, something between the dispatcher and ``execute`` is broken. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-1", + "role": "ROLE_USER", + "parts": [{"text": "hello world"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert crew_observation["inputs"] == {"task": "hello world"} + + def test_response_carries_crew_result(self, client): + """The string returned by the crew comes back to the JSONRPC caller. + + ``CrewAIExecutor.execute`` enqueues a ``Message`` whose text is the + ``str(crew.kickoff(...))`` result. ``DefaultRequestHandler`` shapes that + into the ``SendMessageResponse`` payload. The crew result string should + appear somewhere in the response body. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-2", + "role": "ROLE_USER", + "parts": [{"text": "anything"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert "fake crew result" in response.text + + def test_access_token_propagates_into_crew_kickoff( + self, client, crew_observation + ): + """The bearer reaches the contextvar by the time crew.kickoff runs. + + The verified bearer flows: KeycardUser (set by the auth backend) -> + request.scope["user"] -> KeycardServerCallContextBuilder -> + ServerCallContext.state["access_token"] -> CrewAIExecutor.execute reads + it -> set_delegation_token writes it to the contextvar -> asyncio.to_thread + copies the context into the worker thread -> crew.kickoff observes it. + + If any link breaks, synchronous CrewAI tools delegate without the user + token and downstream services either reject the call or attribute it to + the wrong identity. Worth a single assertion that the chain holds. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "msg-3", + "role": "ROLE_USER", + "parts": [{"text": "doesn't matter"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + assert crew_observation["token_at_kickoff"] == "bearer-test-token" From 6b4a571ba4cd1ffb3692a757ecd0c56f60eedd33 Mon Sep 17 00:00:00 2001 From: Larry-Osakwe Date: Wed, 29 Apr 2026 19:11:59 -0700 Subject: [PATCH 5/5] fix(keycardai-a2a)!: align DelegationClient with a2a-sdk 1.x JSONRPC dispatcher (ACC-231) DelegationClient.invoke_service was hardcoded to the 0.x JSONRPC method "message/send", emitted a 0.x message envelope (lowercase role, no messageId), omitted the A2A-Version header, and unwrapped a 0.x response shape (result.parts). Against any real 1.x server the dispatcher rejected the call before the executor ran, so the keycardai-crewai delegation tools (the entire client-side path that runs through DelegationClientSync.invoke_service) were dead-on-arrival. The bug survived because the existing test_a2a_client.py tests mock the http client and the request body never sees a real dispatcher; they asserted "method == message/send" against the stubbed wire shape, which was internally consistent and externally wrong. Caught by the keycardai-crewai integration test added in commit 4: that test sends a real JSONRPC POST through Starlette TestClient against the real a2a-sdk DefaultRequestHandler, so dispatcher contract drift fails fast. Changes: - _build_jsonrpc_message_send -> _build_jsonrpc_send_message: method name is now "SendMessage" (the 1.x CamelCase form), the message envelope carries a messageId (required by the dispatcher) and the canonical enum-string role "ROLE_USER". - Both async and sync invoke_service set the A2A-Version: 1.0 header. The header constant comes from a2a.utils.constants so a future a2a-sdk rename follows our code automatically. - _unwrap_jsonrpc_response unwraps result.message.parts[].text, the shape SendMessageResponse takes when the executor enqueues a Message. Tasks fall back to JSON-stringified for now; full Task lifecycle consumers should reach for a2a.client.create_client directly. Tests: test_a2a_client.py mocks updated to the 1.x response shape and now assert method == "SendMessage", role == "ROLE_USER", messageId presence, and A2A-Version header on the outbound POST. Added test_jsonrpc_dispatch.py: a positive-path TestClient test that drives a real a2a-sdk DefaultRequestHandler with a stub AgentExecutor enqueuing a Message, then asserts the executor saw the user input and the access_token from KeycardServerCallContextBuilder. The auth-gate tests in test_agent_card_server.py only cover 401, so dispatcher contract drift had no local guard. This test fills the gap on the keycardai-a2a side; the keycardai-crewai integration test added in commit 4 covers the same chain with a CrewAIExecutor on top. a2a 45 passed (was 44; +1) crewai 27 passed agents 0 collected (empty package, expected) oauth/starlette/mcp/mcp-fastmcp/fastmcp green ruff clean BREAKING: Wire shape changes; any caller depending on the old "message/send" method name or the lowercase "user" role on the outbound envelope needs to update. No production callers exist (closed alpha; the keycardai-crewai delegation path is the only consumer in this PR). --- .../src/keycardai/a2a/server/delegation.py | 83 +++++---- packages/a2a/tests/test_a2a_client.py | 41 ++++- packages/a2a/tests/test_jsonrpc_dispatch.py | 169 ++++++++++++++++++ 3 files changed, 252 insertions(+), 41 deletions(-) create mode 100644 packages/a2a/tests/test_jsonrpc_dispatch.py diff --git a/packages/a2a/src/keycardai/a2a/server/delegation.py b/packages/a2a/src/keycardai/a2a/server/delegation.py index fb797c1..a1f75e6 100644 --- a/packages/a2a/src/keycardai/a2a/server/delegation.py +++ b/packages/a2a/src/keycardai/a2a/server/delegation.py @@ -10,6 +10,7 @@ from typing import Any import httpx +from a2a.utils import constants from keycardai.oauth import AsyncClient as AsyncOAuthClient from keycardai.oauth import Client as SyncOAuthClient @@ -22,13 +23,16 @@ logger = logging.getLogger(__name__) -def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: - """Wrap a task in an A2A JSONRPC ``message/send`` envelope. +def _build_jsonrpc_send_message(task: dict[str, Any] | str) -> dict[str, Any]: + """Wrap a task in an A2A 1.x JSONRPC ``SendMessage`` envelope. ``task`` may be a plain string, a dict carrying a ``"task"`` string under - that key (legacy shape preserved for the keycardai-agents CrewAI - integration), or any other dict (serialized to JSON for the message - text). + that key (legacy shape preserved for the CrewAI integration), or any + other dict (serialized to JSON for the message text). + + The shape mirrors ``a2a.types.SendMessageRequest`` after JSON-marshalling + via ``google.protobuf.json_format``: a ``messageId`` (required by the + dispatcher), an enum-string ``role``, and ``parts`` carrying the text. """ if isinstance(task, str): text = task @@ -43,10 +47,11 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: return { "jsonrpc": "2.0", "id": str(uuid.uuid4()), - "method": "message/send", + "method": "SendMessage", "params": { "message": { - "role": "user", + "messageId": str(uuid.uuid4()), + "role": "ROLE_USER", "parts": [{"text": text}], }, }, @@ -54,16 +59,20 @@ def _build_jsonrpc_message_send(task: dict[str, Any] | str) -> dict[str, Any]: def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: - """Unwrap an A2A JSONRPC response into the ``{result, delegation_chain}`` shape. + """Unwrap an A2A 1.x JSONRPC ``SendMessageResponse`` into the + ``{result, delegation_chain}`` shape consumed by the CrewAI delegation + tool. - Best-effort surface used by CrewAI delegation tools. If the JSONRPC - result is a ``Message`` (parts with ``text``), the text parts are - joined; otherwise the raw result is JSON-stringified. + ``SendMessageResponse`` is a oneof of ``message`` or ``task``. If the + remote executor enqueued a ``Message`` (the common case for synchronous + crews), the text is at ``result.message.parts[].text``. If it produced a + ``Task``, we fall back to JSON-stringifying the task; callers wanting the + full Task lifecycle should reach for ``a2a.client.create_client``. - ``delegation_chain`` is returned empty: the legacy keycardai-agents - chain reconstruction read from ``request.state.keycardai_auth_info`` - which never carried the claim, so it was always single-hop. Callers - that need multi-hop tracking should parse JWT claims directly. + ``delegation_chain`` is returned empty: the legacy chain reconstruction + read from ``request.state.keycardai_auth_info``, which never carried the + claim, so it was always single-hop. Callers needing multi-hop tracking + should parse JWT claims directly. Raises: ValueError: if the response carries a JSONRPC ``error`` member. @@ -78,18 +87,20 @@ def _unwrap_jsonrpc_response(response_body: dict[str, Any]) -> dict[str, Any]: if result is None: return {"result": "", "delegation_chain": []} if isinstance(result, dict): - parts = result.get("parts") - if isinstance(parts, list): - text_parts = [ - p.get("text", "") - for p in parts - if isinstance(p, dict) and "text" in p - ] - if text_parts: - return { - "result": "\n".join(text_parts), - "delegation_chain": [], - } + message = result.get("message") + if isinstance(message, dict): + parts = message.get("parts") + if isinstance(parts, list): + text_parts = [ + p.get("text", "") + for p in parts + if isinstance(p, dict) and "text" in p + ] + if text_parts: + return { + "result": "\n".join(text_parts), + "delegation_chain": [], + } if isinstance(result, str): return {"result": result, "delegation_chain": []} return {"result": json.dumps(result), "delegation_chain": []} @@ -278,7 +289,7 @@ async def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` and returns ``{"result": , "delegation_chain": []}`` for compatibility with the legacy invocation surface. If you need the full A2A protocol surface (Task lifecycle, streaming, status @@ -311,13 +322,16 @@ async def invoke_service( token = await self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = await self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) @@ -524,7 +538,7 @@ def invoke_service( ) -> dict[str, Any]: """Call another agent service over A2A JSONRPC with bearer auth. - Sends a ``message/send`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` + Sends a ``SendMessage`` JSONRPC request to ``${service_url}/a2a/jsonrpc`` and returns ``{"result": , "delegation_chain": []}`` for compatibility with the legacy invocation surface. @@ -547,13 +561,16 @@ def invoke_service( token = self.get_delegation_token(service_url, subject_token) jsonrpc_url = f"{service_url}/a2a/jsonrpc" - envelope = _build_jsonrpc_message_send(task) + envelope = _build_jsonrpc_send_message(task) try: response = self.http_client.post( jsonrpc_url, json=envelope, - headers={"Authorization": f"Bearer {token}"}, + headers={ + "Authorization": f"Bearer {token}", + constants.VERSION_HEADER: constants.PROTOCOL_VERSION_1_0, + }, ) response.raise_for_status() unwrapped = _unwrap_jsonrpc_response(response.json()) diff --git a/packages/a2a/tests/test_a2a_client.py b/packages/a2a/tests/test_a2a_client.py index 34505a3..f2e6819 100644 --- a/packages/a2a/tests/test_a2a_client.py +++ b/packages/a2a/tests/test_a2a_client.py @@ -111,14 +111,23 @@ async def test_get_delegation_token_client_credentials(a2a_client): @pytest.mark.asyncio async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): - """invoke_service sends a JSONRPC message/send to /a2a/jsonrpc.""" + """invoke_service sends a 1.x SendMessage JSONRPC request to /a2a/jsonrpc. + + The dispatcher requires a CamelCase method name and an A2A-Version + header; the message envelope must carry messageId and an enum-string + role. Any of these wrong and the dispatcher rejects the request before + the executor runs. + """ mock_response = Mock() mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", "result": { - "role": "agent", - "parts": [{"text": "Task completed successfully"}], + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Task completed successfully"}], + }, }, } mock_response.raise_for_status = Mock() @@ -132,17 +141,21 @@ async def test_invoke_service_posts_jsonrpc_envelope(a2a_client): token="test_token_123", ) - # The wrapper unwraps the JSONRPC result back to the legacy shape. + # The wrapper unwraps the SendMessageResponse.message.parts text. assert result["result"] == "Task completed successfully" assert result["delegation_chain"] == [] - # Confirm the request was a JSONRPC envelope to /a2a/jsonrpc. + # Confirm the request matches the 1.x dispatcher contract. posted_url = mock_post.call_args[0][0] posted_body = mock_post.call_args[1]["json"] + posted_headers = mock_post.call_args[1]["headers"] assert posted_url == "https://target.example.com/a2a/jsonrpc" assert posted_body["jsonrpc"] == "2.0" - assert posted_body["method"] == "message/send" + assert posted_body["method"] == "SendMessage" + assert posted_body["params"]["message"]["role"] == "ROLE_USER" assert posted_body["params"]["message"]["parts"][0]["text"] == "Test task" + assert posted_body["params"]["message"]["messageId"] + assert posted_headers["A2A-Version"] == "1.0" @pytest.mark.asyncio @@ -156,7 +169,13 @@ async def test_invoke_service_auto_token_exchange(a2a_client): mock_http_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Success"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Success"}], + } + }, } mock_http_response.raise_for_status = Mock() @@ -183,7 +202,13 @@ async def test_invoke_service_string_task(a2a_client): mock_response.json.return_value = { "jsonrpc": "2.0", "id": "1", - "result": {"role": "agent", "parts": [{"text": "Done"}]}, + "result": { + "message": { + "messageId": "resp-1", + "role": "ROLE_AGENT", + "parts": [{"text": "Done"}], + } + }, } mock_response.raise_for_status = Mock() diff --git a/packages/a2a/tests/test_jsonrpc_dispatch.py b/packages/a2a/tests/test_jsonrpc_dispatch.py new file mode 100644 index 0000000..352b840 --- /dev/null +++ b/packages/a2a/tests/test_jsonrpc_dispatch.py @@ -0,0 +1,169 @@ +"""Positive-path JSONRPC test for the keycardai-a2a server primitives. + +The auth-gate tests in ``test_agent_card_server.py`` only assert that the +``/a2a/jsonrpc`` mount returns 401 for anonymous requests; the body is +rejected before the dispatcher sees it. That coverage misses dispatcher- +shape drift (method-name renames, new required headers, message-envelope +field changes) because the gate fires first. This module sends a real +authenticated JSONRPC request through the dispatcher and asserts a real +response, so future a2a-sdk dispatcher changes that break our delegation +or context-builder wiring fail here rather than in customer deployments. + +It pairs with ``packages/crewai/tests/test_default_request_handler_integration.py`` +on the executor side; the two together cover the dispatcher contract and +the executor wrap. +""" + +import pytest +from a2a.server.agent_execution import AgentExecutor +from a2a.server.events.event_queue_v2 import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.routes import create_jsonrpc_routes +from a2a.server.tasks import InMemoryTaskStore +from a2a.types import Message, Part, Role +from keycardai.starlette import KeycardUser +from starlette.applications import Starlette +from starlette.authentication import AuthCredentials, AuthenticationBackend +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount +from starlette.testclient import TestClient + +from keycardai.a2a import ( + AgentServiceConfig, + KeycardServerCallContextBuilder, + build_agent_card_from_config, +) + + +class _StubAuthBackend(AuthenticationBackend): + """Stand-in for ``KeycardAuthBackend`` that always authenticates with a + fixed ``KeycardUser``. + + Real auth is covered by keycardai-starlette's own tests. The point of + this module is the dispatcher contract, not auth verification, so a + permissive backend that still injects a valid ``KeycardUser`` is the + right level of isolation: the context builder still runs against a + real user object. + """ + + def __init__(self, access_token: str): + self._access_token = access_token + + async def authenticate(self, conn): + user = KeycardUser( + access_token=self._access_token, + client_id="caller-svc", + zone_id="test_zone", + resource_server_url="https://test.example.com", + scopes=["test"], + ) + return AuthCredentials(["authenticated"]), user + + +class _EchoMessageExecutor(AgentExecutor): + """Minimal ``AgentExecutor`` that enqueues a ``Message`` carrying the + user's input back, plus the ``access_token`` it observed in + ``RequestContext.call_context.state``. + + Using a real ``AgentExecutor`` (rather than mocking ``DefaultRequestHandler``) + forces the full chain to run: dispatcher -> context_builder -> executor + -> event_queue -> response. If any link breaks, this test fails. + """ + + async def execute(self, context, event_queue: EventQueue) -> None: + user_input = context.get_user_input() + call_ctx = getattr(context, "call_context", None) + access_token = call_ctx.state.get("access_token") if call_ctx else None + body = f"echoed: {user_input}; token: {access_token}" + message = Message( + message_id="resp-1", + role=Role.ROLE_AGENT, + parts=[Part(text=body)], + ) + await event_queue.enqueue_event(message) + + async def cancel(self, context, event_queue: EventQueue) -> None: + return None + + +@pytest.fixture +def service_config(): + return AgentServiceConfig( + service_name="JSONRPC Dispatch Test", + client_id="test_client", + client_secret="test_secret", + identity_url="https://test.example.com", + zone_id="test_zone", + capabilities=["test"], + ) + + +@pytest.fixture +def client(service_config): + agent_card = build_agent_card_from_config(service_config) + request_handler = DefaultRequestHandler( + agent_executor=_EchoMessageExecutor(), + task_store=InMemoryTaskStore(), + agent_card=agent_card, + ) + app = Starlette( + routes=[ + Mount( + "/a2a", + routes=create_jsonrpc_routes( + request_handler=request_handler, + rpc_url="/jsonrpc", + context_builder=KeycardServerCallContextBuilder(), + ), + middleware=[ + Middleware( + AuthenticationMiddleware, + backend=_StubAuthBackend(access_token="bearer-test-token"), + ), + ], + ), + ] + ) + return TestClient(app) + + +class TestJsonRpcDispatchPositivePath: + """Real JSONRPC ``SendMessage`` call lands at the executor and the + response carries the executor's enqueued message. + """ + + def test_send_message_drives_executor_and_returns_response(self, client): + """A successful round-trip exercises every link in the chain. + + If a2a-sdk renames the JSONRPC method, drops the A2A-Version header + requirement, changes the message envelope shape, or changes how + DefaultRequestHandler shapes the response, this test fails. The + keycardai-crewai integration test exercises the same chain with a + crew on top; this one isolates the keycardai-a2a primitives so a + regression here is attributed to the right package. + """ + response = client.post( + "/a2a/jsonrpc", + json={ + "jsonrpc": "2.0", + "id": "1", + "method": "SendMessage", + "params": { + "message": { + "messageId": "req-1", + "role": "ROLE_USER", + "parts": [{"text": "ping"}], + } + }, + }, + headers={"A2A-Version": "1.0"}, + ) + + assert response.status_code == 200, response.text + body = response.text + # The executor echoed the input. + assert "echoed: ping" in body + # The KeycardServerCallContextBuilder propagated the access_token + # from the auth backend's KeycardUser into ServerCallContext.state. + assert "token: bearer-test-token" in body