Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions packages/appkit/src/agents/databricks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ export class DatabricksAdapter implements AgentAdapter {

yield { type: "status", status: "running" };

// When tools are offered, streamCompletion buffers assistant text instead of
// streaming it live (#421), and run() surfaces it only on a terminal turn.
// If the loop instead exits by exhausting maxSteps while still tool-calling,
// there is no terminal turn — so we'd drop the last turn's text entirely and
// the caller would see nothing. Track the last buffered text and whether a
// terminal turn surfaced it, then flush it after the loop as a fallback.
let lastBufferedText = "";
let surfacedFinalText = false;

for (let step = 0; step < this.maxSteps; step++) {
if (context.signal?.aborted) break;

Expand All @@ -406,13 +415,24 @@ export class DatabricksAdapter implements AgentAdapter {
tools,
context,
);
lastBufferedText = text;

if (toolCalls.length === 0) {
const parsed = parseTextToolCalls(text);
if (parsed.length > 0) {
yield* this.executeToolCalls(parsed, messages, context, nameToWire);
continue;
}
// Terminal turn: no tool calls (structured or text-encoded), so `text`
// is the assistant's final answer. When tools were offered,
// streamCompletion buffered the text instead of streaming it live (to
// avoid duplicating it on tool-calling steps, #421) — surface it now,
// exactly once. With no tools it was already streamed live, so emitting
// again here would double it.
if (tools.length > 0 && text) {
yield { type: "message_delta", content: text };
}
surfacedFinalText = true;
break;
}

Expand All @@ -428,6 +448,20 @@ export class DatabricksAdapter implements AgentAdapter {
yield* this.executeSingleTool(tc, originalName, messages, context);
}
}

// maxSteps exhausted mid-tool-calling: no terminal turn ever surfaced the
// buffered text. Flush the last turn's text so the caller isn't left with a
// silent, answerless run. Skipped when a terminal turn already emitted (or
// streamed live with no tools), or when the run was aborted (signal handling
// discards in-flight output).
if (
!surfacedFinalText &&
tools.length > 0 &&
lastBufferedText &&
!context.signal?.aborted
) {
yield { type: "message_delta", content: lastBufferedText };
}
}

/** Parse wire arguments, emit tool_call / tool_result, append tool messages. */
Expand Down Expand Up @@ -486,13 +520,14 @@ export class DatabricksAdapter implements AgentAdapter {
{ text: string; toolCalls: OpenAIToolCall[] },
unknown
> {
const hasTools = tools.length > 0;
const body: Record<string, unknown> = {
messages,
stream: true,
max_tokens: this.maxTokens,
};

if (tools.length > 0) {
if (hasTools) {
body.tools = tools;
}

Expand Down Expand Up @@ -573,7 +608,18 @@ export class DatabricksAdapter implements AgentAdapter {
this.maxStreamTextChars,
);
fullText += content;
yield { type: "message_delta" as const, content };
// Stream text live only when no tools are offered: such a turn is
// always terminal, so its text is the final answer. When tools are
// available the same turn may ALSO produce tool calls — Claude on
// Databricks Model Serving emits a full draft answer alongside its
// tool calls because the OpenAI-compatible layer lacks Claude's
// native tool-use prompt. Emitting that live would surface the draft
// on every ReAct step and duplicate it 3-4x (#421). Instead we buffer
// it into `fullText`; run() surfaces the text once, only on the
// terminal (no-tool) turn.
if (!hasTools) {
yield { type: "message_delta" as const, content };
}
}

const toolCallsRaw = deltaUnknown.tool_calls;
Expand Down
147 changes: 147 additions & 0 deletions packages/appkit/src/agents/tests/databricks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,111 @@ describe("DatabricksAdapter", () => {
expect(mockAuthenticate).toHaveBeenCalledTimes(2);
});

test("suppresses draft text emitted alongside tool calls, surfacing the final answer once (#421)", async () => {
const executeTool = vi.fn().mockResolvedValue([{ answer: 42 }]);
const draft = "The answer is 42.";

let callCount = 0;
globalThis.fetch = vi.fn().mockImplementation(() => {
callCount++;
if (callCount === 1) {
// A single turn that contains BOTH a full draft answer AND a tool call —
// the shape Claude produces on Databricks Model Serving. The draft must
// not reach the consumer, or it duplicates on every ReAct step.
return Promise.resolve({
ok: true,
body: createReadableStream([
textDelta(draft),
toolCallDelta(
0,
"call_1",
"analytics__query",
'{"query":"SELECT 1"}',
),
sseChunk("[DONE]"),
]),
});
}
// Final turn: the real answer, no tool calls.
return Promise.resolve({
ok: true,
body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]),
});
});

const adapter = createAdapter();
const events: AgentEvent[] = [];
for await (const event of adapter.run(
{
messages: createTestMessages(),
tools: createTestTools(),
threadId: "t1",
},
{ executeTool },
)) {
events.push(event);
}

// The draft on the tool-calling turn is suppressed; the answer appears once.
const messageDeltas = events.filter((e) => e.type === "message_delta");
expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]);

// The tool still ran (the draft was only suppressed from user-facing output).
expect(executeTool).toHaveBeenCalledWith("analytics.query", {
query: "SELECT 1",
});
});

test("does not duplicate the answer across multiple tool-calling steps (#421)", async () => {
const executeTool = vi.fn().mockResolvedValue([{ ok: true }]);
const draft = "Based on the data, revenue grew 12%.";

let callCount = 0;
globalThis.fetch = vi.fn().mockImplementation(() => {
callCount++;
if (callCount <= 2) {
// Two tool-calling turns, each leaking the full draft answer.
return Promise.resolve({
ok: true,
body: createReadableStream([
textDelta(draft),
toolCallDelta(
0,
`call_${callCount}`,
"analytics__query",
'{"query":"SELECT 1"}',
),
sseChunk("[DONE]"),
]),
});
}
// Terminal turn with the final answer.
return Promise.resolve({
ok: true,
body: createReadableStream([textDelta(draft), sseChunk("[DONE]")]),
});
});

const adapter = createAdapter();
const events: AgentEvent[] = [];
for await (const event of adapter.run(
{
messages: createTestMessages(),
tools: createTestTools(),
threadId: "t1",
},
{ executeTool },
)) {
events.push(event);
}

const answerDeltas = events.filter(
(e) => e.type === "message_delta" && e.content === draft,
);
expect(answerDeltas).toHaveLength(1);
expect(executeTool).toHaveBeenCalledTimes(2);
});

describe("Vertex/Gemini thoughtSignature pass-through", () => {
// Vertex AI's OpenAI-compatible surface attaches `thoughtSignature`
// on every function call emitted by Gemini 2.x/3.x models. The next
Expand Down Expand Up @@ -535,6 +640,48 @@ describe("DatabricksAdapter", () => {
expect(globalThis.fetch).toHaveBeenCalledTimes(2);
});

test("flushes the last buffered text when maxSteps is exhausted mid-tool-calling (#421)", async () => {
// Every turn carries a draft answer AND a tool call, so the loop never
// reaches a terminal (no-tool) turn — it exits by exhausting maxSteps.
// Without the post-loop flush the buffered text would be dropped and the
// caller would receive no answer at all.
const draft = "Partial answer so far.";
globalThis.fetch = vi.fn().mockImplementation(() =>
Promise.resolve({
ok: true,
body: createReadableStream([
textDelta(draft),
toolCallDelta(
0,
"call_loop",
"analytics__query",
'{"query":"SELECT 1"}',
),
sseChunk("[DONE]"),
]),
}),
);

const adapter = createAdapter({ maxSteps: 2 });
const events: AgentEvent[] = [];

for await (const event of adapter.run(
{
messages: createTestMessages(),
tools: createTestTools(),
threadId: "t1",
},
{ executeTool: vi.fn().mockResolvedValue("ok") },
)) {
events.push(event);
}

expect(globalThis.fetch).toHaveBeenCalledTimes(2);
// The draft is surfaced exactly once (the post-loop flush), not per turn.
const messageDeltas = events.filter((e) => e.type === "message_delta");
expect(messageDeltas).toEqual([{ type: "message_delta", content: draft }]);
});

test("sends correct request to endpoint URL", async () => {
globalThis.fetch = mockFetch([textDelta("Hi"), sseChunk("[DONE]")]);

Expand Down
Loading