diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index ed8bdb18..d82f0725 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -33,6 +33,7 @@ import type { SkillsListResponse, SandboxPolicy, Thread, + ThreadGoalStatus, ThreadSourceKind, TurnCompletedNotification, UserInput, @@ -329,6 +330,39 @@ export class CodexAcpClient { await this.codexClient.runCompact({threadId: sessionId}); } + async setGoal( + sessionId: string, + objective: string, + onTurnStarted?: (turnId: string) => void, + ): Promise { + return await this.codexClient.runGoalSet({ + threadId: sessionId, + objective, + status: "active", + }, onTurnStarted); + } + + async setGoalStatus(sessionId: string, status: ThreadGoalStatus): Promise { + await this.codexClient.runGoalSet({ + threadId: sessionId, + status, + }); + } + + async resumeGoal( + sessionId: string, + onTurnStarted?: (turnId: string) => void, + ): Promise { + return await this.codexClient.runGoalSet({ + threadId: sessionId, + status: "active", + }, onTurnStarted); + } + + async clearGoal(sessionId: string): Promise { + await this.codexClient.runGoalClear({threadId: sessionId}); + } + async awaitMcpServerStartup(serverNames: Array, afterVersion: number): Promise { return await this.codexClient.awaitMcpServerStartup(serverNames, afterVersion); } diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 720ff95f..92a20cfc 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -14,6 +14,7 @@ import type { ReasoningEffortOption, Thread, ThreadItem, + TurnCompletedNotification, UserInput } from "./app-server/v2"; import type {RateLimitsMap} from "./RateLimitsMap"; @@ -66,6 +67,12 @@ import packageJson from "../package.json"; import {isJetBrains2026_1Client} from "./JBUtils"; import {resolveTerminalOutputMode, type TerminalOutputMode} from "./TerminalOutputMode"; +export interface ThreadGoalSnapshot { + objective: string; + status: string; + tokenBudget: number | null; +} + export interface SessionState { sessionId: string, currentModelId: string, @@ -87,6 +94,7 @@ export interface SessionState { currentModelSupportsFast: boolean; sessionMcpServers?: Array; terminalOutputMode: TerminalOutputMode; + currentGoal?: ThreadGoalSnapshot | null; } interface ActiveAuthState { @@ -1389,6 +1397,13 @@ export class CodexAcpServer { sessionState.lastTokenUsage = null; const activePrompt = this.trackActivePrompt(params.sessionId); let pendingTurnStart: PendingTurnStart | null = null; + const ensurePendingTurnStart = (): PendingTurnStart => { + if (pendingTurnStart === null) { + pendingTurnStart = this.createPendingTurnStart(); + this.pendingTurnStarts.set(params.sessionId, pendingTurnStart); + } + return pendingTurnStart; + }; const disposePromptRequestCancellation = this.observePromptRequestCancellation(signal, sessionState, activePrompt); try { @@ -1408,6 +1423,9 @@ export class CodexAcpServer { } const commandPromise = this.availableCommands.tryHandleCommand(params.prompt, sessionState, { + onTurnStartPending: () => { + ensurePendingTurnStart(); + }, onTurnStarted: (turnId, threadId) => { const turn = {threadId, turnId}; activePrompt.currentTurn = turn; @@ -1416,6 +1434,7 @@ export class CodexAcpServer { return; } sessionState.currentTurnId = turnId; + pendingTurnStart?.resolve(turnId); }, }); void commandPromise.catch((err) => { @@ -1474,8 +1493,7 @@ export class CodexAcpServer { sessionState.fastModeEnabled, sessionState.currentModelSupportsFast, ); - pendingTurnStart = this.createPendingTurnStart(); - this.pendingTurnStarts.set(params.sessionId, pendingTurnStart); + ensurePendingTurnStart(); const sendPromptPromise = this.runWithProcessCheck( () => this.codexAcpClient.sendPrompt( params, @@ -1537,10 +1555,11 @@ export class CodexAcpServer { logger.log("Prompt completed", {sessionId: params.sessionId}); disposePromptRequestCancellation(); sessionState.currentTurnId = null; - if (pendingTurnStart !== null && this.pendingTurnStarts.get(params.sessionId) === pendingTurnStart) { + const registeredPendingTurnStart = this.pendingTurnStarts.get(params.sessionId); + if (registeredPendingTurnStart !== undefined) { this.pendingTurnStarts.delete(params.sessionId); + registeredPendingTurnStart.resolve(null); } - pendingTurnStart?.resolve(null); activePrompt.complete(); } } diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index 6a5a0b51..d1bc210c 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -26,10 +26,19 @@ import type { SkillsExtraRootsSetParams, SkillsListParams, SkillsListResponse, + ThreadGoal, + ThreadGoalUpdatedNotification, + ThreadStatus, + ThreadStatusChangedNotification, ThreadArchiveParams, ThreadArchiveResponse, ThreadCompactStartParams, ThreadCompactStartResponse, + ThreadGoalClearedNotification, + ThreadGoalClearParams, + ThreadGoalClearResponse, + ThreadGoalSetParams, + ThreadGoalSetResponse, ThreadLoadedListParams, ThreadLoadedListResponse, ThreadListParams, @@ -101,6 +110,8 @@ const McpServerElicitationRequest = new RequestType< void >('mcpServer/elicitation/request'); +const GOAL_RUNTIME_EFFECTS_GRACE_MS = 1_000; + /** * A type-safe client over the Codex App Server's JSON-RPC API. * Maps each request to its expected response and exposes clear, typed methods for supported JSON-RPC operations. @@ -115,6 +126,10 @@ export class CodexAppServerClient { private readonly pendingTurnCompletionResolvers = new Map void>>(); private readonly pendingCompactionCompletionResolvers = new Map void>>(); private readonly turnCompletionCaptures = new Map void>>(); + private readonly turnRoutingCaptures = new Map void>>(); + private readonly threadStatusCaptures = new Map void>>(); + private readonly threadGoalUpdateCaptures = new Map void>>(); + private readonly threadGoalClearedCaptures = new Map void>>(); private readonly staleTurnIds = new Map>(); constructor(connection: MessageConnection) { @@ -136,15 +151,21 @@ export class CodexAppServerClient { if (isCompactionCompletedNotification(serverNotification)) { this.recordCompactionCompleted(serverNotification); } + if (isThreadStatusChangedNotification(serverNotification)) { + this.recordThreadStatusChanged(serverNotification.params); + } + if (isThreadGoalUpdatedNotification(serverNotification)) { + this.recordThreadGoalUpdated(serverNotification.params); + } + if (isThreadGoalClearedNotification(serverNotification)) { + this.recordThreadGoalCleared(serverNotification.params); + } const routing = extractTurnRouting(serverNotification); - const staleTurnNotification = this.isStaleTurn(routing.threadId, routing.turnId); - if (staleTurnNotification) { - if (isTurnCompletedNotification(serverNotification) && routing.threadId !== null && routing.turnId !== null) { - this.clearStaleTurn(routing.threadId, routing.turnId); - } - for (const callback of this.codexEventHandlers) { - callback({ eventType: "notification", ...serverNotification }); - } + if (this.handleStaleTurnNotification(serverNotification, routing)) { + return; + } + this.recordTurnRouting(routing); + if (this.handleStaleTurnNotification(serverNotification, routing)) { return; } this.notify(serverNotification); @@ -265,6 +286,185 @@ export class CodexAppServerClient { } } + async runGoalSet( + params: ThreadGoalSetParams, + onTurnStarted?: (turnId: string) => void, + runtimeEffectsGraceMs = GOAL_RUNTIME_EFFECTS_GRACE_MS, + ): Promise { + let goalTurnId: string | null = null; + const capturedCompletions: Array = []; + let resolveGoalTurnCompleted: (event: TurnCompletedNotification) => void = () => {}; + const goalTurnCompleted = new Promise((resolve) => { + resolveGoalTurnCompleted = resolve; + }); + const releaseCompletionCapture = this.captureTurnCompletions(params.threadId, (event) => { + capturedCompletions.push(event); + if (goalTurnId === event.turn.id) { + resolveGoalTurnCompleted(event); + } + }); + let resolveGoalTurnStarted: (turnId: string) => void = () => {}; + const goalTurnStarted = new Promise((resolve) => { + resolveGoalTurnStarted = resolve; + }); + let resolveGoalUpdateHandled: () => void = () => {}; + const matchingGoalUpdateHandled = new Promise((resolve) => { + resolveGoalUpdateHandled = () => resolve(null); + }); + let goalUpdateHandled = false; + let expectedGoal: ThreadGoal | null = null; + const noGoalTurnStarted = this.createNoGoalTurnStartedPromise(runtimeEffectsGraceMs); + const capturedGoalUpdates: Array = []; + const releaseRoutingCapture = this.captureTurnRoutings(params.threadId, (turnId) => { + if (!goalUpdateHandled || goalTurnId !== null) { + return; + } + goalTurnId = turnId; + onTurnStarted?.(turnId); + resolveGoalTurnStarted(turnId); + }); + const releaseGoalUpdateCapture = this.captureThreadGoalUpdates(params.threadId, (event) => { + capturedGoalUpdates.push(event); + if (expectedGoal !== null && goalsMatch(event.goal, expectedGoal)) { + goalUpdateHandled = true; + resolveGoalUpdateHandled(); + noGoalTurnStarted.goalUpdated(); + } + }); + const releaseStatusCapture = this.captureThreadStatuses(params.threadId, (status) => { + if (!goalUpdateHandled || goalTurnId !== null) { + return; + } + noGoalTurnStarted.threadStatusChanged(status); + }); + + try { + const goalSetResponse = await this.threadGoalSet(params); + expectedGoal = goalSetResponse.goal; + if (capturedGoalUpdates.some(event => goalsMatch(event.goal, expectedGoal!))) { + goalUpdateHandled = true; + resolveGoalUpdateHandled(); + noGoalTurnStarted.goalUpdated(); + } + if (expectedGoal.status !== "active") { + await matchingGoalUpdateHandled; + return null; + } + const turnId = goalTurnId ?? await Promise.race([goalTurnStarted, noGoalTurnStarted.promise]); + noGoalTurnStarted.release(); + releaseRoutingCapture(); + releaseStatusCapture(); + releaseGoalUpdateCapture(); + if (turnId === null) { + return null; + } + const earlyCompletion = capturedCompletions.find(event => event.turn.id === turnId); + if (earlyCompletion) { + return earlyCompletion; + } + return await goalTurnCompleted; + } finally { + noGoalTurnStarted.release(); + releaseCompletionCapture(); + releaseRoutingCapture(); + releaseStatusCapture(); + releaseGoalUpdateCapture(); + } + } + + async runGoalClear(params: ThreadGoalClearParams): Promise { + let goalClearedHandled = false; + let resolveGoalClearedHandled: () => void = () => {}; + const matchingGoalClearedHandled = new Promise((resolve) => { + resolveGoalClearedHandled = () => resolve(); + }); + const releaseGoalClearedCapture = this.captureThreadGoalClears(params.threadId, () => { + goalClearedHandled = true; + resolveGoalClearedHandled(); + }); + + try { + const response = await this.threadGoalClear(params); + if (!response.cleared || goalClearedHandled) { + return; + } + await matchingGoalClearedHandled; + } finally { + releaseGoalClearedCapture(); + } + } + + private createNoGoalTurnStartedPromise( + runtimeEffectsGraceMs: number, + ): { + promise: Promise, + release: () => void, + goalUpdated: () => void, + threadStatusChanged: (status: ThreadStatus) => void, + } { + let released = false; + let resolved = false; + let goalUpdated = false; + let activeAfterGoalUpdate = false; + let timeout: ReturnType | null = null; + let resolveNoGoalTurnStarted: () => void = () => {}; + const clearTimer = () => { + if (timeout !== null) { + clearTimeout(timeout); + timeout = null; + } + }; + const resolveNoTurn = () => { + if (released || resolved) { + return; + } + resolved = true; + clearTimer(); + resolveNoGoalTurnStarted(); + }; + const scheduleNoTurnTimer = () => { + if (released || resolved || !goalUpdated || activeAfterGoalUpdate || timeout !== null) { + return; + } + timeout = setTimeout(resolveNoTurn, runtimeEffectsGraceMs); + }; + const release = () => { + if (released) { + return; + } + released = true; + clearTimer(); + }; + const promise = new Promise((resolve) => { + resolveNoGoalTurnStarted = () => { + resolve(null); + }; + }); + const handleGoalUpdated = () => { + goalUpdated = true; + scheduleNoTurnTimer(); + }; + const handleThreadStatusChanged = (status: ThreadStatus) => { + if (!goalUpdated || released || resolved) { + return; + } + if (status.type === "active") { + activeAfterGoalUpdate = true; + clearTimer(); + return; + } + if (activeAfterGoalUpdate) { + resolveNoTurn(); + } + }; + return { + promise, + release, + goalUpdated: handleGoalUpdated, + threadStatusChanged: handleThreadStatusChanged, + }; + } + async runCompact(params: ThreadCompactStartParams): Promise { const compactionCompleted = this.awaitCompactionCompleted(params.threadId); await this.threadCompactStart(params); @@ -317,6 +517,14 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "thread/compact/start", params: params }); } + async threadGoalSet(params: ThreadGoalSetParams): Promise { + return await this.sendRequest({ method: "thread/goal/set", params: params }); + } + + async threadGoalClear(params: ThreadGoalClearParams): Promise { + return await this.sendRequest({ method: "thread/goal/clear", params: params }); + } + async listMcpServerStatus(params: ListMcpServerStatusParams): Promise { return await this.sendRequest({ method: "mcpServerStatus/list", params }); } @@ -469,6 +677,65 @@ export class CodexAppServerClient { } } + private recordThreadStatusChanged(event: ThreadStatusChangedNotification): void { + const captures = this.threadStatusCaptures.get(event.threadId); + if (!captures) { + return; + } + for (const capture of captures) { + capture(event.status); + } + } + + private recordThreadGoalUpdated(event: ThreadGoalUpdatedNotification): void { + const captures = this.threadGoalUpdateCaptures.get(event.threadId); + if (!captures) { + return; + } + for (const capture of captures) { + capture(event); + } + } + + private recordThreadGoalCleared(event: ThreadGoalClearedNotification): void { + const captures = this.threadGoalClearedCaptures.get(event.threadId); + if (!captures) { + return; + } + for (const capture of captures) { + capture(); + } + } + + private recordTurnRouting(routing: { threadId: string | null, turnId: string | null }): void { + if (routing.threadId === null || routing.turnId === null) { + return; + } + const captures = this.turnRoutingCaptures.get(routing.threadId); + if (!captures) { + return; + } + for (const capture of captures) { + capture(routing.turnId); + } + } + + private handleStaleTurnNotification( + notification: ServerNotification, + routing: { threadId: string | null, turnId: string | null }, + ): boolean { + if (!this.isStaleTurn(routing.threadId, routing.turnId)) { + return false; + } + if (isTurnCompletedNotification(notification) && routing.threadId !== null && routing.turnId !== null) { + this.clearStaleTurn(routing.threadId, routing.turnId); + } + for (const callback of this.codexEventHandlers) { + callback({ eventType: "notification", ...notification }); + } + return true; + } + private isStaleTurn(threadId: string | null, turnId: string | null): boolean { if (threadId === null || turnId === null) { return false; @@ -514,6 +781,74 @@ export class CodexAppServerClient { }; } + private captureTurnRoutings(threadId: string, capture: (turnId: string) => void): () => void { + const captures = this.turnRoutingCaptures.get(threadId) ?? new Set<(turnId: string) => void>(); + captures.add(capture); + this.turnRoutingCaptures.set(threadId, captures); + let released = false; + return () => { + if (released) { + return; + } + released = true; + captures.delete(capture); + if (captures.size === 0) { + this.turnRoutingCaptures.delete(threadId); + } + }; + } + + private captureThreadStatuses(threadId: string, capture: (status: ThreadStatus) => void): () => void { + const captures = this.threadStatusCaptures.get(threadId) ?? new Set<(status: ThreadStatus) => void>(); + captures.add(capture); + this.threadStatusCaptures.set(threadId, captures); + let released = false; + return () => { + if (released) { + return; + } + released = true; + captures.delete(capture); + if (captures.size === 0) { + this.threadStatusCaptures.delete(threadId); + } + }; + } + + private captureThreadGoalUpdates(threadId: string, capture: (event: ThreadGoalUpdatedNotification) => void): () => void { + const captures = this.threadGoalUpdateCaptures.get(threadId) ?? new Set<(event: ThreadGoalUpdatedNotification) => void>(); + captures.add(capture); + this.threadGoalUpdateCaptures.set(threadId, captures); + let released = false; + return () => { + if (released) { + return; + } + released = true; + captures.delete(capture); + if (captures.size === 0) { + this.threadGoalUpdateCaptures.delete(threadId); + } + }; + } + + private captureThreadGoalClears(threadId: string, capture: () => void): () => void { + const captures = this.threadGoalClearedCaptures.get(threadId) ?? new Set<() => void>(); + captures.add(capture); + this.threadGoalClearedCaptures.set(threadId, captures); + let released = false; + return () => { + if (released) { + return; + } + released = true; + captures.delete(capture); + if (captures.size === 0) { + this.threadGoalClearedCaptures.delete(threadId); + } + }; + } + private resolveMcpServerStartupResolvers(): void { const pendingResolvers: Array = []; for (const resolver of this.mcpServerStartupResolvers) { @@ -618,6 +953,27 @@ function isTurnCompletedNotification(notification: ServerNotification): notifica return notification.method === "turn/completed"; } +function isThreadStatusChangedNotification(notification: ServerNotification): notification is { + method: "thread/status/changed"; + params: ThreadStatusChangedNotification; +} { + return notification.method === "thread/status/changed"; +} + +function isThreadGoalUpdatedNotification(notification: ServerNotification): notification is { + method: "thread/goal/updated"; + params: ThreadGoalUpdatedNotification; +} { + return notification.method === "thread/goal/updated"; +} + +function isThreadGoalClearedNotification(notification: ServerNotification): notification is { + method: "thread/goal/cleared"; + params: ThreadGoalClearedNotification; +} { + return notification.method === "thread/goal/cleared"; +} + function isCompactionCompletedNotification(notification: ServerNotification): notification is CompactionCompletedNotification { if (notification.method === "thread/compacted") { return true; @@ -625,6 +981,14 @@ function isCompactionCompletedNotification(notification: ServerNotification): no return notification.method === "item/completed" && notification.params.item.type === "contextCompaction"; } +function goalsMatch(left: ThreadGoal, right: ThreadGoal): boolean { + return left.threadId === right.threadId + && left.objective === right.objective + && left.status === right.status + && left.tokenBudget === right.tokenBudget + && left.updatedAt === right.updatedAt; +} + function extractThreadId(notification: ServerNotification): string | null { const params = notification.params as { threadId?: unknown } | undefined; if (params && typeof params.threadId === "string") { diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index 457d911e..0698002c 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -18,6 +18,7 @@ export type CommandHandleResult = | { handled: true, turnCompleted?: TurnCompletedNotification }; export type CommandHandleOptions = { + onTurnStartPending?: () => void; onTurnStarted?: (turnId: string, threadId: string) => void; }; @@ -121,6 +122,11 @@ export class CodexCommands { description: "Summarize conversation to avoid hitting the context limit.", input: null }, + { + name: "goal", + description: "Set, pause, resume, or clear a task goal.", + input: { hint: "[|clear|pause|resume]" } + }, { name: "logout", description: "Sign out of Codex. This option is available when you are logged in via ChatGPT.", @@ -164,6 +170,9 @@ export class CodexCommands { await this.runWithProcessCheck(() => this.codexAcpClient.runCompact(sessionId)); return { handled: true }; } + case "goal": { + return await this.runGoalCommand(sessionState, command.rest, options); + } case "review": { const target = this.buildReviewTarget(command.rest); const turnCompleted = await this.runReviewCommand(sessionState, target, options); @@ -260,19 +269,90 @@ export class CodexCommands { target: ReviewTarget, options: CommandHandleOptions, ): Promise { + options.onTurnStartPending?.(); return await this.runWithProcessCheck(() => this.codexAcpClient.runReview( sessionState.sessionId, target, (turnId, threadId) => { - if (options.onTurnStarted) { - options.onTurnStarted(turnId, threadId); - } else { - sessionState.currentTurnId = turnId; - } + this.handleCommandTurnStarted(sessionState, options, turnId, threadId); }, )); } + private async runGoalCommand( + sessionState: SessionState, + rest: string, + options: CommandHandleOptions, + ): Promise { + const sessionId = sessionState.sessionId; + const argument = rest.trim(); + if (argument.length === 0) { + await this.sendCommandUsageMessage("goal", "[|clear|pause|resume]", sessionId); + return { handled: true }; + } + + switch (argument.toLowerCase()) { + case "pause": + await this.runWithProcessCheck(() => this.codexAcpClient.setGoalStatus(sessionId, "paused")); + return { handled: true }; + case "resume": + options.onTurnStartPending?.(); + return this.createGoalCommandResult(await this.runWithProcessCheck(() => this.codexAcpClient.resumeGoal( + sessionId, + (turnId) => { + this.handleCommandTurnStarted(sessionState, options, turnId, sessionId); + }, + ))); + case "clear": + await this.runWithProcessCheck(() => this.codexAcpClient.clearGoal(sessionId)); + return { handled: true }; + } + + if (argument.length > 4000) { + const session = new ACPSessionConnection(this.connection, sessionId); + await session.update({ + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: 'Command "/goal" requires goal text of at most 4000 characters.' + } + }); + return { handled: true }; + } + + options.onTurnStartPending?.(); + return this.createGoalCommandResult(await this.runWithProcessCheck(() => this.codexAcpClient.setGoal( + sessionId, + argument, + (turnId) => { + this.handleCommandTurnStarted(sessionState, options, turnId, sessionId); + }, + ))); + } + + private handleCommandTurnStarted( + sessionState: SessionState, + options: CommandHandleOptions, + turnId: string, + threadId: string, + ): void { + if (options.onTurnStarted) { + options.onTurnStarted(turnId, threadId); + } else { + sessionState.currentTurnId = turnId; + } + } + + private createGoalCommandResult(turnCompleted: TurnCompletedNotification | null): CommandHandleResult { + if (turnCompleted === null) { + return { handled: true }; + } + return { + handled: true, + turnCompleted, + }; + } + private buildReviewTarget(instructions: string): ReviewTarget { if (instructions.length === 0) { return { type: "uncommittedChanges" }; diff --git a/src/CodexEventHandler.ts b/src/CodexEventHandler.ts index 513d5f51..4ed0cdbe 100644 --- a/src/CodexEventHandler.ts +++ b/src/CodexEventHandler.ts @@ -3,7 +3,7 @@ import type { FuzzyFileSearchSessionUpdatedNotification, ServerNotification } from "./app-server"; -import type {SessionState} from "./CodexAcpServer"; +import type {SessionState, ThreadGoalSnapshot} from "./CodexAcpServer"; import {type PlanEntry, RequestError} from "@agentclientprotocol/sdk"; import {ACPSessionConnection, type AcpClientConnection, type UpdateSessionEvent} from "./ACPSessionConnection"; import type { @@ -266,9 +266,15 @@ export class CodexEventHandler { }; } - private createThreadGoalUpdatedEvent(event: ThreadGoalUpdatedNotification): UpdateSessionEvent { + private createThreadGoalUpdatedEvent(event: ThreadGoalUpdatedNotification): UpdateSessionEvent | null { + const goalSnapshot = this.createThreadGoalSnapshot(event); + if (this.sameThreadGoalSnapshot(this.sessionState.currentGoal, goalSnapshot)) { + return null; + } + this.sessionState.currentGoal = goalSnapshot; + const status = this.formatThreadGoalStatus(event.goal.status); - const objective = event.goal.objective.trim(); + const objective = goalSnapshot.objective; const text = objective.includes("\n") ? `Goal updated (${status}):\n${objective}` : `Goal updated (${status}): ${objective}`; @@ -276,7 +282,7 @@ export class CodexEventHandler { sessionUpdate: "agent_message_chunk", content: { type: "text", - text, + text: `\n\n${text}\n\n`, }, }; } @@ -298,16 +304,40 @@ export class CodexEventHandler { } } - private createThreadGoalClearedEvent(_event: ThreadGoalClearedNotification): UpdateSessionEvent { + private createThreadGoalClearedEvent(_event: ThreadGoalClearedNotification): UpdateSessionEvent | null { + if (this.sessionState.currentGoal === null) { + return null; + } + this.sessionState.currentGoal = null; + return { sessionUpdate: "agent_message_chunk", content: { type: "text", - text: "Goal cleared.", + text: "\n\nGoal cleared.\n\n", }, }; } + private createThreadGoalSnapshot(event: ThreadGoalUpdatedNotification): ThreadGoalSnapshot { + return { + objective: event.goal.objective.trim(), + status: event.goal.status, + tokenBudget: event.goal.tokenBudget, + }; + } + + private sameThreadGoalSnapshot( + left: ThreadGoalSnapshot | null | undefined, + right: ThreadGoalSnapshot + ): boolean { + return left !== null + && left !== undefined + && left.objective === right.objective + && left.status === right.status + && left.tokenBudget === right.tokenBudget; + } + private createReasoningDeltaEvent( event: ReasoningSummaryTextDeltaNotification | ReasoningTextDeltaNotification ): UpdateSessionEvent { diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 54436c6a..e645610d 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -13,7 +13,7 @@ import { import type {ServerNotification} from "../../app-server"; import type {SessionState} from "../../CodexAcpServer"; import {AgentMode} from "../../AgentMode"; -import type {Model, ReviewStartResponse, TurnCompletedNotification, TurnStartParams} from "../../app-server/v2"; +import type {Model, ReviewStartResponse, ThreadGoal, TurnCompletedNotification, TurnStartParams} from "../../app-server/v2"; import type {RateLimitsMap} from "../../RateLimitsMap"; import {ModelId} from "../../ModelId"; @@ -1462,6 +1462,1090 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(mockFixture.getAcpConnectionDump([])).toContain("Context compacted"); }); + it('handles goal slash commands through Codex app server', async () => { + const { mockFixture, turnStartSpy } = setupPromptFixture(); + const goalRunSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "runGoalSet") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + const goalClearSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "runGoalClear") + .mockResolvedValue(undefined); + + await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }); + await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal pause" }], + }); + await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal resume" }], + }); + await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal clear" }], + }); + + expect(goalRunSpy).toHaveBeenNthCalledWith(1, { + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, expect.any(Function)); + expect(goalRunSpy).toHaveBeenNthCalledWith(2, { + threadId: "session-id", + status: "paused", + }); + expect(goalRunSpy).toHaveBeenNthCalledWith(3, { + threadId: "session-id", + status: "active", + }, expect.any(Function)); + expect(goalClearSpy).toHaveBeenCalledWith({ threadId: "session-id" }); + expect(turnStartSpy).not.toHaveBeenCalled(); + }); + + it('waits for goal slash command turn routing', async () => { + const { mockFixture } = setupPromptFixture(); + const goal = createThreadGoal(); + vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalSet") + .mockResolvedValue({ goal }); + const awaitTurnCompletedSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "awaitTurnCompleted"); + + let promptResolved = false; + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }).then((response) => { + promptResolved = true; + return response; + }); + + await vi.waitFor(() => { + expect(mockFixture.getCodexAppServerClient().threadGoalSet).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + await Promise.resolve(); + expect(promptResolved).toBe(false); + expect(mockFixture.getCodexAppServerClient().awaitTurnCompleted).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + mockFixture.sendServerNotification({ + method: "thread/status/changed", + params: { + threadId: "session-id", + status: { + type: "active", + activeFlags: [], + }, + }, + }); + await Promise.resolve(); + expect(promptResolved).toBe(false); + expect(mockFixture.getCodexAppServerClient().awaitTurnCompleted).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "turn/started", + params: { + threadId: "session-id", + turn: createTurn("goal-turn-id", "inProgress"), + }, + }); + await Promise.resolve(); + expect(promptResolved).toBe(false); + expect(mockFixture.getCodexAppServerClient().awaitTurnCompleted).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "I", + }, + }); + await Promise.resolve(); + expect(promptResolved).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + + await vi.waitFor(() => { + expect(promptResolved).toBe(true); + }); + await expect(promptPromise).resolves.toEqual(expect.objectContaining({ + stopReason: "end_turn", + })); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + }); + + it('does not complete no-turn goal slash command before the goal update and runtime grace are handled', async () => { + const { mockFixture } = setupPromptFixture(); + const goal = createThreadGoal({updatedAt: 1710000100}); + const threadGoalSetSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalSet") + .mockResolvedValue({ goal }); + let promptResolved = false; + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }).then((response) => { + promptResolved = true; + return response; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + await flushAsyncWork(); + expect(promptResolved).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + await flushAsyncWork(); + expect(promptResolved).toBe(false); + + await expect(promptPromise).resolves.toEqual(expect.objectContaining({ + stopReason: "end_turn", + })); + expect(mockFixture.getAcpConnectionDump([])).toContain("Goal updated (active): Ship the migration and keep tests green"); + }); + + it('completes goal slash command when a turn routes after the goal update', async () => { + const { mockFixture } = setupPromptFixture(); + const goal = createThreadGoal({updatedAt: 1710000150}); + const threadGoalSetSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalSet") + .mockResolvedValue({ goal }); + const awaitTurnCompletedSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + let promptResolved = false; + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }).then((response) => { + promptResolved = true; + return response; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + await flushAsyncWork(); + expect(promptResolved).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + await flushAsyncWork(); + expect(promptResolved).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + + await expect(promptPromise).resolves.toEqual(expect.objectContaining({ + stopReason: "end_turn", + })); + expect(promptResolved).toBe(true); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + }); + + it('waits for goal turn completion after the goal completes before streamed output finishes', async () => { + const { mockFixture } = setupPromptFixture(); + const goal = createThreadGoal({updatedAt: 1710000160}); + const completedGoal = createThreadGoal({ + status: "complete", + updatedAt: 1710000170, + tokensUsed: 42, + timeUsedSeconds: 8, + }); + vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalSet") + .mockResolvedValue({ goal }); + let promptResolved = false; + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal tell me a joke" }], + }).then((response) => { + promptResolved = true; + return response; + }); + + await vi.waitFor(() => { + expect(mockFixture.getCodexAppServerClient().threadGoalSet).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "tell me a joke", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + mockFixture.sendServerNotification({ + method: "thread/status/changed", + params: { + threadId: "session-id", + status: { + type: "active", + activeFlags: [], + }, + }, + }); + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + goal: completedGoal, + }, + }); + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "Why", + }, + }); + await flushAsyncWork(); + expect(promptResolved).toBe(false); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: " did the test wait?", + }, + }); + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + + await expect(promptPromise).resolves.toEqual(expect.objectContaining({ + stopReason: "end_turn", + })); + expect(promptResolved).toBe(true); + }); + + it('does not start the no-turn grace period before the goal update is handled', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000200}); + const threadGoalSet = deferred<{goal: ThreadGoal}>(); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockReturnValue(threadGoalSet.promise); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, undefined).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + threadGoalSet.resolve({goal}); + await vi.advanceTimersByTimeAsync(10_000); + await Promise.resolve(); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + await Promise.resolve(); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + await vi.advanceTimersByTimeAsync(0); + await expect(resultPromise).resolves.toMatchObject({ + threadId: "session-id", + turn: { + id: "goal-turn-id", + status: "completed", + }, + }); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it('completes goal set when a turn routes after the goal update', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000200}); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({goal}); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, undefined).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + await flushAsyncWork(); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + + await flushAsyncWork(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + await expect(resultPromise).resolves.toMatchObject({ + threadId: "session-id", + turn: { + id: "goal-turn-id", + status: "completed", + }, + }); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + }); + + it('keeps goal set pending while the thread is active before a turn routes', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000225}); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({goal}); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, undefined).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + mockFixture.sendServerNotification({ + method: "thread/status/changed", + params: { + threadId: "session-id", + status: { + type: "active", + activeFlags: [], + }, + }, + }); + + await vi.advanceTimersByTimeAsync(10_000); + await Promise.resolve(); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + await vi.advanceTimersByTimeAsync(0); + await expect(resultPromise).resolves.toMatchObject({ + threadId: "session-id", + turn: { + id: "goal-turn-id", + status: "completed", + }, + }); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it('keeps goal set pending after turn start until turn completion routes', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000235}); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({goal}); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + const onTurnStarted = vi.fn(); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, onTurnStarted).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + mockFixture.sendServerNotification({ + method: "turn/started", + params: { + threadId: "session-id", + turn: createTurn("goal-turn-id", "inProgress"), + }, + }); + + await vi.advanceTimersByTimeAsync(10_000); + await Promise.resolve(); + expect(onTurnStarted).toHaveBeenCalledWith("goal-turn-id"); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + await vi.advanceTimersByTimeAsync(0); + await expect(resultPromise).resolves.toMatchObject({ + threadId: "session-id", + turn: { + id: "goal-turn-id", + status: "completed", + }, + }); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it('completes goal set when active thread returns idle without routing a turn', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000250}); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({goal}); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, undefined).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + mockFixture.sendServerNotification({ + method: "thread/status/changed", + params: { + threadId: "session-id", + status: { + type: "active", + activeFlags: [], + }, + }, + }); + + await vi.advanceTimersByTimeAsync(10_000); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/status/changed", + params: { + threadId: "session-id", + status: { type: "idle" }, + }, + }); + + await expect(resultPromise).resolves.toBeNull(); + } finally { + vi.useRealTimers(); + } + }); + + it('waits for paused goal update before completing goal status set', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({status: "paused", updatedAt: 1710000260}); + const threadGoalSet = deferred<{goal: ThreadGoal}>(); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockReturnValue(threadGoalSet.promise); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted"); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + status: "paused", + }).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + status: "paused", + }); + }); + + threadGoalSet.resolve({goal}); + await flushAsyncWork(); + expect(resultSettled).toBe(false); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + await expect(resultPromise).resolves.toBeNull(); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + }); + + it('waits for goal cleared notification before completing goal clear', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const threadGoalClear = deferred<{cleared: boolean}>(); + const threadGoalClearSpy = vi.spyOn(codexAppServerClient, "threadGoalClear") + .mockReturnValue(threadGoalClear.promise); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalClear({ + threadId: "session-id", + }).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalClearSpy).toHaveBeenCalledWith({ + threadId: "session-id", + }); + }); + + threadGoalClear.resolve({cleared: true}); + await flushAsyncWork(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/goal/cleared", + params: { + threadId: "session-id", + }, + }); + + await expect(resultPromise).resolves.toBeUndefined(); + }); + + it('interrupts a late-started goal slash command after the ACP prompt request is cancelled', async () => { + const { mockFixture } = setupPromptFixture(); + const goalCompleted = deferred(); + let startGoalTurn: () => void = () => {}; + const goalRunSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "runGoalSet") + .mockImplementation((_params, onTurnStarted) => { + startGoalTurn = () => onTurnStarted?.("goal-turn-id"); + return goalCompleted.promise; + }); + const turnInterruptSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "turnInterrupt") + .mockImplementation(async ({threadId, turnId}) => { + goalCompleted.resolve({ + threadId, + turn: createTurn(turnId, "interrupted"), + }); + }); + const controller = new AbortController(); + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }, controller.signal); + + await vi.waitFor(() => { + expect(goalRunSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, expect.any(Function)); + }); + + controller.abort(); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + expect(turnInterruptSpy).not.toHaveBeenCalled(); + + startGoalTurn(); + + await vi.waitFor(() => { + expect(turnInterruptSpy).toHaveBeenCalledWith({ + threadId: "session-id", + turnId: "goal-turn-id", + }); + }); + }); + + it('interrupts a goal slash command when ACP cancel arrives before the first routed turn', async () => { + const { mockFixture, sessionState } = setupPromptFixture(); + // @ts-expect-error - registering local session state for the ACP cancel path + mockFixture.getCodexAcpAgent().sessions.set("session-id", sessionState); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal(); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({ goal }); + const turnInterruptSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "turnInterrupt") + .mockImplementation(async ({threadId, turnId}) => { + mockFixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId, + turn: createTurn(turnId, "interrupted"), + }, + }); + }); + let cancelResolved = false; + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + const cancelPromise = mockFixture.getCodexAcpAgent().cancel({sessionId: "session-id"}) + .then(() => { + cancelResolved = true; + }); + await flushAsyncWork(); + expect(cancelResolved).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "goal output", + }, + }); + + await vi.waitFor(() => { + expect(turnInterruptSpy).toHaveBeenCalledWith({ + threadId: "session-id", + turnId: "goal-turn-id", + }); + }); + await expect(cancelPromise).resolves.toBeUndefined(); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + }); + + it('suppresses the first routed goal notification after cancellation marks the turn stale', async () => { + const { mockFixture } = setupPromptFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal(); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({ goal }); + const turnInterruptSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "turnInterrupt") + .mockResolvedValue(undefined); + const controller = new AbortController(); + + const promptPromise = mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }, controller.signal); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + + controller.abort(); + await expect(promptPromise).resolves.toMatchObject({stopReason: "cancelled"}); + mockFixture.clearAcpConnectionDump(); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "leaked goal output", + }, + }); + + await vi.waitFor(() => { + expect(turnInterruptSpy).toHaveBeenCalledWith({ + threadId: "session-id", + turnId: "goal-turn-id", + }); + }); + await flushAsyncWork(); + expect(mockFixture.getAcpConnectionDump([])).not.toContain("leaked goal output"); + }); + + it('does not hang when goal set starts no continuation turn', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000300}); + const threadGoalSetSpy = vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({ goal }); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted"); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, 0).finally(() => { + resultSettled = true; + }); + + await vi.waitFor(() => { + expect(threadGoalSetSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }); + }); + await flushAsyncWork(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + + await expect(resultPromise).resolves.toBeNull(); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + }); + + it('keeps goal set pending after elapsed startup time until a turn is routed', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const goal = createThreadGoal({updatedAt: 1710000400}); + vi.spyOn(codexAppServerClient, "threadGoalSet") + .mockResolvedValue({ goal }); + const awaitTurnCompletedSpy = vi.spyOn(codexAppServerClient, "awaitTurnCompleted") + .mockResolvedValue({ + threadId: "session-id", + turn: createTurn("goal-turn-id", "completed"), + }); + let resultSettled = false; + + const resultPromise = codexAppServerClient.runGoalSet({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, undefined, undefined).finally(() => { + resultSettled = true; + }); + + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(10_000); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "thread/goal/updated", + params: { + threadId: "session-id", + turnId: null, + goal, + }, + }); + await Promise.resolve(); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "item/agentMessage/delta", + params: { + threadId: "session-id", + turnId: "goal-turn-id", + itemId: "goal-message-id", + delta: "late goal output", + }, + }); + + await Promise.resolve(); + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(0); + expect(resultSettled).toBe(false); + + mockFixture.sendServerNotification(createTurnCompletedNotification("session-id", "goal-turn-id")); + await vi.advanceTimersByTimeAsync(0); + await expect(resultPromise).resolves.toMatchObject({ + threadId: "session-id", + turn: { + id: "goal-turn-id", + status: "completed", + }, + }); + expect(awaitTurnCompletedSpy).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it('completes goal slash command when app server starts no continuation turn', async () => { + const { mockFixture, turnStartSpy } = setupPromptFixture(); + const goalRunSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "runGoalSet") + .mockResolvedValue(null); + + const response = await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal Ship the migration and keep tests green" }], + }); + + expect(response.stopReason).toBe("end_turn"); + expect(goalRunSpy).toHaveBeenCalledWith({ + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + }, expect.any(Function)); + expect(turnStartSpy).not.toHaveBeenCalled(); + }); + + it('reports missing goal slash command input', async () => { + const { mockFixture } = setupPromptFixture(); + const goalSetSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalSet") + .mockResolvedValue({ goal: createThreadGoal() }); + const goalClearSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "threadGoalClear") + .mockResolvedValue({ cleared: true }); + + await mockFixture.getCodexAcpAgent().prompt({ + sessionId: "session-id", + prompt: [{ type: "text", text: "/goal" }], + }); + + expect(goalSetSpy).not.toHaveBeenCalled(); + expect(goalClearSpy).not.toHaveBeenCalled(); + const [event] = mockFixture.getAcpConnectionEvents([]); + expect(event).toBeDefined(); + expect(event!.args[0].update.content.text).toBe( + 'Command "/goal" requires [|clear|pause|resume].' + ); + }); it('returns cancelled promptly when non-interruptible slash command startup is cancelled', async () => { const { mockFixture } = setupPromptFixture(); const compactStartSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "threadCompactStart") @@ -1895,6 +2979,20 @@ describe('ACP server test', { timeout: 40_000 }, () => { }; } + function createThreadGoal(overrides?: Partial): ThreadGoal { + return { + threadId: "session-id", + objective: "Ship the migration and keep tests green", + status: "active", + tokenBudget: null, + tokensUsed: 0, + timeUsedSeconds: 0, + createdAt: 1710000000, + updatedAt: 1710000000, + ...overrides, + }; + } + it ('should disable reasoning.summary if key authorization is used', async () => { const { mockFixture, turnStartSpy } = setupPromptFixture({ account: { type: "apiKey" } }); diff --git a/src/__tests__/CodexACPAgent/data/available-commands-build-in.json b/src/__tests__/CodexACPAgent/data/available-commands-build-in.json index d805f6b0..26e78cbb 100644 --- a/src/__tests__/CodexACPAgent/data/available-commands-build-in.json +++ b/src/__tests__/CodexACPAgent/data/available-commands-build-in.json @@ -47,6 +47,13 @@ "description": "Summarize conversation to avoid hitting the context limit.", "input": null }, + { + "name": "goal", + "description": "Set, pause, resume, or clear a task goal.", + "input": { + "hint": "[|clear|pause|resume]" + } + }, { "name": "logout", "description": "Sign out of Codex. This option is available when you are logged in via ChatGPT.", diff --git a/src/__tests__/CodexACPAgent/data/available-commands-skills.json b/src/__tests__/CodexACPAgent/data/available-commands-skills.json index dc1b0101..59987072 100644 --- a/src/__tests__/CodexACPAgent/data/available-commands-skills.json +++ b/src/__tests__/CodexACPAgent/data/available-commands-skills.json @@ -47,6 +47,13 @@ "description": "Summarize conversation to avoid hitting the context limit.", "input": null }, + { + "name": "goal", + "description": "Set, pause, resume, or clear a task goal.", + "input": { + "hint": "[|clear|pause|resume]" + } + }, { "name": "logout", "description": "Sign out of Codex. This option is available when you are logged in via ChatGPT.", diff --git a/src/__tests__/CodexACPAgent/data/load-session-history.json b/src/__tests__/CodexACPAgent/data/load-session-history.json index c8d0b1eb..bb272cf7 100644 --- a/src/__tests__/CodexACPAgent/data/load-session-history.json +++ b/src/__tests__/CodexACPAgent/data/load-session-history.json @@ -47,6 +47,13 @@ "description": "Summarize conversation to avoid hitting the context limit.", "input": null }, + { + "name": "goal", + "description": "Set, pause, resume, or clear a task goal.", + "input": { + "hint": "[|clear|pause|resume]" + } + }, { "name": "logout", "description": "Sign out of Codex. This option is available when you are logged in via ChatGPT.", diff --git a/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json b/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json index 94a65ad8..c5d03f99 100644 --- a/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json +++ b/src/__tests__/CodexACPAgent/data/load-session-response-item-history-fallback.json @@ -47,6 +47,13 @@ "description": "Summarize conversation to avoid hitting the context limit.", "input": null }, + { + "name": "goal", + "description": "Set, pause, resume, or clear a task goal.", + "input": { + "hint": "[|clear|pause|resume]" + } + }, { "name": "logout", "description": "Sign out of Codex. This option is available when you are logged in via ChatGPT.", diff --git a/src/__tests__/CodexACPAgent/data/thread-goal-cleared.json b/src/__tests__/CodexACPAgent/data/thread-goal-cleared.json index bfd4ac7a..c6fc87f1 100644 --- a/src/__tests__/CodexACPAgent/data/thread-goal-cleared.json +++ b/src/__tests__/CodexACPAgent/data/thread-goal-cleared.json @@ -7,7 +7,7 @@ "sessionUpdate": "agent_message_chunk", "content": { "type": "text", - "text": "Goal cleared." + "text": "\n\nGoal cleared.\n\n" } } } diff --git a/src/__tests__/CodexACPAgent/data/thread-goal-updated-multiline.json b/src/__tests__/CodexACPAgent/data/thread-goal-updated-multiline.json index 79c6d2c0..b9336936 100644 --- a/src/__tests__/CodexACPAgent/data/thread-goal-updated-multiline.json +++ b/src/__tests__/CodexACPAgent/data/thread-goal-updated-multiline.json @@ -7,7 +7,7 @@ "sessionUpdate": "agent_message_chunk", "content": { "type": "text", - "text": "Goal updated (budget limited):\nFirst task\nSecond task" + "text": "\n\nGoal updated (budget limited):\nFirst task\nSecond task\n\n" } } } diff --git a/src/__tests__/CodexACPAgent/data/thread-goal-updated.json b/src/__tests__/CodexACPAgent/data/thread-goal-updated.json index 19e32e97..264c1e34 100644 --- a/src/__tests__/CodexACPAgent/data/thread-goal-updated.json +++ b/src/__tests__/CodexACPAgent/data/thread-goal-updated.json @@ -7,7 +7,7 @@ "sessionUpdate": "agent_message_chunk", "content": { "type": "text", - "text": "Goal updated (active): Ship the goal update" + "text": "\n\nGoal updated (active): Ship the goal update\n\n" } } } diff --git a/src/__tests__/CodexACPAgent/thread-goal-events.test.ts b/src/__tests__/CodexACPAgent/thread-goal-events.test.ts index d121cf85..2db982de 100644 --- a/src/__tests__/CodexACPAgent/thread-goal-events.test.ts +++ b/src/__tests__/CodexACPAgent/thread-goal-events.test.ts @@ -18,12 +18,6 @@ describe("CodexEventHandler - thread goal events", () => { vi.clearAllMocks(); }); - const sessionState: SessionState = createTestSessionState({ - sessionId, - currentModelId: "model-id[effort]", - agentMode: AgentMode.DEFAULT_AGENT_MODE, - }); - it("should send thread goal updates as agent messages", async () => { const goalUpdatedNotification: ServerNotification = { method: "thread/goal/updated", @@ -43,7 +37,7 @@ describe("CodexEventHandler - thread goal events", () => { }, }; - await setupPromptAndSendNotifications(mockFixture, sessionId, sessionState, [goalUpdatedNotification]); + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [goalUpdatedNotification]); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot( "data/thread-goal-updated.json" @@ -69,7 +63,7 @@ describe("CodexEventHandler - thread goal events", () => { }, }; - await setupPromptAndSendNotifications(mockFixture, sessionId, sessionState, [goalUpdatedNotification]); + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [goalUpdatedNotification]); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot( "data/thread-goal-updated-multiline.json" @@ -84,10 +78,114 @@ describe("CodexEventHandler - thread goal events", () => { }, }; - await setupPromptAndSendNotifications(mockFixture, sessionId, sessionState, [goalClearedNotification]); + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [goalClearedNotification]); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot( "data/thread-goal-cleared.json" ); }); + + it("should suppress duplicate thread goal updates", async () => { + const goalUpdatedNotification: ServerNotification = { + method: "thread/goal/updated", + params: { + threadId: sessionId, + turnId: "turn-1", + goal: { + threadId: sessionId, + objective: "Ship the goal update", + status: "active", + tokenBudget: null, + tokensUsed: 42, + timeUsedSeconds: 12, + createdAt: 1710000000, + updatedAt: 1710000012, + }, + }, + }; + const duplicateGoalUpdatedNotification: ServerNotification = { + method: "thread/goal/updated", + params: { + ...goalUpdatedNotification.params, + goal: { + ...goalUpdatedNotification.params.goal, + tokensUsed: 84, + timeUsedSeconds: 24, + updatedAt: 1710000024, + }, + }, + }; + + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [ + goalUpdatedNotification, + duplicateGoalUpdatedNotification, + ]); + + const events = mockFixture.getAcpConnectionEvents([]); + expect(events).toHaveLength(1); + expect(events[0]!.args[0].update.content.text).toBe("\n\nGoal updated (active): Ship the goal update\n\n"); + }); + + it("should separate completed goal updates from preceding agent text", async () => { + const goalCompletedNotification: ServerNotification = { + method: "thread/goal/updated", + params: { + threadId: sessionId, + turnId: "turn-1", + goal: { + threadId: sessionId, + objective: "tell me a joke", + status: "complete", + tokenBudget: null, + tokensUsed: 42, + timeUsedSeconds: 12, + createdAt: 1710000000, + updatedAt: 1710000012, + }, + }, + }; + + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [ + { + method: "item/agentMessage/delta", + params: { + threadId: sessionId, + turnId: "turn-1", + itemId: "message-1", + delta: "Because they kept losing interest in `any`.", + }, + }, + goalCompletedNotification, + ]); + + const events = mockFixture.getAcpConnectionEvents([]); + expect(events).toHaveLength(2); + expect(events[1]!.args[0].update.content.text).toBe("\n\nGoal updated (complete): tell me a joke\n\n"); + }); + + it("should suppress duplicate thread goal cleared notifications", async () => { + const goalClearedNotification: ServerNotification = { + method: "thread/goal/cleared", + params: { + threadId: sessionId, + }, + }; + + await setupPromptAndSendNotifications(mockFixture, sessionId, createSessionState(), [ + goalClearedNotification, + goalClearedNotification, + ]); + + const events = mockFixture.getAcpConnectionEvents([]); + expect(events).toHaveLength(1); + expect(events[0]!.args[0].update.content.text).toBe("\n\nGoal cleared.\n\n"); + }); + + function createSessionState(): SessionState { + return createTestSessionState({ + sessionId, + currentModelId: "model-id[effort]", + agentMode: AgentMode.DEFAULT_AGENT_MODE, + }); + } });