diff --git a/apps/dev-playground/client/src/routes/agent.route.tsx b/apps/dev-playground/client/src/routes/agent.route.tsx
index 6762a1a38..22f860b08 100644
--- a/apps/dev-playground/client/src/routes/agent.route.tsx
+++ b/apps/dev-playground/client/src/routes/agent.route.tsx
@@ -1,5 +1,11 @@
import { getPluginClientConfig } from "@databricks/appkit-ui/js";
-import { Button } from "@databricks/appkit-ui/react";
+import type { AgentTurnItem } from "@databricks/appkit-ui/react";
+import {
+ Button,
+ Collapsible,
+ CollapsibleContent,
+ CollapsibleTrigger,
+} from "@databricks/appkit-ui/react";
import { createFileRoute } from "@tanstack/react-router";
import { useCallback, useEffect, useRef, useState } from "react";
@@ -39,7 +45,113 @@ interface SSEEvent {
interface ChatMessage {
id: number;
role: "user" | "assistant";
+ /** User text, or — for assistant turns — the terminal answer text. */
content: string;
+ /**
+ * For assistant turns: the ordered per-round trace (intermediate draft
+ * messages, tool calls, tool results, and the final answer). Rendered as a
+ * collapsible "Steps" section above the answer. Undefined for user turns.
+ */
+ items?: AgentTurnItem[];
+}
+
+/** Parse a wire JSON string, falling back to the raw value. */
+function parseMaybeJson(value: string | undefined): unknown {
+ if (value === undefined || value === "") return value;
+ try {
+ return JSON.parse(value);
+ } catch {
+ return value;
+ }
+}
+
+/** Text of the last `message` item — the terminal answer. */
+function lastMessageText(items: AgentTurnItem[]): string {
+ for (let i = items.length - 1; i >= 0; i--) {
+ const it = items[i];
+ if (it.kind === "message") return it.text;
+ }
+ return "";
+}
+
+function summarizeArgs(args: unknown): string {
+ if (args === undefined || args === "") return "";
+ const s = typeof args === "string" ? args : JSON.stringify(args);
+ return s.length > 80 ? `${s.slice(0, 80)}…` : s;
+}
+
+function summarizeOutput(output: unknown): string {
+ const s = typeof output === "string" ? output : JSON.stringify(output);
+ return s.length > 200 ? `${s.slice(0, 200)}…` : s;
+}
+
+/**
+ * Renders an assistant turn: collapsible intermediate steps (every item before
+ * the last `message`) plus the terminal answer streamed live below.
+ */
+function AssistantTurn({ items }: { items: AgentTurnItem[] }) {
+ const lastMessageIdx = (() => {
+ for (let i = items.length - 1; i >= 0; i--) {
+ if (items[i].kind === "message") return i;
+ }
+ return -1;
+ })();
+ const steps = lastMessageIdx >= 0 ? items.slice(0, lastMessageIdx) : items;
+ const answer = lastMessageIdx >= 0 ? lastMessageText(items) : "";
+
+ return (
+
+ {steps.length > 0 && (
+
+
+ Steps ({steps.length})
+
+
+ {steps.map((it) => {
+ if (it.kind === "message") {
+ return (
+
+ {it.text}
+
+ );
+ }
+ if (it.kind === "tool_call") {
+ return (
+
+ {it.name}
+
+ ({summarizeArgs(it.args)})
+
+
+ );
+ }
+ return (
+
+ {it.error ? `error: ${it.error}` : summarizeOutput(it.output)}
+
+ );
+ })}
+
+
+ )}
+
+
+ );
}
interface PendingApproval {
@@ -138,7 +250,7 @@ function useAutocomplete(enabled: boolean) {
function AgentRoute() {
const [messages, setMessages] = useState([]);
- const [events, setEvents] = useState([]);
+ const [events, setEvents] = useState([]);
const [input, setInput] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [threadId, setThreadId] = useState(null);
@@ -232,9 +344,31 @@ function AgentRoute() {
if (!reader) return;
const decoder = new TextDecoder();
- let assistantContent = "";
+ // Per-round trace items for this assistant turn, keyed by wire item id.
+ const turnItems: AgentTurnItem[] = [];
+ const assistantId = ++msgIdCounter.current;
let buffer = "";
+ const flushAssistant = () => {
+ const snapshot = turnItems.map((x) => ({ ...x }));
+ setMessages((prev) => {
+ const updated = [...prev];
+ const last = updated[updated.length - 1];
+ const next: ChatMessage = {
+ id: assistantId,
+ role: "assistant",
+ content: lastMessageText(snapshot),
+ items: snapshot,
+ };
+ if (last?.role === "assistant" && last.id === assistantId) {
+ updated[updated.length - 1] = next;
+ } else {
+ updated.push(next);
+ }
+ return updated;
+ });
+ };
+
while (true) {
const { done, value } = await reader.read();
if (done) break;
@@ -273,26 +407,81 @@ function AgentRoute() {
setThreadId(event.data.threadId as string);
}
- if (event.type === "response.output_text.delta" && event.delta) {
- assistantContent += event.delta;
- setMessages((prev) => {
- const updated = [...prev];
- const last = updated[updated.length - 1];
- if (last?.role === "assistant") {
- updated[updated.length - 1] = {
- ...last,
- content: assistantContent,
- };
- } else {
- updated.push({
- id: ++msgIdCounter.current,
- role: "assistant",
- content: assistantContent,
- });
+ // Build the ordered per-round item list. The translator emits
+ // items in output_index order, so appending on `added` keeps it
+ // ordered. Each ReAct round's draft is its own message item, so the
+ // duplicated drafts surface as collapsible steps, not the answer.
+ let changed = false;
+ const it = event.item;
+ if (event.type === "response.output_item.added") {
+ if (it?.type === "message" && it.id) {
+ turnItems.push({
+ kind: "message",
+ id: it.id,
+ text: "",
+ status: "in_progress",
+ });
+ changed = true;
+ } else if (it?.type === "function_call") {
+ turnItems.push({
+ kind: "tool_call",
+ id: it.id ?? it.call_id ?? `fc_${turnItems.length}`,
+ callId: it.call_id ?? "",
+ name: it.name ?? "",
+ args: parseMaybeJson(it.arguments),
+ status: "in_progress",
+ });
+ changed = true;
+ } else if (it?.type === "function_call_output") {
+ turnItems.push({
+ kind: "tool_result",
+ id: it.id ?? `fc_output_${turnItems.length}`,
+ callId: it.call_id ?? "",
+ output: parseMaybeJson(it.output),
+ });
+ changed = true;
+ }
+ } else if (event.type === "response.output_item.done") {
+ if (it?.type === "function_call") {
+ for (let i = turnItems.length - 1; i >= 0; i--) {
+ const t = turnItems[i];
+ if (
+ t.kind === "tool_call" &&
+ (t.callId === it.call_id || t.id === it.id)
+ ) {
+ t.status = "completed";
+ if (it.arguments !== undefined) {
+ t.args = parseMaybeJson(it.arguments);
+ }
+ changed = true;
+ break;
+ }
}
- return updated;
- });
+ } else if (it?.type === "message" && it.id) {
+ for (let i = turnItems.length - 1; i >= 0; i--) {
+ const t = turnItems[i];
+ if (t.kind === "message" && t.id === it.id) {
+ t.status = "completed";
+ changed = true;
+ break;
+ }
+ }
+ }
+ } else if (
+ event.type === "response.output_text.delta" &&
+ event.delta
+ ) {
+ for (let i = turnItems.length - 1; i >= 0; i--) {
+ const t = turnItems[i];
+ if (t.kind === "message" && t.id === event.item_id) {
+ t.text += event.delta;
+ changed = true;
+ break;
+ }
+ }
}
+
+ if (changed) flushAssistant();
} catch {
// skip malformed events
}
@@ -368,15 +557,21 @@ function AgentRoute() {
key={msg.id}
className={`flex ${msg.role === "user" ? "justify-end" : "justify-start"}`}
>
-
+ {msg.role === "assistant" && msg.items ? (
+
+ ) : (
+
+ )}
))}
diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts
index c066d35f8..1be41d381 100644
--- a/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts
+++ b/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts
@@ -51,6 +51,7 @@ describe("useAgentChat", () => {
const { result } = renderHook(() => useAgentChat({ agent: "helper" }));
expect(result.current.content).toBe("");
+ expect(result.current.items).toEqual([]);
expect(result.current.events).toEqual([]);
expect(result.current.threadId).toBeNull();
expect(result.current.isStreaming).toBe(false);
@@ -100,20 +101,155 @@ describe("useAgentChat", () => {
await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined());
await act(async () => {
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 0,
+ item: { type: "message", id: "msg_1" },
+ }),
+ );
await emit(
JSON.stringify({
type: "response.output_text.delta",
+ item_id: "msg_1",
delta: "Hello, ",
}),
);
await emit(
- JSON.stringify({ type: "response.output_text.delta", delta: "world" }),
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_1",
+ delta: "world",
+ }),
);
});
expect(result.current.content).toBe("Hello, world");
});
+ test("builds an ordered items list and content = the LAST message", async () => {
+ const { result } = renderHook(() => useAgentChat({ agent: "helper" }));
+
+ act(() => {
+ void result.current.send("hi");
+ });
+ await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined());
+
+ await act(async () => {
+ // Round 1: a draft message item (the model's duplicate draft answer).
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 0,
+ item: { type: "message", id: "msg_1" },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_1",
+ delta: "draft answer",
+ }),
+ );
+ // A tool call + result between the two rounds.
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 1,
+ item: {
+ type: "function_call",
+ id: "fc_1",
+ call_id: "call_1",
+ name: "get_weather",
+ arguments: '{"city":"SF"}',
+ },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.done",
+ output_index: 1,
+ item: {
+ type: "function_call",
+ id: "fc_1",
+ call_id: "call_1",
+ name: "get_weather",
+ arguments: '{"city":"SF"}',
+ },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 2,
+ item: {
+ type: "function_call_output",
+ id: "fco_1",
+ call_id: "call_1",
+ output: '{"temp":72}',
+ },
+ }),
+ );
+ // Terminal round: the real answer, streamed live in two deltas.
+ await emit(
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 3,
+ item: { type: "message", id: "msg_2" },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_2",
+ delta: "It's ",
+ }),
+ );
+ });
+
+ // Deltas stream into the right item live, and content tracks the LAST
+ // message — never concatenating the round-1 draft.
+ expect(result.current.content).toBe("It's ");
+ expect(result.current.items.map((it) => it.kind)).toEqual([
+ "message",
+ "tool_call",
+ "tool_result",
+ "message",
+ ]);
+
+ await act(async () => {
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_2",
+ delta: "72 in SF.",
+ }),
+ );
+ });
+
+ expect(result.current.content).toBe("It's 72 in SF.");
+
+ const items = result.current.items;
+ const draft = items[0];
+ const toolCall = items[1];
+ const toolResult = items[2];
+ const answer = items[3];
+ expect(draft).toMatchObject({ kind: "message", text: "draft answer" });
+ expect(toolCall).toMatchObject({
+ kind: "tool_call",
+ name: "get_weather",
+ callId: "call_1",
+ status: "completed",
+ args: { city: "SF" },
+ });
+ expect(toolResult).toMatchObject({
+ kind: "tool_result",
+ callId: "call_1",
+ output: { temp: 72 },
+ });
+ expect(answer).toMatchObject({ kind: "message", text: "It's 72 in SF." });
+ });
+
test("captures threadId from appkit.metadata and reuses it on next send()", async () => {
const { result } = renderHook(() => useAgentChat({ agent: "helper" }));
@@ -207,7 +343,18 @@ describe("useAgentChat", () => {
await act(async () => {
await emit(
- JSON.stringify({ type: "response.output_text.delta", delta: "x" }),
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 0,
+ item: { type: "message", id: "msg_1" },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_1",
+ delta: "x",
+ }),
);
});
@@ -231,12 +378,25 @@ describe("useAgentChat", () => {
await emit("[DONE]");
await emit("");
await emit(
- JSON.stringify({ type: "response.output_text.delta", delta: "ok" }),
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 0,
+ item: { type: "message", id: "msg_1" },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_1",
+ delta: "ok",
+ }),
);
});
expect(result.current.content).toBe("ok");
- expect(onEvent).toHaveBeenCalledTimes(1);
+ // Two well-formed events parsed (the message item + its delta); the three
+ // malformed/empty payloads were skipped before reaching onEvent.
+ expect(onEvent).toHaveBeenCalledTimes(2);
});
test("isStreaming toggles around the connectSSE lifecycle", async () => {
@@ -269,7 +429,18 @@ describe("useAgentChat", () => {
JSON.stringify({ type: "appkit.metadata", data: { threadId: "t-1" } }),
);
await emit(
- JSON.stringify({ type: "response.output_text.delta", delta: "x" }),
+ JSON.stringify({
+ type: "response.output_item.added",
+ output_index: 0,
+ item: { type: "message", id: "msg_1" },
+ }),
+ );
+ await emit(
+ JSON.stringify({
+ type: "response.output_text.delta",
+ item_id: "msg_1",
+ delta: "x",
+ }),
);
});
diff --git a/packages/appkit-ui/src/react/hooks/index.ts b/packages/appkit-ui/src/react/hooks/index.ts
index 63b639761..303265445 100644
--- a/packages/appkit-ui/src/react/hooks/index.ts
+++ b/packages/appkit-ui/src/react/hooks/index.ts
@@ -24,6 +24,7 @@ export type {
} from "./types";
export {
type AgentChatEvent,
+ type AgentTurnItem,
type UseAgentChatOptions,
type UseAgentChatResult,
useAgentChat,
diff --git a/packages/appkit-ui/src/react/hooks/use-agent-chat.ts b/packages/appkit-ui/src/react/hooks/use-agent-chat.ts
index 684284a60..e26d356a6 100644
--- a/packages/appkit-ui/src/react/hooks/use-agent-chat.ts
+++ b/packages/appkit-ui/src/react/hooks/use-agent-chat.ts
@@ -1,6 +1,45 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { connectSSE } from "@/js";
+/**
+ * Parse a wire string as JSON, returning the raw value (or `undefined`) when
+ * it isn't valid JSON. Tool `arguments`/`output` arrive as serialized strings
+ * on the Responses API; we surface the parsed value when possible so item
+ * consumers don't each re-parse.
+ */
+function parseMaybeJson(value: string | undefined): unknown {
+ if (value === undefined || value === "") return value;
+ try {
+ return JSON.parse(value);
+ } catch {
+ return value;
+ }
+}
+
+/** Find the last element matching `pred` (no `Array.prototype.findLast` dep). */
+function findLast(
+ arr: T[],
+ pred: (value: T) => value is S,
+): S | undefined {
+ for (let i = arr.length - 1; i >= 0; i--) {
+ if (pred(arr[i])) return arr[i] as S;
+ }
+ return undefined;
+}
+
+/**
+ * Text of the last `message` item — the terminal answer of the turn. Streams
+ * live as that message's deltas arrive; earlier (superseded draft) messages
+ * are ignored.
+ */
+function lastMessageText(list: AgentTurnItem[]): string {
+ for (let i = list.length - 1; i >= 0; i--) {
+ const it = list[i];
+ if (it.kind === "message") return it.text;
+ }
+ return "";
+}
+
/**
* One Responses-API-shaped event yielded by the agents plugin SSE stream.
*
@@ -39,6 +78,41 @@ export interface AgentChatEvent {
annotations?: Record;
}
+/**
+ * One ordered item in an agent turn, derived from the Responses-API output
+ * items the agents plugin streams. Each ReAct round's assistant text is a
+ * distinct `message` item; tool calls and their results sit between rounds.
+ *
+ * The list is ordered by the wire `output_index`, so the LAST `message` item
+ * is the terminal answer and everything before it is intermediate "thinking"
+ * (draft messages the model emitted alongside its tool calls, plus the tool
+ * calls/results themselves).
+ *
+ * @see {@link UseAgentChatResult.items}
+ */
+export type AgentTurnItem =
+ | {
+ kind: "message";
+ id: string;
+ text: string;
+ status: "in_progress" | "completed";
+ }
+ | {
+ kind: "tool_call";
+ id: string;
+ callId: string;
+ name: string;
+ args: unknown;
+ status: "in_progress" | "completed";
+ }
+ | {
+ kind: "tool_result";
+ id: string;
+ callId: string;
+ output: unknown;
+ error?: string;
+ };
+
export interface UseAgentChatOptions {
/**
* Agent name registered with the `agents()` plugin (e.g. `"assistant"`,
@@ -62,8 +136,24 @@ export interface UseAgentChatOptions {
}
export interface UseAgentChatResult {
- /** Accumulated assistant text from `response.output_text.delta` events. */
+ /**
+ * Text of the FINAL assistant message item — the terminal answer of the
+ * turn, updated live as its deltas stream in. With OpenAI-compatible Claude
+ * the model emits a full draft answer alongside its tool calls on every
+ * ReAct round; `content` surfaces only the last round's message, so it never
+ * shows those duplicated drafts. For the full per-round structure (drafts,
+ * tool calls, tool results) read {@link items} instead.
+ */
content: string;
+ /**
+ * Ordered list of turn items derived from the Responses-API output items,
+ * keyed by wire `output_index`. Render everything before the last `message`
+ * item as collapsible intermediate steps ("thinking" + tool calls/results)
+ * and the last `message` item as the prominent answer. Messages stream in
+ * live; tool calls flip to `completed` on their `done` event. Reset at the
+ * start of each {@link send} and by {@link reset}.
+ */
+ items: AgentTurnItem[];
/**
* Every parsed event, in order. Provided for components that need to
* render historical tool calls or replay state after a remount —
@@ -104,25 +194,35 @@ export interface UseAgentChatResult {
* The hook is intentionally lower-level than a full chat component —
* it owns one stream at a time, not a multi-turn message history. The
* caller composes its own messages array (typically a `useState`) and
- * appends to it via the `onEvent` callback for tool calls and via the
- * `content` field for assistant text.
+ * appends to it from the structured {@link UseAgentChatResult.items} list
+ * (drafts, tool calls, tool results, and the terminal answer) or, for the
+ * common case, just reads {@link UseAgentChatResult.content} (the final
+ * answer text, streamed live and de-duplicated across ReAct rounds).
*
* @example
* ```tsx
* function Chat({ agent }: { agent: string }) {
- * const [messages, setMessages] = useState([]);
- * const { content, threadId, isStreaming, send, reset } = useAgentChat({
- * agent,
- * onEvent(ev) {
- * if (ev.type === "response.output_item.added" && ev.item?.type === "function_call") {
- * setMessages((m) => [...m, { role: "tool", name: ev.item?.name, args: ev.item?.arguments }]);
- * }
- * },
- * });
- * // `content` reflects the latest assistant turn; reset() between conversations.
- * // ...
+ * const { content, items, threadId, isStreaming, send, reset } = useAgentChat({ agent });
+ * const steps = items.slice(0, -1); // everything before the terminal answer
+ * return (
+ * <>
+ * {steps.length > 0 && (
+ *
+ * Steps
+ * {steps.map((it) =>
+ * it.kind === "message" ? {it.text}
:
+ * it.kind === "tool_call" ? {it.name} :
+ * {String(it.output)},
+ * )}
+ *
+ * )}
+ * {content}
+ * >
+ * );
* }
* ```
+ *
+ * `content` is the terminal answer (streams live, de-duplicated across rounds).
*/
export function useAgentChat({
agent,
@@ -130,6 +230,7 @@ export function useAgentChat({
onEvent,
}: UseAgentChatOptions): UseAgentChatResult {
const [content, setContent] = useState("");
+ const [items, setItems] = useState([]);
const [events, setEvents] = useState([]);
const [threadId, setThreadId] = useState(null);
const [isStreaming, setIsStreaming] = useState(false);
@@ -139,7 +240,9 @@ export function useAgentChat({
// `onEvent`: `send` is a stable callback that reads the latest
// threadId/onEvent without re-mounting connectSSE on every render.
const threadIdRef = useRef(null);
- const contentRef = useRef("");
+ // Working copy of the ordered turn items, keyed by wire `item_id`. Mutated
+ // as events arrive, then mirrored into `items`/`content` state.
+ const itemsRef = useRef([]);
const onEventRef = useRef(onEvent);
onEventRef.current = onEvent;
const abortControllerRef = useRef(null);
@@ -148,8 +251,9 @@ export function useAgentChat({
abortControllerRef.current?.abort();
abortControllerRef.current = null;
threadIdRef.current = null;
- contentRef.current = "";
+ itemsRef.current = [];
setContent("");
+ setItems([]);
setEvents([]);
setThreadId(null);
setIsStreaming(false);
@@ -163,8 +267,9 @@ export function useAgentChat({
const controller = new AbortController();
abortControllerRef.current = controller;
- contentRef.current = "";
+ itemsRef.current = [];
setContent("");
+ setItems([]);
setEvents([]);
setError(null);
setIsStreaming(true);
@@ -217,12 +322,94 @@ export function useAgentChat({
threadIdRef.current = tid;
setThreadId(tid);
}
+ return;
+ }
+
+ // Reduce the Responses-API wire events into the ordered item list.
+ // The translator emits items in `output_index` order, so simply
+ // appending on `added` keeps the list ordered.
+ const list = itemsRef.current;
+ let changed = false;
+
+ if (event.type === "response.output_item.added") {
+ const it = event.item;
+ if (it?.type === "message" && it.id) {
+ list.push({
+ kind: "message",
+ id: it.id,
+ text: "",
+ status: "in_progress",
+ });
+ changed = true;
+ } else if (it?.type === "function_call") {
+ list.push({
+ kind: "tool_call",
+ id: it.id ?? it.call_id ?? `fc_${list.length}`,
+ callId: it.call_id ?? "",
+ name: it.name ?? "",
+ args: parseMaybeJson(it.arguments),
+ status: "in_progress",
+ });
+ changed = true;
+ } else if (it?.type === "function_call_output") {
+ list.push({
+ kind: "tool_result",
+ id: it.id ?? `fc_output_${list.length}`,
+ callId: it.call_id ?? "",
+ output: parseMaybeJson(it.output),
+ });
+ changed = true;
+ }
+ } else if (event.type === "response.output_item.done") {
+ const it = event.item;
+ if (it?.type === "function_call") {
+ const callId = it.call_id ?? "";
+ const target = findLast(
+ list,
+ (x): x is Extract =>
+ x.kind === "tool_call" &&
+ (x.callId === callId || x.id === it.id),
+ );
+ if (target) {
+ target.status = "completed";
+ if (it.arguments !== undefined) {
+ target.args = parseMaybeJson(it.arguments);
+ }
+ if (it.name) target.name = it.name;
+ changed = true;
+ }
+ } else if (it?.type === "message" && it.id) {
+ const target = findLast(
+ list,
+ (x): x is Extract =>
+ x.kind === "message" && x.id === it.id,
+ );
+ if (target) {
+ target.status = "completed";
+ changed = true;
+ }
+ }
} else if (
event.type === "response.output_text.delta" &&
typeof event.delta === "string"
) {
- contentRef.current += event.delta;
- setContent(contentRef.current);
+ const target = findLast(
+ list,
+ (x): x is Extract =>
+ x.kind === "message" && x.id === event.item_id,
+ );
+ if (target) {
+ target.text += event.delta;
+ changed = true;
+ }
+ }
+
+ if (changed) {
+ // Clone so React sees a new reference; item objects are mutated
+ // in place in the ref, so shallow-copy them too.
+ const snapshot = list.map((x) => ({ ...x }));
+ setItems(snapshot);
+ setContent(lastMessageText(snapshot));
}
},
onError: (err) => {
@@ -253,6 +440,7 @@ export function useAgentChat({
return {
content,
+ items,
events,
threadId,
isStreaming,
diff --git a/packages/appkit/src/core/agent/consume-adapter-stream.ts b/packages/appkit/src/core/agent/consume-adapter-stream.ts
index c4f3d07ed..d7885cebf 100644
--- a/packages/appkit/src/core/agent/consume-adapter-stream.ts
+++ b/packages/appkit/src/core/agent/consume-adapter-stream.ts
@@ -19,16 +19,30 @@ interface ConsumeAdapterStreamOptions {
/**
* Consume an adapter's event stream and aggregate the assistant's final text.
*
- * Accumulation rule (shared across all agent-execution paths in AppKit):
+ * Per-round rule (shared across all agent-execution paths in AppKit):
*
- * - `message_delta` events append their `content` to the running text.
- * - A `message` event *replaces* the running text with its `content`.
+ * - `message_delta` events append their `content` to the message currently
+ * being built.
+ * - A `message` event *replaces* the message currently being built with its
+ * `content` (LangChain's `on_chain_end` path emits a single final message).
+ * - A `tool_call` / `tool_result` event closes the current message: any text
+ * accumulated so far is a *draft* that the model emitted alongside its tool
+ * calls, so it is set aside as "the last closed message" and the buffer is
+ * reset for the next ReAct round.
*
- * The two branches coexist because different adapters emit different shapes:
- * streaming adapters (Databricks, Vercel AI) emit deltas chunk-by-chunk,
- * while `LangChain`'s `on_chain_end` path emits a single final `message`.
- * Without the replace branch, LangChain conversations silently dropped the
- * assistant turn from thread history.
+ * The return value is the text of the message currently open at end-of-stream
+ * (the terminal answer in a normal ReAct loop), falling back to the last
+ * closed message if the stream ended mid-tool-calling (e.g. `maxSteps`
+ * exhausted right after a tool call).
+ *
+ * Why per-round and not flat concatenation: with OpenAI-compatible Claude on
+ * Databricks Model Serving, the model emits a full draft answer ALONGSIDE its
+ * tool calls on every round, so a naive `text += content` accumulation
+ * surfaces that draft 3-4× in the final text. Only the final round's message
+ * is the real answer; each tool call marks a round boundary. The
+ * `AgentEventTranslator` already item-bounds these rounds on the wire — this
+ * loop mirrors that boundary so server-side consumers (thread history, the
+ * non-streaming JSON `fullContent`, `runAgent`, and sub-agents) dedupe too.
*
* Kept pure (no I/O, no mutable external state beyond the caller's `onEvent`
* side effect) so each execution path — HTTP streaming, sub-agents, and the
@@ -38,15 +52,24 @@ export async function consumeAdapterStream(
stream: AsyncIterable,
opts: ConsumeAdapterStreamOptions = {},
): Promise {
- let text = "";
+ let current = ""; // text of the message currently being built
+ let lastClosed = ""; // most recent fully-closed message
for await (const event of stream) {
if (opts.signal?.aborted) break;
if (event.type === "message_delta") {
- text += event.content;
+ current += event.content;
} else if (event.type === "message") {
- text = event.content;
+ // LangChain single-final replace — preserve.
+ current = event.content;
+ } else if (event.type === "tool_call" || event.type === "tool_result") {
+ // A draft followed by a tool call is superseded by the next round.
+ if (current) {
+ lastClosed = current;
+ current = "";
+ }
}
opts.onEvent?.(event);
}
- return text;
+ // Terminal answer, or the last draft if maxSteps exhausted mid-tool-calling.
+ return current || lastClosed;
}
diff --git a/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts b/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts
index 98863a62a..740be33c2 100644
--- a/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts
+++ b/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts
@@ -31,6 +31,64 @@ describe("consumeAdapterStream", () => {
expect(text).toBe("final answer");
});
+ test("multi-round ReAct: returns the terminal message, not the drafts", async () => {
+ // OpenAI-compatible Claude emits a full draft answer alongside its tool
+ // call on every round. Only the final round's message is the answer.
+ const text = await consumeAdapterStream(
+ streamOf([
+ // Round 1: draft + tool call
+ { type: "message_delta", content: "The answer is 42." },
+ { type: "tool_call", callId: "c1", name: "lookup", args: {} },
+ { type: "tool_result", callId: "c1", result: "ok" },
+ // Round 2: another draft + tool call
+ { type: "message_delta", content: "The answer is 42." },
+ { type: "tool_call", callId: "c2", name: "verify", args: {} },
+ { type: "tool_result", callId: "c2", result: "ok" },
+ // Terminal round: the real answer, streamed live
+ { type: "message_delta", content: "The answer " },
+ { type: "message_delta", content: "is 42." },
+ ]),
+ );
+ expect(text).toBe("The answer is 42.");
+ });
+
+ test("maxSteps exhausted mid-tool-calling: returns the last draft", async () => {
+ // Stream ends right after a tool_call following a draft — no terminal
+ // message was produced, so fall back to the last closed message.
+ const text = await consumeAdapterStream(
+ streamOf([
+ { type: "message_delta", content: "Working on it…" },
+ { type: "tool_call", callId: "c1", name: "lookup", args: {} },
+ ]),
+ );
+ expect(text).toBe("Working on it…");
+ });
+
+ test("LangChain single `message` after tool calls still replaces", async () => {
+ const text = await consumeAdapterStream(
+ streamOf([
+ { type: "message_delta", content: "draft" },
+ { type: "tool_call", callId: "c1", name: "lookup", args: {} },
+ { type: "tool_result", callId: "c1", result: "ok" },
+ { type: "message", content: "final answer" },
+ ]),
+ );
+ expect(text).toBe("final answer");
+ });
+
+ test("mixed: deltas then a tool_call then a fresh terminal delta", async () => {
+ const text = await consumeAdapterStream(
+ streamOf([
+ { type: "message_delta", content: "thinking " },
+ { type: "message_delta", content: "out loud" },
+ { type: "tool_call", callId: "c1", name: "lookup", args: {} },
+ { type: "tool_result", callId: "c1", result: "ok" },
+ { type: "message_delta", content: "done" },
+ ]),
+ );
+ expect(text).toBe("done");
+ });
+
test("invokes onEvent once per event, in order, with the raw event", async () => {
const seen: AgentEvent[] = [];
await consumeAdapterStream(