From f2b692e6dc3cd83b23798b18c48326c4331f7930 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Sat, 13 Jun 2026 21:25:35 +0200 Subject: [PATCH] feat(appkit): enable conservative agent retries by default for transient errors Agent stream defaults previously set retry { enabled: false }. Enable a conservative default (attempts: 2, 500ms..4s backoff) so a transient serving error (5xx / 429 / connection reset) doesn't fail the whole turn. Non-retryable errors (4xx, AppKitError isRetryable=false) are still never retried. Streaming safety: in executeStream the RetryInterceptor wraps only the synchronous creation of the adapter async generator; token emission and tool dispatch run during yield* iteration, outside the interceptor chain. A transient error thrown after the first streamed event therefore cannot be retried, so retries can never re-emit tokens or re-run a tool side-effect. Only a failure during generator setup (before any output) is retried. Added defaults.test.ts asserting the conservative values and proving no mid-stream replay. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- .../appkit/src/plugins/agents/defaults.ts | 15 ++- .../src/plugins/agents/tests/defaults.test.ts | 110 ++++++++++++++++++ 2 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 packages/appkit/src/plugins/agents/tests/defaults.test.ts diff --git a/packages/appkit/src/plugins/agents/defaults.ts b/packages/appkit/src/plugins/agents/defaults.ts index 4da11bef4..a0dbef17e 100644 --- a/packages/appkit/src/plugins/agents/defaults.ts +++ b/packages/appkit/src/plugins/agents/defaults.ts @@ -3,7 +3,20 @@ import type { StreamExecutionSettings } from "shared"; export const agentStreamDefaults: StreamExecutionSettings = { default: { cache: { enabled: false }, - retry: { enabled: false }, + // Conservative retry for transient serving errors (5xx / 429 / connection + // resets) — see `isRetryableError` in interceptors/retry.ts; 4xx and + // non-retryable AppKitErrors are never retried. + // + // SAFETY: this is streaming-replay-safe by construction. In + // `executeStream`, the RetryInterceptor wraps only the *creation* of the + // adapter async generator (`fn()` returns the generator object + // synchronously, without running its body). Token emission and tool + // dispatch happen during `yield*` iteration, which runs *outside* the + // interceptor chain. So a transient error thrown after the first streamed + // event surfaces during iteration and is never retried — there is no path + // by which retry can re-emit tokens or re-run a tool side-effect. Only a + // failure during generator setup (before any output) is retried. + retry: { enabled: true, attempts: 2, initialDelay: 500, maxDelay: 4_000 }, timeout: 300_000, }, stream: { diff --git a/packages/appkit/src/plugins/agents/tests/defaults.test.ts b/packages/appkit/src/plugins/agents/tests/defaults.test.ts new file mode 100644 index 000000000..bddf69e5d --- /dev/null +++ b/packages/appkit/src/plugins/agents/tests/defaults.test.ts @@ -0,0 +1,110 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { RetryInterceptor } from "../../../plugin/interceptors/retry"; +import type { InterceptorContext } from "../../../plugin/interceptors/types"; +import { agentStreamDefaults } from "../defaults"; + +describe("agentStreamDefaults", () => { + test("enables a conservative retry default for transient errors", () => { + const retry = agentStreamDefaults.default.retry; + expect(retry).toBeDefined(); + expect(retry?.enabled).toBe(true); + // Conservative: at most one extra attempt, with bounded backoff. + expect(retry?.attempts).toBe(2); + expect(retry?.initialDelay).toBeGreaterThan(0); + expect(retry?.maxDelay).toBeGreaterThanOrEqual(retry?.initialDelay ?? 0); + // Caps keep a serving outage from snowballing into long stalls. + expect(retry?.attempts ?? 0).toBeLessThanOrEqual(3); + expect(retry?.maxDelay ?? Infinity).toBeLessThanOrEqual(10_000); + }); + + describe("streaming-replay safety", () => { + let context: InterceptorContext; + + beforeEach(() => { + context = { metadata: new Map(), userKey: "test" }; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + test( + "does NOT re-run a streamed turn or re-execute tools when an error " + + "is thrown mid-iteration", + async () => { + // Mirrors how AgentsPlugin._streamAgent feeds executeStream: the retry + // interceptor wraps `fn()`, where `fn` is an async generator function. + // Calling it returns the generator object synchronously WITHOUT running + // the body, so the interceptor sees an immediate success and never + // retries. Token emission + tool side-effects happen during `yield*` + // iteration, which is outside the interceptor chain. + const retry = agentStreamDefaults.default.retry; + if (!retry) throw new Error("expected a retry default"); + + const interceptor = new RetryInterceptor(retry); + + let generatorBodyRuns = 0; + const toolSideEffect = vi.fn(); + + async function* turn() { + generatorBodyRuns++; + toolSideEffect("emit-token-1"); + yield "token-1"; + // Transient 5xx surfacing AFTER the first streamed event. + throw Object.assign(new Error("serving blip"), { statusCode: 500 }); + } + + // executeStream's wrappedFn: `async () => fn(signal)`. + const wrappedFn = async () => turn(); + + const result = await interceptor.intercept(wrappedFn, context); + + // Generator body has not run yet — only constructed. + expect(generatorBodyRuns).toBe(0); + expect(toolSideEffect).not.toHaveBeenCalled(); + + // Now drive iteration (the part that lives outside the interceptor). + const seen: string[] = []; + await expect( + (async () => { + for await (const ev of result) seen.push(ev); + })(), + ).rejects.toThrow("serving blip"); + + // The transient error during iteration was NOT retried: the body ran + // exactly once and the side-effect fired exactly once. No replay. + expect(generatorBodyRuns).toBe(1); + expect(toolSideEffect).toHaveBeenCalledTimes(1); + expect(seen).toEqual(["token-1"]); + }, + ); + + test("retries a transient failure during generator SETUP (before output)", async () => { + // The one place the streaming retry default genuinely helps: a transient + // error thrown synchronously while constructing the turn, before any + // event is emitted, is safe to retry. + const retry = agentStreamDefaults.default.retry; + if (!retry) throw new Error("expected a retry default"); + + const interceptor = new RetryInterceptor(retry); + + const setup = vi + .fn<() => Promise>>() + .mockRejectedValueOnce( + Object.assign(new Error("setup blip"), { statusCode: 503 }), + ) + .mockResolvedValue( + (async function* () { + yield "ok"; + })(), + ); + + const promise = interceptor.intercept(setup, context); + await vi.runAllTimersAsync(); + await promise; + + expect(setup).toHaveBeenCalledTimes(2); + }); + }); +});