Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 133 additions & 2 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
enqueueMessage: vi.fn(),
removeQueuedMessage: vi.fn(),
clearMessageQueue: vi.fn(),
dequeueMessagesAsText: vi.fn(() => null),
dequeueMessagesAsText: vi.fn((): string | null => null),
dequeueMessages: vi.fn(
() =>
[] as Array<{
Expand Down Expand Up @@ -2242,6 +2242,129 @@ describe("SessionService", () => {
});
});

describe("local queue draining", () => {
async function connectLocalCodexSession() {
const service = getSessionService();
const sessions: Record<string, AgentSession> = {};
mockSessionStoreSetters.setSession.mockImplementation(
(session: AgentSession) => {
sessions[session.taskRunId] = session;
},
);
mockSessionStoreSetters.getSessions.mockImplementation(() => sessions);
mockSessionStoreSetters.getSessionByTaskId.mockImplementation(
(taskId: string) =>
Object.values(sessions).find((session) => session.taskId === taskId),
);
mockSessionStoreSetters.updateSession.mockImplementation(
(taskRunId: string, updates: Partial<AgentSession>) => {
const existing = sessions[taskRunId];
if (!existing) return;
sessions[taskRunId] = { ...existing, ...updates };
},
);
mockBuildAuthenticatedClient.mockReturnValue({
...mockAuthenticatedClient,
createTaskRun: vi.fn().mockResolvedValue({ id: "run-123" }),
appendTaskRunLog: vi.fn(),
});
mockTrpcAgent.start.mutate.mockResolvedValue({
channel: "agent-event:run-123",
configOptions: [],
});

await service.connectToTask({
task: createMockTask(),
repoPath: "/repo",
adapter: "codex",
});

const subscriptionOptions = mockTrpcAgent.onSessionEvent.subscribe.mock
.calls[0]?.[1] as { onData: (payload: unknown) => void };

return { service, sessions, onData: subscriptionOptions.onData };
}

it("drains queued messages when Codex turn_complete arrives before prompt response", async () => {
const { sessions, onData } = await connectLocalCodexSession();
mockSessionStoreSetters.dequeueMessagesAsText.mockReturnValue(
"investigate another",
);
mockTrpcAgent.prompt.mutate.mockResolvedValue({ stopReason: "end_turn" });
sessions["run-123"] = {
...sessions["run-123"],
isPromptPending: true,
currentPromptId: 42,
messageQueue: [
{ id: "q-1", content: "investigate another", queuedAt: 1700000001 },
],
};

onData({
type: "acp_message",
ts: 1700000002,
message: {
jsonrpc: "2.0",
method: "_posthog/turn_complete",
params: { sessionId: "acp-session", stopReason: "end_turn" },
},
});
expect(sessions["run-123"].currentPromptId).toBe(42);

onData({
type: "acp_message",
ts: 1700000003,
message: {
jsonrpc: "2.0",
id: 42,
result: { stopReason: "end_turn" },
},
});

await vi.waitFor(() => {
expect(
mockSessionStoreSetters.dequeueMessagesAsText,
).toHaveBeenCalledWith("task-123");
expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith({
sessionId: "run-123",
prompt: [{ type: "text", text: "investigate another" }],
});
});
});

it("does not drain queued messages from a cancelled local prompt response", async () => {
const { service, sessions, onData } = await connectLocalCodexSession();
sessions["run-123"] = {
...sessions["run-123"],
isPromptPending: true,
currentPromptId: 42,
messageQueue: [
{ id: "q-1", content: "investigate another", queuedAt: 1700000001 },
],
};
mockTrpcAgent.cancelPrompt.mutate.mockResolvedValue(true);

await service.cancelPrompt("task-123");
expect(sessions["run-123"].currentPromptId).toBeNull();

onData({
type: "acp_message",
ts: 1700000003,
message: {
jsonrpc: "2.0",
id: 42,
result: { stopReason: "cancelled" },
},
});
await Promise.resolve();

expect(
mockSessionStoreSetters.dequeueMessagesAsText,
).not.toHaveBeenCalled();
expect(mockTrpcAgent.prompt.mutate).not.toHaveBeenCalled();
});
});

describe("sendPrompt", () => {
it("throws when offline", async () => {
mockGetIsOnline.mockReturnValue(false);
Expand Down Expand Up @@ -2832,13 +2955,21 @@ describe("SessionService", () => {
it("calls cancelPrompt mutation", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession(),
createMockSession({ isPromptPending: true, currentPromptId: 42 }),
);
mockTrpcAgent.cancelPrompt.mutate.mockResolvedValue(true);

const result = await service.cancelPrompt("task-123");

expect(result).toBe(true);
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
"run-123",
{
isPromptPending: false,
promptStartedAt: null,
currentPromptId: null,
},
);
expect(mockTrpcAgent.cancelPrompt.mutate).toHaveBeenCalledWith({
sessionId: "run-123",
});
Expand Down
23 changes: 12 additions & 11 deletions apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ function buildCloudDefaultConfigOptions(
];
}

function isCloudTurnCompleteEvent(event: AcpMessage): boolean {
function isTurnCompleteEvent(event: AcpMessage): boolean {
const msg = event.message;
return (
"method" in msg &&
Expand Down Expand Up @@ -1111,12 +1111,15 @@ export class SessionService {
currentPromptId: null,
});
}
if (isCloudTurnCompleteEvent(acpMsg)) {
sessionStoreSetters.updateSession(taskRunId, {
isPromptPending: false,
promptStartedAt: null,
currentPromptId: null,
});
if (isTurnCompleteEvent(acpMsg)) {
const session = sessionStoreSetters.getSessions()[taskRunId];
if (session?.isCloud) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear cancelled local turns before their response

When a user cancels a local Codex turn that has queued follow-up messages, Codex emits _posthog/turn_complete before the matching JSON-RPC prompt response, but this new session?.isCloud guard leaves the old currentPromptId in place for local sessions. Since cancelPrompt() only clears isPromptPending, the later cancelled response still matches the stale id in handleSessionEvent and drains the queued messages, even though the turn was cancelled; previously the turn-complete event cleared the id so that response was ignored as stale.

Useful? React with 👍 / 👎.

sessionStoreSetters.updateSession(taskRunId, {
isPromptPending: false,
promptStartedAt: null,
currentPromptId: null,
});
}
}
// Lifecycle handshake from the agent — flip status to "connected"
// so the UI can release the queue-while-not-ready guard. This is
Expand Down Expand Up @@ -1151,10 +1154,7 @@ export class SessionService {
}
// Canonical "turn boundary" — flush any queued cloud messages now
// that the agent is idle and accepting the next prompt.
if (
"method" in msg &&
isNotification(msg.method, POSTHOG_NOTIFICATIONS.TURN_COMPLETE)
) {
if (isTurnCompleteEvent(acpMsg)) {
const session = sessionStoreSetters.getSessions()[taskRunId];
if (session?.isCloud) {
// Backward compat: treat turn_complete as an implicit run_started
Expand Down Expand Up @@ -1624,6 +1624,7 @@ export class SessionService {
sessionStoreSetters.updateSession(session.taskRunId, {
isPromptPending: false,
promptStartedAt: null,
currentPromptId: null,
});

if (session.isCloud) {
Expand Down