diff --git a/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.test.tsx b/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.test.tsx new file mode 100644 index 000000000..bd8c635bb --- /dev/null +++ b/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.test.tsx @@ -0,0 +1,130 @@ +import { renderToStaticMarkup } from "react-dom/server"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ThreadTimelineSurfaceProps } from "./ThreadTimelineSurface.js"; +import { ThreadTimelinePanelContent } from "./ThreadTimelinePanelContent.js"; +import type { UseThreadTimelineControllerResult } from "./useThreadTimelineController.js"; + +const mocks = vi.hoisted(() => ({ + displayStatus: "idle", + surfaceProps: [] as ThreadTimelineSurfaceProps[], + threadStatus: "idle", + timeline: undefined as unknown as UseThreadTimelineControllerResult, +})); + +vi.mock("@/hooks/queries/thread-queries", () => ({ + useThread: () => ({ + data: { + runtime: { displayStatus: mocks.displayStatus }, + status: mocks.threadStatus, + }, + error: null, + }), +})); + +vi.mock("./useThreadTimelineController.js", () => ({ + useThreadTimelineController: () => mocks.timeline, +})); + +vi.mock("./ThreadTimelineSurface.js", () => ({ + ThreadTimelineSurface: (props: ThreadTimelineSurfaceProps) => { + mocks.surfaceProps.push(props); + return ( +
+ {props.showOngoingIndicator ? ( +
{props.ongoingIndicatorLabel ?? "Working"}
+ ) : null} +
+ ); + }, +})); + +function makeTimeline( + overrides: Partial = {}, +): UseThreadTimelineControllerResult { + return { + activeThinking: null, + activeWorkflow: null, + activeBackgroundCommands: [], + contextWindowUsage: undefined, + goal: null, + hasOlderTimelineRows: false, + isLoadingOlderTimelineRows: false, + loadOlderTimelineRows: vi.fn(), + pendingTodos: null, + timelineError: null, + timelineLoading: false, + timelineRows: [], + ...overrides, + } as UseThreadTimelineControllerResult; +} + +function lastSurfaceProps(): ThreadTimelineSurfaceProps { + const props = mocks.surfaceProps.at(-1); + if (!props) { + throw new Error("ThreadTimelineSurface did not render"); + } + return props; +} + +beforeEach(() => { + mocks.displayStatus = "idle"; + mocks.surfaceProps = []; + mocks.threadStatus = "idle"; + mocks.timeline = makeTimeline(); +}); + +afterEach(() => { + vi.clearAllMocks(); +}); + +describe("ThreadTimelinePanelContent background task indicator", () => { + it("shows a background-only working indicator for an idle thread with an active workflow", () => { + mocks.timeline = makeTimeline({ + activeWorkflow: {} as UseThreadTimelineControllerResult["activeWorkflow"], + }); + + const markup = renderToStaticMarkup( + , + ); + + expect(markup).toContain("Background work running"); + expect(lastSurfaceProps()).toMatchObject({ + ongoingIndicatorLabel: "Background work running", + showOngoingIndicator: true, + }); + }); + + it("shows a background-only working indicator for an idle thread with active background commands", () => { + mocks.timeline = makeTimeline({ + activeBackgroundCommands: [ + {}, + ] as UseThreadTimelineControllerResult["activeBackgroundCommands"], + }); + + const markup = renderToStaticMarkup( + , + ); + + expect(markup).toContain("Background work running"); + expect(lastSurfaceProps()).toMatchObject({ + ongoingIndicatorLabel: "Background work running", + showOngoingIndicator: true, + }); + }); + + it("does not show the background-task indicator for stopping threads", () => { + mocks.threadStatus = "stopping"; + mocks.timeline = makeTimeline({ + activeWorkflow: {} as UseThreadTimelineControllerResult["activeWorkflow"], + }); + + const markup = renderToStaticMarkup( + , + ); + + expect(markup).not.toContain("Background work running"); + expect(lastSurfaceProps()).toMatchObject({ + showOngoingIndicator: false, + }); + }); +}); diff --git a/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.tsx b/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.tsx index fd41c5c21..c65b85cd1 100644 --- a/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.tsx +++ b/apps/app/src/components/thread/timeline/ThreadTimelinePanelContent.tsx @@ -79,18 +79,25 @@ export function ThreadTimelinePanelContent({ const displayStatus = threadQuery.data?.runtime.displayStatus ?? "idle"; const isProvisioningDisplayStatus = displayStatus === "provisioning" || displayStatus === "starting"; + const isRuntimeOngoing = isRunningThreadRuntimeDisplayStatus(displayStatus); + const hasActiveBackgroundTask = + resolvedTimeline.activeWorkflow !== null || + resolvedTimeline.activeBackgroundCommands.length > 0; const ongoingIndicatorLabel = displayStatus === "host-reconnecting" ? "Waiting for reconnection" : isProvisioningDisplayStatus ? provisioningLabel + : hasActiveBackgroundTask && !isRuntimeOngoing + ? "Background work running" : undefined; const showOngoingIndicator = threadQuery.data?.status !== "stopping" && (isProvisioningDisplayStatus || (!resolvedTimeline.timelineLoading && (isTurnSubmitting || - isRunningThreadRuntimeDisplayStatus(displayStatus)))); + isRuntimeOngoing || + hasActiveBackgroundTask))); const timelineRows = resolvedTimeline.timelineRows; const isChildThreadMissing = threadQuery.error instanceof HttpError && threadQuery.error.status === 404; diff --git a/apps/app/src/views/thread-detail/ThreadDetailView.tsx b/apps/app/src/views/thread-detail/ThreadDetailView.tsx index 4945f266d..4e3cef2f7 100644 --- a/apps/app/src/views/thread-detail/ThreadDetailView.tsx +++ b/apps/app/src/views/thread-detail/ThreadDetailView.tsx @@ -693,6 +693,8 @@ export function ThreadDetailView(props: ThreadDetailViewProps) { } = useThreadTimelineController({ threadId: threadId ?? "", }); + const hasActiveBackgroundTask = + activeWorkflow !== null || activeBackgroundCommands.length > 0; const sendMessage = useSendThreadMessage(); const requestEnvironmentAction = useRequestEnvironmentAction(); const [pullRequestMergeMethod, setPullRequestMergeMethod] = useAtom( @@ -1838,6 +1840,9 @@ export function ThreadDetailView(props: ThreadDetailViewProps) { ); } + const isRuntimeOngoing = isRunningThreadRuntimeDisplayStatus( + thread.runtime.displayStatus, + ); const hasAssignableParent = parentSelectorOptions.some( (option) => option.value !== "none", ); @@ -2176,11 +2181,13 @@ export function ThreadDetailView(props: ThreadDetailViewProps) { // own inline shimmer row, so the bottom indicator would just // duplicate it. !hasPendingInteraction && - isRunningThreadRuntimeDisplayStatus(thread.runtime.displayStatus) && + (isRuntimeOngoing || hasActiveBackgroundTask) && !isThreadTimelinePending, ongoingIndicatorLabel: thread.runtime.displayStatus === "host-reconnecting" ? "Waiting for reconnection" + : hasActiveBackgroundTask && !isRuntimeOngoing + ? "Background work running" : undefined, timelineRows, isStopping: thread.status === "stopping", diff --git a/apps/server/src/services/threads/timeline.ts b/apps/server/src/services/threads/timeline.ts index b6d6becc1..54aee94fe 100644 --- a/apps/server/src/services/threads/timeline.ts +++ b/apps/server/src/services/threads/timeline.ts @@ -21,6 +21,7 @@ import { listStoredClientTurnRequestIdsInRange, listStoredEventRowsInRange, listLatestBackgroundTaskStateRowsByItemIds, + listOpenBackgroundTaskStateRowsForThread, listStoredTimelineWindowEventRows, listStoredTurnInputAcceptedRowsByClientRequestIds, listStoredTurnStartedRowsByTurnIdsUpToSequence, @@ -540,6 +541,39 @@ function ensureTimelineWindowBackgroundTaskStateRows( return mergeStoredEventRowsById([...args.rows, ...stateRows]); } +function ensureLatestTimelineOpenBackgroundTaskStateRows( + db: DbConnection, + args: TimelineWindowRowsArgs & { page: ThreadTimelinePageRequest }, +): StoredEventRow[] { + if (args.page.kind !== "latest") { + return [...args.rows]; + } + + const openTaskRows = listOpenBackgroundTaskStateRowsForThread(db, { + threadId: args.threadId, + }); + if (openTaskRows.length === 0) { + return [...args.rows]; + } + + const selectedRowIds = new Set(args.rows.map((row) => row.id)); + const projectedOpenTaskRows = openTaskRows.map((row) => + selectedRowIds.has(row.id) + ? row + : { + ...row, + // Injected open-task rows feed active cards, not exact transcript + // replay. A task whose latest state is only item/started remains + // turn-scoped in storage, so replay it as the thread-scoped progress + // snapshot that background-task projections already accept. + type: "item/backgroundTask/progress" as const, + scopeKind: "thread" as const, + turnId: null, + }, + ); + return mergeStoredEventRowsById([...projectedOpenTaskRows, ...args.rows]); +} + interface ResolveTimelineSegmentWindowArgs { page: ThreadTimelinePageRequest; threadId: string; @@ -636,15 +670,19 @@ function selectStandardTimelineEventRows( const beforeSequence = window.beforeSequence; const sequenceStart = window.sequenceStart; - const selectedRows = ensureTimelineWindowBackgroundTaskStateRows(db, { + const selectedRows = ensureLatestTimelineOpenBackgroundTaskStateRows(db, { + page, threadId: thread.id, - rows: ensureTimelineWindowTurnStartedRows(db, { + rows: ensureTimelineWindowBackgroundTaskStateRows(db, { threadId: thread.id, - rows: listStoredTimelineWindowEventRows(db, { - beforeSequence, - excludedTypes: THREAD_TIMELINE_EXCLUDED_EVENT_TYPES, - sequenceStart, + rows: ensureTimelineWindowTurnStartedRows(db, { threadId: thread.id, + rows: listStoredTimelineWindowEventRows(db, { + beforeSequence, + excludedTypes: THREAD_TIMELINE_EXCLUDED_EVENT_TYPES, + sequenceStart, + threadId: thread.id, + }), }), }), }); diff --git a/apps/server/test/public/public-thread-data.test.ts b/apps/server/test/public/public-thread-data.test.ts index cae3485c7..09dd8eee9 100644 --- a/apps/server/test/public/public-thread-data.test.ts +++ b/apps/server/test/public/public-thread-data.test.ts @@ -207,6 +207,201 @@ describe("public thread data routes", () => { }); }); + it.each([ + { + activeList: "workflow" as const, + taskId: "task:wf-window", + taskType: "local_workflow" as const, + workflowName: "fixture-window", + }, + { + activeList: "backgroundCommand" as const, + taskId: "task:bash-window", + taskType: "local_bash" as const, + workflowName: null, + }, + ])( + "surfaces active $taskType background tasks outside the latest timeline window", + async ({ activeList, taskId, taskType, workflowName }) => { + await withTestHarness(async (harness) => { + const { environment, thread } = seedThreadFixture(harness); + const clientTurnRequestData = (value: number, text: string) => ({ + direction: "outbound" as const, + requestId: encodeClientTurnRequestIdNumber({ value }), + source: "tell" as const, + initiator: "user" as const, + senderThreadId: null, + input: [{ type: "text" as const, text }], + target: { kind: "new-turn" as const }, + request: { + method: "turn/start" as const, + params: {}, + }, + execution: { + model: "gpt-4o-mini", + reasoningLevel: "medium", + permissionMode: "full" as const, + serviceTier: "default" as const, + source: "client/turn/requested" as const, + }, + }); + const agentMessageData = (id: string, text: string) => ({ + item: { + type: "agentMessage" as const, + id, + text, + }, + }); + const workflowTaskData = { + item: { + type: "backgroundTask" as const, + id: taskId, + taskType, + description: "windowed background task", + status: "pending" as const, + taskStatus: "running" as const, + skipTranscript: false, + ...(workflowName ? { workflowName } : {}), + }, + }; + + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + sequence: 1, + type: "client/turn/requested", + scope: threadScope(), + data: clientTurnRequestData(1, "Start workflow"), + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 2, + type: "turn/started", + scope: turnScope("turn-1"), + data: {}, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 3, + type: "item/started", + scope: turnScope("turn-1"), + data: workflowTaskData, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 4, + type: "turn/completed", + scope: turnScope("turn-1"), + data: { status: "completed" }, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + sequence: 5, + type: "client/turn/requested", + scope: threadScope(), + data: clientTurnRequestData(2, "Middle turn"), + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 6, + type: "turn/started", + scope: turnScope("turn-2"), + data: {}, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 7, + type: "item/completed", + scope: turnScope("turn-2"), + data: agentMessageData("assistant-2", "Middle done."), + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 8, + type: "turn/completed", + scope: turnScope("turn-2"), + data: { status: "completed" }, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + sequence: 9, + type: "client/turn/requested", + scope: threadScope(), + data: clientTurnRequestData(3, "Latest turn"), + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 10, + type: "turn/started", + scope: turnScope("turn-3"), + data: {}, + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 11, + type: "item/completed", + scope: turnScope("turn-3"), + data: agentMessageData("assistant-3", "Latest done."), + }); + seedEvent(harness.deps, { + threadId: thread.id, + environmentId: environment.id, + providerThreadId: "provider-thread-1", + sequence: 12, + type: "turn/completed", + scope: turnScope("turn-3"), + data: { status: "completed" }, + }); + + const timelineResponse = await harness.app.request( + `/api/v1/threads/${thread.id}/timeline?segmentLimit=1`, + ); + const timelineBody = await timelineResponse.text(); + expect(timelineResponse.status, timelineBody).toBe(200); + const timeline = threadTimelineResponseSchema.parse( + JSON.parse(timelineBody), + ); + + if (activeList === "workflow") { + expect(timeline.activeWorkflow).toMatchObject({ + itemId: taskId, + status: "pending", + taskStatus: "running", + workflowName, + }); + expect(timeline.activeBackgroundCommands).toEqual([]); + } else { + expect(timeline.activeWorkflow).toBeNull(); + expect(timeline.activeBackgroundCommands).toHaveLength(1); + expect(timeline.activeBackgroundCommands[0]).toMatchObject({ + itemId: taskId, + status: "pending", + taskStatus: "running", + taskType, + }); + } + }); + }, + ); + it("uses uploaded project attachments for localFile prompt input and timeline metadata", async () => { await withTestHarness(async (harness) => { const { host } = seedHostSession(harness.deps, { diff --git a/packages/agent-runtime/src/claude-code/bridge/__tests__/bridge.test.ts b/packages/agent-runtime/src/claude-code/bridge/__tests__/bridge.test.ts index 6bbe35392..6dd47fabf 100644 --- a/packages/agent-runtime/src/claude-code/bridge/__tests__/bridge.test.ts +++ b/packages/agent-runtime/src/claude-code/bridge/__tests__/bridge.test.ts @@ -1793,6 +1793,57 @@ describe("bridge", () => { } }); + it("keeps a live session open when thread/resume targets the same provider thread", async () => { + const bridge = createBridgeJsonRpcTestHarness(handleLine); + const queries: ControlledClaudeQuery[] = []; + queryMock.mockImplementation(() => { + const query = createControlledClaudeQuery(); + queries.push(query); + return query; + }); + + try { + const threadId = "thread-idempotent-resume"; + const providerThreadId = "provider-thread-idempotent-resume"; + bridge.sendRequest(1, "thread/resume", { + workflowsEnabled: false, + claudeCodeMockCliTraffic: DEFAULT_CLAUDE_CODE_MOCK_CLI_TRAFFIC_CONFIG, + cwd: "/tmp/worktree", + instructionMode: "append", + permissionEscalation: "ask", + permissionMode: "default", + providerThreadId, + threadId, + }); + await bridge.waitForResponse(1); + + bridge.sendRequest(2, "thread/resume", { + workflowsEnabled: false, + claudeCodeMockCliTraffic: DEFAULT_CLAUDE_CODE_MOCK_CLI_TRAFFIC_CONFIG, + cwd: "/tmp/worktree", + instructionMode: "append", + permissionEscalation: "ask", + permissionMode: "default", + providerThreadId, + threadId, + }); + const resumeResponse = await bridge.waitForResponse(2); + + expect(getProviderThreadIdFromResult(resumeResponse)).toBe( + providerThreadId, + ); + expect(queries).toHaveLength(1); + expect(queries[0]?.close).not.toHaveBeenCalled(); + + bridge.sendRequest(3, "thread/stop", { threadId }); + await bridge.flushWork(); + queries[0]?.finish(); + await bridge.waitForResponse(3); + } finally { + bridge.restore(); + } + }); + it("resumes a Claude session when follow-up arrives after an SDK stream error", async () => { const bridge = createBridgeJsonRpcTestHarness(handleLine); const queries: ControlledClaudeQuery[] = []; diff --git a/packages/agent-runtime/src/claude-code/bridge/bridge.ts b/packages/agent-runtime/src/claude-code/bridge/bridge.ts index 24c50ae96..c976ad808 100644 --- a/packages/agent-runtime/src/claude-code/bridge/bridge.ts +++ b/packages/agent-runtime/src/claude-code/bridge/bridge.ts @@ -1216,6 +1216,22 @@ async function handleThreadResume( const existing = sessions.get(threadId); if (existing) { + const existingProviderThreadId = + existing.providerThreadId ?? existing.session.getSessionId(); + if ( + existingProviderThreadId !== undefined && + requestedProviderThreadId !== undefined && + requestedProviderThreadId === existingProviderThreadId && + !existing.closing && + !existing.streamEnded + ) { + sendResult(id, { + threadId, + providerThreadId: existingProviderThreadId, + }); + sendThreadIdentity(threadId, existingProviderThreadId); + return; + } await closeThreadSession({ graceful: false, message: "Thread session replaced while awaiting permission approval", diff --git a/packages/db/src/data/events.ts b/packages/db/src/data/events.ts index e16202f92..d12055a87 100644 --- a/packages/db/src/data/events.ts +++ b/packages/db/src/data/events.ts @@ -840,6 +840,10 @@ export interface ListOpenBackgroundTaskItemRowsForHostArgs { hostId: string; } +export interface ListOpenBackgroundTaskStateRowsForThreadArgs { + threadId: string; +} + export interface OpenBackgroundTaskItemRow { /** Raw data JSON of the latest lifecycle row; carries the item payload. */ data: string; @@ -1947,6 +1951,7 @@ export function listOpenBackgroundTaskItemRowsForHost( SELECT MAX(latest.sequence) FROM events latest WHERE latest.thread_id = ${events.threadId} + AND latest.item_kind = 'backgroundTask' AND latest.item_id = ${events.itemId} AND latest.type IN (${startedType}, ${progressType}) )`, @@ -1960,6 +1965,57 @@ export function listOpenBackgroundTaskItemRowsForHost( ); } +/** + * Latest lifecycle row per open backgroundTask item on one thread. Used by the + * latest timeline response so active workflow/background-command cards do not + * depend on the open task's start/progress rows still being inside the selected + * timeline window. + */ +export function listOpenBackgroundTaskStateRowsForThread( + db: DbQueryConnection, + args: ListOpenBackgroundTaskStateRowsForThreadArgs, +): StoredEventRow[] { + const startedType = "item/started" satisfies ThreadEventType; + const progressType = + "item/backgroundTask/progress" satisfies ThreadEventType; + const completedType = + "item/backgroundTask/completed" satisfies ThreadEventType; + const settled = alias(events, "settled_background_task"); + + return db + .select(storedEventRowFields) + .from(events) + .where( + and( + eq(events.threadId, args.threadId), + eq(events.itemKind, "backgroundTask"), + inArray(events.type, [startedType, progressType]), + isNotNull(events.itemId), + notExists( + db + .select({ one: sql`1` }) + .from(settled) + .where( + and( + eq(settled.threadId, events.threadId), + eq(settled.itemId, events.itemId), + eq(settled.type, completedType), + ), + ), + ), + sql`${events.sequence} = ( + SELECT MAX(latest.sequence) + FROM events latest + WHERE latest.thread_id = ${events.threadId} + AND latest.item_id = ${events.itemId} + AND latest.type IN (${startedType}, ${progressType}) + )`, + ), + ) + .orderBy(events.sequence) + .all(); +} + /** * Each item/backgroundTask/progress row carries the full superseding task * snapshot, and the turn-scoped item/started anchors the row's sequence range diff --git a/packages/db/src/data/index.ts b/packages/db/src/data/index.ts index 59c688566..0a51b4c28 100644 --- a/packages/db/src/data/index.ts +++ b/packages/db/src/data/index.ts @@ -243,6 +243,7 @@ export { listStoredTurnStartedRowsByTurnIdsUpToSequence, getLatestThreadInterruptedReason, listLatestBackgroundTaskStateRowsByItemIds, + listOpenBackgroundTaskStateRowsForThread, listOpenBackgroundTaskItemRowsForHost, listThreadIdsWithLatestHostDaemonRestartInterruption, listThreadTurnInterruptionEventStates, @@ -277,6 +278,7 @@ export type { ListStoredTurnStartedRowsByTurnIdsUpToSequenceArgs, MissingStoredTurnStartedDetails, PruneContextWindowUsageEventsBeforeSequenceArgs, + ListOpenBackgroundTaskStateRowsForThreadArgs, ListOpenBackgroundTaskItemRowsForHostArgs, OpenBackgroundTaskItemRow, PruneBackgroundTaskProgressEventsArgs, diff --git a/packages/db/test/data/events.test.ts b/packages/db/test/data/events.test.ts index 52c57014a..6d34a3b49 100644 --- a/packages/db/test/data/events.test.ts +++ b/packages/db/test/data/events.test.ts @@ -32,6 +32,7 @@ import { listStoredTurnInputAcceptedRowsByClientRequestIds, MissingStoredTurnStartedError, listLatestBackgroundTaskStateRowsByItemIds, + listOpenBackgroundTaskStateRowsForThread, listOpenBackgroundTaskItemRowsForHost, listThreadTurnInterruptionEventStates, pruneBackgroundTaskProgressEvents, @@ -2824,6 +2825,25 @@ describe("events", () => { expect(JSON.parse(rows[0]!.data)).toMatchObject({ item: { taskStatus: "paused" }, }); + expect( + listOpenBackgroundTaskStateRowsForThread(db, { threadId: thread.id }).map( + (row) => ({ + data: JSON.parse(row.data), + itemId: row.itemId, + sequence: row.sequence, + type: row.type, + }), + ), + ).toEqual([ + { + data: expect.objectContaining({ + item: expect.objectContaining({ taskStatus: "paused" }), + }), + itemId: "task:wf-open", + sequence: 3, + type: "item/backgroundTask/progress", + }, + ]); const otherHost = upsertHost(db, noopNotifier, { name: "other-host",