Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { image, text } from "../../../utils/acp-content";
import { unreachable } from "../../../utils/common";
import type { Logger } from "../../../utils/logger";
import { tryParsePartialJson } from "../../../utils/partial-json";
import { classifyAgentError } from "../../error-classification";
import { type EnrichedReadCache, registerHookCallback } from "../hooks";
import type {
Session,
Expand Down Expand Up @@ -696,32 +697,6 @@ export type ResultMessageHandlerResult = {
};
};

export type AgentErrorClassification =
| "upstream_stream_terminated"
| "upstream_connection_error"
| "agent_error";

/**
* Classify an error string surfaced by the Claude CLI via `is_error: true`
* result messages. Transient upstream-stream terminations (e.g. the fetch body
* from the LLM gateway is torn down mid-stream) are retriable; most other
* errors are not.
*/
export function classifyAgentError(
result: string | undefined,
): AgentErrorClassification {
if (!result) return "agent_error";
const text = result.trim();
// Anthropic SDK surfaces an undici fetch abort as "API Error: terminated".
if (/API Error:\s*terminated\b/i.test(text)) {
return "upstream_stream_terminated";
}
if (/API Error:\s*Connection error\b/i.test(text)) {
return "upstream_connection_error";
}
return "agent_error";
}

export function handleResultMessage(
message: SDKResultMessage,
): ResultMessageHandlerResult {
Expand Down
83 changes: 83 additions & 0 deletions packages/agent/src/adapters/codex/codex-agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,89 @@ describe("CodexAcpAgent", () => {
).resolves.toEqual({ stopReason: "end_turn" });
});

it.each([
["API Error: 429 rate_limit_error", "upstream_provider_failure"],
["API Error: 503 internal_error", "upstream_provider_failure"],
["API Error: 529 overloaded_error", "upstream_provider_failure"],
["ordinary failure", undefined],
] as const)(
"handles prompt failure %p",
async (message, expectedClassification) => {
const { agent } = createAgent();
mockCodexConnection.newSession.mockResolvedValue({
sessionId: "session-1",
modes: { currentModeId: "auto", availableModes: [] },
configOptions: [],
} satisfies Partial<NewSessionResponse>);
await agent.newSession({
cwd: process.cwd(),
} as never);

const promptError = new Error(message);
mockCodexConnection.prompt.mockRejectedValueOnce(promptError);

let thrown: unknown;
try {
await agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "A" }],
} as never);
} catch (error) {
thrown = error;
}

if (!expectedClassification) {
expect(thrown).toBe(promptError);
return;
}

expect(thrown).toMatchObject({
data: {
classification: expectedClassification,
result: message,
},
});
},
);

it("does not let a classified failing prompt block subsequent prompts", async () => {
const { agent } = createAgent();
mockCodexConnection.newSession.mockResolvedValue({
sessionId: "session-1",
modes: { currentModeId: "auto", availableModes: [] },
configOptions: [],
} satisfies Partial<NewSessionResponse>);
await agent.newSession({
cwd: process.cwd(),
} as never);

mockCodexConnection.prompt.mockRejectedValueOnce(
new Error("API Error: 529 overloaded_error"),
);
mockCodexConnection.prompt.mockResolvedValueOnce({
stopReason: "end_turn",
});

await expect(
agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "A" }],
} as never),
).rejects.toMatchObject({
data: {
classification: "upstream_provider_failure",
result: "API Error: 529 overloaded_error",
},
});

await expect(
agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "B" }],
} as never),
).resolves.toEqual({ stopReason: "end_turn" });
});

describe("structured output injection", () => {
const schema = {
type: "object",
Expand Down
16 changes: 16 additions & 0 deletions packages/agent/src/adapters/codex/codex-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import {
nodeWritableToWebWritable,
} from "../../utils/streams";
import { BaseAcpAgent, type BaseSession } from "../base-acp-agent";
import { classifyAgentError } from "../error-classification";
import { createCodexClient } from "./codex-client";
import { normalizeCodexConfigOptions } from "./models";
import {
Expand Down Expand Up @@ -138,6 +139,19 @@ function prependPrContext(params: PromptRequest): PromptRequest {
};
}

function classifyPromptError(error: unknown): unknown {
const message = error instanceof Error ? error.message : String(error ?? "");
const classification = classifyAgentError(message);
if (classification === "agent_error") {
return error;
}

return RequestError.internalError(
{ classification, result: message },
message,
);
}

const CODEX_NATIVE_MODE: Record<CodeExecutionMode, CodexNativeMode> = {
auto: "auto",
default: "auto",
Expand Down Expand Up @@ -577,6 +591,8 @@ export class CodexAcpAgent extends BaseAcpAgent {
let response: PromptResponse;
try {
response = await this.codexConnection.prompt(prependPrContext(params));
} catch (error) {
throw classifyPromptError(error);
} finally {
this.session.promptRunning = false;
}
Expand Down
30 changes: 30 additions & 0 deletions packages/agent/src/adapters/error-classification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
export type AgentErrorClassification =
| "upstream_stream_terminated"
| "upstream_connection_error"
| "upstream_provider_failure"
| "agent_error";

const UPSTREAM_PROVIDER_ERROR_STATUS_PATTERN = /API Error:\s*(?:429|5\d\d)\b/i;

/**
* Classify error strings surfaced by agent adapters. Transient upstream
* failures are retriable when they match exact stream/connection patterns or
* retryable provider HTTP statuses; most other errors are not.
*/
export function classifyAgentError(
result: string | undefined,
): AgentErrorClassification {
if (!result) return "agent_error";
const text = result.trim();
// Anthropic SDK surfaces an undici fetch abort as "API Error: terminated".
if (/API Error:\s*terminated\b/i.test(text)) {
return "upstream_stream_terminated";
}
if (/API Error:\s*Connection error\b/i.test(text)) {
return "upstream_connection_error";
}
if (UPSTREAM_PROVIDER_ERROR_STATUS_PATTERN.test(text)) {
return "upstream_provider_failure";
}
return "agent_error";
}
24 changes: 17 additions & 7 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
import {
type AgentErrorClassification,
classifyAgentError,
} from "../adapters/claude/conversion/sdk-to-acp";
} from "../adapters/error-classification";
import type { PermissionMode } from "../execution-mode";
import { DEFAULT_CODEX_MODEL } from "../gateway-models";
import { HandoffCheckpointTracker } from "../handoff-checkpoint";
Expand Down Expand Up @@ -65,9 +65,20 @@ import type { AgentServerConfig } from "./types";
const agentErrorClassificationSchema = z.enum([
"upstream_stream_terminated",
"upstream_connection_error",
"upstream_provider_failure",
"agent_error",
]) satisfies z.ZodType<AgentErrorClassification>;

export const UPSTREAM_PROVIDER_FAILURE_MESSAGE =
"The upstream AI provider failed to process the request. Please retry the task in a few minutes.";

const upstreamProviderFailureClassifications =
new Set<AgentErrorClassification>([
"upstream_stream_terminated",
"upstream_connection_error",
"upstream_provider_failure",
]);

const errorWithClassificationSchema = z.object({
data: z.object({ classification: agentErrorClassificationSchema }),
});
Expand Down Expand Up @@ -1051,12 +1062,11 @@ export class AgentServer {
error: unknown,
): Promise<void> {
const { classification, message } = this.extractErrorClassification(error);
const errorMessage =
classification === "upstream_stream_terminated"
? "Upstream LLM stream terminated"
: classification === "upstream_connection_error"
? "Upstream LLM connection error"
: message || "Agent error";
const errorMessage = upstreamProviderFailureClassifications.has(
classification,
)
? UPSTREAM_PROVIDER_FAILURE_MESSAGE
: message || "Agent error";
this.logger.error(`send_${phase}_task_message_failed`, {
classification,
message,
Expand Down
72 changes: 67 additions & 5 deletions packages/agent/src/server/question-relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { type SetupServerApi, setupServer } from "msw/node";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { classifyAgentError } from "../adapters/claude/conversion/sdk-to-acp";
import { classifyAgentError } from "../adapters/error-classification";
import type { PostHogAPIClient } from "../posthog-api";
import { createTestRepo, type TestRepo } from "../test/fixtures/api";
import { createPostHogHandlers } from "../test/mocks/msw-handlers";
import type { Task, TaskRun } from "../types";
import { AgentServer } from "./agent-server";
import { AgentServer, UPSTREAM_PROVIDER_FAILURE_MESSAGE } from "./agent-server";

interface TestableAgentServer {
posthogAPI: PostHogAPIClient;
Expand Down Expand Up @@ -76,10 +76,28 @@ function createTransientConnectionError(): Error & {
return error;
}

function createUpstreamProviderFailureError(): Error & {
data: { classification: string; result: string };
} {
const result =
'API Error: 529 {"error":{"message":"{\\"type\\":\\"error\\",\\"error\\":{\\"type\\":\\"overloaded_error\\",\\"message\\":\\"Overloaded\\"}}","type":"api_error"}}';
const error = new Error(result) as Error & {
data: { classification: string; result: string };
};
error.data = {
classification: "upstream_provider_failure",
result,
};
return error;
}

describe("Question relay", () => {
it.each([
["API Error: terminated", "upstream_stream_terminated"],
["API Error: Connection error", "upstream_connection_error"],
["API Error: 429 rate_limit_error", "upstream_provider_failure"],
["API Error: 529 overloaded_error", "upstream_provider_failure"],
["API Error: 503 internal_error", "upstream_provider_failure"],
["something else", "agent_error"],
[undefined, "agent_error"],
])("classifies %p as %s", (message, expected) => {
Comment thread
tatoalo marked this conversation as resolved.
Expand Down Expand Up @@ -590,12 +608,56 @@ describe("Question relay", () => {
"test-run-id",
{
status: "failed",
error_message: "Upstream LLM stream terminated",
error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE,
},
);
});

it("surfaces upstream provider failures with a retryable message", async () => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
title: "t",
description: "original task description",
} as unknown as Task);
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
id: "test-run-id",
task: "test-task-id",
state: {},
} as unknown as TaskRun);

const promptSpy = vi
.fn()
.mockRejectedValueOnce(createUpstreamProviderFailureError());
const updateTaskRunSpy = vi
.spyOn(server.posthogAPI, "updateTaskRun")
.mockResolvedValue({} as TaskRun);
server.session = {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);

expect(promptSpy).toHaveBeenCalledTimes(1);
expect(updateTaskRunSpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "failed",
error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE,
},
);
});

it("surfaces upstream connection errors with the connection-specific message", async () => {
it("surfaces upstream connection errors with the shared provider failure message", async () => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
title: "t",
Expand Down Expand Up @@ -634,7 +696,7 @@ describe("Question relay", () => {
"test-run-id",
{
status: "failed",
error_message: "Upstream LLM connection error",
error_message: UPSTREAM_PROVIDER_FAILURE_MESSAGE,
},
);
});
Expand Down
Loading