From 18e1e15ea83103c4524307a6cf59c1239fea8097 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Tue, 19 May 2026 16:00:14 +0100 Subject: [PATCH] fix(agent): handle upstream provider failures --- .../adapters/claude/conversion/sdk-to-acp.ts | 27 +----- .../src/adapters/codex/codex-agent.test.ts | 83 +++++++++++++++++++ .../agent/src/adapters/codex/codex-agent.ts | 16 ++++ .../src/adapters/error-classification.ts | 30 +++++++ packages/agent/src/server/agent-server.ts | 24 ++++-- .../agent/src/server/question-relay.test.ts | 72 ++++++++++++++-- 6 files changed, 214 insertions(+), 38 deletions(-) create mode 100644 packages/agent/src/adapters/error-classification.ts diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index 73a2ef71c..43defc3c2 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -22,6 +22,7 @@ import { image, text } from "../../../utils/acp-content"; import { unreachable } from "../../../utils/common"; import type { Logger } from "../../../utils/logger"; import { tryParsePartialJson } from "../../../utils/partial-json"; +import { classifyAgentError } from "../../error-classification"; import { type EnrichedReadCache, registerHookCallback } from "../hooks"; import type { Session, @@ -696,32 +697,6 @@ export type ResultMessageHandlerResult = { }; }; -export type AgentErrorClassification = - | "upstream_stream_terminated" - | "upstream_connection_error" - | "agent_error"; - -/** - * Classify an error string surfaced by the Claude CLI via `is_error: true` - * result messages. Transient upstream-stream terminations (e.g. the fetch body - * from the LLM gateway is torn down mid-stream) are retriable; most other - * errors are not. - */ -export function classifyAgentError( - result: string | undefined, -): AgentErrorClassification { - if (!result) return "agent_error"; - const text = result.trim(); - // Anthropic SDK surfaces an undici fetch abort as "API Error: terminated". - if (/API Error:\s*terminated\b/i.test(text)) { - return "upstream_stream_terminated"; - } - if (/API Error:\s*Connection error\b/i.test(text)) { - return "upstream_connection_error"; - } - return "agent_error"; -} - export function handleResultMessage( message: SDKResultMessage, ): ResultMessageHandlerResult { diff --git a/packages/agent/src/adapters/codex/codex-agent.test.ts b/packages/agent/src/adapters/codex/codex-agent.test.ts index bc8ad6eef..382fe6131 100644 --- a/packages/agent/src/adapters/codex/codex-agent.test.ts +++ b/packages/agent/src/adapters/codex/codex-agent.test.ts @@ -306,6 +306,89 @@ describe("CodexAcpAgent", () => { ).resolves.toEqual({ stopReason: "end_turn" }); }); + it.each([ + ["API Error: 429 rate_limit_error", "upstream_provider_failure"], + ["API Error: 503 internal_error", "upstream_provider_failure"], + ["API Error: 529 overloaded_error", "upstream_provider_failure"], + ["ordinary failure", undefined], + ] as const)( + "handles prompt failure %p", + async (message, expectedClassification) => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + await agent.newSession({ + cwd: process.cwd(), + } as never); + + const promptError = new Error(message); + mockCodexConnection.prompt.mockRejectedValueOnce(promptError); + + let thrown: unknown; + try { + await agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "A" }], + } as never); + } catch (error) { + thrown = error; + } + + if (!expectedClassification) { + expect(thrown).toBe(promptError); + return; + } + + expect(thrown).toMatchObject({ + data: { + classification: expectedClassification, + result: message, + }, + }); + }, + ); + + it("does not let a classified failing prompt block subsequent prompts", async () => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + await agent.newSession({ + cwd: process.cwd(), + } as never); + + mockCodexConnection.prompt.mockRejectedValueOnce( + new Error("API Error: 529 overloaded_error"), + ); + mockCodexConnection.prompt.mockResolvedValueOnce({ + stopReason: "end_turn", + }); + + await expect( + agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "A" }], + } as never), + ).rejects.toMatchObject({ + data: { + classification: "upstream_provider_failure", + result: "API Error: 529 overloaded_error", + }, + }); + + await expect( + agent.prompt({ + sessionId: "session-1", + prompt: [{ type: "text", text: "B" }], + } as never), + ).resolves.toEqual({ stopReason: "end_turn" }); + }); + describe("structured output injection", () => { const schema = { type: "object", diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts index 7a39ad014..4313d29c0 100644 --- a/packages/agent/src/adapters/codex/codex-agent.ts +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -62,6 +62,7 @@ import { nodeWritableToWebWritable, } from "../../utils/streams"; import { BaseAcpAgent, type BaseSession } from "../base-acp-agent"; +import { classifyAgentError } from "../error-classification"; import { createCodexClient } from "./codex-client"; import { normalizeCodexConfigOptions } from "./models"; import { @@ -138,6 +139,19 @@ function prependPrContext(params: PromptRequest): PromptRequest { }; } +function classifyPromptError(error: unknown): unknown { + const message = error instanceof Error ? error.message : String(error ?? ""); + const classification = classifyAgentError(message); + if (classification === "agent_error") { + return error; + } + + return RequestError.internalError( + { classification, result: message }, + message, + ); +} + const CODEX_NATIVE_MODE: Record = { auto: "auto", default: "auto", @@ -577,6 +591,8 @@ export class CodexAcpAgent extends BaseAcpAgent { let response: PromptResponse; try { response = await this.codexConnection.prompt(prependPrContext(params)); + } catch (error) { + throw classifyPromptError(error); } finally { this.session.promptRunning = false; } diff --git a/packages/agent/src/adapters/error-classification.ts b/packages/agent/src/adapters/error-classification.ts new file mode 100644 index 000000000..cf51ab762 --- /dev/null +++ b/packages/agent/src/adapters/error-classification.ts @@ -0,0 +1,30 @@ +export type AgentErrorClassification = + | "upstream_stream_terminated" + | "upstream_connection_error" + | "upstream_provider_failure" + | "agent_error"; + +const UPSTREAM_PROVIDER_ERROR_STATUS_PATTERN = /API Error:\s*(?:429|5\d\d)\b/i; + +/** + * Classify error strings surfaced by agent adapters. Transient upstream + * failures are retriable when they match exact stream/connection patterns or + * retryable provider HTTP statuses; most other errors are not. + */ +export function classifyAgentError( + result: string | undefined, +): AgentErrorClassification { + if (!result) return "agent_error"; + const text = result.trim(); + // Anthropic SDK surfaces an undici fetch abort as "API Error: terminated". + if (/API Error:\s*terminated\b/i.test(text)) { + return "upstream_stream_terminated"; + } + if (/API Error:\s*Connection error\b/i.test(text)) { + return "upstream_connection_error"; + } + if (UPSTREAM_PROVIDER_ERROR_STATUS_PATTERN.test(text)) { + return "upstream_provider_failure"; + } + return "agent_error"; +} diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 12ea84c47..5a7b856e1 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -24,7 +24,7 @@ import { import { type AgentErrorClassification, classifyAgentError, -} from "../adapters/claude/conversion/sdk-to-acp"; +} from "../adapters/error-classification"; import type { PermissionMode } from "../execution-mode"; import { DEFAULT_CODEX_MODEL } from "../gateway-models"; import { HandoffCheckpointTracker } from "../handoff-checkpoint"; @@ -65,9 +65,20 @@ import type { AgentServerConfig } from "./types"; const agentErrorClassificationSchema = z.enum([ "upstream_stream_terminated", "upstream_connection_error", + "upstream_provider_failure", "agent_error", ]) satisfies z.ZodType; +export const UPSTREAM_PROVIDER_FAILURE_MESSAGE = + "The upstream AI provider failed to process the request. Please retry the task in a few minutes."; + +const upstreamProviderFailureClassifications = + new Set([ + "upstream_stream_terminated", + "upstream_connection_error", + "upstream_provider_failure", + ]); + const errorWithClassificationSchema = z.object({ data: z.object({ classification: agentErrorClassificationSchema }), }); @@ -1051,12 +1062,11 @@ export class AgentServer { error: unknown, ): Promise { const { classification, message } = this.extractErrorClassification(error); - const errorMessage = - classification === "upstream_stream_terminated" - ? "Upstream LLM stream terminated" - : classification === "upstream_connection_error" - ? "Upstream LLM connection error" - : message || "Agent error"; + const errorMessage = upstreamProviderFailureClassifications.has( + classification, + ) + ? UPSTREAM_PROVIDER_FAILURE_MESSAGE + : message || "Agent error"; this.logger.error(`send_${phase}_task_message_failed`, { classification, message, diff --git a/packages/agent/src/server/question-relay.test.ts b/packages/agent/src/server/question-relay.test.ts index daf597ec1..63692ce56 100644 --- a/packages/agent/src/server/question-relay.test.ts +++ b/packages/agent/src/server/question-relay.test.ts @@ -1,11 +1,11 @@ import { type SetupServerApi, setupServer } from "msw/node"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { classifyAgentError } from "../adapters/claude/conversion/sdk-to-acp"; +import { classifyAgentError } from "../adapters/error-classification"; import type { PostHogAPIClient } from "../posthog-api"; import { createTestRepo, type TestRepo } from "../test/fixtures/api"; import { createPostHogHandlers } from "../test/mocks/msw-handlers"; import type { Task, TaskRun } from "../types"; -import { AgentServer } from "./agent-server"; +import { AgentServer, UPSTREAM_PROVIDER_FAILURE_MESSAGE } from "./agent-server"; interface TestableAgentServer { posthogAPI: PostHogAPIClient; @@ -76,10 +76,28 @@ function createTransientConnectionError(): Error & { return error; } +function createUpstreamProviderFailureError(): Error & { + data: { classification: string; result: string }; +} { + const result = + 'API Error: 529 {"error":{"message":"{\\"type\\":\\"error\\",\\"error\\":{\\"type\\":\\"overloaded_error\\",\\"message\\":\\"Overloaded\\"}}","type":"api_error"}}'; + const error = new Error(result) as Error & { + data: { classification: string; result: string }; + }; + error.data = { + classification: "upstream_provider_failure", + result, + }; + return error; +} + describe("Question relay", () => { it.each([ ["API Error: terminated", "upstream_stream_terminated"], ["API Error: Connection error", "upstream_connection_error"], + ["API Error: 429 rate_limit_error", "upstream_provider_failure"], + ["API Error: 529 overloaded_error", "upstream_provider_failure"], + ["API Error: 503 internal_error", "upstream_provider_failure"], ["something else", "agent_error"], [undefined, "agent_error"], ])("classifies %p as %s", (message, expected) => { @@ -590,12 +608,56 @@ describe("Question relay", () => { "test-run-id", { status: "failed", - error_message: "Upstream LLM stream terminated", + error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE, + }, + ); + }); + + it("surfaces upstream provider failures with a retryable message", async () => { + vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({ + id: "test-task-id", + title: "t", + description: "original task description", + } as unknown as Task); + vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({ + id: "test-run-id", + task: "test-task-id", + state: {}, + } as unknown as TaskRun); + + const promptSpy = vi + .fn() + .mockRejectedValueOnce(createUpstreamProviderFailureError()); + const updateTaskRunSpy = vi + .spyOn(server.posthogAPI, "updateTaskRun") + .mockResolvedValue({} as TaskRun); + server.session = { + payload: TEST_PAYLOAD, + acpSessionId: "acp-session", + clientConnection: { prompt: promptSpy }, + logWriter: { + flushAll: vi.fn().mockResolvedValue(undefined), + getFullAgentResponse: vi.fn().mockReturnValue(null), + resetTurnMessages: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + isRegistered: vi.fn().mockReturnValue(true), + }, + }; + + await server.sendInitialTaskMessage(TEST_PAYLOAD); + + expect(promptSpy).toHaveBeenCalledTimes(1); + expect(updateTaskRunSpy).toHaveBeenCalledWith( + "test-task-id", + "test-run-id", + { + status: "failed", + error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE, }, ); }); - it("surfaces upstream connection errors with the connection-specific message", async () => { + it("surfaces upstream connection errors with the shared provider failure message", async () => { vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({ id: "test-task-id", title: "t", @@ -634,7 +696,7 @@ describe("Question relay", () => { "test-run-id", { status: "failed", - error_message: "Upstream LLM connection error", + error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE, }, ); });