From a05668a8187c8f309fd7bf104bc43039c06ca577 Mon Sep 17 00:00:00 2001 From: Frederico Luz Date: Mon, 18 May 2026 14:45:22 +0100 Subject: [PATCH] Fix local Codex queued messages getting stuck --- .../features/sessions/service/service.test.ts | 135 +++++++++++++++++- .../features/sessions/service/service.ts | 23 +-- 2 files changed, 145 insertions(+), 13 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 5ffe3ebf2..986b4619a 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -73,7 +73,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ enqueueMessage: vi.fn(), removeQueuedMessage: vi.fn(), clearMessageQueue: vi.fn(), - dequeueMessagesAsText: vi.fn(() => null), + dequeueMessagesAsText: vi.fn((): string | null => null), dequeueMessages: vi.fn( () => [] as Array<{ @@ -2242,6 +2242,129 @@ describe("SessionService", () => { }); }); + describe("local queue draining", () => { + async function connectLocalCodexSession() { + const service = getSessionService(); + const sessions: Record = {}; + mockSessionStoreSetters.setSession.mockImplementation( + (session: AgentSession) => { + sessions[session.taskRunId] = session; + }, + ); + mockSessionStoreSetters.getSessions.mockImplementation(() => sessions); + mockSessionStoreSetters.getSessionByTaskId.mockImplementation( + (taskId: string) => + Object.values(sessions).find((session) => session.taskId === taskId), + ); + mockSessionStoreSetters.updateSession.mockImplementation( + (taskRunId: string, updates: Partial) => { + const existing = sessions[taskRunId]; + if (!existing) return; + sessions[taskRunId] = { ...existing, ...updates }; + }, + ); + mockBuildAuthenticatedClient.mockReturnValue({ + ...mockAuthenticatedClient, + createTaskRun: vi.fn().mockResolvedValue({ id: "run-123" }), + appendTaskRunLog: vi.fn(), + }); + mockTrpcAgent.start.mutate.mockResolvedValue({ + channel: "agent-event:run-123", + configOptions: [], + }); + + await service.connectToTask({ + task: createMockTask(), + repoPath: "/repo", + adapter: "codex", + }); + + const subscriptionOptions = mockTrpcAgent.onSessionEvent.subscribe.mock + .calls[0]?.[1] as { onData: (payload: unknown) => void }; + + return { service, sessions, onData: subscriptionOptions.onData }; + } + + it("drains queued messages when Codex turn_complete arrives before prompt response", async () => { + const { sessions, onData } = await connectLocalCodexSession(); + mockSessionStoreSetters.dequeueMessagesAsText.mockReturnValue( + "investigate another", + ); + mockTrpcAgent.prompt.mutate.mockResolvedValue({ stopReason: "end_turn" }); + sessions["run-123"] = { + ...sessions["run-123"], + isPromptPending: true, + currentPromptId: 42, + messageQueue: [ + { id: "q-1", content: "investigate another", queuedAt: 1700000001 }, + ], + }; + + onData({ + type: "acp_message", + ts: 1700000002, + message: { + jsonrpc: "2.0", + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }); + expect(sessions["run-123"].currentPromptId).toBe(42); + + onData({ + type: "acp_message", + ts: 1700000003, + message: { + jsonrpc: "2.0", + id: 42, + result: { stopReason: "end_turn" }, + }, + }); + + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.dequeueMessagesAsText, + ).toHaveBeenCalledWith("task-123"); + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith({ + sessionId: "run-123", + prompt: [{ type: "text", text: "investigate another" }], + }); + }); + }); + + it("does not drain queued messages from a cancelled local prompt response", async () => { + const { service, sessions, onData } = await connectLocalCodexSession(); + sessions["run-123"] = { + ...sessions["run-123"], + isPromptPending: true, + currentPromptId: 42, + messageQueue: [ + { id: "q-1", content: "investigate another", queuedAt: 1700000001 }, + ], + }; + mockTrpcAgent.cancelPrompt.mutate.mockResolvedValue(true); + + await service.cancelPrompt("task-123"); + expect(sessions["run-123"].currentPromptId).toBeNull(); + + onData({ + type: "acp_message", + ts: 1700000003, + message: { + jsonrpc: "2.0", + id: 42, + result: { stopReason: "cancelled" }, + }, + }); + await Promise.resolve(); + + expect( + mockSessionStoreSetters.dequeueMessagesAsText, + ).not.toHaveBeenCalled(); + expect(mockTrpcAgent.prompt.mutate).not.toHaveBeenCalled(); + }); + }); + describe("sendPrompt", () => { it("throws when offline", async () => { mockGetIsOnline.mockReturnValue(false); @@ -2832,13 +2955,21 @@ describe("SessionService", () => { it("calls cancelPrompt mutation", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( - createMockSession(), + createMockSession({ isPromptPending: true, currentPromptId: 42 }), ); mockTrpcAgent.cancelPrompt.mutate.mockResolvedValue(true); const result = await service.cancelPrompt("task-123"); expect(result).toBe(true); + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + { + isPromptPending: false, + promptStartedAt: null, + currentPromptId: null, + }, + ); expect(mockTrpcAgent.cancelPrompt.mutate).toHaveBeenCalledWith({ sessionId: "run-123", }); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index bcfda3422..5addc859a 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -180,7 +180,7 @@ function buildCloudDefaultConfigOptions( ]; } -function isCloudTurnCompleteEvent(event: AcpMessage): boolean { +function isTurnCompleteEvent(event: AcpMessage): boolean { const msg = event.message; return ( "method" in msg && @@ -1111,12 +1111,15 @@ export class SessionService { currentPromptId: null, }); } - if (isCloudTurnCompleteEvent(acpMsg)) { - sessionStoreSetters.updateSession(taskRunId, { - isPromptPending: false, - promptStartedAt: null, - currentPromptId: null, - }); + if (isTurnCompleteEvent(acpMsg)) { + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (session?.isCloud) { + sessionStoreSetters.updateSession(taskRunId, { + isPromptPending: false, + promptStartedAt: null, + currentPromptId: null, + }); + } } // Lifecycle handshake from the agent — flip status to "connected" // so the UI can release the queue-while-not-ready guard. This is @@ -1151,10 +1154,7 @@ export class SessionService { } // Canonical "turn boundary" — flush any queued cloud messages now // that the agent is idle and accepting the next prompt. - if ( - "method" in msg && - isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE) - ) { + if (isTurnCompleteEvent(acpMsg)) { const session = sessionStoreSetters.getSessions()[taskRunId]; if (session?.isCloud) { // Backward compat: treat turn_complete as an implicit run_started @@ -1624,6 +1624,7 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: false, promptStartedAt: null, + currentPromptId: null, }); if (session.isCloud) {