From 08e8784da063a5a58c6c965926ae35fc68d5e7a5 Mon Sep 17 00:00:00 2001 From: Michael Yong Date: Wed, 17 Jun 2026 04:16:33 +0000 Subject: [PATCH] Add threadless terminal session plumbing --- .../thread/terminal/terminal-websocket-url.ts | 37 +- .../cache-owners/terminal-cache-owner.ts | 6 + apps/app/src/lib/api.ts | 100 +++++ .../src/terminals/terminal-manager.test.ts | 285 +++++++----- .../src/terminals/terminal-manager.ts | 95 +++- .../test/command/environment-dispatch.test.ts | 12 +- apps/server/src/routes/environments.ts | 43 +- apps/server/src/routes/terminals.ts | 41 ++ apps/server/src/server.ts | 29 ++ .../terminals/terminal-session-lifecycle.ts | 407 +++++++++++++++--- apps/server/src/ws/terminal-protocol.ts | 4 +- .../public/public-thread-terminals.test.ts | 232 +++++++++- .../0042_threadless_terminal_sessions.sql | 68 +++ packages/db/drizzle/meta/_journal.json | 7 + packages/db/src/data/index.ts | 19 + packages/db/src/data/terminal-sessions.ts | 260 ++++++++++- packages/db/src/schema.ts | 12 +- .../db/test/data/terminal-sessions.test.ts | 118 +++++ packages/db/test/migrate.test.ts | 54 ++- packages/host-daemon-contract/src/session.ts | 21 +- .../test/contract.test.ts | 24 +- packages/server-contract/src/api/terminals.ts | 72 +++- packages/server-contract/src/common.ts | 6 + packages/server-contract/src/public-api.ts | 93 +++- .../server-contract/test/contract.test.ts | 44 ++ 25 files changed, 1831 insertions(+), 258 deletions(-) create mode 100644 apps/server/src/routes/terminals.ts create mode 100644 packages/db/drizzle/0042_threadless_terminal_sessions.sql diff --git a/apps/app/src/components/thread/terminal/terminal-websocket-url.ts b/apps/app/src/components/thread/terminal/terminal-websocket-url.ts index 187868959..d62e4a2b6 100644 --- a/apps/app/src/components/thread/terminal/terminal-websocket-url.ts +++ b/apps/app/src/components/thread/terminal/terminal-websocket-url.ts @@ -5,6 +5,14 @@ interface BuildTerminalWebSocketUrlArgs { threadId: string; } +interface BuildThreadlessTerminalWebSocketUrlArgs { + terminalId: string; +} + +interface BuildTerminalSessionWebSocketUrlArgs { + terminalId: string; +} + function buildTerminalWebSocketPath({ terminalId, threadId, @@ -14,10 +22,13 @@ function buildTerminalWebSocketPath({ )}`; } -export function buildTerminalWebSocketUrl( - args: BuildTerminalWebSocketUrlArgs, -): string { - const path = buildTerminalWebSocketPath(args); +function buildThreadlessTerminalWebSocketPath({ + terminalId, +}: BuildThreadlessTerminalWebSocketUrlArgs): string { + return `/ws/terminals/${encodeURIComponent(terminalId)}`; +} + +function buildWebSocketUrl(path: string): string { const devWebSocketUrl = buildDevWebSocketUrl({ path }); if (devWebSocketUrl !== undefined) { return devWebSocketUrl; @@ -26,3 +37,21 @@ export function buildTerminalWebSocketUrl( const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; return `${protocol}//${window.location.host}${path}`; } + +export function buildTerminalWebSocketUrl( + args: BuildTerminalWebSocketUrlArgs, +): string { + return buildWebSocketUrl(buildTerminalWebSocketPath(args)); +} + +export function buildThreadlessTerminalWebSocketUrl( + args: BuildThreadlessTerminalWebSocketUrlArgs, +): string { + return buildWebSocketUrl(buildThreadlessTerminalWebSocketPath(args)); +} + +export function buildTerminalSessionWebSocketUrl( + args: BuildTerminalSessionWebSocketUrlArgs, +): string { + return buildWebSocketUrl(buildThreadlessTerminalWebSocketPath(args)); +} diff --git a/apps/app/src/hooks/cache-owners/terminal-cache-owner.ts b/apps/app/src/hooks/cache-owners/terminal-cache-owner.ts index 5d6d53eb7..3ea2a0f8d 100644 --- a/apps/app/src/hooks/cache-owners/terminal-cache-owner.ts +++ b/apps/app/src/hooks/cache-owners/terminal-cache-owner.ts @@ -58,6 +58,9 @@ export function applyThreadTerminalSessionUpsert({ queryClient, session, }: TerminalSessionCacheArgs): void { + if (session.threadId === null) { + return; + } queryClient.setQueryData( threadTerminalsQueryKey(session.threadId), (current) => upsertTerminalSession(current, session), @@ -72,6 +75,9 @@ export function applyThreadTerminalSessionClose({ session, terminalId, }: CloseTerminalSessionCacheArgs): void { + if (session.threadId === null) { + return; + } queryClient.setQueryData( threadTerminalsQueryKey(session.threadId), (current) => diff --git a/apps/app/src/lib/api.ts b/apps/app/src/lib/api.ts index d1914b810..374220ab4 100644 --- a/apps/app/src/lib/api.ts +++ b/apps/app/src/lib/api.ts @@ -31,7 +31,9 @@ import type { EnvironmentDiffFileResponse, EnvironmentStatusResponse, EnvironmentPullRequestResponse, + EnvironmentTerminalListResponse, CreateThreadRequest, + CreateEnvironmentTerminalRequest, CreateThreadTerminalRequest, ProjectBranchesResponse, ProjectResponse, @@ -67,13 +69,19 @@ import type { ThreadTimelineResponse, TimelineTurnSummaryDetailsRequest, TimelineTurnSummaryDetailsResponse, + CloseTerminalRequest, CloseThreadTerminalRequest, + CloseEnvironmentTerminalRequest, + CreateTerminalRequest, ResolvePendingInteractionRequest, UpdateEnvironmentRequest, + UpdateEnvironmentTerminalRequest, + UpdateTerminalRequest, UpdateProjectRequest, UpdateThreadRequest, UpdateThreadTerminalRequest, UpdateProjectSourceRequest, + TerminalListResponse, UploadedPromptAttachment, ThreadStorageFileListResponse, ThreadStoragePathListResponse, @@ -1067,6 +1075,48 @@ export async function updateThread( ); } +export async function listTerminals( + signal?: AbortSignal, +): Promise { + return request( + apiClient.terminals.$get(undefined, requestOptions(signal)), + ); +} + +export async function createTerminal( + req: CreateTerminalRequest, +): Promise { + return request( + apiClient.terminals.$post({ + json: req, + }), + ); +} + +export async function renameTerminal( + terminalId: string, + req: UpdateTerminalRequest, +): Promise { + return request( + apiClient.terminals[":terminalId"].$patch({ + param: { terminalId }, + json: req, + }), + ); +} + +export async function closeTerminal( + terminalId: string, + req: CloseTerminalRequest, +): Promise { + return request( + apiClient.terminals[":terminalId"].close.$post({ + param: { terminalId }, + json: req, + }), + ); +} + export async function listThreadTerminals( id: string, signal?: AbortSignal, @@ -1314,6 +1364,56 @@ export async function updateEnvironment( ); } +export async function listEnvironmentTerminals( + id: string, + signal?: AbortSignal, +): Promise { + return request( + apiClient.environments[":id"].terminals.$get( + { param: { id } }, + requestOptions(signal), + ), + ); +} + +export async function createEnvironmentTerminal( + id: string, + req: CreateEnvironmentTerminalRequest, +): Promise { + return request( + apiClient.environments[":id"].terminals.$post({ + param: { id }, + json: req, + }), + ); +} + +export async function renameEnvironmentTerminal( + id: string, + terminalId: string, + req: UpdateEnvironmentTerminalRequest, +): Promise { + return request( + apiClient.environments[":id"].terminals[":terminalId"].$patch({ + param: { id, terminalId }, + json: req, + }), + ); +} + +export async function closeEnvironmentTerminal( + id: string, + terminalId: string, + req: CloseEnvironmentTerminalRequest, +): Promise { + return request( + apiClient.environments[":id"].terminals[":terminalId"].close.$post({ + param: { id, terminalId }, + json: req, + }), + ); +} + export async function getEnvironmentWorkStatus( environmentId: string, mergeBaseBranch?: string, diff --git a/apps/host-daemon/src/terminals/terminal-manager.test.ts b/apps/host-daemon/src/terminals/terminal-manager.test.ts index c0ca78aed..5010491af 100644 --- a/apps/host-daemon/src/terminals/terminal-manager.test.ts +++ b/apps/host-daemon/src/terminals/terminal-manager.test.ts @@ -112,7 +112,9 @@ class FakeTerminalPty implements TerminalPtyProcess { private readonly dataListeners: ((data: string) => void)[]; private readonly exitListeners: ((event: TerminalPtyExit) => void)[]; private readonly registeredDataListeners: ((data: string) => void)[]; - private readonly registeredExitListeners: ((event: TerminalPtyExit) => void)[]; + private readonly registeredExitListeners: (( + event: TerminalPtyExit, + ) => void)[]; constructor() { this.killCalls = []; @@ -363,10 +365,13 @@ async function openTerminal( requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -411,9 +416,43 @@ describe("TerminalManager", () => { title: "zsh", }), ); - await expect(harness.runtimeManager.evictIdleEnvironments()).resolves.toEqual( - [], + await expect( + harness.runtimeManager.evictIdleEnvironments(), + ).resolves.toEqual([]); + }); + + it("opens a PTY in a host path without an environment", async () => { + const cwd = await makeTempDir("bb-terminal-host-path-"); + const harness = createHarness(); + + await harness.manager.handleMessage({ + type: "terminal.open", + requestId: "open-host-path", + terminalId: "term-host-path", + target: { + kind: "host_path", + cwd, + }, + cols: 80, + rows: 24, + }); + + expect(harness.adapter.spawned).toHaveLength(1); + expect(harness.adapter.spawned[0]?.args).toMatchObject({ + cols: 80, + cwd, + rows: 24, + }); + expect(harness.messages).toContainEqual( + expect.objectContaining({ + type: "terminal.opened", + terminalId: "term-host-path", + initialCwd: cwd, + }), ); + await expect( + harness.runtimeManager.evictIdleEnvironments(), + ).resolves.toEqual([]); }); it("closes a terminal after an in-progress open finishes", async () => { @@ -431,10 +470,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -490,10 +532,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -545,10 +590,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -591,10 +639,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -606,10 +657,13 @@ describe("TerminalManager", () => { requestId: "open-2", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -657,10 +711,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -672,10 +729,13 @@ describe("TerminalManager", () => { requestId: "open-2", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -729,10 +789,13 @@ describe("TerminalManager", () => { requestId: "open-stale", terminalId: "term-stale", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/stale-terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/stale-terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -975,9 +1038,9 @@ describe("TerminalManager", () => { closeReason: "user", }, ]); - await expect(harness.runtimeManager.evictIdleEnvironments()).resolves.toEqual( - ["env-1"], - ); + await expect( + harness.runtimeManager.evictIdleEnvironments(), + ).resolves.toEqual(["env-1"]); expect(harness.runtime.shutdown).toHaveBeenCalledTimes(1); }); @@ -1063,10 +1126,13 @@ describe("TerminalManager", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + workspaceProvisionType: "unmanaged", + }, }, cols: 100, rows: 30, @@ -1084,74 +1150,73 @@ describe("TerminalManager", () => { ]); }); - it( - "runs commands in one persistent shell from the workspace cwd", - async () => { - if (process.platform === "win32") { - return; - } - - const workspacePath = await makeTempDir("bb-terminal-manager-real-"); - const targetPath = await makeTempDir("bb-terminal-manager-target-"); - const expectedWorkspacePath = await fs.realpath(workspacePath); - const expectedTargetPath = await fs.realpath(targetPath); - const messages: HostDaemonDaemonWsMessage[] = []; - const runtimeManager = new RuntimeManager({ - createRuntime: () => createFakeRuntime(), - provisionWorkspace: async () => createFakeWorkspace(workspacePath), - }); - const manager = new TerminalManager({ - logger: { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }, - resolveShell: async () => "/bin/sh", - runtimeManager, - sendMessage: (message) => { - messages.push(message); - return true; - }, - }); + it("runs commands in one persistent shell from the workspace cwd", async () => { + if (process.platform === "win32") { + return; + } + + const workspacePath = await makeTempDir("bb-terminal-manager-real-"); + const targetPath = await makeTempDir("bb-terminal-manager-target-"); + const expectedWorkspacePath = await fs.realpath(workspacePath); + const expectedTargetPath = await fs.realpath(targetPath); + const messages: HostDaemonDaemonWsMessage[] = []; + const runtimeManager = new RuntimeManager({ + createRuntime: () => createFakeRuntime(), + provisionWorkspace: async () => createFakeWorkspace(workspacePath), + }); + const manager = new TerminalManager({ + logger: { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }, + resolveShell: async () => "/bin/sh", + runtimeManager, + sendMessage: (message) => { + messages.push(message); + return true; + }, + }); - await manager.handleMessage({ - type: "terminal.open", - requestId: "open-real", - terminalId: "term-real", - threadId: "thr-real", + await manager.handleMessage({ + type: "terminal.open", + requestId: "open-real", + terminalId: "term-real", + threadId: "thr-real", + target: { + kind: "workspace", environmentId: "env-real", workspaceContext: { workspacePath, workspaceProvisionType: "unmanaged", }, - cols: 100, - rows: 30, - }); - await manager.handleMessage({ - type: "terminal.input", - terminalId: "term-real", - dataBase64: Buffer.from( - [ - 'printf "__PWD1:%s\\n" "$(pwd -P)"', - `cd ${shellQuote(targetPath)}`, - 'printf "__PWD2:%s\\n" "$(pwd -P)"', - "", - ].join("\n"), - "utf8", - ).toString("base64"), - }); - - await waitForOutputContaining({ - messages, - text: `__PWD1:${expectedWorkspacePath}`, - }); - await waitForOutputContaining({ - messages, - text: `__PWD2:${expectedTargetPath}`, - }); - await manager.shutdownAll(); - }, - 10_000, - ); + }, + cols: 100, + rows: 30, + }); + await manager.handleMessage({ + type: "terminal.input", + terminalId: "term-real", + dataBase64: Buffer.from( + [ + 'printf "__PWD1:%s\\n" "$(pwd -P)"', + `cd ${shellQuote(targetPath)}`, + 'printf "__PWD2:%s\\n" "$(pwd -P)"', + "", + ].join("\n"), + "utf8", + ).toString("base64"), + }); + + await waitForOutputContaining({ + messages, + text: `__PWD1:${expectedWorkspacePath}`, + }); + await waitForOutputContaining({ + messages, + text: `__PWD2:${expectedTargetPath}`, + }); + await manager.shutdownAll(); + }, 10_000); }); diff --git a/apps/host-daemon/src/terminals/terminal-manager.ts b/apps/host-daemon/src/terminals/terminal-manager.ts index d6e85696a..5763c328c 100644 --- a/apps/host-daemon/src/terminals/terminal-manager.ts +++ b/apps/host-daemon/src/terminals/terminal-manager.ts @@ -1,5 +1,5 @@ import { accessSync, chmodSync, constants, existsSync } from "node:fs"; -import { access } from "node:fs/promises"; +import { access, stat } from "node:fs/promises"; import { createRequire } from "node:module"; import path from "node:path"; import { spawn as spawnPty } from "node-pty"; @@ -90,7 +90,7 @@ interface TerminalSession { closeReason: TerminalSessionCloseReason | null; cols: number; disposables: TerminalPtyDisposable[]; - environmentId: string; + environmentId: string | null; nextSeq: number; pty: TerminalPtyProcess; rows: number; @@ -132,6 +132,11 @@ interface ResizeTerminalArgs { terminalId: string; } +interface ResolvedTerminalOpenTarget { + cwd: string; + environmentId: string | null; +} + interface FinishTerminalSessionArgs { closeReason: TerminalSessionCloseReason; exitCode: number | null; @@ -317,6 +322,25 @@ function terminalTitleFromShell(shell: string): string { return path.basename(shell) || "Terminal"; } +function terminalEnvironmentIdFromOpenMessage( + message: TerminalOpenMessage, +): string | null { + return message.target.kind === "workspace" + ? message.target.environmentId + : null; +} + +async function requireTerminalCwd(cwd: string): Promise { + if (!path.isAbsolute(cwd)) { + throw new Error("Terminal cwd must be an absolute path"); + } + const info = await stat(cwd); + if (!info.isDirectory()) { + throw new Error(`Terminal cwd is not a directory: ${cwd}`); + } + return cwd; +} + function createTerminalOperationCompletion(): TerminalOperationCompletion { let resolveCompletion: () => void = () => { throw new Error("Terminal operation completion resolver was not set"); @@ -334,7 +358,10 @@ export class TerminalManager { private readonly scrollbackMaxBytes: number; private readonly scrollbackMaxChunks: number; private readonly terminalOperations = new Map>(); - private readonly openingTerminalEnvironmentIds = new Map(); + private readonly openingTerminalEnvironmentIds = new Map< + string, + string | null + >(); private readonly sessions = new Map(); constructor(private readonly options: TerminalManagerOptions) { @@ -450,21 +477,17 @@ export class TerminalManager { return; } + const openingEnvironmentId = terminalEnvironmentIdFromOpenMessage(message); this.openingTerminalEnvironmentIds.set( message.terminalId, - message.environmentId, + openingEnvironmentId, ); try { - const entry = await requireResolvedWorkspaceForCommand({ - dataDir: this.options.dataDir, - environmentId: message.environmentId, - runtimeManager: this.options.runtimeManager, - workspaceContext: message.workspaceContext, - }); + const target = await this.resolveTerminalOpenTarget(message); const shell = await this.resolveShell(); const pty = this.ptyAdapter.spawn({ cols: message.cols, - cwd: entry.path, + cwd: target.cwd, env: buildTerminalEnv({ shellEnv: this.options.runtimeManager.getShellEnv(), terminalId: message.terminalId, @@ -477,7 +500,7 @@ export class TerminalManager { closeReason: null, cols: message.cols, disposables: [], - environmentId: message.environmentId, + environmentId: target.environmentId, nextSeq: 0, pty, rows: message.rows, @@ -486,10 +509,12 @@ export class TerminalManager { terminalId: message.terminalId, }; this.sessions.set(message.terminalId, session); - this.options.runtimeManager.markTerminalActive( - message.environmentId, - message.terminalId, - ); + if (target.environmentId !== null) { + this.options.runtimeManager.markTerminalActive( + target.environmentId, + message.terminalId, + ); + } session.disposables.push( pty.onData((data) => this.handleTerminalOutput(session, data)), pty.onExit((event) => { @@ -518,7 +543,7 @@ export class TerminalManager { terminalId: message.terminalId, shell, title: terminalTitleFromShell(shell), - initialCwd: entry.path, + initialCwd: target.cwd, cols: message.cols, rows: message.rows, }); @@ -537,13 +562,37 @@ export class TerminalManager { } finally { if ( this.openingTerminalEnvironmentIds.get(message.terminalId) === - message.environmentId + openingEnvironmentId ) { this.openingTerminalEnvironmentIds.delete(message.terminalId); } } } + private async resolveTerminalOpenTarget( + message: TerminalOpenMessage, + ): Promise { + switch (message.target.kind) { + case "workspace": { + const entry = await requireResolvedWorkspaceForCommand({ + dataDir: this.options.dataDir, + environmentId: message.target.environmentId, + runtimeManager: this.options.runtimeManager, + workspaceContext: message.target.workspaceContext, + }); + return { + cwd: entry.path, + environmentId: message.target.environmentId, + }; + } + case "host_path": + return { + cwd: await requireTerminalCwd(message.target.cwd), + environmentId: null, + }; + } + } + private attachTerminal(message: TerminalAttachMessage): void { const session = this.sessions.get(message.terminalId); if (!session) { @@ -689,10 +738,12 @@ export class TerminalManager { return; } this.sessions.delete(args.session.terminalId); - this.options.runtimeManager.markTerminalInactive( - args.session.environmentId, - args.session.terminalId, - ); + if (args.session.environmentId !== null) { + this.options.runtimeManager.markTerminalInactive( + args.session.environmentId, + args.session.terminalId, + ); + } for (const disposable of args.session.disposables) { disposable.dispose(); } diff --git a/apps/host-daemon/test/command/environment-dispatch.test.ts b/apps/host-daemon/test/command/environment-dispatch.test.ts index b4851e047..2078d6e13 100644 --- a/apps/host-daemon/test/command/environment-dispatch.test.ts +++ b/apps/host-daemon/test/command/environment-dispatch.test.ts @@ -768,10 +768,13 @@ describe("environment command dispatch", () => { requestId: "open-1", terminalId: "term-1", threadId: "thr-1", - environmentId: "env-1", - workspaceContext: { - workspacePath: "/tmp/env-1", - workspaceProvisionType: "managed-worktree", + target: { + kind: "workspace", + environmentId: "env-1", + workspaceContext: { + workspacePath: "/tmp/env-1", + workspaceProvisionType: "managed-worktree", + }, }, cols: 100, rows: 30, @@ -889,5 +892,4 @@ describe("environment command dispatch", () => { expect(retryResult).toEqual({}); }); - }); diff --git a/apps/server/src/routes/environments.ts b/apps/server/src/routes/environments.ts index f8c0705d4..80a33b513 100644 --- a/apps/server/src/routes/environments.ts +++ b/apps/server/src/routes/environments.ts @@ -211,6 +211,39 @@ export function registerEnvironmentRoutes(app: Hono, deps: AppDeps): void { return context.json(updated); }); + get(routes.terminals, (context) => { + const sessions = deps.terminalSessions.listEnvironmentTerminals( + context.req.param("id"), + ); + return context.json({ sessions }); + }); + + post(routes.createTerminal, async (context, payload) => { + const session = await deps.terminalSessions.createEnvironmentTerminal({ + payload, + environmentId: context.req.param("id"), + }); + return context.json(session, 201); + }); + + patch(routes.updateTerminal, (context, payload) => { + const session = deps.terminalSessions.renameEnvironmentTerminal({ + payload, + environmentId: context.req.param("id"), + terminalId: context.req.param("terminalId"), + }); + return context.json(session); + }); + + post(routes.closeTerminal, (context, payload) => { + const session = deps.terminalSessions.closeEnvironmentTerminal({ + payload, + environmentId: context.req.param("id"), + terminalId: context.req.param("terminalId"), + }); + return context.json(session); + }); + post(routes.archiveThreads, (context) => { const environment = requireEnvironment(deps.db, context.req.param("id")); if (!isWorktreeEnvironment(environment)) { @@ -327,10 +360,7 @@ export function registerEnvironmentRoutes(app: Hono, deps: AppDeps): void { }); get(routes.diffFiles, async (context, query) => { - const target = resolveGitDiffWorkspaceTarget( - deps, - context.req.param("id"), - ); + const target = resolveGitDiffWorkspaceTarget(deps, context.req.param("id")); if (target === null) { return context.json(NON_GIT_DIFF_NOT_APPLICABLE); } @@ -391,10 +421,7 @@ export function registerEnvironmentRoutes(app: Hono, deps: AppDeps): void { }); post(routes.diffPatch, async (context, payload) => { - const target = resolveGitDiffWorkspaceTarget( - deps, - context.req.param("id"), - ); + const target = resolveGitDiffWorkspaceTarget(deps, context.req.param("id")); if (target === null) { return context.json(NON_GIT_DIFF_NOT_APPLICABLE); } diff --git a/apps/server/src/routes/terminals.ts b/apps/server/src/routes/terminals.ts new file mode 100644 index 000000000..e159aa286 --- /dev/null +++ b/apps/server/src/routes/terminals.ts @@ -0,0 +1,41 @@ +import { + publicApiRoutes, + typedRoutes, + type PublicApiSchema, +} from "@bb/server-contract"; +import type { Hono } from "hono"; +import type { AppDeps } from "../types.js"; +import { ApiError } from "../errors.js"; + +export function registerTerminalRoutes(app: Hono, deps: AppDeps): void { + const { get, patch, post } = typedRoutes(app, { + onValidationError: (msg) => new ApiError(400, "invalid_request", msg), + }); + const routes = publicApiRoutes.terminals; + + get(routes.list, (context) => { + const sessions = deps.terminalSessions.listTerminals(); + return context.json({ sessions }); + }); + + post(routes.create, async (context, payload) => { + const session = await deps.terminalSessions.createTerminal({ payload }); + return context.json(session, 201); + }); + + patch(routes.update, (context, payload) => { + const session = deps.terminalSessions.renameTerminal({ + payload, + terminalId: context.req.param("terminalId"), + }); + return context.json(session); + }); + + post(routes.close, (context, payload) => { + const session = deps.terminalSessions.closeTerminal({ + payload, + terminalId: context.req.param("terminalId"), + }); + return context.json(session); + }); +} diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index fe325c771..2a5a09128 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -16,6 +16,7 @@ import { registerHostRoutes } from "./routes/hosts.js"; import { registerProjectRoutes } from "./routes/projects.js"; import { registerAutomationRoutes } from "./routes/automations.js"; import { registerSystemRoutes } from "./routes/system.js"; +import { registerTerminalRoutes } from "./routes/terminals.js"; import { registerThreadRoutes } from "./routes/threads/index.js"; import { registerInternalEventRoutes } from "./internal/events.js"; import { registerInternalHostRoutes } from "./internal/hosts.js"; @@ -246,6 +247,7 @@ export function createApp( registerAutomationRoutes(publicApi, deps); registerFileRoutes(publicApi, deps); registerHostRoutes(publicApi, deps); + registerTerminalRoutes(publicApi, deps); registerEnvironmentRoutes(publicApi, deps); registerThreadRoutes(publicApi, deps); registerSystemRoutes(publicApi, deps); @@ -272,6 +274,33 @@ export function createApp( })), ); + app.get( + "/ws/terminals/:terminalId", + upgradeWebSocket((context) => { + const terminalId = context.req.param("terminalId"); + return { + onOpen: (_event, socket) => + onTerminalSocketOpen(deps, { + socket, + terminalId, + threadId: null, + }), + onMessage: (event, socket) => + onTerminalSocketMessage(deps, { + raw: event.data, + socket, + terminalId, + threadId: null, + }), + onClose: (_event, socket) => + onTerminalSocketClose(deps, { + socket, + terminalId, + }), + }; + }), + ); + app.get( "/ws/threads/:threadId/terminals/:terminalId", upgradeWebSocket((context) => { diff --git a/apps/server/src/services/terminals/terminal-session-lifecycle.ts b/apps/server/src/services/terminals/terminal-session-lifecycle.ts index 97b58f09a..2fb74f786 100644 --- a/apps/server/src/services/terminals/terminal-session-lifecycle.ts +++ b/apps/server/src/services/terminals/terminal-session-lifecycle.ts @@ -1,20 +1,27 @@ import { randomUUID } from "node:crypto"; import { createTerminalSession, + getTerminalSession, getTerminalSessionForThread, + getThreadlessTerminalSessionForEnvironment, listTerminalSessionsByEnvironment, listTerminalSessionsByThread, + listThreadlessTerminalSessionsByEnvironment, + listVisibleTerminalSessions, listVisibleTerminalSessionsByThread, + listVisibleThreadlessTerminalSessionsByEnvironment, markDaemonTerminalSessionExited, markDaemonTerminalSessionsDisconnected, markEnvironmentTerminalSessionsExited, markHostDisconnectedTerminalSessionsExited, markTerminalSessionExited, markTerminalSessionRunning, - markTerminalSessionUserInput, + markTerminalSessionUserInputById, markThreadTerminalSessionsExited, - updateTerminalSessionSize, + updateTerminalSessionSizeById, updateTerminalSessionTitle, + updateTerminalSessionTitleById, + updateThreadlessTerminalSessionTitle, type TerminalSessionRow, } from "@bb/db"; import type { TerminalSessionCloseReason } from "@bb/domain"; @@ -23,17 +30,25 @@ import type { HostDaemonServerWsMessage, } from "@bb/host-daemon-contract"; import type { + CloseEnvironmentTerminalRequest, + CloseTerminalRequest, CloseThreadTerminalRequest, + CreateTerminalRequest, + CreateEnvironmentTerminalRequest, CreateThreadTerminalRequest, TerminalClientMessage, + TerminalCreateTarget, TerminalOutputChunk, TerminalSession, + UpdateEnvironmentTerminalRequest, + UpdateTerminalRequest, UpdateThreadTerminalRequest, } from "@bb/server-contract"; import { ApiError } from "../../errors.js"; import type { AppDeps, ServerLogger } from "../../types.js"; import { requireConnectedHostSession, + requireEnvironment, requirePublicThread, requireReadyEnvironment, } from "../lib/entity-lookup.js"; @@ -84,7 +99,7 @@ interface PendingTerminalAttach { daemonSessionId: string; socket: TerminalClientSocket; terminalId: string; - threadId: string; + threadId: string | null; timeout: ReturnType; } @@ -99,7 +114,7 @@ interface WaitForTerminalAttachArgs { requestId: string; socket: TerminalClientSocket; terminalId: string; - threadId: string; + threadId: string | null; } interface ResolvePendingOpenArgs { @@ -153,10 +168,22 @@ interface TerminalDaemonCloseTarget { terminalId: string; } +type TerminalDaemonOpenTarget = Extract< + HostDaemonServerWsMessage, + { type: "terminal.open" } +>["target"]; + +interface ResolvedTerminalLaunchTarget { + daemonTarget: TerminalDaemonOpenTarget; + environmentId: string | null; + hostId: string; + initialCwd: string; +} + interface AttachBrowserTerminalArgs { socket: TerminalClientSocket; terminalId: string; - threadId: string; + threadId: string | null; } interface DetachBrowserTerminalArgs { @@ -168,13 +195,20 @@ interface HandleBrowserTerminalMessageArgs { message: TerminalClientMessage; socket: TerminalClientSocket; terminalId: string; - threadId: string; + threadId: string | null; } interface GetRunningBrowserTerminalArgs { socket: TerminalClientSocket; terminalId: string; - threadId: string; + threadId: string | null; +} + +interface GetBrowserTerminalSessionArgs { + reportMissing?: boolean; + socket: TerminalClientSocket; + terminalId: string; + threadId: string | null; } interface SendTerminalSocketErrorArgs { @@ -196,7 +230,6 @@ interface RejectPendingAttachesForTerminalArgs { interface CloseStaleOpenedTerminalArgs { daemonSessionId: string; terminalId: string; - threadId: string; } interface PublishLifecycleTerminalExitsForSessionsArgs { @@ -224,18 +257,61 @@ interface CreateThreadTerminalArgs { threadId: string; } +interface CreateTerminalArgs { + payload: CreateTerminalRequest; +} + +interface CreateTerminalForTargetArgs { + payload: CreateThreadTerminalRequest; + target: TerminalCreateTarget; + threadId: string | null; + title: string; +} + +interface CreateEnvironmentTerminalArgs { + environmentId: string; + payload: CreateEnvironmentTerminalRequest; +} + interface RenameThreadTerminalArgs { payload: UpdateThreadTerminalRequest; terminalId: string; threadId: string; } +interface RenameTerminalArgs { + payload: UpdateTerminalRequest; + terminalId: string; +} + +interface RenameEnvironmentTerminalArgs { + environmentId: string; + payload: UpdateEnvironmentTerminalRequest; + terminalId: string; +} + interface CloseThreadTerminalArgs { payload: CloseThreadTerminalRequest; terminalId: string; threadId: string; } +interface CloseTerminalArgs { + payload: CloseTerminalRequest; + terminalId: string; +} + +interface CloseEnvironmentTerminalArgs { + environmentId: string; + payload: CloseEnvironmentTerminalRequest; + terminalId: string; +} + +interface CloseTerminalSessionArgs { + current: TerminalSessionRow; + payload: CloseTerminalRequest; +} + interface CloseDeletedThreadTerminalsArgs { threadId: string; } @@ -332,6 +408,28 @@ export class TerminalSessionLifecycle { ); } + listEnvironmentTerminals(environmentId: string): TerminalSession[] { + requireEnvironment(this.options.db, environmentId); + return listVisibleThreadlessTerminalSessionsByEnvironment( + this.options.db, + environmentId, + ).map(toTerminalSession); + } + + listTerminals(): TerminalSession[] { + return listVisibleTerminalSessions(this.options.db).map(toTerminalSession); + } + + async createTerminal(args: CreateTerminalArgs): Promise { + const existingSessions = listVisibleTerminalSessions(this.options.db); + return this.createTerminalForTarget({ + payload: args.payload, + target: args.payload.target, + threadId: null, + title: `Terminal ${existingSessions.length + 1}`, + }); + } + async createThreadTerminal( args: CreateThreadTerminalArgs, ): Promise { @@ -341,39 +439,59 @@ export class TerminalSessionLifecycle { threadEnvironmentUnavailableDetails("never_attached", null), ); } - const environment = requireReadyEnvironment( + const existingSessions = listTerminalSessionsByThread( + this.options.db, + thread.id, + ); + return this.createTerminalForTarget({ + payload: args.payload, + target: { kind: "environment", environmentId: thread.environmentId }, + threadId: thread.id, + title: `Terminal ${existingSessions.length + 1}`, + }); + } + + async createEnvironmentTerminal( + args: CreateEnvironmentTerminalArgs, + ): Promise { + const existingSessions = listThreadlessTerminalSessionsByEnvironment( this.options.db, - thread.environmentId, + args.environmentId, ); + return this.createTerminalForTarget({ + payload: args.payload, + target: { kind: "environment", environmentId: args.environmentId }, + threadId: null, + title: `Terminal ${existingSessions.length + 1}`, + }); + } + + private async createTerminalForTarget( + args: CreateTerminalForTargetArgs, + ): Promise { + const launchTarget = this.resolveTerminalLaunchTarget(args.target); const daemonSession = requireConnectedHostSession( this.options, - environment.hostId, + launchTarget.hostId, ); - const target = requireWorkspaceCommandTarget(environment); - const existingSessions = listTerminalSessionsByThread( - this.options.db, - thread.id, - ); - const title = `Terminal ${existingSessions.length + 1}`; const startingSession = createTerminalSession(this.options.db, { cols: args.payload.cols, daemonSessionId: daemonSession.id, - environmentId: environment.id, - hostId: environment.hostId, - initialCwd: environment.path, + environmentId: launchTarget.environmentId, + hostId: launchTarget.hostId, + initialCwd: launchTarget.initialCwd, rows: args.payload.rows, status: "starting", - threadId: thread.id, - title, + threadId: args.threadId, + title: args.title, }); const requestId = randomUUID(); const openMessage: HostDaemonServerWsMessage = { type: "terminal.open", requestId, terminalId: startingSession.id, - threadId: thread.id, - environmentId: target.environmentId, - workspaceContext: target.workspaceContext, + ...(args.threadId !== null ? { threadId: args.threadId } : {}), + target: launchTarget.daemonTarget, cols: args.payload.cols, rows: args.payload.rows, }; @@ -395,7 +513,7 @@ export class TerminalSessionLifecycle { closeReason: "daemon-disconnect", }); if (exited) { - this.notifyThreadTerminalsChanged(exited.threadId); + this.notifyTerminalSessionChanged(exited); } throw new ApiError( 502, @@ -418,7 +536,7 @@ export class TerminalSessionLifecycle { closeReason: "open-timeout", }); if (exited) { - this.notifyThreadTerminalsChanged(exited.threadId); + this.notifyTerminalSessionChanged(exited); } this.options.hub.sendDaemonSessionMessage(daemonSession.id, { type: "terminal.close", @@ -435,7 +553,7 @@ export class TerminalSessionLifecycle { closeReason: "process-exit", }); if (exited) { - this.notifyThreadTerminalsChanged(exited.threadId); + this.notifyTerminalSessionChanged(exited); } } throw error; @@ -453,7 +571,6 @@ export class TerminalSessionLifecycle { this.closeStaleOpenedTerminal({ daemonSessionId: daemonSession.id, terminalId: startingSession.id, - threadId: thread.id, }); throw new ApiError( 409, @@ -461,10 +578,44 @@ export class TerminalSessionLifecycle { "Terminal session was cancelled before it opened", ); } - this.notifyThreadTerminalsChanged(runningSession.threadId); + this.notifyTerminalSessionChanged(runningSession); return toTerminalSession(runningSession); } + private resolveTerminalLaunchTarget( + target: TerminalCreateTarget, + ): ResolvedTerminalLaunchTarget { + switch (target.kind) { + case "environment": { + const environment = requireReadyEnvironment( + this.options.db, + target.environmentId, + ); + const workspaceTarget = requireWorkspaceCommandTarget(environment); + return { + daemonTarget: { + kind: "workspace", + environmentId: workspaceTarget.environmentId, + workspaceContext: workspaceTarget.workspaceContext, + }, + environmentId: environment.id, + hostId: workspaceTarget.hostId, + initialCwd: workspaceTarget.workspaceContext.workspacePath, + }; + } + case "host_path": + return { + daemonTarget: { + kind: "host_path", + cwd: target.cwd, + }, + environmentId: null, + hostId: target.hostId, + initialCwd: target.cwd, + }; + } + } + renameThreadTerminal(args: RenameThreadTerminalArgs): TerminalSession { requirePublicThread(this.options.db, args.threadId); const renamed = updateTerminalSessionTitle(this.options.db, { @@ -479,7 +630,52 @@ export class TerminalSessionLifecycle { "Terminal session not found", ); } - this.notifyThreadTerminalsChanged(renamed.threadId); + this.notifyTerminalSessionChanged(renamed); + const session = toTerminalSession(renamed); + this.options.hub.sendTerminalClientMessage(renamed.id, { + type: "session-updated", + session, + }); + return session; + } + + renameTerminal(args: RenameTerminalArgs): TerminalSession { + const renamed = updateTerminalSessionTitleById(this.options.db, { + terminalId: args.terminalId, + title: args.payload.title, + }); + if (!renamed) { + throw new ApiError( + 404, + "terminal_not_found", + "Terminal session not found", + ); + } + this.notifyTerminalSessionChanged(renamed); + const session = toTerminalSession(renamed); + this.options.hub.sendTerminalClientMessage(renamed.id, { + type: "session-updated", + session, + }); + return session; + } + + renameEnvironmentTerminal( + args: RenameEnvironmentTerminalArgs, + ): TerminalSession { + requireEnvironment(this.options.db, args.environmentId); + const renamed = updateThreadlessTerminalSessionTitle(this.options.db, { + environmentId: args.environmentId, + terminalId: args.terminalId, + title: args.payload.title, + }); + if (!renamed) { + throw new ApiError( + 404, + "terminal_not_found", + "Terminal session not found", + ); + } const session = toTerminalSession(renamed); this.options.hub.sendTerminalClientMessage(renamed.id, { type: "session-updated", @@ -501,6 +697,57 @@ export class TerminalSessionLifecycle { "Terminal session not found", ); } + return this.closeTerminalSession({ + current, + payload: args.payload, + }); + } + + closeTerminal(args: CloseTerminalArgs): TerminalSession { + const current = getTerminalSession(this.options.db, { + terminalId: args.terminalId, + }); + if (!current) { + throw new ApiError( + 404, + "terminal_not_found", + "Terminal session not found", + ); + } + return this.closeTerminalSession({ + current, + payload: args.payload, + }); + } + + closeEnvironmentTerminal( + args: CloseEnvironmentTerminalArgs, + ): TerminalSession { + requireEnvironment(this.options.db, args.environmentId); + const current = getThreadlessTerminalSessionForEnvironment( + this.options.db, + { + environmentId: args.environmentId, + terminalId: args.terminalId, + }, + ); + if (!current) { + throw new ApiError( + 404, + "terminal_not_found", + "Terminal session not found", + ); + } + return this.closeTerminalSession({ + current, + payload: args.payload, + }); + } + + private closeTerminalSession( + args: CloseTerminalSessionArgs, + ): TerminalSession { + const current = args.current; if (current.status === "exited") { return toTerminalSession(current); } @@ -601,10 +848,9 @@ export class TerminalSessionLifecycle { } attachBrowserTerminal(args: AttachBrowserTerminalArgs): void { - requirePublicThread(this.options.db, args.threadId); - const current = getTerminalSessionForThread(this.options.db, { - terminalId: args.terminalId, - threadId: args.threadId, + const current = this.getBrowserTerminalSession({ + ...args, + reportMissing: false, }); if (!current) { throw new ApiError( @@ -694,11 +940,13 @@ export class TerminalSessionLifecycle { this.resizeBrowserTerminal(args); return; case "close": - this.closeThreadTerminal({ - threadId: args.threadId, - terminalId: args.terminalId, - payload: { mode: "force", reason: args.message.reason }, - }); + const current = this.getBrowserTerminalSession(args); + if (current) { + this.closeTerminalSession({ + current, + payload: { mode: "force", reason: args.message.reason }, + }); + } return; } } @@ -731,7 +979,7 @@ export class TerminalSessionLifecycle { closeReason: args.message.closeReason, }); if (exited) { - this.notifyThreadTerminalsChanged(exited.threadId); + this.notifyTerminalSessionChanged(exited); const session = toTerminalSession(exited); this.options.hub.sendTerminalClientMessage(exited.id, { type: "exited", @@ -813,9 +1061,8 @@ export class TerminalSessionLifecycle { } private closeStaleOpenedTerminal(args: CloseStaleOpenedTerminalArgs): void { - const current = getTerminalSessionForThread(this.options.db, { + const current = getTerminalSession(this.options.db, { terminalId: args.terminalId, - threadId: args.threadId, }); this.options.hub.sendDaemonSessionMessage(args.daemonSessionId, { type: "terminal.close", @@ -849,7 +1096,7 @@ export class TerminalSessionLifecycle { private notifyExitedTerminalSession( args: NotifyExitedTerminalSessionArgs, ): void { - this.notifyThreadTerminalsChanged(args.session.threadId); + this.notifyTerminalSessionChanged(args.session); this.options.hub.sendTerminalClientMessage(args.session.id, { type: "exited", session: toTerminalSession(args.session), @@ -871,13 +1118,12 @@ export class TerminalSessionLifecycle { if (!current) { return; } - const markedInput = markTerminalSessionUserInput(this.options.db, { + const markedInput = markTerminalSessionUserInputById(this.options.db, { terminalId: current.id, - threadId: args.threadId, }); if (markedInput) { const session = toTerminalSession(markedInput); - this.notifyThreadTerminalsChanged(markedInput.threadId); + this.notifyTerminalSessionChanged(markedInput); this.options.hub.sendTerminalClientMessage(markedInput.id, { type: "session-updated", session, @@ -915,15 +1161,14 @@ export class TerminalSessionLifecycle { current.cols !== args.message.cols || current.rows !== args.message.rows ) { - const resized = updateTerminalSessionSize(this.options.db, { + const resized = updateTerminalSessionSizeById(this.options.db, { cols: args.message.cols, rows: args.message.rows, terminalId: current.id, - threadId: args.threadId, }); if (resized) { const session = toTerminalSession(resized); - this.notifyThreadTerminalsChanged(resized.threadId); + this.notifyTerminalSessionChanged(resized); this.options.hub.sendTerminalClientMessage(resized.id, { type: "session-updated", session, @@ -954,17 +1199,8 @@ export class TerminalSessionLifecycle { private getRunningBrowserTerminal( args: GetRunningBrowserTerminalArgs, ): RunningBrowserTerminalSession | null { - requirePublicThread(this.options.db, args.threadId); - const current = getTerminalSessionForThread(this.options.db, { - terminalId: args.terminalId, - threadId: args.threadId, - }); + const current = this.getBrowserTerminalSession(args); if (!current) { - this.sendTerminalSocketError({ - socket: args.socket, - code: "terminal_not_found", - message: "Terminal session not found", - }); return null; } if (!isRunningBrowserTerminalSession(current)) { @@ -978,6 +1214,34 @@ export class TerminalSessionLifecycle { return current; } + private getBrowserTerminalSession( + args: GetBrowserTerminalSessionArgs, + ): TerminalSessionRow | null { + let current: TerminalSessionRow | null; + if (args.threadId === null) { + current = getTerminalSession(this.options.db, { + terminalId: args.terminalId, + }); + } else { + requirePublicThread(this.options.db, args.threadId); + current = getTerminalSessionForThread(this.options.db, { + terminalId: args.terminalId, + threadId: args.threadId, + }); + } + if (!current) { + if (args.reportMissing !== false) { + this.sendTerminalSocketError({ + socket: args.socket, + code: "terminal_not_found", + message: "Terminal session not found", + }); + } + return null; + } + return current; + } + private disconnectDaemonSessionTerminals( args: DisconnectDaemonSessionTerminalsArgs, ): void { @@ -1004,7 +1268,7 @@ export class TerminalSessionLifecycle { { terminalId: session.id, sessionId: args.daemonSessionId }, "Terminal session disconnected with daemon session", ); - this.notifyThreadTerminalsChanged(session.threadId); + this.notifyTerminalSessionChanged(session); this.options.hub.sendTerminalClientMessage(session.id, { type: "session-updated", session: toTerminalSession(session), @@ -1098,10 +1362,15 @@ export class TerminalSessionLifecycle { clearTimeout(pending.timeout); this.pendingAttaches.delete(args.message.requestId); - const current = getTerminalSessionForThread(this.options.db, { - terminalId: pending.terminalId, - threadId: pending.threadId, - }); + const current = + pending.threadId === null + ? getTerminalSession(this.options.db, { + terminalId: pending.terminalId, + }) + : getTerminalSessionForThread(this.options.db, { + terminalId: pending.terminalId, + threadId: pending.threadId, + }); if (!current) { this.sendTerminalSocketError({ socket: pending.socket, @@ -1203,7 +1472,11 @@ export class TerminalSessionLifecycle { }); } - private notifyThreadTerminalsChanged(threadId: string): void { - this.options.hub.notifyThread(threadId, ["terminals-changed"]); + private notifyTerminalSessionChanged( + session: Pick, + ): void { + if (session.threadId !== null) { + this.options.hub.notifyThread(session.threadId, ["terminals-changed"]); + } } } diff --git a/apps/server/src/ws/terminal-protocol.ts b/apps/server/src/ws/terminal-protocol.ts index fcd98e494..c246753b7 100644 --- a/apps/server/src/ws/terminal-protocol.ts +++ b/apps/server/src/ws/terminal-protocol.ts @@ -16,14 +16,14 @@ interface TerminalSocket { interface TerminalSocketOpenArgs { socket: TerminalSocket; terminalId: string; - threadId: string; + threadId: string | null; } interface TerminalSocketMessageArgs { raw: unknown; socket: TerminalSocket; terminalId: string; - threadId: string; + threadId: string | null; } interface TerminalSocketCloseArgs { diff --git a/apps/server/test/public/public-thread-terminals.test.ts b/apps/server/test/public/public-thread-terminals.test.ts index 1bf8d4b4f..4c80e114d 100644 --- a/apps/server/test/public/public-thread-terminals.test.ts +++ b/apps/server/test/public/public-thread-terminals.test.ts @@ -1,6 +1,7 @@ import { createTerminalSession, getTerminalSessionForThread, + getThreadlessTerminalSessionForEnvironment, listTerminalSessionsByThread, markDaemonTerminalSessionsDisconnected, markEnvironmentTerminalSessionsExited, @@ -15,6 +16,8 @@ import { } from "@bb/host-daemon-contract"; import { apiErrorSchema, + environmentTerminalListResponseSchema, + terminalListResponseSchema, terminalServerMessageSchema, type TerminalServerMessage, terminalSessionSchema, @@ -173,9 +176,61 @@ async function startPendingTerminalOpen( }; } +async function startPendingEnvironmentTerminalOpen( + fixture: TerminalRouteFixture, +): Promise { + const responsePromise = Promise.resolve( + fixture.harness.app.request( + `/api/v1/environments/${fixture.environment.id}/terminals`, + { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ cols: 100, rows: 30 }), + }, + ), + ); + const openMessage = await waitForDaemonMessage(fixture.socket); + if (openMessage.type !== "terminal.open") { + throw new Error(`Expected terminal.open, received ${openMessage.type}`); + } + return { + openMessage, + responsePromise, + }; +} + +async function startPendingStandaloneTerminalOpen( + fixture: TerminalRouteFixture, +): Promise { + const responsePromise = Promise.resolve( + fixture.harness.app.request("/api/v1/terminals", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + cols: 100, + rows: 30, + target: { + kind: "host_path", + hostId: fixture.host.id, + cwd: "/tmp/standalone-terminal", + }, + }), + }), + ); + const openMessage = await waitForDaemonMessage(fixture.socket); + if (openMessage.type !== "terminal.open") { + throw new Error(`Expected terminal.open, received ${openMessage.type}`); + } + return { + openMessage, + responsePromise, + }; +} + function acknowledgeTerminalOpen( fixture: TerminalRouteFixture, openMessage: TerminalOpenMessage, + initialCwd = "/tmp/terminal-workspace", ): void { fixture.harness.deps.terminalSessions.handleDaemonTerminalMessage({ hostId: fixture.host.id, @@ -186,7 +241,7 @@ function acknowledgeTerminalOpen( terminalId: openMessage.terminalId, shell: "/bin/zsh", title: "zsh", - initialCwd: "/tmp/terminal-workspace", + initialCwd, cols: 100, rows: 30, }, @@ -254,6 +309,172 @@ describe("public thread terminal routes", () => { ]); }); + it("creates and lists threadless terminal sessions for an environment", async () => { + const fixture = await createTerminalRouteFixture(); + harnesses.push(fixture.harness); + + const pending = await startPendingEnvironmentTerminalOpen(fixture); + + expect(pending.openMessage).not.toHaveProperty("threadId"); + expect(pending.openMessage.target).toMatchObject({ + kind: "workspace", + environmentId: fixture.environment.id, + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + }, + }); + acknowledgeTerminalOpen(fixture, pending.openMessage); + const response = await pending.responsePromise; + + expect(response.status).toBe(201); + const created = terminalSessionSchema.parse(await readJson(response)); + expect(created).toMatchObject({ + environmentId: fixture.environment.id, + threadId: null, + status: "running", + title: "zsh", + }); + + const environmentListResponse = await fixture.harness.app.request( + `/api/v1/environments/${fixture.environment.id}/terminals`, + ); + expect(environmentListResponse.status).toBe(200); + const environmentList = environmentTerminalListResponseSchema.parse( + await readJson(environmentListResponse), + ); + expect(environmentList.sessions).toEqual([ + expect.objectContaining({ + id: created.id, + threadId: null, + }), + ]); + + const threadListResponse = await fixture.harness.app.request( + `/api/v1/threads/${fixture.thread.id}/terminals`, + ); + const threadList = threadTerminalListResponseSchema.parse( + await readJson(threadListResponse), + ); + expect(threadList.sessions).toEqual([]); + }); + + it("creates and lists terminal sessions for a host path without an environment", async () => { + const fixture = await createTerminalRouteFixture(); + harnesses.push(fixture.harness); + + const pending = await startPendingStandaloneTerminalOpen(fixture); + + expect(pending.openMessage).not.toHaveProperty("threadId"); + expect(pending.openMessage.target).toEqual({ + kind: "host_path", + cwd: "/tmp/standalone-terminal", + }); + acknowledgeTerminalOpen( + fixture, + pending.openMessage, + "/tmp/standalone-terminal", + ); + const response = await pending.responsePromise; + + expect(response.status).toBe(201); + const created = terminalSessionSchema.parse(await readJson(response)); + expect(created).toMatchObject({ + environmentId: null, + hostId: fixture.host.id, + initialCwd: "/tmp/standalone-terminal", + threadId: null, + status: "running", + }); + + const listResponse = await fixture.harness.app.request( + "/api/v1/terminals", + ); + expect(listResponse.status).toBe(200); + const list = terminalListResponseSchema.parse(await readJson(listResponse)); + expect(list.sessions).toEqual([ + expect.objectContaining({ + id: created.id, + environmentId: null, + threadId: null, + }), + ]); + }); + + it("attaches browser sockets to threadless terminal sessions", async () => { + const fixture = await createTerminalRouteFixture(); + harnesses.push(fixture.harness); + const stored = createTerminalSession(fixture.harness.db, { + cols: 120, + daemonSessionId: fixture.session.id, + environmentId: fixture.environment.id, + hostId: fixture.host.id, + initialCwd: fixture.environment.path ?? "/tmp/terminal-workspace", + rows: 32, + status: "running", + threadId: null, + title: "Terminal 1", + }); + const browserSocket = createFakeBrowserSocket(); + + fixture.harness.deps.terminalSessions.attachBrowserTerminal({ + socket: browserSocket, + terminalId: stored.id, + threadId: null, + }); + const attachMessage = await waitForDaemonMessage(fixture.socket); + expect(attachMessage).toMatchObject({ + type: "terminal.attach", + terminalId: stored.id, + }); + if (attachMessage.type !== "terminal.attach") { + throw new Error(`Expected terminal.attach, received ${attachMessage.type}`); + } + + fixture.harness.deps.terminalSessions.handleDaemonTerminalMessage({ + hostId: fixture.host.id, + sessionId: fixture.session.id, + message: { + type: "terminal.replay", + requestId: attachMessage.requestId, + terminalId: stored.id, + chunks: [], + nextSeq: 0, + }, + }); + expect(readBrowserMessages(browserSocket)).toContainEqual( + expect.objectContaining({ + type: "attached", + session: expect.objectContaining({ + id: stored.id, + threadId: null, + }), + }), + ); + + fixture.harness.deps.terminalSessions.handleBrowserTerminalMessage({ + socket: browserSocket, + terminalId: stored.id, + threadId: null, + message: { + type: "input", + dataBase64: Buffer.from("pwd\n").toString("base64"), + }, + }); + const inputMessage = await waitForDaemonMessage(fixture.socket, 1); + expect(inputMessage).toMatchObject({ + type: "terminal.input", + terminalId: stored.id, + }); + expect( + getThreadlessTerminalSessionForEnvironment(fixture.harness.db, { + environmentId: fixture.environment.id, + terminalId: stored.id, + }), + ).toMatchObject({ + lastUserInputAt: expect.any(Number), + }); + }); + it("rejects terminal creation when the thread has no environment", async () => { const harness = await createTestAppHarness(); harnesses.push(harness); @@ -305,11 +526,14 @@ describe("public thread terminal routes", () => { } expect(openMessage).toMatchObject({ cols: 100, - environmentId: fixture.environment.id, rows: 30, threadId: fixture.thread.id, - workspaceContext: { - workspacePath: "/tmp/terminal-workspace", + target: { + kind: "workspace", + environmentId: fixture.environment.id, + workspaceContext: { + workspacePath: "/tmp/terminal-workspace", + }, }, }); diff --git a/packages/db/drizzle/0042_threadless_terminal_sessions.sql b/packages/db/drizzle/0042_threadless_terminal_sessions.sql new file mode 100644 index 000000000..07649bb51 --- /dev/null +++ b/packages/db/drizzle/0042_threadless_terminal_sessions.sql @@ -0,0 +1,68 @@ +CREATE TABLE `__new_terminal_sessions` ( + `id` text PRIMARY KEY NOT NULL, + `thread_id` text, + `environment_id` text, + `host_id` text NOT NULL, + `daemon_session_id` text, + `title` text NOT NULL, + `initial_cwd` text NOT NULL, + `cols` integer NOT NULL, + `rows` integer NOT NULL, + `status` text NOT NULL, + `exit_code` integer, + `close_reason` text, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL, + `last_user_input_at` integer, + FOREIGN KEY (`thread_id`) REFERENCES `threads`(`id`) ON UPDATE no action ON DELETE cascade, + FOREIGN KEY (`environment_id`) REFERENCES `environments`(`id`) ON UPDATE no action ON DELETE cascade, + FOREIGN KEY (`host_id`) REFERENCES `hosts`(`id`) ON UPDATE no action ON DELETE cascade, + FOREIGN KEY (`daemon_session_id`) REFERENCES `host_daemon_sessions`(`id`) ON UPDATE no action ON DELETE set null +); +--> statement-breakpoint +INSERT INTO `__new_terminal_sessions` ( + `id`, + `thread_id`, + `environment_id`, + `host_id`, + `daemon_session_id`, + `title`, + `initial_cwd`, + `cols`, + `rows`, + `status`, + `exit_code`, + `close_reason`, + `created_at`, + `updated_at`, + `last_user_input_at` +) +SELECT + `id`, + `thread_id`, + `environment_id`, + `host_id`, + `daemon_session_id`, + `title`, + `initial_cwd`, + `cols`, + `rows`, + `status`, + `exit_code`, + `close_reason`, + `created_at`, + `updated_at`, + `last_user_input_at` +FROM `terminal_sessions`; +--> statement-breakpoint +DROP TABLE `terminal_sessions`; +--> statement-breakpoint +ALTER TABLE `__new_terminal_sessions` RENAME TO `terminal_sessions`; +--> statement-breakpoint +CREATE INDEX `terminal_sessions_thread_status_updated_idx` ON `terminal_sessions` (`thread_id`,`status`,`updated_at`); +--> statement-breakpoint +CREATE INDEX `terminal_sessions_environment_status_idx` ON `terminal_sessions` (`environment_id`,`status`); +--> statement-breakpoint +CREATE INDEX `terminal_sessions_host_status_idx` ON `terminal_sessions` (`host_id`,`status`); +--> statement-breakpoint +CREATE INDEX `terminal_sessions_daemon_session_idx` ON `terminal_sessions` (`daemon_session_id`); diff --git a/packages/db/drizzle/meta/_journal.json b/packages/db/drizzle/meta/_journal.json index 22a5cc02e..aa92c5691 100644 --- a/packages/db/drizzle/meta/_journal.json +++ b/packages/db/drizzle/meta/_journal.json @@ -295,6 +295,13 @@ "when": 1781660000003, "tag": "0041_add_automations", "breakpoints": true + }, + { + "idx": 42, + "version": "6", + "when": 1781660000004, + "tag": "0042_threadless_terminal_sessions", + "breakpoints": true } ] } diff --git a/packages/db/src/data/index.ts b/packages/db/src/data/index.ts index 59c688566..3cd933402 100644 --- a/packages/db/src/data/index.ts +++ b/packages/db/src/data/index.ts @@ -292,10 +292,15 @@ export type { export { createTerminalSession, + getTerminalSession, getTerminalSessionForThread, + getThreadlessTerminalSessionForEnvironment, listTerminalSessionsByEnvironment, listTerminalSessionsByThread, + listThreadlessTerminalSessionsByEnvironment, + listVisibleTerminalSessions, listVisibleTerminalSessionsByThread, + listVisibleThreadlessTerminalSessionsByEnvironment, markDaemonTerminalSessionExited, markDaemonTerminalSessionsDisconnected, markEnvironmentTerminalSessionsExited, @@ -303,13 +308,21 @@ export { markTerminalSessionExited, markTerminalSessionRunning, markTerminalSessionUserInput, + markTerminalSessionUserInputById, + markThreadlessTerminalSessionUserInput, markThreadTerminalSessionsExited, updateTerminalSessionSize, + updateTerminalSessionSizeById, updateTerminalSessionTitle, + updateTerminalSessionTitleById, + updateThreadlessTerminalSessionSize, + updateThreadlessTerminalSessionTitle, } from "./terminal-sessions.js"; export type { CreateTerminalSessionInput, + GetTerminalSessionArgs, GetTerminalSessionForThreadArgs, + GetThreadlessTerminalSessionForEnvironmentArgs, MarkDaemonTerminalSessionExitedArgs, MarkDaemonTerminalSessionsDisconnectedArgs, MarkEnvironmentTerminalSessionsExitedArgs, @@ -317,10 +330,16 @@ export type { MarkTerminalSessionExitedArgs, MarkTerminalSessionRunningArgs, MarkTerminalSessionUserInputArgs, + MarkTerminalSessionUserInputByIdArgs, + MarkThreadlessTerminalSessionUserInputArgs, MarkThreadTerminalSessionsExitedArgs, TerminalSessionRow, UpdateTerminalSessionSizeArgs, + UpdateTerminalSessionSizeByIdArgs, UpdateTerminalSessionTitleArgs, + UpdateTerminalSessionTitleByIdArgs, + UpdateThreadlessTerminalSessionSizeArgs, + UpdateThreadlessTerminalSessionTitleArgs, } from "./terminal-sessions.js"; export { diff --git a/packages/db/src/data/terminal-sessions.ts b/packages/db/src/data/terminal-sessions.ts index de0ba1208..2ff3cea47 100644 --- a/packages/db/src/data/terminal-sessions.ts +++ b/packages/db/src/data/terminal-sessions.ts @@ -15,21 +15,30 @@ export type TerminalSessionRow = typeof terminalSessions.$inferSelect; export interface CreateTerminalSessionInput { cols: number; daemonSessionId: string | null; - environmentId: string; + environmentId: string | null; hostId: string; initialCwd: string; now?: number; rows: number; status: TerminalSessionStatus; - threadId: string; + threadId: string | null; title: string; } +export interface GetTerminalSessionArgs { + terminalId: string; +} + export interface GetTerminalSessionForThreadArgs { terminalId: string; threadId: string; } +export interface GetThreadlessTerminalSessionForEnvironmentArgs { + environmentId: string; + terminalId: string; +} + export interface UpdateTerminalSessionTitleArgs { now?: number; terminalId: string; @@ -37,6 +46,19 @@ export interface UpdateTerminalSessionTitleArgs { title: string; } +export interface UpdateTerminalSessionTitleByIdArgs { + now?: number; + terminalId: string; + title: string; +} + +export interface UpdateThreadlessTerminalSessionTitleArgs { + environmentId: string; + now?: number; + terminalId: string; + title: string; +} + export interface MarkTerminalSessionRunningArgs { cols: number; daemonSessionId: string; @@ -55,6 +77,21 @@ export interface UpdateTerminalSessionSizeArgs { threadId: string; } +export interface UpdateTerminalSessionSizeByIdArgs { + cols: number; + now?: number; + rows: number; + terminalId: string; +} + +export interface UpdateThreadlessTerminalSessionSizeArgs { + cols: number; + environmentId: string; + now?: number; + rows: number; + terminalId: string; +} + export interface MarkTerminalSessionExitedArgs { closeReason: TerminalSessionCloseReason; exitCode: number | null; @@ -68,6 +105,17 @@ export interface MarkTerminalSessionUserInputArgs { threadId: string; } +export interface MarkTerminalSessionUserInputByIdArgs { + now?: number; + terminalId: string; +} + +export interface MarkThreadlessTerminalSessionUserInputArgs { + environmentId: string; + now?: number; + terminalId: string; +} + export interface MarkDaemonTerminalSessionExitedArgs { closeReason: TerminalSessionCloseReason; daemonSessionId: string; @@ -165,6 +213,52 @@ export function listVisibleTerminalSessionsByThread( .all(); } +export function listVisibleTerminalSessions( + db: TerminalSessionReadConnection, +): TerminalSessionRow[] { + return db + .select() + .from(terminalSessions) + .where(inArray(terminalSessions.status, NON_TERMINAL_SESSION_STATUSES)) + .orderBy(asc(terminalSessions.createdAt), asc(terminalSessions.id)) + .all(); +} + +export function listThreadlessTerminalSessionsByEnvironment( + db: TerminalSessionReadConnection, + environmentId: string, +): TerminalSessionRow[] { + return db + .select() + .from(terminalSessions) + .where( + and( + eq(terminalSessions.environmentId, environmentId), + isNull(terminalSessions.threadId), + ), + ) + .orderBy(asc(terminalSessions.createdAt), asc(terminalSessions.id)) + .all(); +} + +export function listVisibleThreadlessTerminalSessionsByEnvironment( + db: TerminalSessionReadConnection, + environmentId: string, +): TerminalSessionRow[] { + return db + .select() + .from(terminalSessions) + .where( + and( + eq(terminalSessions.environmentId, environmentId), + isNull(terminalSessions.threadId), + inArray(terminalSessions.status, NON_TERMINAL_SESSION_STATUSES), + ), + ) + .orderBy(asc(terminalSessions.createdAt), asc(terminalSessions.id)) + .all(); +} + export function listTerminalSessionsByEnvironment( db: TerminalSessionReadConnection, environmentId: string, @@ -177,6 +271,19 @@ export function listTerminalSessionsByEnvironment( .all(); } +export function getTerminalSession( + db: TerminalSessionReadConnection, + args: GetTerminalSessionArgs, +): TerminalSessionRow | null { + return ( + db + .select() + .from(terminalSessions) + .where(eq(terminalSessions.id, args.terminalId)) + .get() ?? null + ); +} + export function getTerminalSessionForThread( db: TerminalSessionReadConnection, args: GetTerminalSessionForThreadArgs, @@ -195,6 +302,25 @@ export function getTerminalSessionForThread( ); } +export function getThreadlessTerminalSessionForEnvironment( + db: TerminalSessionReadConnection, + args: GetThreadlessTerminalSessionForEnvironmentArgs, +): TerminalSessionRow | null { + return ( + db + .select() + .from(terminalSessions) + .where( + and( + eq(terminalSessions.id, args.terminalId), + eq(terminalSessions.environmentId, args.environmentId), + isNull(terminalSessions.threadId), + ), + ) + .get() ?? null + ); +} + export function updateTerminalSessionTitle( db: TerminalSessionWriteConnection, args: UpdateTerminalSessionTitleArgs, @@ -217,6 +343,46 @@ export function updateTerminalSessionTitle( ); } +export function updateTerminalSessionTitleById( + db: TerminalSessionWriteConnection, + args: UpdateTerminalSessionTitleByIdArgs, +): TerminalSessionRow | null { + return ( + db + .update(terminalSessions) + .set({ + title: args.title, + updatedAt: args.now ?? Date.now(), + }) + .where(eq(terminalSessions.id, args.terminalId)) + .returning() + .get() ?? null + ); +} + +export function updateThreadlessTerminalSessionTitle( + db: TerminalSessionWriteConnection, + args: UpdateThreadlessTerminalSessionTitleArgs, +): TerminalSessionRow | null { + return ( + db + .update(terminalSessions) + .set({ + title: args.title, + updatedAt: args.now ?? Date.now(), + }) + .where( + and( + eq(terminalSessions.id, args.terminalId), + eq(terminalSessions.environmentId, args.environmentId), + isNull(terminalSessions.threadId), + ), + ) + .returning() + .get() ?? null + ); +} + export function markTerminalSessionRunning( db: TerminalSessionWriteConnection, args: MarkTerminalSessionRunningArgs, @@ -271,6 +437,48 @@ export function updateTerminalSessionSize( ); } +export function updateTerminalSessionSizeById( + db: TerminalSessionWriteConnection, + args: UpdateTerminalSessionSizeByIdArgs, +): TerminalSessionRow | null { + return ( + db + .update(terminalSessions) + .set({ + cols: args.cols, + rows: args.rows, + updatedAt: args.now ?? Date.now(), + }) + .where(eq(terminalSessions.id, args.terminalId)) + .returning() + .get() ?? null + ); +} + +export function updateThreadlessTerminalSessionSize( + db: TerminalSessionWriteConnection, + args: UpdateThreadlessTerminalSessionSizeArgs, +): TerminalSessionRow | null { + return ( + db + .update(terminalSessions) + .set({ + cols: args.cols, + rows: args.rows, + updatedAt: args.now ?? Date.now(), + }) + .where( + and( + eq(terminalSessions.id, args.terminalId), + eq(terminalSessions.environmentId, args.environmentId), + isNull(terminalSessions.threadId), + ), + ) + .returning() + .get() ?? null + ); +} + export function markTerminalSessionUserInput( db: TerminalSessionWriteConnection, args: MarkTerminalSessionUserInputArgs, @@ -295,6 +503,54 @@ export function markTerminalSessionUserInput( ); } +export function markTerminalSessionUserInputById( + db: TerminalSessionWriteConnection, + args: MarkTerminalSessionUserInputByIdArgs, +): TerminalSessionRow | null { + const now = args.now ?? Date.now(); + return ( + db + .update(terminalSessions) + .set({ + lastUserInputAt: now, + updatedAt: now, + }) + .where( + and( + eq(terminalSessions.id, args.terminalId), + isNull(terminalSessions.lastUserInputAt), + ), + ) + .returning() + .get() ?? null + ); +} + +export function markThreadlessTerminalSessionUserInput( + db: TerminalSessionWriteConnection, + args: MarkThreadlessTerminalSessionUserInputArgs, +): TerminalSessionRow | null { + const now = args.now ?? Date.now(); + return ( + db + .update(terminalSessions) + .set({ + lastUserInputAt: now, + updatedAt: now, + }) + .where( + and( + eq(terminalSessions.id, args.terminalId), + eq(terminalSessions.environmentId, args.environmentId), + isNull(terminalSessions.threadId), + isNull(terminalSessions.lastUserInputAt), + ), + ) + .returning() + .get() ?? null + ); +} + export function markTerminalSessionExited( db: TerminalSessionWriteConnection, args: MarkTerminalSessionExitedArgs, diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index 2856f3837..36be25e98 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -547,12 +547,12 @@ export const terminalSessions = sqliteTable( "terminal_sessions", { id: text("id").primaryKey(), - threadId: text("thread_id") - .notNull() - .references(() => threads.id, { onDelete: "cascade" }), - environmentId: text("environment_id") - .notNull() - .references(() => environments.id, { onDelete: "cascade" }), + threadId: text("thread_id").references(() => threads.id, { + onDelete: "cascade", + }), + environmentId: text("environment_id").references(() => environments.id, { + onDelete: "cascade", + }), hostId: text("host_id") .notNull() .references(() => hosts.id, { onDelete: "cascade" }), diff --git a/packages/db/test/data/terminal-sessions.test.ts b/packages/db/test/data/terminal-sessions.test.ts index b8779afc6..62b3dafba 100644 --- a/packages/db/test/data/terminal-sessions.test.ts +++ b/packages/db/test/data/terminal-sessions.test.ts @@ -4,11 +4,17 @@ import { migrate } from "../../src/migrate.js"; import { noopNotifier } from "../../src/notifier.js"; import { createTerminalSession, + getThreadlessTerminalSessionForEnvironment, + listThreadlessTerminalSessionsByEnvironment, listTerminalSessionsByThread, + listVisibleTerminalSessions, listVisibleTerminalSessionsByThread, + listVisibleThreadlessTerminalSessionsByEnvironment, markDaemonTerminalSessionsDisconnected, markEnvironmentTerminalSessionsExited, + markThreadlessTerminalSessionUserInput, markTerminalSessionUserInput, + markTerminalSessionUserInputById, markTerminalSessionRunning, markThreadTerminalSessionsExited, } from "../../src/data/terminal-sessions.js"; @@ -100,7 +106,119 @@ function createStartingTerminal(fixture: TerminalSessionFixture) { }); } +function createStartingThreadlessTerminal(fixture: TerminalSessionFixture) { + return createTerminalSession(fixture.db, { + cols: 80, + daemonSessionId: fixture.session.id, + environmentId: fixture.environment.id, + hostId: fixture.host.id, + initialCwd: "/tmp/workspace", + rows: 24, + status: "starting", + threadId: null, + title: "Terminal 1", + }); +} + +function createStartingStandaloneTerminal(fixture: TerminalSessionFixture) { + return createTerminalSession(fixture.db, { + cols: 80, + daemonSessionId: fixture.session.id, + environmentId: null, + hostId: fixture.host.id, + initialCwd: "/tmp", + rows: 24, + status: "starting", + threadId: null, + title: "Terminal 1", + }); +} + describe("terminal sessions", () => { + it("keeps threadless environment terminals out of thread terminal queries", () => { + const fixture = setup(); + const threadTerminal = createStartingTerminal(fixture); + const threadlessTerminal = createStartingThreadlessTerminal(fixture); + + expect(listTerminalSessionsByThread(fixture.db, fixture.thread.id)).toEqual([ + expect.objectContaining({ id: threadTerminal.id }), + ]); + expect( + listThreadlessTerminalSessionsByEnvironment( + fixture.db, + fixture.environment.id, + ), + ).toEqual([expect.objectContaining({ id: threadlessTerminal.id })]); + expect( + getThreadlessTerminalSessionForEnvironment(fixture.db, { + environmentId: fixture.environment.id, + terminalId: threadTerminal.id, + }), + ).toBeNull(); + }); + + it("keeps standalone host terminals visible without thread or environment ownership", () => { + const fixture = setup(); + const terminal = createStartingStandaloneTerminal(fixture); + + expect(listVisibleTerminalSessions(fixture.db)).toEqual([ + expect.objectContaining({ + id: terminal.id, + environmentId: null, + threadId: null, + }), + ]); + + const firstInput = markTerminalSessionUserInputById(fixture.db, { + terminalId: terminal.id, + now: 10, + }); + const secondInput = markTerminalSessionUserInputById(fixture.db, { + terminalId: terminal.id, + now: 20, + }); + + expect(firstInput).toMatchObject({ + id: terminal.id, + lastUserInputAt: 10, + }); + expect(secondInput).toBeNull(); + }); + + it("marks a threadless terminal dirty on first user input only", () => { + const fixture = setup(); + const terminal = createStartingThreadlessTerminal(fixture); + + const firstInput = markThreadlessTerminalSessionUserInput(fixture.db, { + environmentId: fixture.environment.id, + terminalId: terminal.id, + now: 10, + }); + const secondInput = markThreadlessTerminalSessionUserInput(fixture.db, { + environmentId: fixture.environment.id, + terminalId: terminal.id, + now: 20, + }); + + expect(firstInput).toMatchObject({ + id: terminal.id, + lastUserInputAt: 10, + updatedAt: 10, + }); + expect(secondInput).toBeNull(); + expect( + listVisibleThreadlessTerminalSessionsByEnvironment( + fixture.db, + fixture.environment.id, + ), + ).toEqual([ + expect.objectContaining({ + id: terminal.id, + lastUserInputAt: 10, + }), + ]); + }); + it("marks only the expected starting daemon session running", () => { const fixture = setup(); const terminal = createStartingTerminal(fixture); diff --git a/packages/db/test/migrate.test.ts b/packages/db/test/migrate.test.ts index 7f9fe8d3e..af7583f8f 100644 --- a/packages/db/test/migrate.test.ts +++ b/packages/db/test/migrate.test.ts @@ -73,8 +73,8 @@ interface MigratedThreadProvenanceRow { interface MigratedTerminalSessionRow { id: string; - threadId: string; - environmentId: string; + threadId: string | null; + environmentId: string | null; hostId: string; daemonSessionId: string | null; title: string; @@ -186,7 +186,10 @@ const __dirname = dirname(fileURLToPath(import.meta.url)); const latestMigrationWhen = Math.max( ...( JSON.parse( - readFileSync(resolve(__dirname, "../drizzle/meta/_journal.json"), "utf-8"), + readFileSync( + resolve(__dirname, "../drizzle/meta/_journal.json"), + "utf-8", + ), ) as { entries: { when: number }[] } ).entries.map((entry) => entry.when), ); @@ -230,10 +233,12 @@ const branchLocalThreadSearchRowidFtsMigrationWhen = 1781403656071; const rowidThreadSearchMigrationHash = "025358fe89253aec7f5bd970dc3eb88d0e834f0d58fb9d75329a5d39899340f4"; const eventLargeValuesMigrationWhen = 1781403656069; +const eventLargeValuesRestoreMigrationWhen = 1781557200000; const cleanupModeDropMigrationWhen = 1781557300000; const stopRequestedAtDropMigrationWhen = 1781557400000; const cleanupRequestedAtDropMigrationWhen = 1781557500000; const threadSourceOriginMigrationWhen = 1781660000000; +const threadlessTerminalSessionsMigrationWhen = 1781660000004; const eventLargeValuesPreOptimizationHash = "bc111f5134183c37cf135af70231ec5a79823f9868818fdd8377e1ab3c05a23f"; const queuedMessageSortKeyMigrationPath = resolve( @@ -2746,11 +2751,22 @@ describe("migrate", () => { const terminalSessionColumns = db.$client .prepare<[], TableInfoRow>("PRAGMA table_info(terminal_sessions)") - .all() - .map((column) => column.name); - expect(terminalSessionColumns).not.toContain("current_cwd"); - expect(terminalSessionColumns).not.toContain("last_connected_at"); - expect(terminalSessionColumns).not.toContain("exited_at"); + .all(); + const terminalSessionColumnNames = terminalSessionColumns.map( + (column) => column.name, + ); + expect(terminalSessionColumnNames).not.toContain("current_cwd"); + expect(terminalSessionColumnNames).not.toContain("last_connected_at"); + expect(terminalSessionColumnNames).not.toContain("exited_at"); + expect( + terminalSessionColumns.find((column) => column.name === "thread_id") + ?.notnull, + ).toBe(0); + expect( + terminalSessionColumns.find( + (column) => column.name === "environment_id", + )?.notnull, + ).toBe(0); const hostDaemonSessionColumns = db.$client .prepare<[], TableInfoRow>("PRAGMA table_info(host_daemon_sessions)") @@ -2855,8 +2871,28 @@ describe("migrate", () => { markEventLargeValuesMigrationUnapplied(db); migrate(db); + const reappliedMigrationCreatedAts = db.$client + .prepare<[number, number], MigrationCreatedAtRow>( + ` + SELECT created_at AS createdAt + FROM __drizzle_migrations + WHERE created_at IN (?, ?) + ORDER BY created_at + `, + ) + .all( + eventLargeValuesRestoreMigrationWhen, + threadlessTerminalSessionsMigrationWhen, + ) + .map((row) => row.createdAt); + expect(reappliedMigrationCreatedAts).toContain( + eventLargeValuesRestoreMigrationWhen, + ); + expect(reappliedMigrationCreatedAts).toContain( + threadlessTerminalSessionsMigrationWhen, + ); // The restore migration and every migration after it re-apply, so the - // latest applied migration is the most recent in the journal (0041). + // latest applied migration is the most recent in the journal. expect(readLatestAppliedMigrationCreatedAt(db)).toBe(latestMigrationWhen); expect(readTableNames(db)).not.toContain("event_large_values"); diff --git a/packages/host-daemon-contract/src/session.ts b/packages/host-daemon-contract/src/session.ts index c8b8b0073..e5004081e 100644 --- a/packages/host-daemon-contract/src/session.ts +++ b/packages/host-daemon-contract/src/session.ts @@ -358,14 +358,29 @@ export const hostDaemonTerminalOutputChunkSchema = z }) .strict(); +const hostDaemonTerminalOpenTargetSchema = z.discriminatedUnion("kind", [ + z + .object({ + kind: z.literal("workspace"), + environmentId: z.string().min(1), + workspaceContext: workspaceContextSchema, + }) + .strict(), + z + .object({ + kind: z.literal("host_path"), + cwd: z.string().min(1), + }) + .strict(), +]); + const hostDaemonTerminalOpenMessageSchema = z .object({ type: z.literal("terminal.open"), requestId: terminalRequestIdSchema, terminalId: terminalIdSchema, - threadId: z.string().min(1), - environmentId: z.string().min(1), - workspaceContext: workspaceContextSchema, + threadId: z.string().min(1).optional(), + target: hostDaemonTerminalOpenTargetSchema, cols: terminalColsSchema, rows: terminalRowsSchema, }) diff --git a/packages/host-daemon-contract/test/contract.test.ts b/packages/host-daemon-contract/test/contract.test.ts index 041e55afc..831ddc652 100644 --- a/packages/host-daemon-contract/test/contract.test.ts +++ b/packages/host-daemon-contract/test/contract.test.ts @@ -2403,10 +2403,26 @@ describe("host-daemon session schemas", () => { requestId: "request-1", terminalId: "term_123", threadId: "thr_123", - environmentId: "env_123", - workspaceContext: { - workspacePath: "/tmp/workspace", - workspaceProvisionType: "unmanaged", + target: { + kind: "workspace", + environmentId: "env_123", + workspaceContext: { + workspacePath: "/tmp/workspace", + workspaceProvisionType: "unmanaged", + }, + }, + cols: TERMINAL_COLS_MAX, + rows: TERMINAL_ROWS_MAX, + }).success, + ).toBe(true); + expect( + hostDaemonServerWsMessageSchema.safeParse({ + type: "terminal.open", + requestId: "request-1", + terminalId: "term_123", + target: { + kind: "host_path", + cwd: "/tmp/workspace", }, cols: TERMINAL_COLS_MAX, rows: TERMINAL_ROWS_MAX, diff --git a/packages/server-contract/src/api/terminals.ts b/packages/server-contract/src/api/terminals.ts index a59d5a3a2..fe3c607a1 100644 --- a/packages/server-contract/src/api/terminals.ts +++ b/packages/server-contract/src/api/terminals.ts @@ -9,8 +9,8 @@ import { export const terminalSessionSchema = z.object({ id: z.string().min(1), - threadId: z.string().min(1), - environmentId: z.string().min(1), + threadId: z.string().min(1).nullable(), + environmentId: z.string().min(1).nullable(), hostId: z.string().min(1), title: z.string().min(1), initialCwd: z.string().min(1), @@ -25,42 +25,102 @@ export const terminalSessionSchema = z.object({ }); export type TerminalSession = z.infer; -export const threadTerminalListResponseSchema = z.object({ +export const terminalListResponseSchema = z.object({ sessions: z.array(terminalSessionSchema), }); +export type TerminalListResponse = z.infer; + +export const threadTerminalListResponseSchema = terminalListResponseSchema; export type ThreadTerminalListResponse = z.infer< typeof threadTerminalListResponseSchema >; -export const createThreadTerminalRequestSchema = z +export const environmentTerminalListResponseSchema = terminalListResponseSchema; +export type EnvironmentTerminalListResponse = z.infer< + typeof environmentTerminalListResponseSchema +>; + +export const terminalCreateTargetSchema = z.discriminatedUnion("kind", [ + z + .object({ + kind: z.literal("environment"), + environmentId: z.string().min(1), + }) + .strict(), + z + .object({ + kind: z.literal("host_path"), + hostId: z.string().min(1), + cwd: z.string().trim().min(1), + }) + .strict(), +]); +export type TerminalCreateTarget = z.infer; + +export const createTerminalRequestSchema = z + .object({ + cols: terminalColsSchema, + rows: terminalRowsSchema, + target: terminalCreateTargetSchema, + }) + .strict(); +export type CreateTerminalRequest = z.infer; + +export const createScopedTerminalRequestSchema = z .object({ cols: terminalColsSchema, rows: terminalRowsSchema, }) .strict(); + +export const createThreadTerminalRequestSchema = + createScopedTerminalRequestSchema; export type CreateThreadTerminalRequest = z.infer< typeof createThreadTerminalRequestSchema >; -export const closeThreadTerminalRequestSchema = z +export const createEnvironmentTerminalRequestSchema = + createScopedTerminalRequestSchema; +export type CreateEnvironmentTerminalRequest = z.infer< + typeof createEnvironmentTerminalRequestSchema +>; + +export const closeTerminalRequestSchema = z .object({ mode: z.enum(["force", "if-clean"]), reason: z.literal("user"), }) .strict(); +export type CloseTerminalRequest = z.infer; + +export const closeThreadTerminalRequestSchema = closeTerminalRequestSchema; export type CloseThreadTerminalRequest = z.infer< typeof closeThreadTerminalRequestSchema >; -export const updateThreadTerminalRequestSchema = z +export const closeEnvironmentTerminalRequestSchema = closeTerminalRequestSchema; +export type CloseEnvironmentTerminalRequest = z.infer< + typeof closeEnvironmentTerminalRequestSchema +>; + +export const updateTerminalRequestSchema = z .object({ title: z.string().trim().min(1).max(200), }) .strict(); +export type UpdateTerminalRequest = z.infer; + +export const updateThreadTerminalRequestSchema = updateTerminalRequestSchema; export type UpdateThreadTerminalRequest = z.infer< typeof updateThreadTerminalRequestSchema >; +export const updateEnvironmentTerminalRequestSchema = + updateTerminalRequestSchema; +export type UpdateEnvironmentTerminalRequest = z.infer< + typeof updateEnvironmentTerminalRequestSchema +>; + export const terminalOutputChunkSchema = z .object({ seq: z.number().int().nonnegative(), diff --git a/packages/server-contract/src/common.ts b/packages/server-contract/src/common.ts index d9fc1827d..be4bb0230 100644 --- a/packages/server-contract/src/common.ts +++ b/packages/server-contract/src/common.ts @@ -16,6 +16,12 @@ export type PathThreadAndFilePath = { export type PathThreadAndTerminal = { param: { id: string; terminalId: string }; }; +export type PathEnvironmentAndTerminal = { + param: { id: string; terminalId: string }; +}; +export type PathTerminal = { + param: { terminalId: string }; +}; export type PathProjectAutomationId = { param: { id: string; automationId: string }; }; diff --git a/packages/server-contract/src/public-api.ts b/packages/server-contract/src/public-api.ts index 74129784d..4650b5a8c 100644 --- a/packages/server-contract/src/public-api.ts +++ b/packages/server-contract/src/public-api.ts @@ -25,12 +25,14 @@ import { } from "@bb/hono-typed-routes"; import type { EmptyInput, + PathEnvironmentAndTerminal, PathId, PathProjectAutomationId, PathProjectId, PathThreadAndFilePath, PathThreadAndQueuedMessage, PathThreadAndTerminal, + PathTerminal, } from "./common.js"; import type { Automation, @@ -38,11 +40,13 @@ import type { AutomationRunListResponse, AutomationRunResponse, AutomationsOverviewResponse, - CreateAutomationRequest, - RunAutomationRequest, - UpdateAutomationRequest, + CloseEnvironmentTerminalRequest, + CloseTerminalRequest, CloseThreadTerminalRequest, + CreateAutomationRequest, CommandListResponse, + CreateEnvironmentTerminalRequest, + CreateTerminalRequest, CreateProjectRequest, CreateProjectSourceRequest, CreateQueuedMessageRequest, @@ -66,6 +70,7 @@ import type { EnvironmentPullRequestResponse, EnvironmentStatusQuery, EnvironmentStatusResponse, + EnvironmentTerminalListResponse, ProjectAttachmentContentQuery, ProjectAttachmentUploadForm, ProjectBranchesQuery, @@ -83,6 +88,7 @@ import type { ReorderProjectRequest, ReorderQueuedMessageRequest, ResolvePendingInteractionRequest, + RunAutomationRequest, SendMessageRequest, SendQueuedMessageRequest, SendQueuedMessageResponse, @@ -95,6 +101,7 @@ import type { SystemVersionResponse, SystemVoiceTranscriptionForm, SystemVoiceTranscriptionResponse, + TerminalListResponse, TerminalSession, ThreadArchiveAllResponse, ThreadChildSummaryResponse, @@ -123,6 +130,9 @@ import type { TimelineTurnSummaryDetailsQuery, TimelineTurnSummaryDetailsResponse, UpdateEnvironmentRequest, + UpdateEnvironmentTerminalRequest, + UpdateTerminalRequest, + UpdateAutomationRequest, UpdateProjectRequest, UpdateProjectSourceRequest, UpdateThreadRequest, @@ -133,10 +143,12 @@ import type { } from "./api-types.js"; import { automationRunListQuerySchema, - createAutomationRequestSchema, - runAutomationRequestSchema, - updateAutomationRequestSchema, + closeEnvironmentTerminalRequestSchema, + closeTerminalRequestSchema, closeThreadTerminalRequestSchema, + createAutomationRequestSchema, + createEnvironmentTerminalRequestSchema, + createTerminalRequestSchema, createProjectRequestSchema, createProjectSourceRequestSchema, createQueuedMessageRequestSchema, @@ -162,6 +174,7 @@ import { reorderProjectRequestSchema, reorderQueuedMessageRequestSchema, resolvePendingInteractionRequestSchema, + runAutomationRequestSchema, sendMessageRequestSchema, sendQueuedMessageRequestSchema, systemExecutionOptionsQuerySchema, @@ -177,7 +190,10 @@ import { threadStoragePathsQuerySchema, threadTimelineQuerySchema, timelineTurnSummaryDetailsQuerySchema, + updateAutomationRequestSchema, + updateEnvironmentTerminalRequestSchema, updateEnvironmentRequestSchema, + updateTerminalRequestSchema, updateProjectRequestSchema, updateProjectSourceRequestSchema, updateThreadRequestSchema, @@ -345,6 +361,39 @@ export const publicApiRoutes = { }), }, + terminals: { + list: defineRoute({ + path: "/terminals", + method: "get", + request: noRequest(), + response: jsonResponse(), + }), + create: defineRoute({ + path: "/terminals", + method: "post", + request: jsonRequest( + createTerminalRequestSchema, + ), + response: jsonResponse({ status: 201 }), + }), + update: defineRoute({ + path: "/terminals/:terminalId", + method: "patch", + request: jsonRequest( + updateTerminalRequestSchema, + ), + response: jsonResponse(), + }), + close: defineRoute({ + path: "/terminals/:terminalId/close", + method: "post", + request: jsonRequest( + closeTerminalRequestSchema, + ), + response: jsonResponse(), + }), + }, + environments: { get: defineRoute({ path: "/environments/:id", @@ -447,6 +496,38 @@ export const publicApiRoutes = { request: noRequest(), response: jsonResponse(), }), + terminals: defineRoute({ + path: "/environments/:id/terminals", + method: "get", + request: noRequest(), + response: jsonResponse(), + }), + createTerminal: defineRoute({ + path: "/environments/:id/terminals", + method: "post", + request: jsonRequest( + createEnvironmentTerminalRequestSchema, + ), + response: jsonResponse({ status: 201 }), + }), + updateTerminal: defineRoute({ + path: "/environments/:id/terminals/:terminalId", + method: "patch", + request: jsonRequest< + PathEnvironmentAndTerminal, + UpdateEnvironmentTerminalRequest + >(updateEnvironmentTerminalRequestSchema), + response: jsonResponse(), + }), + closeTerminal: defineRoute({ + path: "/environments/:id/terminals/:terminalId/close", + method: "post", + request: jsonRequest< + PathEnvironmentAndTerminal, + CloseEnvironmentTerminalRequest + >(closeEnvironmentTerminalRequestSchema), + response: jsonResponse(), + }), }, threads: { diff --git a/packages/server-contract/test/contract.test.ts b/packages/server-contract/test/contract.test.ts index 988eff1a6..8dc21299e 100644 --- a/packages/server-contract/test/contract.test.ts +++ b/packages/server-contract/test/contract.test.ts @@ -12,6 +12,7 @@ import { TERMINAL_DATA_MAX_BASE64_LENGTH, TERMINAL_DATA_MAX_BYTES, TERMINAL_ROWS_MAX, + createTerminalRequestSchema, createThreadTerminalRequestSchema, createQueuedMessageRequestSchema, createProjectSourceRequestSchema, @@ -27,6 +28,7 @@ import { sendMessageRequestSchema, terminalClientMessageSchema, terminalOutputChunkSchema, + terminalSessionSchema, threadListResponseSchema, threadPendingInteractionsResponseSchema, timelineTurnSummaryDetailsResponseSchema, @@ -483,7 +485,49 @@ describe("git branch name contract", () => { }); describe("public terminal contracts", () => { + it("allows terminal session responses without thread or environment ownership", () => { + expect( + terminalSessionSchema.safeParse({ + id: "term_1", + threadId: null, + environmentId: null, + hostId: "host_1", + title: "Terminal 1", + initialCwd: "/tmp/workspace", + cols: 80, + rows: 24, + status: "running", + exitCode: null, + closeReason: null, + createdAt: 1, + updatedAt: 1, + lastUserInputAt: null, + }).success, + ).toBe(true); + }); + it("bounds terminal dimensions", () => { + expect( + createTerminalRequestSchema.safeParse({ + cols: TERMINAL_COLS_MAX, + rows: TERMINAL_ROWS_MAX, + target: { + kind: "environment", + environmentId: "env_1", + }, + }).success, + ).toBe(true); + expect( + createTerminalRequestSchema.safeParse({ + cols: TERMINAL_COLS_MAX, + rows: TERMINAL_ROWS_MAX, + target: { + kind: "host_path", + hostId: "host_1", + cwd: "/tmp/workspace", + }, + }).success, + ).toBe(true); expect( createThreadTerminalRequestSchema.safeParse({ cols: TERMINAL_COLS_MAX,