From 5f2f7cacb3130b6a924c0f909749a7165d1564d2 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 12:30:01 +0200 Subject: [PATCH 1/2] fix(agents): stop DatabricksAdapter duplicating answer text across tool calls (#421) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude on Databricks Model Serving emits a full draft answer ALONGSIDE its tool calls in the same turn, because the OpenAI-compatible layer lacks Claude's native tool-use prompt. DatabricksAdapter yielded every `delta.content` as a `message_delta` the instant it arrived, so in a ReAct loop the draft surfaced on every tool-calling step — the same answer appearing 3-4x, and (because consumeAdapterStream appends message_delta content) duplicated in the aggregated output too. Surface assistant text only on the terminal turn: - streamCompletion streams text live only when no tools are offered (such a turn is always terminal). When tools are present it buffers the text into fullText instead of yielding it. - run() emits the buffered text as a single message_delta on the terminal turn (no structured or text-encoded tool calls). Tool-calling turns keep their text in message history for model context but never surface it to the user. Tradeoff: with tools enabled, the final answer arrives as one delta rather than streaming token-by-token — there's no way to know a turn is terminal until its stream ends without tool calls. Pure-chat (no tools) still streams live. Tests: 2 new cases (text+tool-call in one turn; no duplication across multiple steps). Full appkit suite (2141) passes. Signed-off-by: MarioCadenas --- packages/appkit/src/agents/databricks.ts | 25 ++++- .../src/agents/tests/databricks.test.ts | 105 ++++++++++++++++++ 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/packages/appkit/src/agents/databricks.ts b/packages/appkit/src/agents/databricks.ts index 6e2e78d60..3c494022a 100644 --- a/packages/appkit/src/agents/databricks.ts +++ b/packages/appkit/src/agents/databricks.ts @@ -413,6 +413,15 @@ export class DatabricksAdapter implements AgentAdapter { yield* this.executeToolCalls(parsed, messages, context, nameToWire); continue; } + // Terminal turn: no tool calls (structured or text-encoded), so `text` + // is the assistant's final answer. When tools were offered, + // streamCompletion buffered the text instead of streaming it live (to + // avoid duplicating it on tool-calling steps, #421) — surface it now, + // exactly once. With no tools it was already streamed live, so emitting + // again here would double it. + if (tools.length > 0 && text) { + yield { type: "message_delta", content: text }; + } break; } @@ -486,13 +495,14 @@ export class DatabricksAdapter implements AgentAdapter { { text: string; toolCalls: OpenAIToolCall[] }, unknown > { + const hasTools = tools.length > 0; const body: Record = { messages, stream: true, max_tokens: this.maxTokens, }; - if (tools.length > 0) { + if (hasTools) { body.tools = tools; } @@ -573,7 +583,18 @@ export class DatabricksAdapter implements AgentAdapter { this.maxStreamTextChars, ); fullText += content; - yield { type: "message_delta" as const, content }; + // Stream text live only when no tools are offered: such a turn is + // always terminal, so its text is the final answer. When tools are + // available the same turn may ALSO produce tool calls — Claude on + // Databricks Model Serving emits a full draft answer alongside its + // tool calls because the OpenAI-compatible layer lacks Claude's + // native tool-use prompt. Emitting that live would surface the draft + // on every ReAct step and duplicate it 3-4x (#421). Instead we buffer + // it into `fullText`; run() surfaces the text once, only on the + // terminal (no-tool) turn. + if (!hasTools) { + yield { type: "message_delta" as const, content }; + } } const toolCallsRaw = deltaUnknown.tool_calls; diff --git a/packages/appkit/src/agents/tests/databricks.test.ts b/packages/appkit/src/agents/tests/databricks.test.ts index 84f0c6717..9ffecbb92 100644 --- a/packages/appkit/src/agents/tests/databricks.test.ts +++ b/packages/appkit/src/agents/tests/databricks.test.ts @@ -252,6 +252,111 @@ describe("DatabricksAdapter", () => { expect(mockAuthenticate).toHaveBeenCalledTimes(2); }); + test("suppresses draft text emitted alongside tool calls, surfacing the final answer once (#421)", async () => { + const executeTool = vi.fn().mockResolvedValue([{ answer: 42 }]); + const draft = "The answer is 42."; + + let callCount = 0; + globalThis.fetch = vi.fn().mockImplementation(() => { + callCount++; + if (callCount === 1) { + // A single turn that contains BOTH a full draft answer AND a tool call — + // the shape Claude produces on Databricks Model Serving. The draft must + // not reach the consumer, or it duplicates on every ReAct step. + return Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + "call_1", + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }); + } + // Final turn: the real answer, no tool calls. + return Promise.resolve({ + ok: true, + body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]), + }); + }); + + const adapter = createAdapter(); + const events: AgentEvent[] = []; + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool }, + )) { + events.push(event); + } + + // The draft on the tool-calling turn is suppressed; the answer appears once. + const messageDeltas = events.filter((e) => e.type === "message_delta"); + expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]); + + // The tool still ran (the draft was only suppressed from user-facing output). + expect(executeTool).toHaveBeenCalledWith("analytics.query", { + query: "SELECT 1", + }); + }); + + test("does not duplicate the answer across multiple tool-calling steps (#421)", async () => { + const executeTool = vi.fn().mockResolvedValue([{ ok: true }]); + const draft = "Based on the data, revenue grew 12%."; + + let callCount = 0; + globalThis.fetch = vi.fn().mockImplementation(() => { + callCount++; + if (callCount <= 2) { + // Two tool-calling turns, each leaking the full draft answer. + return Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + `call_${callCount}`, + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }); + } + // Terminal turn with the final answer. + return Promise.resolve({ + ok: true, + body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]), + }); + }); + + const adapter = createAdapter(); + const events: AgentEvent[] = []; + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool }, + )) { + events.push(event); + } + + const answerDeltas = events.filter( + (e) => e.type === "message_delta" && e.content === draft, + ); + expect(answerDeltas).toHaveLength(1); + expect(executeTool).toHaveBeenCalledTimes(2); + }); + describe("Vertex/Gemini thoughtSignature pass-through", () => { // Vertex AI's OpenAI-compatible surface attaches `thoughtSignature` // on every function call emitted by Gemini 2.x/3.x models. The next From 449731a4b6369c063838c9bafbb14cca6f408aaf Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 14:06:35 +0200 Subject: [PATCH 2/2] fix(agents): flush buffered final text when maxSteps is exhausted (#421) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the text-dedup fix. When tools are offered, assistant text is buffered and surfaced only on a terminal (no-tool) turn. If the ReAct loop instead exits by exhausting maxSteps while still tool-calling, there was no terminal turn — so the last turn's buffered text was dropped and the caller received no answer at all (a regression vs. main, which streamed text live every turn). Track the last buffered text and whether a terminal turn surfaced it; after the loop, flush it as a fallback when maxSteps was exhausted mid-tool-calling. Skip when a terminal turn already emitted, when no tools were offered (already streamed live), or when the run was aborted (in-flight output is discarded). Adds a test that exhausts maxSteps with every turn carrying both a draft and a tool call, asserting the draft is surfaced exactly once. Signed-off-by: MarioCadenas --- packages/appkit/src/agents/databricks.ts | 25 +++++++++++ .../src/agents/tests/databricks.test.ts | 42 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/packages/appkit/src/agents/databricks.ts b/packages/appkit/src/agents/databricks.ts index 3c494022a..75e734c0f 100644 --- a/packages/appkit/src/agents/databricks.ts +++ b/packages/appkit/src/agents/databricks.ts @@ -398,6 +398,15 @@ export class DatabricksAdapter implements AgentAdapter { yield { type: "status", status: "running" }; + // When tools are offered, streamCompletion buffers assistant text instead of + // streaming it live (#421), and run() surfaces it only on a terminal turn. + // If the loop instead exits by exhausting maxSteps while still tool-calling, + // there is no terminal turn — so we'd drop the last turn's text entirely and + // the caller would see nothing. Track the last buffered text and whether a + // terminal turn surfaced it, then flush it after the loop as a fallback. + let lastBufferedText = ""; + let surfacedFinalText = false; + for (let step = 0; step < this.maxSteps; step++) { if (context.signal?.aborted) break; @@ -406,6 +415,7 @@ export class DatabricksAdapter implements AgentAdapter { tools, context, ); + lastBufferedText = text; if (toolCalls.length === 0) { const parsed = parseTextToolCalls(text); @@ -422,6 +432,7 @@ export class DatabricksAdapter implements AgentAdapter { if (tools.length > 0 && text) { yield { type: "message_delta", content: text }; } + surfacedFinalText = true; break; } @@ -437,6 +448,20 @@ export class DatabricksAdapter implements AgentAdapter { yield* this.executeSingleTool(tc, originalName, messages, context); } } + + // maxSteps exhausted mid-tool-calling: no terminal turn ever surfaced the + // buffered text. Flush the last turn's text so the caller isn't left with a + // silent, answerless run. Skipped when a terminal turn already emitted (or + // streamed live with no tools), or when the run was aborted (signal handling + // discards in-flight output). + if ( + !surfacedFinalText && + tools.length > 0 && + lastBufferedText && + !context.signal?.aborted + ) { + yield { type: "message_delta", content: lastBufferedText }; + } } /** Parse wire arguments, emit tool_call / tool_result, append tool messages. */ diff --git a/packages/appkit/src/agents/tests/databricks.test.ts b/packages/appkit/src/agents/tests/databricks.test.ts index 9ffecbb92..531865e5b 100644 --- a/packages/appkit/src/agents/tests/databricks.test.ts +++ b/packages/appkit/src/agents/tests/databricks.test.ts @@ -640,6 +640,48 @@ describe("DatabricksAdapter", () => { expect(globalThis.fetch).toHaveBeenCalledTimes(2); }); + test("flushes the last buffered text when maxSteps is exhausted mid-tool-calling (#421)", async () => { + // Every turn carries a draft answer AND a tool call, so the loop never + // reaches a terminal (no-tool) turn — it exits by exhausting maxSteps. + // Without the post-loop flush the buffered text would be dropped and the + // caller would receive no answer at all. + const draft = "Partial answer so far."; + globalThis.fetch = vi.fn().mockImplementation(() => + Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + "call_loop", + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }), + ); + + const adapter = createAdapter({ maxSteps: 2 }); + const events: AgentEvent[] = []; + + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool: vi.fn().mockResolvedValue("ok") }, + )) { + events.push(event); + } + + expect(globalThis.fetch).toHaveBeenCalledTimes(2); + // The draft is surfaced exactly once (the post-loop flush), not per turn. + const messageDeltas = events.filter((e) => e.type === "message_delta"); + expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]); + }); + test("sends correct request to endpoint URL", async () => { globalThis.fetch = mockFetch([textDelta("Hi"), sseChunk("[DONE]")]);