Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/packages/ag-ui/agent_framework_ag_ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ._endpoint import add_agent_framework_fastapi_endpoint
from ._event_converters import AGUIEventConverter
from ._http_service import AGUIHttpService
from ._state import state_update
from ._types import AgentState, AGUIChatOptions, AGUIRequest, PredictStateConfig, RunMetadata
from ._workflow import AgentFrameworkWorkflow, WorkflowFactory

Expand All @@ -34,5 +35,6 @@
"PredictStateConfig",
"RunMetadata",
"DEFAULT_TAGS",
"state_update",
"__version__",
]
84 changes: 80 additions & 4 deletions python/packages/ag-ui/agent_framework_ag_ui/_run_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import json
import logging
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any, cast

Expand All @@ -31,6 +32,7 @@
from agent_framework import Content

from ._orchestration._predictive_state import PredictiveStateHandler
from ._state import TOOL_RESULT_STATE_KEY
from ._utils import generate_event_id, make_json_safe

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -233,16 +235,66 @@ def _emit_tool_call(
return events


def _extract_tool_result_state(content: Content) -> dict[str, Any] | None:
"""Extract a deterministic AG-UI state update from a tool-result ``Content``.

Tools using :func:`agent_framework_ag_ui.state_update` carry the state
payload in ``additional_properties[TOOL_RESULT_STATE_KEY]`` on the inner
text item produced by ``parse_result``. We also check the outer
function_result content's ``additional_properties`` for robustness.

If multiple items carry state, they are merged in order so later items
override earlier ones (plain ``dict.update`` semantics).

Returns:
The merged state dict to apply, or ``None`` if no state update is
present.
"""
merged: dict[str, Any] | None = None

outer_ap = getattr(content, "additional_properties", None) or {}
outer_state = outer_ap.get(TOOL_RESULT_STATE_KEY)
if isinstance(outer_state, dict):
merged = dict(outer_state)

for item in content.items or ():
item_ap = getattr(item, "additional_properties", None) or {}
item_state = item_ap.get(TOOL_RESULT_STATE_KEY)
if isinstance(item_state, dict):
if merged is None:
merged = dict(item_state)
else:
merged.update(item_state)

return merged


def _emit_tool_result_common(
call_id: str,
raw_result: Any,
flow: FlowState,
predictive_handler: PredictiveStateHandler | None = None,
*,
state_update: Mapping[str, Any] | None = None,
) -> list[BaseEvent]:
"""Shared helper for emitting ToolCallEnd + ToolCallResult events and performing FlowState cleanup.

Both ``_emit_tool_result`` (standard function results) and ``_emit_mcp_tool_result``
(MCP server tool results) delegate to this function.

Args:
call_id: Tool call identifier.
raw_result: The stringified tool result content sent back to the LLM.
flow: Current ``FlowState``.
predictive_handler: Optional predictive state handler driven by
``predict_state_config``.
state_update: Optional deterministic state snapshot produced by a tool
returning :func:`agent_framework_ag_ui.state_update`. When present,
it is merged into ``flow.current_state`` and a ``StateSnapshotEvent``
is emitted after the ``ToolCallResult`` event. When both
``predictive_handler`` and ``state_update`` are active, predictive
updates are applied first, then the deterministic merge, and a
single coalesced ``StateSnapshotEvent`` is emitted.
"""
events: list[BaseEvent] = []

Expand Down Expand Up @@ -271,8 +323,18 @@ def _emit_tool_result_common(

if predictive_handler:
predictive_handler.apply_pending_updates()
if flow.current_state:
events.append(StateSnapshotEvent(snapshot=flow.current_state))

if state_update:
flow.current_state.update(state_update)
logger.debug(
"Emitted deterministic tool-result StateSnapshotEvent for call_id=%s (keys=%s)",
call_id,
list(state_update.keys()),
)

# Emit a single coalesced snapshot when either mechanism updated state.
if (predictive_handler or state_update) and flow.current_state:
events.append(StateSnapshotEvent(snapshot=flow.current_state))

flow.tool_call_id = None
flow.tool_call_name = None
Expand All @@ -295,7 +357,14 @@ def _emit_tool_result(
if not content.call_id:
return []
raw_result = content.result if content.result is not None else ""
return _emit_tool_result_common(content.call_id, raw_result, flow, predictive_handler)
state_update = _extract_tool_result_state(content)
return _emit_tool_result_common(
content.call_id,
raw_result,
flow,
predictive_handler,
state_update=state_update,
)


def _emit_approval_request(
Expand Down Expand Up @@ -460,7 +529,14 @@ def _emit_mcp_tool_result(
logger.warning("MCP tool result content missing call_id, skipping")
return []
raw_output = content.output if content.output is not None else ""
return _emit_tool_result_common(content.call_id, raw_output, flow, predictive_handler)
state_update = _extract_tool_result_state(content)
return _emit_tool_result_common(
content.call_id,
raw_output,
flow,
predictive_handler,
state_update=state_update,
)


def _close_reasoning_block(flow: FlowState) -> list[BaseEvent]:
Expand Down
84 changes: 84 additions & 0 deletions python/packages/ag-ui/agent_framework_ag_ui/_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright (c) Microsoft. All rights reserved.

"""Deterministic tool-driven AG-UI state updates.

Tools wired into the :mod:`agent_framework_ag_ui` endpoint can push a
deterministic state update by returning :func:`state_update`. Unlike
``predict_state_config`` — which emits ``StateDeltaEvent``s optimistically from
LLM-predicted tool call arguments — ``state_update`` runs *after* the tool
executes, so the AG-UI state always reflects the tool's actual return value.

See issue https://github.com/microsoft/agent-framework/issues/3167 for the
motivating discussion.
"""

from __future__ import annotations

from collections.abc import Mapping
from typing import Any

from agent_framework import Content

__all__ = ["TOOL_RESULT_STATE_KEY", "state_update"]


TOOL_RESULT_STATE_KEY = "__ag_ui_tool_result_state__"
"""Reserved ``Content.additional_properties`` key used to carry a tool-driven
state snapshot from a tool return value through to the AG-UI emitter."""


def state_update(
text: str = "",
*,
state: Mapping[str, Any],
) -> Content:
"""Build a tool return value that deterministically updates AG-UI shared state.

Return the result of this helper from an agent tool to push a state update
to AG-UI clients using the actual tool output, rather than LLM-predicted
tool arguments.

When the AG-UI endpoint emits the tool result, it will:

* Forward ``text`` to the LLM as the normal ``function_result`` content.
* Merge ``state`` into ``FlowState.current_state``.
* Emit a deterministic ``StateSnapshotEvent`` after the ``ToolCallResult``
event so frontends observe the updated state deterministically. If
predictive state is enabled, a predictive snapshot may be emitted first.

Example:
.. code-block:: python

from agent_framework import tool
from agent_framework_ag_ui import state_update


@tool
async def get_weather(city: str) -> Content:
data = await _fetch_weather(city)
return state_update(
text=f"Weather in {city}: {data['temp']}°C {data['conditions']}",
state={"weather": {"city": city, **data}},
)

Args:
text: Text passed back to the LLM as the ``function_result`` content.
Defaults to an empty string for tools whose only output is a state
update.
state: A mapping merged into the AG-UI shared state via JSON-compatible
``dict.update`` semantics. Nested dicts are replaced, not deep-merged.

Returns:
A ``Content`` object with ``type="text"``. The state payload rides in
``additional_properties`` under :data:`TOOL_RESULT_STATE_KEY` and is
extracted by the AG-UI emitter.

Raises:
TypeError: If ``state`` is not a ``Mapping``.
"""
if not isinstance(state, Mapping):
raise TypeError(f"state_update() 'state' must be a Mapping, got {type(state).__name__}")
return Content.from_text(
text,
additional_properties={TOOL_RESULT_STATE_KEY: dict(state)},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright (c) Microsoft. All rights reserved.

"""Deterministic tool-driven AG-UI state example.

This sample demonstrates how a tool can push a *deterministic* state update
to the AG-UI frontend based on its actual return value — in contrast to
``predict_state_config`` which fires optimistically from LLM-predicted tool
call arguments. See issue https://github.com/microsoft/agent-framework/issues/3167.

The :func:`agent_framework_ag_ui.state_update` helper wraps a text result
together with a state snapshot. When a tool returns one of these, the AG-UI
endpoint merges the snapshot into the shared state and emits a
``StateSnapshotEvent`` after the tool result.
"""

from __future__ import annotations

from typing import Any

from agent_framework import Agent, Content, SupportsChatGetResponse, tool
from agent_framework.ag_ui import AgentFrameworkAgent

from agent_framework_ag_ui import state_update

# Simulated weather database — in the issue's motivating example the tool
# would instead call a real weather API.
_WEATHER_DB: dict[str, dict[str, Any]] = {
"seattle": {"temperature": 11, "conditions": "rainy", "humidity": 75},
"san francisco": {"temperature": 14, "conditions": "foggy", "humidity": 85},
"new york city": {"temperature": 18, "conditions": "sunny", "humidity": 60},
"miami": {"temperature": 29, "conditions": "hot and humid", "humidity": 90},
"chicago": {"temperature": 9, "conditions": "windy", "humidity": 65},
}


@tool
async def get_weather(location: str) -> Content:
"""Fetch current weather for a location and push it into AG-UI shared state.

Unlike ``predict_state_config`` — which derives state optimistically from
LLM-predicted tool call arguments — this tool uses ``state_update`` to
forward the *actual* fetched weather to the frontend. The ``text`` goes
back to the LLM as the normal tool result, and the ``state`` dict is merged
into the AG-UI shared state.

Args:
location: City name to look up.

Returns:
A :class:`Content` carrying both the LLM-visible text result and a
deterministic state snapshot.
"""
key = location.lower()
data = _WEATHER_DB.get(
key,
{"temperature": 21, "conditions": "partly cloudy", "humidity": 50},
)
weather_record = {"location": location, **data}
return state_update(
text=(
f"The weather in {location} is {data['conditions']} at "
f"{data['temperature']}°C with {data['humidity']}% humidity."
),
state={"weather": weather_record},
)


def weather_state_agent(client: SupportsChatGetResponse[Any]) -> AgentFrameworkAgent:
"""Create an AG-UI agent with a deterministic tool-driven state tool."""
agent = Agent[Any](
name="weather_state_agent",
instructions=(
"You are a weather assistant. When a user asks about the weather "
"in a city, call the get_weather tool and use its output to give a "
"friendly, concise reply. The tool also updates the shared UI state "
"so the frontend can render a weather card from the `weather` key."
),
client=client,
tools=[get_weather],
)

return AgentFrameworkAgent(
agent=agent,
name="WeatherStateAgent",
description="Weather agent that deterministically updates shared state from tool results.",
state_schema={
"weather": {
"type": "object",
"description": "Last fetched weather record",
},
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ..agents.task_steps_agent import task_steps_agent_wrapped
from ..agents.ui_generator_agent import ui_generator_agent
from ..agents.weather_agent import weather_agent
from ..agents.weather_state_agent import weather_state_agent

AnthropicClient: type[Any] | None
try:
Expand Down Expand Up @@ -141,6 +142,14 @@
path="/subgraphs",
)

# Deterministic Tool-Driven State - tool returns state_update() to push snapshot
# from actual tool output (see issue #3167).
add_agent_framework_fastapi_endpoint(
app=app,
agent=weather_state_agent(client),
path="/deterministic_state",
)


def main():
"""Run the server."""
Expand Down
Loading
Loading