Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@
from ._executors_tools import (
FUNCTION_TOOL_REGISTRY_KEY,
TOOL_ACTION_EXECUTORS,
TOOL_APPROVAL_STATE_KEY,
BaseToolExecutor,
InvokeFunctionToolExecutor,
ToolApprovalRequest,
ToolApprovalResponse,
ToolApprovalState,
ToolInvocationResult,
)
Comment thread
peibekwe marked this conversation as resolved.
from ._factory import WorkflowFactory
Expand Down Expand Up @@ -111,7 +109,6 @@
"HTTP_ACTION_EXECUTORS",
"MCP_ACTION_EXECUTORS",
"TOOL_ACTION_EXECUTORS",
"TOOL_APPROVAL_STATE_KEY",
"TOOL_REGISTRY_KEY",
"ActionComplete",
"ActionTrigger",
Expand Down Expand Up @@ -164,7 +161,6 @@
"SetVariableExecutor",
"ToolApprovalRequest",
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WorkflowFactory",
"WorkflowState",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@

Security notes:

- The executor never echoes header VALUES (auth tokens, API keys) into the
approval request — only header NAMES are surfaced to the caller. This
matches the security posture of :mod:`._executors_http` (which never logs
request headers either) and prevents secrets from leaking through workflow
events that are typically observable to operators / UIs.
- ``_MCPToolApprovalState`` snapshots the EVALUATED values for non-secret
fields (server URL, tool name, arguments) at approval-request time so that
subsequent state mutations cannot make the executor "approve X then call
Y". Headers are stored as the raw expression strings (not evaluated values)
so secrets are not persisted in the workflow's checkpoint state. They are
re-evaluated on resume.
- Approval requests surface header NAMES only; header values are not echoed,
matching the posture of :mod:`._executors_http`.
- :class:`MCPToolApprovalRequest` carries the values the resume handler will
use; header values are re-evaluated on resume to keep secrets out of
checkpoint state.
- Tool outputs flow back into agent conversations through ``conversationId``
and through Tool-role messages emitted to ``output.messages``. They share
the same prompt-injection risk surface as ``HttpRequestAction``: workflow
Expand Down Expand Up @@ -60,8 +54,6 @@

logger = logging.getLogger(__name__)

_MCP_APPROVAL_STATE_KEY = "_mcp_tool_approval_state"


# ---------------------------------------------------------------------------
# Request / state types
Expand All @@ -72,20 +64,16 @@
class MCPToolApprovalRequest:
"""Approval request emitted before invoking an MCP tool.

Mirrors :class:`agent_framework_declarative.ToolApprovalRequest` but for
MCP-style invocations. Only header NAMES are surfaced — header values are
intentionally omitted because they typically carry authentication
secrets.

Attributes:
request_id: Unique identifier for this approval request. Matches the
id workflow event-emitters use.
tool_name: Evaluated name of the tool to be invoked.
request_id: Identifier matching the framework's pending-request key.
tool_name: Evaluated tool name.
server_url: Evaluated MCP server URL.
server_label: Optional human-readable label for diagnostics.
arguments: Evaluated arguments to be forwarded to the tool.
header_names: Sorted list of outbound header names (no values). Empty
when no headers are configured.
server_label: Optional human-readable label.
arguments: Evaluated tool arguments.
header_names: Outbound header names (values withheld).
connection_name: Connection identifier the invocation will use.
metadata: Internal routing data pinned at approval-request time
(e.g. ``conversation_id``) for use by the resume handler.
"""

request_id: str
Expand All @@ -94,50 +82,24 @@ class MCPToolApprovalRequest:
server_label: str | None
arguments: dict[str, Any]
header_names: list[str] = field(default_factory=lambda: [])


@dataclass
class _MCPToolApprovalState:
"""Internal state saved during the approval yield for resumption.

Stores **evaluated** values for non-secret fields to prevent
"approve X / execute Y" attacks. Stores the raw expression string for
``headers`` so that secret values are NOT persisted in checkpoint state;
the expressions are re-evaluated against current state on resume.
"""

server_url: str
tool_name: str
server_label: str | None
arguments: dict[str, Any]
connection_name: str | None
headers_def: Any
auto_send: bool
conversation_id_expr: str | None
output_messages_path: str | None
output_result_path: str | None
connection_name: str | None = None
metadata: dict[str, Any] = field(default_factory=lambda: {})


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _get_messages_path(state: DeclarativeWorkflowState, conversation_id_expr: str | None) -> str | None:
"""Return the configured conversation messages path, if any.

Returns ``System.conversations.{evaluated_id}.messages`` when a
``conversation_id_expr`` is configured and evaluates to a non-empty value.
Returns ``None`` when no conversation id expression is configured or when
the expression evaluates to ``None`` or an empty string (mirrors .NET
``GetConversationId`` behaviour).
"""
if not conversation_id_expr:
def _evaluate_conversation_id(state: DeclarativeWorkflowState, conversation_id_expr: Any) -> str | None:
"""Return the evaluated ``conversationId`` string, or None when empty/unset."""
if not isinstance(conversation_id_expr, str) or not conversation_id_expr:
return None
evaluated = state.eval_if_expression(conversation_id_expr)
if evaluated is None or (isinstance(evaluated, str) and not evaluated):
if evaluated is None:
return None
return f"System.conversations.{evaluated}.messages"
text = str(evaluated)
return text or None


def _get_output_path(action_def: Mapping[str, Any], key: str) -> str | None:
Expand Down Expand Up @@ -260,27 +222,16 @@ async def handle_action(

if require_approval:
request_id = str(uuid.uuid4())
approval_state = _MCPToolApprovalState(
server_url=server_url,
tool_name=tool_name,
server_label=server_label,
arguments=arguments,
connection_name=connection_name,
headers_def=self._action_def.get("headers"),
auto_send=auto_send,
conversation_id_expr=conversation_id_expr if isinstance(conversation_id_expr, str) else None,
output_messages_path=output_messages_path,
output_result_path=output_result_path,
)
ctx.state.set(self._approval_key(), approval_state)

conversation_id = _evaluate_conversation_id(state, conversation_id_expr)
request = MCPToolApprovalRequest(
request_id=request_id,
tool_name=tool_name,
server_url=server_url,
server_label=server_label,
arguments=arguments,
header_names=sorted(headers.keys()),
connection_name=connection_name,
metadata={"conversation_id": conversation_id},
)
logger.info(
"%s: requesting approval for MCP tool '%s' on '%s'",
Expand All @@ -289,7 +240,6 @@ async def handle_action(
server_url,
)
await ctx.request_info(request, ToolApprovalResponse, request_id=request_id)
# Workflow yields here — resume in handle_approval_response.
return

# No approval required - invoke directly.
Expand All @@ -307,7 +257,7 @@ async def handle_action(
state=state,
result=result,
auto_send=auto_send,
conversation_id_expr=conversation_id_expr if isinstance(conversation_id_expr, str) else None,
conversation_id=_evaluate_conversation_id(state, conversation_id_expr),
output_messages_path=output_messages_path,
output_result_path=output_result_path,
)
Expand All @@ -322,54 +272,46 @@ async def handle_approval_response(
response: ToolApprovalResponse,
ctx: WorkflowContext[ActionComplete, str],
) -> None:
"""Resume after the workflow yielded for an approval request."""
"""Resume the invocation using the values pinned on ``original_request``."""
state = self._get_state(ctx.state)
approval_key = self._approval_key()

try:
approval_state: _MCPToolApprovalState = ctx.state.get(approval_key)
except KeyError:
logger.error("%s: approval state missing for executor '%s'", self.__class__.__name__, self.id)
await ctx.send_message(ActionComplete())
return
try:
ctx.state.delete(approval_key)
except KeyError:
logger.warning("%s: approval state already deleted for '%s'", self.__class__.__name__, self.id)
tool_name = original_request.tool_name
metadata: dict[str, Any] = getattr(original_request, "metadata", None) or {}
Comment thread
peibekwe marked this conversation as resolved.
raw_conversation_id = metadata.get("conversation_id")
conversation_id = raw_conversation_id if isinstance(raw_conversation_id, str) and raw_conversation_id else None

auto_send = self._get_auto_send(state)
Comment thread
peibekwe marked this conversation as resolved.
output_messages_path = _get_output_path(self._action_def, "messages")
output_result_path = _get_output_path(self._action_def, "result")

if not response.approved:
logger.info(
"%s: MCP tool '%s' rejected: %s",
self.__class__.__name__,
approval_state.tool_name,
tool_name,
response.reason,
)
self._assign_error(
state, approval_state.output_result_path, "MCP tool invocation was not approved by user."
)
self._assign_error(state, output_result_path, "MCP tool invocation was not approved by user.")
await ctx.send_message(ActionComplete())
return

# Approved — re-evaluate headers (not stored at approval time for security).
headers = self._evaluate_headers(state, approval_state.headers_def)

invocation = MCPToolInvocation(
server_url=approval_state.server_url,
tool_name=approval_state.tool_name,
server_label=approval_state.server_label,
arguments=approval_state.arguments,
headers=headers,
connection_name=approval_state.connection_name,
server_url=original_request.server_url,
tool_name=tool_name,
server_label=original_request.server_label,
arguments=original_request.arguments,
headers=self._evaluate_headers(state, self._action_def.get("headers")),
connection_name=getattr(original_request, "connection_name", None),
)
result = await self._invoke_with_narrow_catch(invocation)
await self._process_result(
ctx=ctx,
state=state,
result=result,
auto_send=approval_state.auto_send,
conversation_id_expr=approval_state.conversation_id_expr,
output_messages_path=approval_state.output_messages_path,
output_result_path=approval_state.output_result_path,
auto_send=auto_send,
conversation_id=conversation_id,
output_messages_path=output_messages_path,
output_result_path=output_result_path,
)
await ctx.send_message(ActionComplete())

Expand Down Expand Up @@ -528,7 +470,7 @@ async def _process_result(
state: DeclarativeWorkflowState,
result: MCPToolResult,
auto_send: bool,
conversation_id_expr: str | None,
conversation_id: str | None,
output_messages_path: str | None,
output_result_path: str | None,
) -> None:
Expand Down Expand Up @@ -557,14 +499,10 @@ async def _process_result(
if auto_send and parsed_results:
await ctx.yield_output(_format_outputs_for_send(parsed_results))

if conversation_id_expr:
messages_path = _get_messages_path(state, conversation_id_expr)
if messages_path is not None:
# Mirrors .NET: conversation gets ASSISTANT-role message with
# the same outputs (so chat history reads it as the agent's
# contribution).
assistant_message = Message(role="assistant", contents=list(result.outputs))
state.append(messages_path, assistant_message)
if conversation_id:
messages_path = f"System.conversations.{conversation_id}.messages"
assistant_message = Message(role="assistant", contents=list(result.outputs))
state.append(messages_path, assistant_message)

@staticmethod
def _assign_error(
Expand All @@ -577,9 +515,6 @@ def _assign_error(
return
state.set(output_result_path, f"Error: {error_message}")

def _approval_key(self) -> str:
return f"{_MCP_APPROVAL_STATE_KEY}_{self.id}"


def _parse_outputs(outputs: list[Content]) -> list[Any]:
"""Parse :class:`Content` outputs into Python values for ``output.result``.
Expand Down
Loading
Loading