diff --git a/packages/appkit/src/agents/databricks.ts b/packages/appkit/src/agents/databricks.ts index 6e2e78d60..75e734c0f 100644 --- a/packages/appkit/src/agents/databricks.ts +++ b/packages/appkit/src/agents/databricks.ts @@ -398,6 +398,15 @@ export class DatabricksAdapter implements AgentAdapter { yield { type: "status", status: "running" }; + // When tools are offered, streamCompletion buffers assistant text instead of + // streaming it live (#421), and run() surfaces it only on a terminal turn. + // If the loop instead exits by exhausting maxSteps while still tool-calling, + // there is no terminal turn — so we'd drop the last turn's text entirely and + // the caller would see nothing. Track the last buffered text and whether a + // terminal turn surfaced it, then flush it after the loop as a fallback. + let lastBufferedText = ""; + let surfacedFinalText = false; + for (let step = 0; step < this.maxSteps; step++) { if (context.signal?.aborted) break; @@ -406,6 +415,7 @@ export class DatabricksAdapter implements AgentAdapter { tools, context, ); + lastBufferedText = text; if (toolCalls.length === 0) { const parsed = parseTextToolCalls(text); @@ -413,6 +423,16 @@ export class DatabricksAdapter implements AgentAdapter { yield* this.executeToolCalls(parsed, messages, context, nameToWire); continue; } + // Terminal turn: no tool calls (structured or text-encoded), so `text` + // is the assistant's final answer. When tools were offered, + // streamCompletion buffered the text instead of streaming it live (to + // avoid duplicating it on tool-calling steps, #421) — surface it now, + // exactly once. With no tools it was already streamed live, so emitting + // again here would double it. + if (tools.length > 0 && text) { + yield { type: "message_delta", content: text }; + } + surfacedFinalText = true; break; } @@ -428,6 +448,20 @@ export class DatabricksAdapter implements AgentAdapter { yield* this.executeSingleTool(tc, originalName, messages, context); } } + + // maxSteps exhausted mid-tool-calling: no terminal turn ever surfaced the + // buffered text. Flush the last turn's text so the caller isn't left with a + // silent, answerless run. Skipped when a terminal turn already emitted (or + // streamed live with no tools), or when the run was aborted (signal handling + // discards in-flight output). + if ( + !surfacedFinalText && + tools.length > 0 && + lastBufferedText && + !context.signal?.aborted + ) { + yield { type: "message_delta", content: lastBufferedText }; + } } /** Parse wire arguments, emit tool_call / tool_result, append tool messages. */ @@ -486,13 +520,14 @@ export class DatabricksAdapter implements AgentAdapter { { text: string; toolCalls: OpenAIToolCall[] }, unknown > { + const hasTools = tools.length > 0; const body: Record = { messages, stream: true, max_tokens: this.maxTokens, }; - if (tools.length > 0) { + if (hasTools) { body.tools = tools; } @@ -573,7 +608,18 @@ export class DatabricksAdapter implements AgentAdapter { this.maxStreamTextChars, ); fullText += content; - yield { type: "message_delta" as const, content }; + // Stream text live only when no tools are offered: such a turn is + // always terminal, so its text is the final answer. When tools are + // available the same turn may ALSO produce tool calls — Claude on + // Databricks Model Serving emits a full draft answer alongside its + // tool calls because the OpenAI-compatible layer lacks Claude's + // native tool-use prompt. Emitting that live would surface the draft + // on every ReAct step and duplicate it 3-4x (#421). Instead we buffer + // it into `fullText`; run() surfaces the text once, only on the + // terminal (no-tool) turn. + if (!hasTools) { + yield { type: "message_delta" as const, content }; + } } const toolCallsRaw = deltaUnknown.tool_calls; diff --git a/packages/appkit/src/agents/tests/databricks.test.ts b/packages/appkit/src/agents/tests/databricks.test.ts index 84f0c6717..531865e5b 100644 --- a/packages/appkit/src/agents/tests/databricks.test.ts +++ b/packages/appkit/src/agents/tests/databricks.test.ts @@ -252,6 +252,111 @@ describe("DatabricksAdapter", () => { expect(mockAuthenticate).toHaveBeenCalledTimes(2); }); + test("suppresses draft text emitted alongside tool calls, surfacing the final answer once (#421)", async () => { + const executeTool = vi.fn().mockResolvedValue([{ answer: 42 }]); + const draft = "The answer is 42."; + + let callCount = 0; + globalThis.fetch = vi.fn().mockImplementation(() => { + callCount++; + if (callCount === 1) { + // A single turn that contains BOTH a full draft answer AND a tool call — + // the shape Claude produces on Databricks Model Serving. The draft must + // not reach the consumer, or it duplicates on every ReAct step. + return Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + "call_1", + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }); + } + // Final turn: the real answer, no tool calls. + return Promise.resolve({ + ok: true, + body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]), + }); + }); + + const adapter = createAdapter(); + const events: AgentEvent[] = []; + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool }, + )) { + events.push(event); + } + + // The draft on the tool-calling turn is suppressed; the answer appears once. + const messageDeltas = events.filter((e) => e.type === "message_delta"); + expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]); + + // The tool still ran (the draft was only suppressed from user-facing output). + expect(executeTool).toHaveBeenCalledWith("analytics.query", { + query: "SELECT 1", + }); + }); + + test("does not duplicate the answer across multiple tool-calling steps (#421)", async () => { + const executeTool = vi.fn().mockResolvedValue([{ ok: true }]); + const draft = "Based on the data, revenue grew 12%."; + + let callCount = 0; + globalThis.fetch = vi.fn().mockImplementation(() => { + callCount++; + if (callCount <= 2) { + // Two tool-calling turns, each leaking the full draft answer. + return Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + `call_${callCount}`, + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }); + } + // Terminal turn with the final answer. + return Promise.resolve({ + ok: true, + body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]), + }); + }); + + const adapter = createAdapter(); + const events: AgentEvent[] = []; + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool }, + )) { + events.push(event); + } + + const answerDeltas = events.filter( + (e) => e.type === "message_delta" && e.content === draft, + ); + expect(answerDeltas).toHaveLength(1); + expect(executeTool).toHaveBeenCalledTimes(2); + }); + describe("Vertex/Gemini thoughtSignature pass-through", () => { // Vertex AI's OpenAI-compatible surface attaches `thoughtSignature` // on every function call emitted by Gemini 2.x/3.x models. The next @@ -535,6 +640,48 @@ describe("DatabricksAdapter", () => { expect(globalThis.fetch).toHaveBeenCalledTimes(2); }); + test("flushes the last buffered text when maxSteps is exhausted mid-tool-calling (#421)", async () => { + // Every turn carries a draft answer AND a tool call, so the loop never + // reaches a terminal (no-tool) turn — it exits by exhausting maxSteps. + // Without the post-loop flush the buffered text would be dropped and the + // caller would receive no answer at all. + const draft = "Partial answer so far."; + globalThis.fetch = vi.fn().mockImplementation(() => + Promise.resolve({ + ok: true, + body: createReadableStream([ + textDelta(draft), + toolCallDelta( + 0, + "call_loop", + "analytics__query", + '{"query":"SELECT 1"}', + ), + sseChunk("[DONE]"), + ]), + }), + ); + + const adapter = createAdapter({ maxSteps: 2 }); + const events: AgentEvent[] = []; + + for await (const event of adapter.run( + { + messages: createTestMessages(), + tools: createTestTools(), + threadId: "t1", + }, + { executeTool: vi.fn().mockResolvedValue("ok") }, + )) { + events.push(event); + } + + expect(globalThis.fetch).toHaveBeenCalledTimes(2); + // The draft is surfaced exactly once (the post-loop flush), not per turn. + const messageDeltas = events.filter((e) => e.type === "message_delta"); + expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]); + }); + test("sends correct request to endpoint URL", async () => { globalThis.fetch = mockFetch([textDelta("Hi"), sseChunk("[DONE]")]);