Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/appkit/src/plugins/agents/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,20 @@ import type { StreamExecutionSettings } from "shared";
export const agentStreamDefaults: StreamExecutionSettings = {
default: {
cache: { enabled: false },
retry: { enabled: false },
// Conservative retry for transient serving errors (5xx / 429 / connection
// resets) — see `isRetryableError` in interceptors/retry.ts; 4xx and
// non-retryable AppKitErrors are never retried.
//
// SAFETY: this is streaming-replay-safe by construction. In
// `executeStream`, the RetryInterceptor wraps only the *creation* of the
// adapter async generator (`fn()` returns the generator object
// synchronously, without running its body). Token emission and tool
// dispatch happen during `yield*` iteration, which runs *outside* the
// interceptor chain. So a transient error thrown after the first streamed
// event surfaces during iteration and is never retried — there is no path
// by which retry can re-emit tokens or re-run a tool side-effect. Only a
// failure during generator setup (before any output) is retried.
retry: { enabled: true, attempts: 2, initialDelay: 500, maxDelay: 4_000 },
timeout: 300_000,
},
stream: {
Expand Down
110 changes: 110 additions & 0 deletions packages/appkit/src/plugins/agents/tests/defaults.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { RetryInterceptor } from "../../../plugin/interceptors/retry";
import type { InterceptorContext } from "../../../plugin/interceptors/types";
import { agentStreamDefaults } from "../defaults";

describe("agentStreamDefaults", () => {
test("enables a conservative retry default for transient errors", () => {
const retry = agentStreamDefaults.default.retry;
expect(retry).toBeDefined();
expect(retry?.enabled).toBe(true);
// Conservative: at most one extra attempt, with bounded backoff.
expect(retry?.attempts).toBe(2);
expect(retry?.initialDelay).toBeGreaterThan(0);
expect(retry?.maxDelay).toBeGreaterThanOrEqual(retry?.initialDelay ?? 0);
// Caps keep a serving outage from snowballing into long stalls.
expect(retry?.attempts ?? 0).toBeLessThanOrEqual(3);
expect(retry?.maxDelay ?? Infinity).toBeLessThanOrEqual(10_000);
});

describe("streaming-replay safety", () => {
let context: InterceptorContext;

beforeEach(() => {
context = { metadata: new Map(), userKey: "test" };
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

test(
"does NOT re-run a streamed turn or re-execute tools when an error " +
"is thrown mid-iteration",
async () => {
// Mirrors how AgentsPlugin._streamAgent feeds executeStream: the retry
// interceptor wraps `fn()`, where `fn` is an async generator function.
// Calling it returns the generator object synchronously WITHOUT running
// the body, so the interceptor sees an immediate success and never
// retries. Token emission + tool side-effects happen during `yield*`
// iteration, which is outside the interceptor chain.
const retry = agentStreamDefaults.default.retry;
if (!retry) throw new Error("expected a retry default");

const interceptor = new RetryInterceptor(retry);

let generatorBodyRuns = 0;
const toolSideEffect = vi.fn();

async function* turn() {
generatorBodyRuns++;
toolSideEffect("emit-token-1");
yield "token-1";
// Transient 5xx surfacing AFTER the first streamed event.
throw Object.assign(new Error("serving blip"), { statusCode: 500 });
}

// executeStream's wrappedFn: `async () => fn(signal)`.
const wrappedFn = async () => turn();

const result = await interceptor.intercept(wrappedFn, context);

// Generator body has not run yet — only constructed.
expect(generatorBodyRuns).toBe(0);
expect(toolSideEffect).not.toHaveBeenCalled();

// Now drive iteration (the part that lives outside the interceptor).
const seen: string[] = [];
await expect(
(async () => {
for await (const ev of result) seen.push(ev);
})(),
).rejects.toThrow("serving blip");

// The transient error during iteration was NOT retried: the body ran
// exactly once and the side-effect fired exactly once. No replay.
expect(generatorBodyRuns).toBe(1);
expect(toolSideEffect).toHaveBeenCalledTimes(1);
expect(seen).toEqual(["token-1"]);
},
);

test("retries a transient failure during generator SETUP (before output)", async () => {
// The one place the streaming retry default genuinely helps: a transient
// error thrown synchronously while constructing the turn, before any
// event is emitted, is safe to retry.
const retry = agentStreamDefaults.default.retry;
if (!retry) throw new Error("expected a retry default");

const interceptor = new RetryInterceptor(retry);

const setup = vi
.fn<() => Promise<AsyncGenerator<string>>>()
.mockRejectedValueOnce(
Object.assign(new Error("setup blip"), { statusCode: 503 }),
)
.mockResolvedValue(
(async function* () {
yield "ok";
})(),
);

const promise = interceptor.intercept(setup, context);
await vi.runAllTimersAsync();
await promise;

expect(setup).toHaveBeenCalledTimes(2);
});
});
});
Loading