diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index c8a62e56e..590d84a86 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -659,8 +659,9 @@ describe("CloudTaskService", () => { expect(mockStreamFetch.mock.calls.length).toBe(6); // 2 bootstrap calls + 1 post-bootstrap status verification + 6 - // handleStreamCompletion calls (one per stream error) - expect(mockNetFetch).toHaveBeenCalledTimes(9); + // handleStreamCompletion calls (one per stream error) + 1 final + // status refresh from failWatcher + expect(mockNetFetch).toHaveBeenCalledTimes(10); expect(updates).toContainEqual({ taskId: "task-1", runId: "run-1", @@ -672,6 +673,144 @@ describe("CloudTaskService", () => { }); }); + it("emits a terminal status update before the error when the run finished while reconnecting", async () => { + vi.useFakeTimers(); + + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + const inProgressRun = () => + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: null, + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }); + + const completedRun = () => + createJsonResponse({ + id: "run-1", + status: "completed", + stage: null, + output: { pr_url: "https://example.com/pr/1" }, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:01:00Z", + completed_at: "2026-01-01T00:01:00Z", + }); + + // Bootstrap + verifyPostBootstrapStatus + each handleStreamCompletion call + // still reports in_progress; only the final failWatcher refresh sees the + // completed terminal state. + let netCallCount = 0; + mockNetFetch.mockImplementation(() => { + netCallCount++; + if (netCallCount === 2) { + // bootstrap: fetchSessionLogs + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + if (netCallCount === 10) { + // refreshStatusThenEmitError: run is now terminal + return Promise.resolve(completedRun()); + } + return Promise.resolve(inProgressRun()); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\nevent: error\ndata: {"error":"boom"}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + await vi.advanceTimersByTimeAsync(70_000); + await waitFor( + () => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + 10_000, + ); + + const statusIdx = updates.findIndex( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "status" && + (u as { status?: string }).status === "completed", + ); + const errorIdx = updates.findIndex( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ); + + expect(statusIdx).toBeGreaterThanOrEqual(0); + expect(errorIdx).toBeGreaterThanOrEqual(0); + expect(statusIdx).toBeLessThan(errorIdx); + expect(updates[statusIdx]).toMatchObject({ + taskId: "task-1", + runId: "run-1", + kind: "status", + status: "completed", + output: { pr_url: "https://example.com/pr/1" }, + }); + }); + + it("skips the status refresh when the failure is an auth/access error", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch.mockResolvedValueOnce( + createJsonResponse({ detail: "Forbidden" }, 403), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "error", + ), + ); + + // Only the initial bootstrap fetchTaskRun — no extra silent refresh. + expect(mockNetFetch).toHaveBeenCalledTimes(1); + expect( + updates.some( + (u) => + typeof u === "object" && + u !== null && + (u as { kind?: string }).kind === "status", + ), + ).toBe(false); + }); + const guardedFetchStatusExpectations = [ [ 401, diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 1fc19b730..fbe18457b 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -946,6 +946,41 @@ export class CloudTaskService extends TypedEventEmitter { watcher.sseAbortController?.abort(); watcher.sseAbortController = null; + void this.refreshStatusThenEmitError(watcher, error); + } + + /** + * Refresh the run state one final time before emitting the watcher's error + * event. The stream typically fails *after* the run has already reached a + * terminal status (e.g. reconnect attempts exhausted, auth flicker), so a + * single REST poll often surfaces the real outcome. Clients can then move + * the task out of the "in_progress" UI even though the watcher is dead. + * + * Auth/access failures (401/403/404) are skipped because the same call + * would just fail again — and `fetchTaskRunSilent` would loop into another + * `failWatcher` via `shouldFailWatcherForFetchStatus` if we reused + * `fetchTaskRun` here. + */ + private async refreshStatusThenEmitError( + watcher: WatcherState, + error: CloudTaskConnectionError, + ): Promise { + if (!isAuthFailureError(error)) { + const { run } = await this.fetchTaskRunSilent(watcher); + if (run && this.applyTaskRunState(watcher, run)) { + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "status", + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); + } + } + this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -1219,6 +1254,26 @@ export class CloudTaskService extends TypedEventEmitter { private async fetchTaskRun( watcher: WatcherState, ): Promise { + const { run, httpStatus } = await this.fetchTaskRunSilent(watcher); + if (run === null && httpStatus !== null) { + if (shouldFailWatcherForFetchStatus(httpStatus)) { + this.failWatcher( + watcherKey(watcher.taskId, watcher.runId), + createStreamStatusError(httpStatus).details, + ); + } + } + return run; + } + + /** + * Same wire call as `fetchTaskRun` but never escalates a non-OK response + * into a watcher failure. Used from `failWatcher` so the final status poll + * can't re-enter watcher teardown. + */ + private async fetchTaskRunSilent( + watcher: WatcherState, + ): Promise<{ run: TaskRunResponse | null; httpStatus: number | null }> { const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/`; try { @@ -1236,23 +1291,28 @@ export class CloudTaskService extends TypedEventEmitter { taskId: watcher.taskId, runId: watcher.runId, }); - if (shouldFailWatcherForFetchStatus(authedResponse.status)) { - this.failWatcher( - watcherKey(watcher.taskId, watcher.runId), - createStreamStatusError(authedResponse.status).details, - ); - } - return null; + return { run: null, httpStatus: authedResponse.status }; } - return (await authedResponse.json()) as TaskRunResponse; + return { + run: (await authedResponse.json()) as TaskRunResponse, + httpStatus: authedResponse.status, + }; } catch (error) { log.warn("Cloud task status fetch error", { taskId: watcher.taskId, runId: watcher.runId, error, }); - return null; + return { run: null, httpStatus: null }; } } } + +function isAuthFailureError(error: CloudTaskConnectionError): boolean { + return ( + error.title === "Cloud authentication expired" || + error.title === "Cloud access denied" || + error.title === "Cloud run not found" + ); +}