From 00b2e1464fe0096408f7bf33c95445cecbef27b9 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 20:41:20 +0300 Subject: [PATCH] feat(0.15.0): cross-worker sandbox-reconnect durability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A 15-minute agentic sandbox turn must survive the Cloudflare worker isolate dying mid-turn. `runDurableTurn` already replays a *completed* turn, but an *interrupted* one re-runs from the top — the producer's `streamPrompt` generator died with the isolate. The sandbox container is orchestrator-managed and outlives the worker. `runReconnectableTurn` checkpoints a `RunHandle` — `{ kind, sandboxId, sessionId, runId, status, cursor }` — at turn start. On a retry that finds a `running` handle, a fresh worker calls a product-supplied `reconnect(handle)` callback (which wires the sandbox SDK's event-replay endpoint) instead of re-prompting. tcloud products omit `reconnect` and fall through to a clean re-run. The handle is checkpointed as a completed step at index 0; the turn runs at index 1. This reuses the existing `completeStep` JSON-result path with zero schema change — a completed step is the only shape `startOrResume` returns to a retry, and the handle must be readable while the turn step is still `running`. Tests cover fresh / reconnected / replayed / rerun / reconnect-failure across the InMemory / FileSystem / D1 store matrix. --- package.json | 2 +- src/durable/index.ts | 13 + src/durable/run-handle.ts | 328 +++++++++++++++++ src/durable/tests/run-handle.test.ts | 506 +++++++++++++++++++++++++++ 4 files changed, 848 insertions(+), 1 deletion(-) create mode 100644 src/durable/run-handle.ts create mode 100644 src/durable/tests/run-handle.test.ts diff --git a/package.json b/package.json index 78bd775..8539602 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.14.0", + "version": "0.15.0", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/durable/index.ts b/src/durable/index.ts index ebcd53f..3b28f61 100644 --- a/src/durable/index.ts +++ b/src/durable/index.ts @@ -41,6 +41,19 @@ export type { RunDurableTurnOptions, } from './turn' export { runDurableTurn } from './turn' +// ── Cross-worker sandbox-reconnect durability ───────────────────────── +// Checkpoints a substrate run handle at turn start so a fresh worker can +// re-attach to an in-flight sandbox run instead of re-running a long turn. +export type { + ReconnectableTurnHandle, + ReconnectableTurnMode, + ReconnectableTurnProducer, + ReconnectProduce, + ReconnectableProduce, + RunHandle, + RunReconnectableTurnOptions, +} from './run-handle' +export { runReconnectableTurn } from './run-handle' export type { DurableRunManifest, DurableRunStore, diff --git a/src/durable/run-handle.ts b/src/durable/run-handle.ts new file mode 100644 index 0000000..c27fd81 --- /dev/null +++ b/src/durable/run-handle.ts @@ -0,0 +1,328 @@ +/** + * `runReconnectableTurn` — cross-worker sandbox-reconnect durability. + * + * `runDurableTurn` survives a worker crash *after* a turn completes: the + * cached final text replays. It cannot survive a crash *during* a 15-minute + * agentic turn — the producer's generator died with the isolate, so a fresh + * worker re-runs the whole turn. + * + * The fix exploits a property of sandbox-backed turns: the work runs inside + * an orchestrator-managed sandbox container that OUTLIVES the worker isolate. + * The `@tangle-network/sandbox` SDK's `streamPrompt` already auto-reconnects + * to the runtime event-replay endpoint when the SSE stream drops within a + * live worker. The gap is cross-worker: when the isolate itself dies, the + * generator is gone and a fresh worker has no pointer to the in-flight run. + * + * `runReconnectableTurn` closes that gap by checkpointing a **run handle** — + * `{ sandboxId, sessionId, runId }` — the instant the turn starts, before any + * events stream. On a retry that finds a `running` handle, a fresh worker + * calls the product-supplied `reconnect(handle)` callback (which wires the + * SDK's replay endpoint) instead of `produce`. The sandbox runtime IS the + * durable engine; this primitive only remembers the pointer. + * + * Substrate split: + * - sandbox products supply `produce` + `reconnect`. `produce` registers a + * handle via `register(handle)` once `streamPrompt` yields the run id; + * `reconnect` re-attaches to that run from a fresh process. + * - tcloud products supply `produce` only and omit `reconnect`. With no + * reconnect path a `running` handle on retry is treated as a stale lease + * and the turn re-runs — identical to `runDurableTurn` semantics. + * + * Storage: the handle is checkpointed as a *completed* step at index 0 (the + * handle step). The actual turn runs at step index 1. This reuses the + * existing `completeStep` JSON-result path with zero schema change — a + * completed step is the only step shape `startOrResume` returns to a retry, + * and it must be readable while the turn step is still `running`. Adding a + * `durable_steps` column would force a migration across all three stores and + * a new store method; the handle-step approach needs neither. + */ + +import { canonicalHash } from './identity' +import type { DurableTurnProducer } from './turn' +import type { DurableRunManifest, DurableRunStore, RunRecord, StepRecord } from './types' + +/** + * A pointer to a substrate run that outlives the worker isolate. Persisted at + * turn start so a fresh worker can re-attach instead of re-running. + */ +export interface RunHandle { + /** Which substrate owns the run. `sandbox` runs are reconnectable; `tcloud` + * runs are not (no cross-process replay endpoint). */ + kind: 'sandbox' | 'tcloud' + /** Orchestrator-managed sandbox id. Stable across worker isolates. Present + * for `kind: 'sandbox'`. */ + sandboxId?: string + /** Sandbox conversation/session id — the `sessionId` passed to + * `streamPrompt`. Lets the reconnect target the right thread. */ + sessionId?: string + /** The in-flight run id — the sandbox SDK's `executionId`, captured from + * the first `execution.started` SSE frame. The replay endpoint keys on it: + * `GET {runtimeUrl}/agents/run/{runId}/events?lastEventId=`. */ + runId?: string + /** Lifecycle of the substrate run as last observed by this primitive. */ + status: 'running' | 'completed' | 'failed' + /** Last SSE event id yielded — the replay cursor. A reconnect resumes from + * here so already-delivered events are not re-emitted. Undefined until the + * first frame with an `id:` arrives. */ + cursor?: string +} + +/** A producer for a reconnectable turn. Extends the plain turn producer with a + * `register` hook the producer calls once the substrate run id is known. */ +export interface ReconnectableTurnProducer extends DurableTurnProducer { + /** Set by `runReconnectableTurn` before `produce()` is called. The producer + * invokes it as soon as `streamPrompt` yields `execution.started` so the + * handle is checkpointed mid-turn; it may call it again with an updated + * `cursor` as events stream. Each call overwrites the prior handle. */ + register?: (handle: RunHandle) => void +} + +/** What a product supplies to build the live producer for a reconnectable + * turn. `register` is injected by `runReconnectableTurn`. */ +export type ReconnectableProduce = (ctx: { + /** Checkpoint the run handle. Call once `streamPrompt` yields the run id; + * call again to advance `cursor`. */ + register: (handle: RunHandle) => void +}) => DurableTurnProducer + +/** What a product supplies to re-attach to an in-flight substrate run from a + * fresh worker. Receives the handle checkpointed by the original turn. The + * returned producer's `stream` is the replay stream; `finalText()` is the + * reconnected turn's final text once the stream drains. */ +export type ReconnectProduce = (handle: RunHandle) => DurableTurnProducer + +export interface RunReconnectableTurnOptions { + store: DurableRunStore + /** Stable per-turn run id. The same id on a retry is what enables both + * replay (completed turn) and reconnect (in-flight turn). */ + runId: string + manifest: DurableRunManifest + /** Stable per-isolate worker id. */ + workerId: string + /** Lease window in ms. Default 900_000 (15 min) — a reconnectable turn is + * exactly the long-running case, so the lease must outlast the turn. */ + leaseMs?: number + /** Human-readable step label. Default `turn`. */ + intent?: string + /** Builds the live producer for a fresh turn. Called once, on a fresh run. */ + produce: ReconnectableProduce + /** Re-attaches to an in-flight substrate run. Called on a retry that finds + * a `running` handle. Omit for substrates with no cross-process replay + * (tcloud) — a `running` handle then falls through to a re-run. */ + reconnect?: ReconnectProduce + /** Synthesizes the single replay event from cached final text — used when a + * retry finds a *completed* turn (worker died after the turn finished). */ + replayEvent: (finalText: string) => TEvent + /** Optional live accumulator — see `RunDurableTurnOptions.accumulate`. */ + accumulate?: (event: TEvent, current: string) => string | undefined +} + +/** How the turn resolved — which of the three paths ran. */ +export type ReconnectableTurnMode = 'fresh' | 'reconnected' | 'replayed' | 'rerun' + +export interface ReconnectableTurnHandle { + /** Drop-in stream. Fresh/rerun forward producer events; reconnected forwards + * the replay stream; replayed emits one `replayEvent(cachedText)`. */ + stream: AsyncGenerator + /** Final text. Valid after `stream` drains. */ + finalText(): string + /** Which path ran. Valid after `stream` drains. */ + mode(): ReconnectableTurnMode + /** The run handle as last checkpointed. `undefined` if the producer never + * registered one (e.g. a tcloud turn). Valid after `stream` drains. */ + handle(): RunHandle | undefined + /** The durable `RunRecord` for this turn. Valid after `stream` drains. */ + record(): RunRecord | undefined +} + +/** Step 0 holds the run handle; step 1 holds the turn's final text. */ +const HANDLE_STEP = 0 +const TURN_STEP = 1 + +/** A handle in `running` state with no reconnect callback cannot be resumed — + * the turn must re-run. A `completed`/`failed` handle is informational only. */ +function isReconnectable(handle: RunHandle | undefined): handle is RunHandle { + return handle?.status === 'running' +} + +function readHandleStep(steps: ReadonlyArray): RunHandle | undefined { + const step = steps.find((s) => s.stepIndex === HANDLE_STEP) + if (!step || step.status !== 'completed') return undefined + const result = step.result as { handle?: RunHandle } | undefined + return result?.handle +} + +function readTurnStep(steps: ReadonlyArray): { finalText: string } | undefined { + const step = steps.find((s) => s.stepIndex === TURN_STEP) + if (!step || step.status !== 'completed') return undefined + const result = step.result as { finalText?: string } | undefined + return { finalText: result?.finalText ?? '' } +} + +export function runReconnectableTurn( + options: RunReconnectableTurnOptions, +): ReconnectableTurnHandle { + const { store, runId, manifest, workerId } = options + const leaseMs = options.leaseMs ?? 900_000 + const intent = options.intent ?? 'turn' + const inputHash = canonicalHash(manifest.input) + + let accumulated = '' + let mode: ReconnectableTurnMode = 'fresh' + let currentHandle: RunHandle | undefined + let finalRecord: RunRecord | undefined + // Serializes handle-step writes. `register` may fire repeatedly as the + // substrate streams; each write chains onto the prior so they land in call + // order and never interleave with the terminal completed-handle write. + let handleWrites: Promise = Promise.resolve() + + /** Drain a producer's stream: forward events, accumulate text, checkpoint + * the turn step on clean drain, fail it on throw. Shared by the fresh, + * rerun, and reconnected paths — all three differ only in which producer + * they hand in and whether the turn step was already begun. */ + async function* drainAndCheckpoint( + producer: DurableTurnProducer, + ): AsyncGenerator { + try { + for await (const event of producer.stream) { + if (options.accumulate) { + const next = options.accumulate(event, accumulated) + if (typeof next === 'string') accumulated = next + } + yield event + } + const producerText = producer.finalText() + if (producerText) accumulated = producerText + // Drain in-flight `register` writes BEFORE any further store write: + // both touch the run record, and concurrent writers corrupt stores + // that commit via tmp-file+rename. + await handleWrites + await store.completeStep({ + runId, + stepIndex: TURN_STEP, + result: { finalText: accumulated }, + }) + // Flip the handle to `completed` LAST so a late retry sees `completed` + // — never a stale `running` it would wrongly reconnect to. + if (currentHandle && currentHandle.status === 'running') { + currentHandle = { ...currentHandle, status: 'completed' } + await store.completeStep({ + runId, + stepIndex: HANDLE_STEP, + result: { handle: currentHandle }, + }) + } + finalRecord = await store.endRun({ + runId, + workerId, + status: 'completed', + outcome: { notes: intent, metadata: { chars: accumulated.length, mode } }, + }) + } catch (err) { + // Drain in-flight `register` writes before failing so no handle write + // outlives the turn (it would race store teardown). The handle stays + // `running` — exactly what a reconnecting retry needs. + await handleWrites + await store.failStep({ + runId, + stepIndex: TURN_STEP, + error: { message: err instanceof Error ? err.message : String(err) }, + }) + finalRecord = await store.endRun({ runId, workerId, status: 'failed' }) + throw err + } + } + + async function* stream(): AsyncGenerator { + const { completedSteps } = await store.startOrResume({ + runId, + manifest, + workerId, + leaseMs, + }) + + // ── Replay path — the turn already finished ───────────────────────── + const turnStep = readTurnStep(completedSteps) + if (turnStep) { + mode = 'replayed' + accumulated = turnStep.finalText + currentHandle = readHandleStep(completedSteps) + yield options.replayEvent(accumulated) + finalRecord = await store.endRun({ runId, workerId, status: 'completed' }) + return + } + + // ── Reconnect path — an in-flight handle survived a worker crash ──── + const priorHandle = readHandleStep(completedSteps) + if (isReconnectable(priorHandle) && options.reconnect) { + mode = 'reconnected' + currentHandle = priorHandle + // The turn step may be `running`/`failed` from the dead worker. begin + // bumps its attempt count; the reconnected stream re-checkpoints it. + await store.beginStep({ + runId, + stepIndex: TURN_STEP, + intent, + kind: 'llm', + inputHash, + }) + yield* drainAndCheckpoint(options.reconnect(priorHandle)) + return + } + + // ── Fresh / rerun path — produce live ─────────────────────────────── + // A `running` handle with no reconnect callback (tcloud, or a sandbox + // product that did not wire reconnect) cannot resume — it re-runs. + mode = priorHandle ? 'rerun' : 'fresh' + + // The handle step is checkpointed up front as a placeholder so a retry + // that crashes before `register` still sees the run shape. `register` + // overwrites it with the real pointer once the substrate yields the id. + const placeholder: RunHandle = { kind: 'tcloud', status: 'running' } + await store.beginStep({ + runId, + stepIndex: HANDLE_STEP, + intent: `${intent}:handle`, + kind: 'logic', + inputHash, + }) + await store.completeStep({ + runId, + stepIndex: HANDLE_STEP, + result: { handle: placeholder }, + }) + currentHandle = placeholder + + const register = (handle: RunHandle): void => { + currentHandle = handle + // The handle step is already `completed`; each `register` overwrites its + // result. Writes chain through `handleWrites` so they persist in call + // order and `drainAndCheckpoint` can await them before the terminal + // completed-handle write. A crash before a write lands loses only the + // pointer refinement, not correctness — a non-reconnectable handle + // re-runs. A rejected write must not abort the turn, so it is absorbed. + handleWrites = handleWrites + .then(() => + store.completeStep({ runId, stepIndex: HANDLE_STEP, result: { handle } }), + ) + .catch(() => undefined) + } + + await store.beginStep({ + runId, + stepIndex: TURN_STEP, + intent, + kind: 'llm', + inputHash, + }) + yield* drainAndCheckpoint(options.produce({ register })) + } + + return { + stream: stream(), + finalText: () => accumulated, + mode: () => mode, + handle: () => currentHandle, + record: () => finalRecord, + } +} diff --git a/src/durable/tests/run-handle.test.ts b/src/durable/tests/run-handle.test.ts new file mode 100644 index 0000000..36c30e4 --- /dev/null +++ b/src/durable/tests/run-handle.test.ts @@ -0,0 +1,506 @@ +/** + * `runReconnectableTurn` tests — the cross-worker sandbox-reconnect path. + * + * Run identically against InMemory / FileSystem / D1-over-sqlite so the + * handle registry is proven on every store a product could deploy on. + * + * Each test names the regression it defends: + * - "fresh turn registers a handle" — handle is checkpointed at step 0 + * - "retry with running handle reconnects" — fresh worker calls reconnect, + * NOT produce; the 15-min turn is + * not re-run from the top + * - "completed handle replays" — a finished turn replays cached + * text, neither produce nor + * reconnect runs + * - "running handle, no reconnect = rerun" — tcloud substrate (no replay + * endpoint) falls through to a + * clean re-run + * - "reconnect failure fails the run" — a replay-endpoint error is not + * swallowed; the turn step fails + * - "register advances the cursor" — the last-observed SSE cursor is + * persisted for the replay GET + */ + +import { mkdtempSync, readFileSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +import { afterEach, beforeEach, describe, expect, it } from 'vitest' + +import { + D1DurableRunStore, + type DurableRunManifest, + type DurableRunStore, + FileSystemDurableRunStore, + InMemoryDurableRunStore, + type ReconnectableProduce, + type ReconnectProduce, + type RunHandle, + runReconnectableTurn, +} from '../index' +import { createSqliteD1 } from './sqlite-d1-adapter' + +const SCHEMA_SQL = readFileSync(new URL('../schema.sql', import.meta.url), 'utf8') + +interface FakeEvent { + type: string + text?: string + runId?: string +} + +function makeManifest(turnIndex = 0): DurableRunManifest { + return { + projectId: 'test-product', + scenarioId: 'thread-1', + task: { + id: `chat:thread-1:${turnIndex}`, + intent: 'unit-test reconnectable turn', + domain: 'test', + requiredKnowledge: [], + metadata: { turnIndex }, + }, + input: { userMessage: `q-${turnIndex}` }, + } +} + +/** + * A fake sandbox producer. Emits an `execution.started` event carrying a run + * id (the producer registers the handle on it, mirroring how a real product + * watches `streamPrompt`), then text deltas, then a result. `throwAt` makes + * it explode after that many *yielded* events to simulate a mid-turn crash. + */ +function sandboxProducer(opts: { + sandboxId: string + sessionId: string + runId: string + chunks: string[] + register: (handle: RunHandle) => void + onConstruct?: () => void + throwAt?: number +}) { + opts.onConstruct?.() + let assembled = '' + async function* stream(): AsyncGenerator { + let emitted = 0 + // streamPrompt yields execution.started first — the product registers the + // run handle here, mid-turn, before any expensive work. + opts.register({ + kind: 'sandbox', + sandboxId: opts.sandboxId, + sessionId: opts.sessionId, + runId: opts.runId, + status: 'running', + }) + yield { type: 'execution.started', runId: opts.runId } + emitted += 1 + for (let i = 0; i < opts.chunks.length; i++) { + if (opts.throwAt !== undefined && emitted >= opts.throwAt) { + throw new Error('worker isolate died mid-turn') + } + assembled += opts.chunks[i] + // advance the replay cursor as frames arrive + opts.register({ + kind: 'sandbox', + sandboxId: opts.sandboxId, + sessionId: opts.sessionId, + runId: opts.runId, + status: 'running', + cursor: `evt-${i}`, + }) + yield { type: 'delta', text: opts.chunks[i] } + emitted += 1 + } + yield { type: 'result', text: assembled } + } + return { stream: stream(), finalText: () => assembled } +} + +const storeKinds = [ + { + name: 'InMemoryDurableRunStore', + factory: () => ({ store: new InMemoryDurableRunStore(), cleanup: () => undefined }), + }, + { + name: 'FileSystemDurableRunStore', + factory: () => { + const dir = mkdtempSync(join(tmpdir(), 'run-handle-test-')) + return { + store: new FileSystemDurableRunStore(dir), + cleanup: () => rmSync(dir, { recursive: true, force: true }), + } + }, + }, + { + name: 'D1DurableRunStore (better-sqlite3)', + factory: () => { + const handle = createSqliteD1() + handle.raw.exec(SCHEMA_SQL) + return { + store: new D1DurableRunStore(handle.db), + cleanup: () => handle.close(), + } + }, + }, +] as const + +for (const kind of storeKinds) { + describe(`runReconnectableTurn / ${kind.name}`, () => { + let store: DurableRunStore + let cleanup: () => void + + beforeEach(() => { + const made = kind.factory() + store = made.store + cleanup = made.cleanup + }) + + afterEach(async () => { + await store.close() + cleanup() + }) + + it('a fresh turn registers a run handle and checkpoints final text', async () => { + const handle = runReconnectableTurn({ + store, + runId: 'chat:thread-1:0', + manifest: makeManifest(0), + workerId: 'worker-a', + produce: ({ register }) => + sandboxProducer({ + sandboxId: 'sbx-1', + sessionId: 'sess-1', + runId: 'run-1', + chunks: ['Hello', ', ', 'world'], + register, + }), + replayEvent: (text) => ({ type: 'result', text }), + }) + + const events: FakeEvent[] = [] + for await (const e of handle.stream) events.push(e) + + expect(handle.mode()).toBe('fresh') + expect(handle.finalText()).toBe('Hello, world') + expect(handle.record()?.status).toBe('completed') + // handle was registered, and the final state reflects turn completion + const h = handle.handle() + expect(h).toBeDefined() + expect(h?.kind).toBe('sandbox') + expect(h?.sandboxId).toBe('sbx-1') + expect(h?.sessionId).toBe('sess-1') + expect(h?.runId).toBe('run-1') + expect(h?.status).toBe('completed') + // the handle is durably persisted at step 0 + const persisted = await store.loadStep('chat:thread-1:0', 0) + expect(persisted?.status).toBe('completed') + expect((persisted?.result as { handle: RunHandle }).handle.sandboxId).toBe('sbx-1') + }) + + it('a retry with a running handle reconnects instead of re-running produce', async () => { + const runId = 'chat:thread-1:1' + const manifest = makeManifest(1) + + // ── Attempt 1: worker-a starts the turn, crashes mid-stream ─────── + let firstProduces = 0 + const first = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-a', + produce: ({ register }) => { + firstProduces += 1 + return sandboxProducer({ + sandboxId: 'sbx-2', + sessionId: 'sess-2', + runId: 'run-2', + chunks: ['long', '-turn', '-work'], + register, + throwAt: 2, // dies after execution.started + 1 delta + }) + }, + replayEvent: (text) => ({ type: 'result', text }), + }) + await expect( + (async () => { + for await (const _ of first.stream) { + /* drain until the crash */ + } + })(), + ).rejects.toThrow('worker isolate died mid-turn') + expect(firstProduces).toBe(1) + // the handle survived the crash in `running` state + const survived = await store.loadStep(runId, 0) + expect((survived?.result as { handle: RunHandle }).handle.status).toBe('running') + + // ── Attempt 2: worker-b retries — must reconnect, NOT produce ───── + let secondProduces = 0 + let reconnectedWith: RunHandle | undefined + const reconnect: ReconnectProduce = (h) => { + reconnectedWith = h + // the replay endpoint streams the rest of the turn from the cursor + let assembled = '[replayed] full answer' + async function* stream(): AsyncGenerator { + yield { type: 'delta', text: '[replayed] full answer' } + yield { type: 'result', text: assembled } + } + return { stream: stream(), finalText: () => assembled } + } + const second = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-b', + produce: ({ register }) => { + secondProduces += 1 + return sandboxProducer({ + sandboxId: 'sbx-2', + sessionId: 'sess-2', + runId: 'run-2', + chunks: ['should', '-not', '-run'], + register, + }) + }, + reconnect, + replayEvent: (text) => ({ type: 'result', text }), + }) + const events: FakeEvent[] = [] + for await (const e of second.stream) events.push(e) + + // produce was NOT called again — the 15-min turn was not restarted + expect(secondProduces).toBe(0) + expect(second.mode()).toBe('reconnected') + // reconnect received the handle the crashed worker checkpointed + expect(reconnectedWith?.sandboxId).toBe('sbx-2') + expect(reconnectedWith?.runId).toBe('run-2') + expect(reconnectedWith?.cursor).toBe('evt-0') // last cursor before crash + expect(second.finalText()).toBe('[replayed] full answer') + expect(second.record()?.status).toBe('completed') + expect(events).toEqual([ + { type: 'delta', text: '[replayed] full answer' }, + { type: 'result', text: '[replayed] full answer' }, + ]) + }) + + it('a completed handle replays cached text — neither produce nor reconnect runs', async () => { + const runId = 'chat:thread-1:2' + const manifest = makeManifest(2) + + // Attempt 1 completes cleanly. + const first = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-a', + produce: ({ register }) => + sandboxProducer({ + sandboxId: 'sbx-3', + sessionId: 'sess-3', + runId: 'run-3', + chunks: ['cached ', 'result'], + register, + }), + replayEvent: (text) => ({ type: 'result', text }), + }) + for await (const _ of first.stream) { + /* drain */ + } + expect(first.mode()).toBe('fresh') + + // Attempt 2 — same runId. Worker died after the turn finished but before + // the response reached the client. Must replay, run nothing. + let produces = 0 + let reconnects = 0 + const second = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-b', + produce: ({ register }) => { + produces += 1 + return sandboxProducer({ + sandboxId: 'sbx-3', + sessionId: 'sess-3', + runId: 'run-3', + chunks: ['x'], + register, + }) + }, + reconnect: () => { + reconnects += 1 + async function* stream(): AsyncGenerator { + yield { type: 'result', text: 'wrong' } + } + return { stream: stream(), finalText: () => 'wrong' } + }, + replayEvent: (text) => ({ type: 'result', text }), + }) + const events: FakeEvent[] = [] + for await (const e of second.stream) events.push(e) + + expect(produces).toBe(0) + expect(reconnects).toBe(0) + expect(second.mode()).toBe('replayed') + expect(second.finalText()).toBe('cached result') + expect(events).toEqual([{ type: 'result', text: 'cached result' }]) + }) + + it('a running handle with no reconnect callback falls through to a re-run', async () => { + const runId = 'chat:thread-1:3' + const manifest = makeManifest(3) + + // Attempt 1 — a tcloud-style turn (no reconnect) crashes mid-stream. + const tcloudProduce: ReconnectableProduce = ({ register }) => { + register({ kind: 'tcloud', status: 'running' }) + let assembled = '' + async function* stream(): AsyncGenerator { + assembled += 'partial' + yield { type: 'delta', text: 'partial' } + throw new Error('tcloud turn crashed') + } + return { stream: stream(), finalText: () => assembled } + } + const first = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-a', + produce: tcloudProduce, + replayEvent: (text) => ({ type: 'result', text }), + // no reconnect — tcloud has no cross-process replay endpoint + }) + await expect( + (async () => { + for await (const _ of first.stream) { + /* drain */ + } + })(), + ).rejects.toThrow('tcloud turn crashed') + + // Attempt 2 — running handle exists but reconnect is absent: re-run. + let produces = 0 + const second = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-a', + produce: ({ register }) => { + produces += 1 + register({ kind: 'tcloud', status: 'running' }) + let assembled = '' + async function* stream(): AsyncGenerator { + assembled += 'recovered' + yield { type: 'delta', text: 'recovered' } + yield { type: 'result', text: assembled } + } + return { stream: stream(), finalText: () => assembled } + }, + replayEvent: (text) => ({ type: 'result', text }), + }) + const events: FakeEvent[] = [] + for await (const e of second.stream) events.push(e) + + expect(produces).toBe(1) // re-ran from the top + expect(second.mode()).toBe('rerun') + expect(second.finalText()).toBe('recovered') + expect(events).toEqual([ + { type: 'delta', text: 'recovered' }, + { type: 'result', text: 'recovered' }, + ]) + }) + + it('a reconnect-stream failure fails the run — the error is not swallowed', async () => { + const runId = 'chat:thread-1:4' + const manifest = makeManifest(4) + + // Attempt 1 crashes mid-turn, leaving a running handle. + const first = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-a', + produce: ({ register }) => + sandboxProducer({ + sandboxId: 'sbx-5', + sessionId: 'sess-5', + runId: 'run-5', + chunks: ['a', 'b'], + register, + throwAt: 2, + }), + replayEvent: (text) => ({ type: 'result', text }), + }) + await expect( + (async () => { + for await (const _ of first.stream) { + /* drain */ + } + })(), + ).rejects.toThrow('worker isolate died mid-turn') + + // Attempt 2 reconnects, but the replay endpoint itself errors. + const second = runReconnectableTurn({ + store, + runId, + manifest, + workerId: 'worker-b', + produce: ({ register }) => + sandboxProducer({ + sandboxId: 'sbx-5', + sessionId: 'sess-5', + runId: 'run-5', + chunks: ['x'], + register, + }), + reconnect: () => { + async function* stream(): AsyncGenerator { + yield { type: 'delta', text: 'partial-replay' } + throw new Error('replay endpoint returned 410 Gone') + } + return { stream: stream(), finalText: () => '' } + }, + replayEvent: (text) => ({ type: 'result', text }), + }) + await expect( + (async () => { + for await (const _ of second.stream) { + /* drain */ + } + })(), + ).rejects.toThrow('replay endpoint returned 410 Gone') + expect(second.mode()).toBe('reconnected') + expect(second.record()?.status).toBe('failed') + // the turn step is marked failed so a further retry re-runs cleanly + const turnStep = await store.loadStep(runId, 1) + expect(turnStep?.status).toBe('failed') + }) + + it('register advances the persisted cursor as frames stream', async () => { + const runId = 'chat:thread-1:5' + const handle = runReconnectableTurn({ + store, + runId, + manifest: makeManifest(5), + workerId: 'worker-a', + produce: ({ register }) => + sandboxProducer({ + sandboxId: 'sbx-6', + sessionId: 'sess-6', + runId: 'run-6', + chunks: ['one', 'two', 'three'], + register, + }), + replayEvent: (text) => ({ type: 'result', text }), + }) + for await (const _ of handle.stream) { + /* drain */ + } + // three deltas streamed → last cursor observed was evt-2; after + // completion the persisted handle is flipped to `completed`. + const persisted = await store.loadStep(runId, 0) + const h = (persisted?.result as { handle: RunHandle }).handle + expect(h.status).toBe('completed') + expect(h.cursor).toBe('evt-2') + expect(h.runId).toBe('run-6') + }) + }) +}