From f2f3043d3a98eaefa609417a8071536325bbb70b Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Fri, 12 Jun 2026 18:17:19 +0200 Subject: [PATCH] fix(agents): dedupe ReAct draft text per-round while preserving live streaming (#421) With OpenAI-compatible Claude on Databricks Model Serving, the model emits a full draft answer ALONGSIDE its tool calls on every ReAct round, so the answer appeared 3-4x in the output. An earlier approach buffered text and flushed it only on the terminal turn, but that made the final answer arrive all-at-once instead of streaming. Root cause: the duplication was never in the adapter or wire protocol. The AgentEventTranslator already closes the current message item and opens a new one when a tool_call/tool_result arrives, so each round's text is a distinct Responses-API output item. Two CONSUMERS flattened those items by concatenating deltas across item boundaries. Three-layer fix that respects the existing item boundaries: - The Databricks adapter streams every round's text live again (this already matched main; no change needed). - consumeAdapterStream keeps only the terminal round: a draft followed by a tool_call is set aside as superseded, and the open message at end-of-stream (or the last draft if maxSteps exhausted mid-tool-calling) is returned. This centrally dedupes thread history, the non-streaming JSON fullContent, runAgent, and sub-agents. - use-agent-chat reduces the wire events into an ordered, per-output-item AgentTurnItem list while streaming live; `content` now tracks only the LAST message item (the terminal answer), so flat-API consumers dedupe too. New public hook API: the AgentTurnItem discriminated union (message / tool_call / tool_result) plus `items` on UseAgentChatResult. The dev-playground agent route uses it to render a collapsible "Steps" disclosure (intermediate drafts + tool-call/result chips) above the prominent, live-streaming answer. Supersedes the buffer-until-terminal approach in branch fix/421-adapter-text-dup. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- .../client/src/routes/agent.route.tsx | 255 +++++++++++++++--- .../hooks/__tests__/use-agent-chat.test.ts | 181 ++++++++++++- packages/appkit-ui/src/react/hooks/index.ts | 1 + .../src/react/hooks/use-agent-chat.ts | 226 ++++++++++++++-- .../src/core/agent/consume-adapter-stream.ts | 47 +++- .../tests/consume-adapter-stream.test.ts | 58 ++++ 6 files changed, 702 insertions(+), 66 deletions(-) diff --git a/apps/dev-playground/client/src/routes/agent.route.tsx b/apps/dev-playground/client/src/routes/agent.route.tsx index 6762a1a38..22f860b08 100644 --- a/apps/dev-playground/client/src/routes/agent.route.tsx +++ b/apps/dev-playground/client/src/routes/agent.route.tsx @@ -1,5 +1,11 @@ import { getPluginClientConfig } from "@databricks/appkit-ui/js"; -import { Button } from "@databricks/appkit-ui/react"; +import type { AgentTurnItem } from "@databricks/appkit-ui/react"; +import { + Button, + Collapsible, + CollapsibleContent, + CollapsibleTrigger, +} from "@databricks/appkit-ui/react"; import { createFileRoute } from "@tanstack/react-router"; import { useCallback, useEffect, useRef, useState } from "react"; @@ -39,7 +45,113 @@ interface SSEEvent { interface ChatMessage { id: number; role: "user" | "assistant"; + /** User text, or — for assistant turns — the terminal answer text. */ content: string; + /** + * For assistant turns: the ordered per-round trace (intermediate draft + * messages, tool calls, tool results, and the final answer). Rendered as a + * collapsible "Steps" section above the answer. Undefined for user turns. + */ + items?: AgentTurnItem[]; +} + +/** Parse a wire JSON string, falling back to the raw value. */ +function parseMaybeJson(value: string | undefined): unknown { + if (value === undefined || value === "") return value; + try { + return JSON.parse(value); + } catch { + return value; + } +} + +/** Text of the last `message` item — the terminal answer. */ +function lastMessageText(items: AgentTurnItem[]): string { + for (let i = items.length - 1; i >= 0; i--) { + const it = items[i]; + if (it.kind === "message") return it.text; + } + return ""; +} + +function summarizeArgs(args: unknown): string { + if (args === undefined || args === "") return ""; + const s = typeof args === "string" ? args : JSON.stringify(args); + return s.length > 80 ? `${s.slice(0, 80)}…` : s; +} + +function summarizeOutput(output: unknown): string { + const s = typeof output === "string" ? output : JSON.stringify(output); + return s.length > 200 ? `${s.slice(0, 200)}…` : s; +} + +/** + * Renders an assistant turn: collapsible intermediate steps (every item before + * the last `message`) plus the terminal answer streamed live below. + */ +function AssistantTurn({ items }: { items: AgentTurnItem[] }) { + const lastMessageIdx = (() => { + for (let i = items.length - 1; i >= 0; i--) { + if (items[i].kind === "message") return i; + } + return -1; + })(); + const steps = lastMessageIdx >= 0 ? items.slice(0, lastMessageIdx) : items; + const answer = lastMessageIdx >= 0 ? lastMessageText(items) : ""; + + return ( +
+ {steps.length > 0 && ( + + + Steps ({steps.length}) + + + {steps.map((it) => { + if (it.kind === "message") { + return ( +

+ {it.text} +

+ ); + } + if (it.kind === "tool_call") { + return ( +
+ {it.name} + + ({summarizeArgs(it.args)}) + +
+ ); + } + return ( +
+ {it.error ? `error: ${it.error}` : summarizeOutput(it.output)} +
+ ); + })} +
+
+ )} +
+

{answer}

+
+
+ ); } interface PendingApproval { @@ -138,7 +250,7 @@ function useAutocomplete(enabled: boolean) { function AgentRoute() { const [messages, setMessages] = useState([]); - const [events, setEvents] = useState([]); + const [events, setEvents] = useState([]); const [input, setInput] = useState(""); const [isLoading, setIsLoading] = useState(false); const [threadId, setThreadId] = useState(null); @@ -232,9 +344,31 @@ function AgentRoute() { if (!reader) return; const decoder = new TextDecoder(); - let assistantContent = ""; + // Per-round trace items for this assistant turn, keyed by wire item id. + const turnItems: AgentTurnItem[] = []; + const assistantId = ++msgIdCounter.current; let buffer = ""; + const flushAssistant = () => { + const snapshot = turnItems.map((x) => ({ ...x })); + setMessages((prev) => { + const updated = [...prev]; + const last = updated[updated.length - 1]; + const next: ChatMessage = { + id: assistantId, + role: "assistant", + content: lastMessageText(snapshot), + items: snapshot, + }; + if (last?.role === "assistant" && last.id === assistantId) { + updated[updated.length - 1] = next; + } else { + updated.push(next); + } + return updated; + }); + }; + while (true) { const { done, value } = await reader.read(); if (done) break; @@ -273,26 +407,81 @@ function AgentRoute() { setThreadId(event.data.threadId as string); } - if (event.type === "response.output_text.delta" && event.delta) { - assistantContent += event.delta; - setMessages((prev) => { - const updated = [...prev]; - const last = updated[updated.length - 1]; - if (last?.role === "assistant") { - updated[updated.length - 1] = { - ...last, - content: assistantContent, - }; - } else { - updated.push({ - id: ++msgIdCounter.current, - role: "assistant", - content: assistantContent, - }); + // Build the ordered per-round item list. The translator emits + // items in output_index order, so appending on `added` keeps it + // ordered. Each ReAct round's draft is its own message item, so the + // duplicated drafts surface as collapsible steps, not the answer. + let changed = false; + const it = event.item; + if (event.type === "response.output_item.added") { + if (it?.type === "message" && it.id) { + turnItems.push({ + kind: "message", + id: it.id, + text: "", + status: "in_progress", + }); + changed = true; + } else if (it?.type === "function_call") { + turnItems.push({ + kind: "tool_call", + id: it.id ?? it.call_id ?? `fc_${turnItems.length}`, + callId: it.call_id ?? "", + name: it.name ?? "", + args: parseMaybeJson(it.arguments), + status: "in_progress", + }); + changed = true; + } else if (it?.type === "function_call_output") { + turnItems.push({ + kind: "tool_result", + id: it.id ?? `fc_output_${turnItems.length}`, + callId: it.call_id ?? "", + output: parseMaybeJson(it.output), + }); + changed = true; + } + } else if (event.type === "response.output_item.done") { + if (it?.type === "function_call") { + for (let i = turnItems.length - 1; i >= 0; i--) { + const t = turnItems[i]; + if ( + t.kind === "tool_call" && + (t.callId === it.call_id || t.id === it.id) + ) { + t.status = "completed"; + if (it.arguments !== undefined) { + t.args = parseMaybeJson(it.arguments); + } + changed = true; + break; + } } - return updated; - }); + } else if (it?.type === "message" && it.id) { + for (let i = turnItems.length - 1; i >= 0; i--) { + const t = turnItems[i]; + if (t.kind === "message" && t.id === it.id) { + t.status = "completed"; + changed = true; + break; + } + } + } + } else if ( + event.type === "response.output_text.delta" && + event.delta + ) { + for (let i = turnItems.length - 1; i >= 0; i--) { + const t = turnItems[i]; + if (t.kind === "message" && t.id === event.item_id) { + t.text += event.delta; + changed = true; + break; + } + } } + + if (changed) flushAssistant(); } catch { // skip malformed events } @@ -368,15 +557,21 @@ function AgentRoute() { key={msg.id} className={`flex ${msg.role === "user" ? "justify-end" : "justify-start"}`} > -
-

{msg.content}

-
+ {msg.role === "assistant" && msg.items ? ( + + ) : ( +
+

+ {msg.content} +

+
+ )} ))} diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts index c066d35f8..1be41d381 100644 --- a/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-agent-chat.test.ts @@ -51,6 +51,7 @@ describe("useAgentChat", () => { const { result } = renderHook(() => useAgentChat({ agent: "helper" })); expect(result.current.content).toBe(""); + expect(result.current.items).toEqual([]); expect(result.current.events).toEqual([]); expect(result.current.threadId).toBeNull(); expect(result.current.isStreaming).toBe(false); @@ -100,20 +101,155 @@ describe("useAgentChat", () => { await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined()); await act(async () => { + await emit( + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "msg_1" }, + }), + ); await emit( JSON.stringify({ type: "response.output_text.delta", + item_id: "msg_1", delta: "Hello, ", }), ); await emit( - JSON.stringify({ type: "response.output_text.delta", delta: "world" }), + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_1", + delta: "world", + }), ); }); expect(result.current.content).toBe("Hello, world"); }); + test("builds an ordered items list and content = the LAST message", async () => { + const { result } = renderHook(() => useAgentChat({ agent: "helper" })); + + act(() => { + void result.current.send("hi"); + }); + await waitFor(() => expect(capturedCallbacks.onMessage).toBeDefined()); + + await act(async () => { + // Round 1: a draft message item (the model's duplicate draft answer). + await emit( + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "msg_1" }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_1", + delta: "draft answer", + }), + ); + // A tool call + result between the two rounds. + await emit( + JSON.stringify({ + type: "response.output_item.added", + output_index: 1, + item: { + type: "function_call", + id: "fc_1", + call_id: "call_1", + name: "get_weather", + arguments: '{"city":"SF"}', + }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_item.done", + output_index: 1, + item: { + type: "function_call", + id: "fc_1", + call_id: "call_1", + name: "get_weather", + arguments: '{"city":"SF"}', + }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_item.added", + output_index: 2, + item: { + type: "function_call_output", + id: "fco_1", + call_id: "call_1", + output: '{"temp":72}', + }, + }), + ); + // Terminal round: the real answer, streamed live in two deltas. + await emit( + JSON.stringify({ + type: "response.output_item.added", + output_index: 3, + item: { type: "message", id: "msg_2" }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_2", + delta: "It's ", + }), + ); + }); + + // Deltas stream into the right item live, and content tracks the LAST + // message — never concatenating the round-1 draft. + expect(result.current.content).toBe("It's "); + expect(result.current.items.map((it) => it.kind)).toEqual([ + "message", + "tool_call", + "tool_result", + "message", + ]); + + await act(async () => { + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_2", + delta: "72 in SF.", + }), + ); + }); + + expect(result.current.content).toBe("It's 72 in SF."); + + const items = result.current.items; + const draft = items[0]; + const toolCall = items[1]; + const toolResult = items[2]; + const answer = items[3]; + expect(draft).toMatchObject({ kind: "message", text: "draft answer" }); + expect(toolCall).toMatchObject({ + kind: "tool_call", + name: "get_weather", + callId: "call_1", + status: "completed", + args: { city: "SF" }, + }); + expect(toolResult).toMatchObject({ + kind: "tool_result", + callId: "call_1", + output: { temp: 72 }, + }); + expect(answer).toMatchObject({ kind: "message", text: "It's 72 in SF." }); + }); + test("captures threadId from appkit.metadata and reuses it on next send()", async () => { const { result } = renderHook(() => useAgentChat({ agent: "helper" })); @@ -207,7 +343,18 @@ describe("useAgentChat", () => { await act(async () => { await emit( - JSON.stringify({ type: "response.output_text.delta", delta: "x" }), + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "msg_1" }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_1", + delta: "x", + }), ); }); @@ -231,12 +378,25 @@ describe("useAgentChat", () => { await emit("[DONE]"); await emit(""); await emit( - JSON.stringify({ type: "response.output_text.delta", delta: "ok" }), + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "msg_1" }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_1", + delta: "ok", + }), ); }); expect(result.current.content).toBe("ok"); - expect(onEvent).toHaveBeenCalledTimes(1); + // Two well-formed events parsed (the message item + its delta); the three + // malformed/empty payloads were skipped before reaching onEvent. + expect(onEvent).toHaveBeenCalledTimes(2); }); test("isStreaming toggles around the connectSSE lifecycle", async () => { @@ -269,7 +429,18 @@ describe("useAgentChat", () => { JSON.stringify({ type: "appkit.metadata", data: { threadId: "t-1" } }), ); await emit( - JSON.stringify({ type: "response.output_text.delta", delta: "x" }), + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "msg_1" }, + }), + ); + await emit( + JSON.stringify({ + type: "response.output_text.delta", + item_id: "msg_1", + delta: "x", + }), ); }); diff --git a/packages/appkit-ui/src/react/hooks/index.ts b/packages/appkit-ui/src/react/hooks/index.ts index 63b639761..303265445 100644 --- a/packages/appkit-ui/src/react/hooks/index.ts +++ b/packages/appkit-ui/src/react/hooks/index.ts @@ -24,6 +24,7 @@ export type { } from "./types"; export { type AgentChatEvent, + type AgentTurnItem, type UseAgentChatOptions, type UseAgentChatResult, useAgentChat, diff --git a/packages/appkit-ui/src/react/hooks/use-agent-chat.ts b/packages/appkit-ui/src/react/hooks/use-agent-chat.ts index 684284a60..e26d356a6 100644 --- a/packages/appkit-ui/src/react/hooks/use-agent-chat.ts +++ b/packages/appkit-ui/src/react/hooks/use-agent-chat.ts @@ -1,6 +1,45 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { connectSSE } from "@/js"; +/** + * Parse a wire string as JSON, returning the raw value (or `undefined`) when + * it isn't valid JSON. Tool `arguments`/`output` arrive as serialized strings + * on the Responses API; we surface the parsed value when possible so item + * consumers don't each re-parse. + */ +function parseMaybeJson(value: string | undefined): unknown { + if (value === undefined || value === "") return value; + try { + return JSON.parse(value); + } catch { + return value; + } +} + +/** Find the last element matching `pred` (no `Array.prototype.findLast` dep). */ +function findLast( + arr: T[], + pred: (value: T) => value is S, +): S | undefined { + for (let i = arr.length - 1; i >= 0; i--) { + if (pred(arr[i])) return arr[i] as S; + } + return undefined; +} + +/** + * Text of the last `message` item — the terminal answer of the turn. Streams + * live as that message's deltas arrive; earlier (superseded draft) messages + * are ignored. + */ +function lastMessageText(list: AgentTurnItem[]): string { + for (let i = list.length - 1; i >= 0; i--) { + const it = list[i]; + if (it.kind === "message") return it.text; + } + return ""; +} + /** * One Responses-API-shaped event yielded by the agents plugin SSE stream. * @@ -39,6 +78,41 @@ export interface AgentChatEvent { annotations?: Record; } +/** + * One ordered item in an agent turn, derived from the Responses-API output + * items the agents plugin streams. Each ReAct round's assistant text is a + * distinct `message` item; tool calls and their results sit between rounds. + * + * The list is ordered by the wire `output_index`, so the LAST `message` item + * is the terminal answer and everything before it is intermediate "thinking" + * (draft messages the model emitted alongside its tool calls, plus the tool + * calls/results themselves). + * + * @see {@link UseAgentChatResult.items} + */ +export type AgentTurnItem = + | { + kind: "message"; + id: string; + text: string; + status: "in_progress" | "completed"; + } + | { + kind: "tool_call"; + id: string; + callId: string; + name: string; + args: unknown; + status: "in_progress" | "completed"; + } + | { + kind: "tool_result"; + id: string; + callId: string; + output: unknown; + error?: string; + }; + export interface UseAgentChatOptions { /** * Agent name registered with the `agents()` plugin (e.g. `"assistant"`, @@ -62,8 +136,24 @@ export interface UseAgentChatOptions { } export interface UseAgentChatResult { - /** Accumulated assistant text from `response.output_text.delta` events. */ + /** + * Text of the FINAL assistant message item — the terminal answer of the + * turn, updated live as its deltas stream in. With OpenAI-compatible Claude + * the model emits a full draft answer alongside its tool calls on every + * ReAct round; `content` surfaces only the last round's message, so it never + * shows those duplicated drafts. For the full per-round structure (drafts, + * tool calls, tool results) read {@link items} instead. + */ content: string; + /** + * Ordered list of turn items derived from the Responses-API output items, + * keyed by wire `output_index`. Render everything before the last `message` + * item as collapsible intermediate steps ("thinking" + tool calls/results) + * and the last `message` item as the prominent answer. Messages stream in + * live; tool calls flip to `completed` on their `done` event. Reset at the + * start of each {@link send} and by {@link reset}. + */ + items: AgentTurnItem[]; /** * Every parsed event, in order. Provided for components that need to * render historical tool calls or replay state after a remount — @@ -104,25 +194,35 @@ export interface UseAgentChatResult { * The hook is intentionally lower-level than a full chat component — * it owns one stream at a time, not a multi-turn message history. The * caller composes its own messages array (typically a `useState`) and - * appends to it via the `onEvent` callback for tool calls and via the - * `content` field for assistant text. + * appends to it from the structured {@link UseAgentChatResult.items} list + * (drafts, tool calls, tool results, and the terminal answer) or, for the + * common case, just reads {@link UseAgentChatResult.content} (the final + * answer text, streamed live and de-duplicated across ReAct rounds). * * @example * ```tsx * function Chat({ agent }: { agent: string }) { - * const [messages, setMessages] = useState([]); - * const { content, threadId, isStreaming, send, reset } = useAgentChat({ - * agent, - * onEvent(ev) { - * if (ev.type === "response.output_item.added" && ev.item?.type === "function_call") { - * setMessages((m) => [...m, { role: "tool", name: ev.item?.name, args: ev.item?.arguments }]); - * } - * }, - * }); - * // `content` reflects the latest assistant turn; reset() between conversations. - * // ... + * const { content, items, threadId, isStreaming, send, reset } = useAgentChat({ agent }); + * const steps = items.slice(0, -1); // everything before the terminal answer + * return ( + * <> + * {steps.length > 0 && ( + *
+ * Steps + * {steps.map((it) => + * it.kind === "message" ?

{it.text}

: + * it.kind === "tool_call" ? {it.name} : + * {String(it.output)}, + * )} + *
+ * )} + *
{content}
+ * + * ); * } * ``` + * + * `content` is the terminal answer (streams live, de-duplicated across rounds). */ export function useAgentChat({ agent, @@ -130,6 +230,7 @@ export function useAgentChat({ onEvent, }: UseAgentChatOptions): UseAgentChatResult { const [content, setContent] = useState(""); + const [items, setItems] = useState([]); const [events, setEvents] = useState([]); const [threadId, setThreadId] = useState(null); const [isStreaming, setIsStreaming] = useState(false); @@ -139,7 +240,9 @@ export function useAgentChat({ // `onEvent`: `send` is a stable callback that reads the latest // threadId/onEvent without re-mounting connectSSE on every render. const threadIdRef = useRef(null); - const contentRef = useRef(""); + // Working copy of the ordered turn items, keyed by wire `item_id`. Mutated + // as events arrive, then mirrored into `items`/`content` state. + const itemsRef = useRef([]); const onEventRef = useRef(onEvent); onEventRef.current = onEvent; const abortControllerRef = useRef(null); @@ -148,8 +251,9 @@ export function useAgentChat({ abortControllerRef.current?.abort(); abortControllerRef.current = null; threadIdRef.current = null; - contentRef.current = ""; + itemsRef.current = []; setContent(""); + setItems([]); setEvents([]); setThreadId(null); setIsStreaming(false); @@ -163,8 +267,9 @@ export function useAgentChat({ const controller = new AbortController(); abortControllerRef.current = controller; - contentRef.current = ""; + itemsRef.current = []; setContent(""); + setItems([]); setEvents([]); setError(null); setIsStreaming(true); @@ -217,12 +322,94 @@ export function useAgentChat({ threadIdRef.current = tid; setThreadId(tid); } + return; + } + + // Reduce the Responses-API wire events into the ordered item list. + // The translator emits items in `output_index` order, so simply + // appending on `added` keeps the list ordered. + const list = itemsRef.current; + let changed = false; + + if (event.type === "response.output_item.added") { + const it = event.item; + if (it?.type === "message" && it.id) { + list.push({ + kind: "message", + id: it.id, + text: "", + status: "in_progress", + }); + changed = true; + } else if (it?.type === "function_call") { + list.push({ + kind: "tool_call", + id: it.id ?? it.call_id ?? `fc_${list.length}`, + callId: it.call_id ?? "", + name: it.name ?? "", + args: parseMaybeJson(it.arguments), + status: "in_progress", + }); + changed = true; + } else if (it?.type === "function_call_output") { + list.push({ + kind: "tool_result", + id: it.id ?? `fc_output_${list.length}`, + callId: it.call_id ?? "", + output: parseMaybeJson(it.output), + }); + changed = true; + } + } else if (event.type === "response.output_item.done") { + const it = event.item; + if (it?.type === "function_call") { + const callId = it.call_id ?? ""; + const target = findLast( + list, + (x): x is Extract => + x.kind === "tool_call" && + (x.callId === callId || x.id === it.id), + ); + if (target) { + target.status = "completed"; + if (it.arguments !== undefined) { + target.args = parseMaybeJson(it.arguments); + } + if (it.name) target.name = it.name; + changed = true; + } + } else if (it?.type === "message" && it.id) { + const target = findLast( + list, + (x): x is Extract => + x.kind === "message" && x.id === it.id, + ); + if (target) { + target.status = "completed"; + changed = true; + } + } } else if ( event.type === "response.output_text.delta" && typeof event.delta === "string" ) { - contentRef.current += event.delta; - setContent(contentRef.current); + const target = findLast( + list, + (x): x is Extract => + x.kind === "message" && x.id === event.item_id, + ); + if (target) { + target.text += event.delta; + changed = true; + } + } + + if (changed) { + // Clone so React sees a new reference; item objects are mutated + // in place in the ref, so shallow-copy them too. + const snapshot = list.map((x) => ({ ...x })); + setItems(snapshot); + setContent(lastMessageText(snapshot)); } }, onError: (err) => { @@ -253,6 +440,7 @@ export function useAgentChat({ return { content, + items, events, threadId, isStreaming, diff --git a/packages/appkit/src/core/agent/consume-adapter-stream.ts b/packages/appkit/src/core/agent/consume-adapter-stream.ts index c4f3d07ed..d7885cebf 100644 --- a/packages/appkit/src/core/agent/consume-adapter-stream.ts +++ b/packages/appkit/src/core/agent/consume-adapter-stream.ts @@ -19,16 +19,30 @@ interface ConsumeAdapterStreamOptions { /** * Consume an adapter's event stream and aggregate the assistant's final text. * - * Accumulation rule (shared across all agent-execution paths in AppKit): + * Per-round rule (shared across all agent-execution paths in AppKit): * - * - `message_delta` events append their `content` to the running text. - * - A `message` event *replaces* the running text with its `content`. + * - `message_delta` events append their `content` to the message currently + * being built. + * - A `message` event *replaces* the message currently being built with its + * `content` (LangChain's `on_chain_end` path emits a single final message). + * - A `tool_call` / `tool_result` event closes the current message: any text + * accumulated so far is a *draft* that the model emitted alongside its tool + * calls, so it is set aside as "the last closed message" and the buffer is + * reset for the next ReAct round. * - * The two branches coexist because different adapters emit different shapes: - * streaming adapters (Databricks, Vercel AI) emit deltas chunk-by-chunk, - * while `LangChain`'s `on_chain_end` path emits a single final `message`. - * Without the replace branch, LangChain conversations silently dropped the - * assistant turn from thread history. + * The return value is the text of the message currently open at end-of-stream + * (the terminal answer in a normal ReAct loop), falling back to the last + * closed message if the stream ended mid-tool-calling (e.g. `maxSteps` + * exhausted right after a tool call). + * + * Why per-round and not flat concatenation: with OpenAI-compatible Claude on + * Databricks Model Serving, the model emits a full draft answer ALONGSIDE its + * tool calls on every round, so a naive `text += content` accumulation + * surfaces that draft 3-4× in the final text. Only the final round's message + * is the real answer; each tool call marks a round boundary. The + * `AgentEventTranslator` already item-bounds these rounds on the wire — this + * loop mirrors that boundary so server-side consumers (thread history, the + * non-streaming JSON `fullContent`, `runAgent`, and sub-agents) dedupe too. * * Kept pure (no I/O, no mutable external state beyond the caller's `onEvent` * side effect) so each execution path — HTTP streaming, sub-agents, and the @@ -38,15 +52,24 @@ export async function consumeAdapterStream( stream: AsyncIterable, opts: ConsumeAdapterStreamOptions = {}, ): Promise { - let text = ""; + let current = ""; // text of the message currently being built + let lastClosed = ""; // most recent fully-closed message for await (const event of stream) { if (opts.signal?.aborted) break; if (event.type === "message_delta") { - text += event.content; + current += event.content; } else if (event.type === "message") { - text = event.content; + // LangChain single-final replace — preserve. + current = event.content; + } else if (event.type === "tool_call" || event.type === "tool_result") { + // A draft followed by a tool call is superseded by the next round. + if (current) { + lastClosed = current; + current = ""; + } } opts.onEvent?.(event); } - return text; + // Terminal answer, or the last draft if maxSteps exhausted mid-tool-calling. + return current || lastClosed; } diff --git a/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts b/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts index 98863a62a..740be33c2 100644 --- a/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts +++ b/packages/appkit/src/core/agent/tests/consume-adapter-stream.test.ts @@ -31,6 +31,64 @@ describe("consumeAdapterStream", () => { expect(text).toBe("final answer"); }); + test("multi-round ReAct: returns the terminal message, not the drafts", async () => { + // OpenAI-compatible Claude emits a full draft answer alongside its tool + // call on every round. Only the final round's message is the answer. + const text = await consumeAdapterStream( + streamOf([ + // Round 1: draft + tool call + { type: "message_delta", content: "The answer is 42." }, + { type: "tool_call", callId: "c1", name: "lookup", args: {} }, + { type: "tool_result", callId: "c1", result: "ok" }, + // Round 2: another draft + tool call + { type: "message_delta", content: "The answer is 42." }, + { type: "tool_call", callId: "c2", name: "verify", args: {} }, + { type: "tool_result", callId: "c2", result: "ok" }, + // Terminal round: the real answer, streamed live + { type: "message_delta", content: "The answer " }, + { type: "message_delta", content: "is 42." }, + ]), + ); + expect(text).toBe("The answer is 42."); + }); + + test("maxSteps exhausted mid-tool-calling: returns the last draft", async () => { + // Stream ends right after a tool_call following a draft — no terminal + // message was produced, so fall back to the last closed message. + const text = await consumeAdapterStream( + streamOf([ + { type: "message_delta", content: "Working on it…" }, + { type: "tool_call", callId: "c1", name: "lookup", args: {} }, + ]), + ); + expect(text).toBe("Working on it…"); + }); + + test("LangChain single `message` after tool calls still replaces", async () => { + const text = await consumeAdapterStream( + streamOf([ + { type: "message_delta", content: "draft" }, + { type: "tool_call", callId: "c1", name: "lookup", args: {} }, + { type: "tool_result", callId: "c1", result: "ok" }, + { type: "message", content: "final answer" }, + ]), + ); + expect(text).toBe("final answer"); + }); + + test("mixed: deltas then a tool_call then a fresh terminal delta", async () => { + const text = await consumeAdapterStream( + streamOf([ + { type: "message_delta", content: "thinking " }, + { type: "message_delta", content: "out loud" }, + { type: "tool_call", callId: "c1", name: "lookup", args: {} }, + { type: "tool_result", callId: "c1", result: "ok" }, + { type: "message_delta", content: "done" }, + ]), + ); + expect(text).toBe("done"); + }); + test("invokes onEvent once per event, in order, with the raw event", async () => { const seen: AgentEvent[] = []; await consumeAdapterStream(