#!/usr/bin/env python3
"""Standalone repro: HITL approval result not persisted into the AG-UI snapshot
=> provider 400 on the NEXT run.
pip install "agent-framework-ag-ui>=1.0.0rc4" "agent-framework-openai>=1.8.1" uvicorn httpx
az login # keyless; or set AZURE_OPENAI_API_KEY
export AZURE_OPENAI_ENDPOINT="https://<account>.services.ai.azure.com"
export AZURE_OPENAI_MODEL="gpt-4.1" # your chat deployment name
python repro_hitl_400.py
Expected: STEP 3 prints RUN_ERROR=True and the server logs a 400
"... tool_call_ids did not have response messages: call_...".
"""
from __future__ import annotations
import json
import os
import threading
import time
import httpx
import uvicorn
from fastapi import FastAPI
from agent_framework import tool
from agent_framework.ag_ui import add_agent_framework_fastapi_endpoint
from agent_framework_openai import OpenAIChatCompletionClient
from azure.identity import DefaultAzureCredential
_VALUE = {"v": 100}
@tool
async def get_value() -> str:
"""Return the current value."""
return json.dumps({"value": _VALUE["v"]})
@tool(approval_mode="always_require")
async def apply_delta(delta: float) -> str:
"""Change the value by delta. Requires human approval."""
_VALUE["v"] = round(_VALUE["v"] + float(delta), 4)
return json.dumps({"status": "ok", "value": _VALUE["v"]})
client = OpenAIChatCompletionClient(
model=os.environ["AZURE_OPENAI_MODEL"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
credential=DefaultAzureCredential(),
)
agent = client.as_agent(
name="repro",
instructions=(
"You manage one numeric value. When the user asks to change it, call "
"apply_delta(delta) IMMEDIATELY — do not call get_value first and do not "
"ask for confirmation in text; apply_delta is approval-gated by the system."
),
tools=[get_value, apply_delta],
)
app = FastAPI()
add_agent_framework_fastapi_endpoint(app, agent, "/")
BASE = "http://127.0.0.1:8099/"
def _post(body: dict) -> str:
with httpx.Client(timeout=60) as c:
return c.post(BASE, json=body).text
def _snapshot(blob: str):
i = blob.find('"type":"MESSAGES_SNAPSHOT"')
if i < 0:
return None
j = blob.rfind("{", 0, i)
depth, k, instr, esc = 0, j, False, False
while k < len(blob):
ch = blob[k]
if instr:
esc = (ch == "\\") and not esc
if ch == '"' and not esc:
instr = False
elif ch == '"':
instr = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
try:
return json.loads(blob[j:k + 1])
except Exception:
return None
k += 1
return None
def _find_confirm(snap: dict):
for m in snap.get("messages", []):
for tc in m.get("toolCalls", []) or []:
if tc.get("function", {}).get("name") == "confirm_changes":
return tc["id"], json.loads(tc["function"].get("arguments", "{}"))
return None, None
def drive() -> None:
import os as _os
time.sleep(3)
# 1) action -> pause on confirm_changes
b1 = _post({"threadId": "r", "runId": "r1", "messages": [
{"id": "m", "role": "user", "content": "Add 5 to the value now by calling apply_delta. Do not call get_value."}],
"tools": [], "context": [], "state": {}})
snap = _snapshot(b1) or {}
cid, args = _find_confirm(snap)
print(f"STEP 1: paused, confirm_changes id={cid!r}", flush=True)
# 2) approve -> gated tool executes
appr = {"id": "a", "role": "tool", "toolCallId": cid,
"content": json.dumps({"accepted": True, "steps": (args or {}).get("steps")})}
b2 = _post({"threadId": "r", "runId": "r2",
"messages": snap["messages"] + [appr],
"tools": [], "context": [], "state": {}})
print(f"STEP 2 (approve): RUN_ERROR={'RUN_ERROR' in b2}", flush=True)
# 3) replay the POST-APPROVAL snapshot as a fresh run (what a client does next)
snap2 = _snapshot(b2) or snap
# Show the defect directly: the gated tool call has only an {"accepted": true}
# payload, no real tool result, in the replayed history.
tool_msgs = [m for m in snap2["messages"] if m.get("role") == "tool"]
print("STEP 3 history tool messages:",
[str(m.get("content", ""))[:40] for m in tool_msgs], flush=True)
b3 = _post({"threadId": "r", "runId": "r3",
"messages": snap2["messages"] + [
{"id": "u2", "role": "user", "content": "What is the value now?"}],
"tools": [], "context": [], "state": {}})
failed = "RUN_ERROR" in b3
print(f"STEP 3 (replay): RUN_ERROR={failed} <-- expected True on rc4", flush=True)
print("VERDICT:", "REPRODUCED (snapshot missing the executed tool result)"
if failed else "not reproduced", flush=True)
_os._exit(0)
if __name__ == "__main__":
threading.Thread(target=drive, daemon=True).start()
uvicorn.run(app, host="127.0.0.1", port=8099, log_level="warning")
Summary
When a tool is registered with
approval_mode="always_require", the human-in-the-loop (HITL) approve → execute path works for the current run, but the executed tool result is not journaled into the AG-UIMESSAGES_SNAPSHOT(flow.tool_results). The post-approval snapshot keeps only the raw approval payload{"accepted": true, ...}. On the next run, when the client (e.g. CopilotKit) replays that history, the assistant message'stool_callshas no matching tool message, so Azure/OpenAI Chat Completions returns:Versions
agent-framework-ag-ui==1.0.0rc4(latest release)agent-framework-openai==1.8.1(latest release)DefaultAzureCredential).Why this isn't already covered
mainby PR Python: Add opt-in AG-UI thread snapshot persistence and hydration #6471 (_clean_resolved_approvals_from_snapshot), merged ~3 days after rc4 was cut, as part of a snapshot-persistence feature (closing Python: sync AG-UI conversation history from backend #2458). So:Deterministic reproduction (no UI; one file)
repro_hitl_400.pyObserved output (3/3 runs, gpt-4.1)
Server log on STEP 3:
Expected
After an approved HITL tool executes, the AG-UI snapshot/history should contain the tool result message paired with the assistant
tool_calls, so replaying the conversation is a valid provider history.Ask
_clean_resolved_approvals_from_snapshotis invoked in the run path.(Reproduced and reported while building a CopilotKit + AG-UI + Foundry HITL app; happy to test a fix.)