From 3e1c3b97067802352ddc69c9d052168f6c4541ff Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 19:56:41 +0300 Subject: [PATCH] =?UTF-8?q?feat(0.14.0):=20runDurableTurn=20+=20DurableCha?= =?UTF-8?q?tTurnEngine=20=E2=80=94=20the=20durable=20chat=20layer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two reusable primitives so every product chat handler routes durability through one place instead of copy-pasting it four times. runDurableTurn (src/durable/turn.ts) — a streaming, backend-agnostic, checkpoint+replay durable turn. Generic over the event type; never inspects events, only forwards them and reads finalText() after drain. - Fresh run: producer runs, events forward live (streaming preserved), final text checkpointed on drain. - Replay: a completed turn re-emits cached text as one synthetic event; the producer is never constructed — no LLM call, no double-billing. - Mid-stream crash: a turn that died while streaming re-runs from the top (the substrate checkpoints JSON at step granularity — there is no partial-stream checkpoint; this is the honest durability ceiling). - One step, lease claimed once via startOrResume; concurrent workers on the same runId are rejected with DurableRunLeaseHeldError. DurableChatTurnEngine (src/durable/chat-engine.ts) — the framework-neutral chat-turn orchestrator. Owns what was duplicated across legal/gtm/creative/ tax: durable checkpointing, the NDJSON StreamEvent line protocol, the session.run.started/completed/failed lifecycle envelope, ordered persist/ post-process hooks, trace flush. Everything product-specific is a hook: produce / persistAssistantMessage / onTurnComplete / onEvent / transformFinalText / traceFlush. Takes resolved values (identity tuple, store, waitUntil) — never a Request or a Context — so React Router and Hono products use it identically. Replay skips persist + post-process so a retried turn never double-writes. Producer failure becomes an error + session.run.failed pair; the stream always closes; hook errors are swallowed + logged so a post-process failure never fails a streamed turn. 36 new tests (durable-turn 15, chat-engine 21), each run against the full InMemory / FileSystem / D1-over-better-sqlite3 store matrix. Total suite: 213 tests, typecheck + biome clean. --- package.json | 2 +- src/durable/chat-engine.ts | 299 ++++++++++++++++++++++++ src/durable/index.ts | 21 ++ src/durable/tests/chat-engine.test.ts | 287 +++++++++++++++++++++++ src/durable/tests/durable-turn.test.ts | 302 +++++++++++++++++++++++++ src/durable/turn.ts | 170 ++++++++++++++ 6 files changed, 1080 insertions(+), 1 deletion(-) create mode 100644 src/durable/chat-engine.ts create mode 100644 src/durable/tests/chat-engine.test.ts create mode 100644 src/durable/tests/durable-turn.test.ts create mode 100644 src/durable/turn.ts diff --git a/package.json b/package.json index 253e948..78bd775 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.13.1", + "version": "0.14.0", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/durable/chat-engine.ts b/src/durable/chat-engine.ts new file mode 100644 index 0000000..5df4d9f --- /dev/null +++ b/src/durable/chat-engine.ts @@ -0,0 +1,299 @@ +/** + * `DurableChatTurnEngine` — the framework-neutral chat-turn orchestrator every + * product chat handler routes through. It owns the parts that were copy-pasted + * across legal / gtm / creative / tax: durable checkpointing, the NDJSON + * `StreamEvent` line protocol, the `session.run.*` lifecycle vocabulary, the + * runtime-run cost ledger, and trace flush. Everything genuinely + * product-specific is a hook the product supplies. + * + * What the engine owns: + * - durable turn (`runDurableTurn`): completed turns replay free, no re-bill + * - the `session.run.started` / `session.run.completed` / `session.run.failed` + * event envelope around the producer's events + * - NDJSON encoding into a `ReadableStream` (the body every + * product returns, React Router or Hono alike) + * - calling the product's persist / post-process hooks in the right order, + * after the stream drains, with the assembled final text + * - never throwing into the HTTP layer — a producer failure becomes an + * `error` + `session.run.failed` event pair, the stream still closes + * + * What the product supplies (`ChatTurnHooks`): + * - `produce` — build the backend stream for this turn (sandbox / router + * / tcloud / runtime — the engine does not care which) + * - `persistAssistantMessage` — write the assistant turn to the product DB + * - `onTurnComplete` (optional) — post-process (proposals, citations, …) + * - `onEvent` (optional) — per-event side-channel (e.g. DO broadcast) + * - `transformFinalText` (optional) — pre-persist transform (e.g. PII redact) + * + * Framework neutrality: the engine takes already-resolved values + * (`userId`, identity tuple, parsed message, a `DurableRunStore`, a + * `waitUntil`), never a `Request` or a `Context`. The product's thin route + * adapter does auth + parse + access-control, then calls `engine.runTurn(...)` + * and returns `result.body` as its platform `Response`. + */ + +import { deriveWorkerId } from './identity' +import { type DurableTurnProducer, runDurableTurn } from './turn' +import type { DurableRunManifest, DurableRunStore, RunRecord } from './types' + +/** The NDJSON line protocol every product chat client already speaks. */ +export interface ChatStreamEvent { + type: string + data?: Record +} + +/** Identity of a chat turn. `tenantId` is the workspace id for workspace- + * scoped products and the user id for session-scoped products. */ +export interface ChatTurnIdentity { + tenantId: string + /** Thread / session id — the durable run is keyed on this + `turnIndex`. */ + sessionId: string + userId: string + /** Monotonic 0-based turn index within the session. */ + turnIndex: number +} + +export interface ChatTurnHooks { + /** + * Build the backend stream for this turn. The engine never inspects which + * backend this is — sandbox container, tcloud router, direct runtime, a + * test double — it only forwards the events and reads `finalText()`. + */ + produce(): DurableTurnProducer + /** + * Persist the completed assistant message to the product's own store. + * Called once, after the stream drains, on a fresh (non-replay) run. + * Receives the assembled (and `transformFinalText`-transformed) text. + */ + persistAssistantMessage(input: { + identity: ChatTurnIdentity + finalText: string + record: RunRecord | undefined + }): Promise + /** + * Optional post-processing after persistence — proposal extraction, + * citation validation, credit metering, etc. Product policy; the engine + * has no shared logic here. Errors are swallowed + logged (post-process + * must never fail the turn that already streamed successfully). + */ + onTurnComplete?(input: { identity: ChatTurnIdentity; finalText: string }): Promise + /** + * Optional per-event side channel (e.g. Durable Object broadcast). Runs + * for every event the engine emits, lifecycle envelope included. Errors + * are swallowed — a broadcast failure must not break the chat stream. + */ + onEvent?(event: ChatStreamEvent): void | Promise + /** + * Optional pre-persist transform of the final text (e.g. PII redaction). + * Affects only what is persisted; the live stream is never altered. + */ + transformFinalText?(text: string): string | Promise + /** + * Optional trace flush — resolves when OTLP export completes. The engine + * hands it to `waitUntil` so the worker isolate stays alive for the POST. + */ + traceFlush?(): Promise +} + +export interface RunChatTurnInput { + identity: ChatTurnIdentity + /** The user's message for this turn. Hashed into the durable run identity. */ + userMessage: string + /** Product id for telemetry / the durable manifest (`legal-agent`, …). */ + projectId: string + /** Domain tag for the task spec (`legal`, `gtm`, …). */ + domain: string + /** Model id, when known — recorded on the manifest. */ + model?: string + store: DurableRunStore + hooks: ChatTurnHooks + /** Worker liveness hook (`ctx.waitUntil` / `executionCtx.waitUntil`). When + * omitted, trace flush is awaited inline before the stream closes. */ + waitUntil?: (p: Promise) => void + /** Stable per-isolate worker id. Defaults to a fresh `deriveWorkerId()`. */ + workerId?: string + /** Lease window in ms. Default 60_000. */ + leaseMs?: number + /** Optional structured logger for swallowed hook errors. */ + log?: (message: string, meta?: Record) => void +} + +export interface ChatTurnResult { + /** NDJSON body — return this as the platform `Response` body. */ + body: ReadableStream + /** Content type for the response. */ + contentType: 'application/x-ndjson' +} + +const encoder = new TextEncoder() + +function encodeLine(event: ChatStreamEvent): Uint8Array { + return encoder.encode(`${JSON.stringify(event)}\n`) +} + +/** + * The engine. One instance is stateless and reusable across requests — all + * per-turn state lives in `runTurn`'s closure. + */ +export class DurableChatTurnEngine { + /** + * Run one durable chat turn. Returns immediately with a `ReadableStream` + * body; the turn executes as the body is pulled. Never rejects — backend + * failures surface as `error` + `session.run.failed` events. + */ + runTurn(input: RunChatTurnInput): ChatTurnResult { + const workerId = input.workerId ?? deriveWorkerId() + const log = input.log ?? (() => undefined) + const { identity } = input + const runId = `chat:${identity.sessionId}:${identity.turnIndex}` + + const manifest: DurableRunManifest = { + projectId: input.projectId, + scenarioId: identity.sessionId, + task: { + id: `${input.projectId}:chat:${identity.sessionId}:${identity.turnIndex}`, + intent: `Run a ${input.domain} chat turn with workspace context.`, + domain: input.domain, + requiredKnowledge: [], + metadata: { + tenantId: identity.tenantId, + sessionId: identity.sessionId, + turnIndex: identity.turnIndex, + }, + }, + input: { + userMessage: input.userMessage, + model: input.model ?? null, + }, + tags: { + session_id: identity.sessionId, + tenant_id: identity.tenantId, + }, + } + + const body = new ReadableStream({ + start: async (controller) => { + const emit = async (event: ChatStreamEvent): Promise => { + controller.enqueue(encodeLine(event)) + if (input.hooks.onEvent) { + try { + await input.hooks.onEvent(event) + } catch (err) { + log('[chat-engine] onEvent hook threw', { + error: err instanceof Error ? err.message : String(err), + }) + } + } + } + + let turnFailed = false + try { + await emit({ + type: 'session.run.started', + data: { + sessionId: identity.sessionId, + tenantId: identity.tenantId, + turnIndex: identity.turnIndex, + }, + }) + + const turn = runDurableTurn({ + store: input.store, + runId, + manifest, + workerId, + leaseMs: input.leaseMs, + intent: `chat:turn-${identity.turnIndex}`, + produce: input.hooks.produce, + replayEvent: (finalText) => ({ type: 'result', data: { finalText } }), + accumulate: (event, current) => { + // Accumulate from the same event shapes products already emit: + // `message.part.updated` text deltas and a trailing `result`. + if (event.type === 'message.part.updated') { + const data = event.data ?? {} + const delta = typeof data.delta === 'string' ? data.delta : '' + const part = data.part as { type?: string; text?: string } | undefined + if (delta) return current + delta + if (part?.type === 'text' && typeof part.text === 'string') return part.text + return undefined + } + if (event.type === 'result') { + const data = event.data ?? {} + if (typeof data.finalText === 'string') return data.finalText + } + return undefined + }, + }) + + for await (const event of turn.stream) { + await emit(event) + } + + const rawFinal = turn.finalText() + const finalText = input.hooks.transformFinalText + ? await input.hooks.transformFinalText(rawFinal) + : rawFinal + + // Persist + post-process only on a fresh run. On replay the + // assistant message + side effects already landed on the first + // (completed) attempt — re-persisting would double-write. + if (!turn.replayed()) { + try { + await input.hooks.persistAssistantMessage({ + identity, + finalText, + record: turn.record(), + }) + } catch (err) { + log('[chat-engine] persistAssistantMessage threw', { + error: err instanceof Error ? err.message : String(err), + }) + } + if (input.hooks.onTurnComplete) { + try { + await input.hooks.onTurnComplete({ identity, finalText }) + } catch (err) { + log('[chat-engine] onTurnComplete threw', { + error: err instanceof Error ? err.message : String(err), + }) + } + } + } + + await emit({ + type: 'session.run.completed', + data: { sessionId: identity.sessionId, replayed: turn.replayed() }, + }) + } catch (err) { + turnFailed = true + const message = err instanceof Error ? err.message : String(err) + log('[chat-engine] turn failed', { error: message }) + await emit({ type: 'error', data: { message } }) + await emit({ + type: 'session.run.failed', + data: { sessionId: identity.sessionId, message }, + }) + } finally { + // Trace flush: hand to waitUntil so the isolate survives the POST; + // await inline when no waitUntil is available (local / tests). + if (input.hooks.traceFlush) { + const flush = input.hooks.traceFlush().catch((err) => + log('[chat-engine] traceFlush threw', { + error: err instanceof Error ? err.message : String(err), + }), + ) + if (input.waitUntil) input.waitUntil(flush) + else await flush + } + controller.close() + void turnFailed + } + }, + }) + + return { body, contentType: 'application/x-ndjson' } + } +} + +/** Convenience singleton — the engine is stateless, one instance is enough. */ +export const durableChatTurnEngine = new DurableChatTurnEngine() diff --git a/src/durable/index.ts b/src/durable/index.ts index 8937826..ebcd53f 100644 --- a/src/durable/index.ts +++ b/src/durable/index.ts @@ -8,6 +8,18 @@ * See `./types.ts` for the full type contract and concurrency model. */ +// ── Durable chat-turn engine ────────────────────────────────────────── +// Framework-neutral chat-turn orchestrator: durable turn + NDJSON +// streaming + session.run.* lifecycle + product persist/post-process +// hooks. Every product chat handler routes through this. +export type { + ChatStreamEvent, + ChatTurnHooks, + ChatTurnIdentity, + ChatTurnResult, + RunChatTurnInput, +} from './chat-engine' +export { DurableChatTurnEngine, durableChatTurnEngine } from './chat-engine' export type { D1DatabaseLike, D1PreparedStatementLike } from './d1-store' export { D1DurableRunStore } from './d1-store' export { FileSystemDurableRunStore } from './file-system-store' @@ -20,6 +32,15 @@ export { runDurable } from './runner' // during one-time bootstrap. `src/durable/schema.sql` is the source of // truth; `schema.ts` is the build-bundled string that ships in dist/. export { DURABLE_SCHEMA_SQL, DURABLE_SCHEMA_VERSION } from './schema' +// ── Durable turn primitive ──────────────────────────────────────────── +// Streaming, backend-agnostic, checkpoint+replay durable turn. The single +// reusable primitive every product chat handler routes through. +export type { + DurableTurnHandle, + DurableTurnProducer, + RunDurableTurnOptions, +} from './turn' +export { runDurableTurn } from './turn' export type { DurableRunManifest, DurableRunStore, diff --git a/src/durable/tests/chat-engine.test.ts b/src/durable/tests/chat-engine.test.ts new file mode 100644 index 0000000..49ee72a --- /dev/null +++ b/src/durable/tests/chat-engine.test.ts @@ -0,0 +1,287 @@ +/** + * `DurableChatTurnEngine` tests — the orchestration contract every product + * chat handler depends on. Run against all three stores so the engine is + * proven on every backend. Covers: lifecycle envelope, NDJSON encoding, + * replay-skips-persist (no double-write), hook ordering, the failure + * envelope, the per-event side channel, and the pre-persist transform. + */ + +import { mkdtempSync, readFileSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +import { afterEach, beforeEach, describe, expect, it } from 'vitest' + +import { + type ChatStreamEvent, + D1DurableRunStore, + DurableChatTurnEngine, + type DurableRunStore, + FileSystemDurableRunStore, + InMemoryDurableRunStore, +} from '../index' +import { createSqliteD1 } from './sqlite-d1-adapter' + +const SCHEMA_SQL = readFileSync(new URL('../schema.sql', import.meta.url), 'utf8') + +/** Drain an NDJSON ReadableStream into parsed events. */ +async function drain(body: ReadableStream): Promise { + const reader = body.getReader() + const decoder = new TextDecoder() + let buffer = '' + const events: ChatStreamEvent[] = [] + for (;;) { + const { done, value } = await reader.read() + if (done) break + buffer += decoder.decode(value, { stream: true }) + let nl: number + while ((nl = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, nl).trim() + buffer = buffer.slice(nl + 1) + if (line) events.push(JSON.parse(line) as ChatStreamEvent) + } + } + if (buffer.trim()) events.push(JSON.parse(buffer.trim()) as ChatStreamEvent) + return events +} + +/** A producer that streams the given text as two part-updates + a result. */ +function textProducer(text: string, onConstruct?: () => void) { + return () => { + onConstruct?.() + async function* stream(): AsyncGenerator { + yield { type: 'message.part.updated', data: { part: { type: 'text' }, delta: text } } + yield { type: 'result', data: { finalText: text } } + } + return { stream: stream(), finalText: () => text } + } +} + +const storeKinds = [ + { + name: 'InMemoryDurableRunStore', + factory: () => ({ store: new InMemoryDurableRunStore(), cleanup: () => undefined }), + }, + { + name: 'FileSystemDurableRunStore', + factory: () => { + const dir = mkdtempSync(join(tmpdir(), 'chat-engine-test-')) + return { + store: new FileSystemDurableRunStore(dir), + cleanup: () => rmSync(dir, { recursive: true, force: true }), + } + }, + }, + { + name: 'D1DurableRunStore (better-sqlite3)', + factory: () => { + const handle = createSqliteD1() + handle.raw.exec(SCHEMA_SQL) + return { + store: new D1DurableRunStore(handle.db), + cleanup: () => handle.close(), + } + }, + }, +] as const + +const IDENTITY = { tenantId: 'ws-1', sessionId: 'thread-1', userId: 'user-1', turnIndex: 0 } + +for (const kind of storeKinds) { + describe(`DurableChatTurnEngine / ${kind.name}`, () => { + let store: DurableRunStore + let cleanup: () => void + const engine = new DurableChatTurnEngine() + + beforeEach(() => { + const made = kind.factory() + store = made.store + cleanup = made.cleanup + }) + + afterEach(async () => { + await store.close() + cleanup() + }) + + it('wraps the turn in the session.run.* lifecycle envelope', async () => { + const persisted: string[] = [] + const { body, contentType } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'hello', + projectId: 'legal-agent', + domain: 'legal', + store, + hooks: { + produce: textProducer('Hi there.'), + persistAssistantMessage: async ({ finalText }) => { + persisted.push(finalText) + }, + }, + }) + expect(contentType).toBe('application/x-ndjson') + const events = await drain(body) + + expect(events[0]?.type).toBe('session.run.started') + expect(events.at(-1)?.type).toBe('session.run.completed') + expect(events.some((e) => e.type === 'message.part.updated')).toBe(true) + expect(events.some((e) => e.type === 'result')).toBe(true) + expect(persisted).toEqual(['Hi there.']) + }) + + it('runs hooks in order: persist → onTurnComplete, after the stream', async () => { + const order: string[] = [] + const { body } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'gtm-agent', + domain: 'gtm', + store, + hooks: { + produce: textProducer('answer'), + persistAssistantMessage: async () => { + order.push('persist') + }, + onTurnComplete: async () => { + order.push('postProcess') + }, + }, + }) + await drain(body) + expect(order).toEqual(['persist', 'postProcess']) + }) + + it('replay skips the producer AND skips persist/post-process (no double-write)', async () => { + let constructs = 0 + let persists = 0 + let postProcesses = 0 + const hooks = { + produce: textProducer('cached', () => (constructs += 1)), + persistAssistantMessage: async () => { + persists += 1 + }, + onTurnComplete: async () => { + postProcesses += 1 + }, + } + // First attempt — fresh. + await drain( + engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'creative-agent', + domain: 'creative', + store, + hooks, + }).body, + ) + // Second attempt — same identity → replay. + const replayEvents = await drain( + engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'creative-agent', + domain: 'creative', + store, + hooks, + }).body, + ) + + expect(constructs).toBe(1) // producer built once + expect(persists).toBe(1) // assistant message persisted once + expect(postProcesses).toBe(1) // post-process ran once + expect(replayEvents.some((e) => e.type === 'result')).toBe(true) + const completed = replayEvents.find((e) => e.type === 'session.run.completed') + expect(completed?.data?.replayed).toBe(true) + }) + + it('a producer failure becomes error + session.run.failed, stream still closes', async () => { + const { body } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'tax-agent', + domain: 'tax', + store, + hooks: { + produce: () => { + async function* stream(): AsyncGenerator { + yield { type: 'message.part.updated', data: { delta: 'partial' } } + throw new Error('backend exploded') + } + return { stream: stream(), finalText: () => '' } + }, + persistAssistantMessage: async () => undefined, + }, + }) + const events = await drain(body) + expect(events[0]?.type).toBe('session.run.started') + const err = events.find((e) => e.type === 'error') + expect(err?.data?.message).toBe('backend exploded') + expect(events.at(-1)?.type).toBe('session.run.failed') + }) + + it('onEvent side channel receives every emitted event', async () => { + const broadcast: string[] = [] + const { body } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'gtm-agent', + domain: 'gtm', + store, + hooks: { + produce: textProducer('x'), + persistAssistantMessage: async () => undefined, + onEvent: (event) => { + broadcast.push(event.type) + }, + }, + }) + const events = await drain(body) + // Side channel saw exactly what the client saw. + expect(broadcast).toEqual(events.map((e) => e.type)) + }) + + it('transformFinalText alters what is persisted, not the live stream', async () => { + let persisted = '' + const { body } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'legal-agent', + domain: 'legal', + store, + hooks: { + produce: textProducer('SSN 123-45-6789'), + transformFinalText: (t) => t.replace(/\d{3}-\d{2}-\d{4}/, '[REDACTED]'), + persistAssistantMessage: async ({ finalText }) => { + persisted = finalText + }, + }, + }) + const events = await drain(body) + // Live stream still carries the raw text. + const result = events.find((e) => e.type === 'result') + expect(result?.data?.finalText).toBe('SSN 123-45-6789') + // Persisted copy is redacted. + expect(persisted).toBe('SSN [REDACTED]') + }) + + it('a throwing persist hook is swallowed — the turn still completes', async () => { + const { body } = engine.runTurn({ + identity: IDENTITY, + userMessage: 'q', + projectId: 'legal-agent', + domain: 'legal', + store, + hooks: { + produce: textProducer('ok'), + persistAssistantMessage: async () => { + throw new Error('db down') + }, + }, + }) + const events = await drain(body) + // Persist failure must not turn into session.run.failed. + expect(events.at(-1)?.type).toBe('session.run.completed') + }) + }) +} diff --git a/src/durable/tests/durable-turn.test.ts b/src/durable/tests/durable-turn.test.ts new file mode 100644 index 0000000..bfe3c5b --- /dev/null +++ b/src/durable/tests/durable-turn.test.ts @@ -0,0 +1,302 @@ +/** + * `runDurableTurn` tests — fresh run, replay, mid-stream crash re-run, + * concurrent-attempt rejection. Run identically against all three stores + * (InMemory / FileSystem / D1-over-sqlite) via the shared matrix, so the + * turn primitive is proven on every backend a product could use. + */ + +import { mkdtempSync, readFileSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +import { afterEach, beforeEach, describe, expect, it } from 'vitest' + +import { + D1DurableRunStore, + DurableRunLeaseHeldError, + type DurableRunManifest, + type DurableRunStore, + FileSystemDurableRunStore, + InMemoryDurableRunStore, + runDurableTurn, +} from '../index' +import { createSqliteD1 } from './sqlite-d1-adapter' + +const SCHEMA_SQL = readFileSync(new URL('../schema.sql', import.meta.url), 'utf8') + +interface FakeEvent { + type: string + text?: string +} + +function makeManifest(turnIndex = 0): DurableRunManifest { + return { + projectId: 'test-product', + scenarioId: 'thread-1', + task: { + id: `chat:thread-1:${turnIndex}`, + intent: 'unit-test chat turn', + domain: 'test', + requiredKnowledge: [], + metadata: { turnIndex }, + }, + input: { userMessage: `q-${turnIndex}` }, + } +} + +/** A producer that yields N text deltas then a final event. Records whether + * it was constructed so tests can prove the replay path skips it. */ +function fakeProducer(opts: { chunks: string[]; onConstruct?: () => void; throwAfter?: number }) { + opts.onConstruct?.() + let assembled = '' + async function* stream(): AsyncGenerator { + let emitted = 0 + for (const chunk of opts.chunks) { + if (opts.throwAfter !== undefined && emitted >= opts.throwAfter) { + throw new Error('producer exploded mid-stream') + } + assembled += chunk + yield { type: 'delta', text: chunk } + emitted += 1 + } + yield { type: 'result', text: assembled } + } + return { + stream: stream(), + finalText: () => assembled, + } +} + +const storeKinds = [ + { + name: 'InMemoryDurableRunStore', + factory: () => ({ store: new InMemoryDurableRunStore(), cleanup: () => undefined }), + }, + { + name: 'FileSystemDurableRunStore', + factory: () => { + const dir = mkdtempSync(join(tmpdir(), 'durable-turn-test-')) + return { + store: new FileSystemDurableRunStore(dir), + cleanup: () => rmSync(dir, { recursive: true, force: true }), + } + }, + }, + { + name: 'D1DurableRunStore (better-sqlite3)', + factory: () => { + const handle = createSqliteD1() + handle.raw.exec(SCHEMA_SQL) + return { + store: new D1DurableRunStore(handle.db), + cleanup: () => handle.close(), + } + }, + }, +] as const + +for (const kind of storeKinds) { + describe(`runDurableTurn / ${kind.name}`, () => { + let store: DurableRunStore + let cleanup: () => void + + beforeEach(() => { + const made = kind.factory() + store = made.store + cleanup = made.cleanup + }) + + afterEach(async () => { + await store.close() + cleanup() + }) + + it('fresh run forwards producer events live and checkpoints final text', async () => { + let constructed = 0 + const handle = runDurableTurn({ + store, + runId: 'chat:thread-1:0', + manifest: makeManifest(0), + workerId: 'worker-a', + produce: () => + fakeProducer({ chunks: ['Hello', ', ', 'world'], onConstruct: () => (constructed += 1) }), + replayEvent: (text) => ({ type: 'result', text }), + }) + + const events: FakeEvent[] = [] + for await (const e of handle.stream) events.push(e) + + expect(constructed).toBe(1) + expect(handle.replayed()).toBe(false) + expect(events).toEqual([ + { type: 'delta', text: 'Hello' }, + { type: 'delta', text: ', ' }, + { type: 'delta', text: 'world' }, + { type: 'result', text: 'Hello, world' }, + ]) + expect(handle.finalText()).toBe('Hello, world') + expect(handle.record()?.status).toBe('completed') + }) + + it('replay emits one synthetic event and never constructs the producer', async () => { + // First attempt — completes, checkpoints. + const first = runDurableTurn({ + store, + runId: 'chat:thread-1:1', + manifest: makeManifest(1), + workerId: 'worker-a', + produce: () => fakeProducer({ chunks: ['cached ', 'answer'] }), + replayEvent: (text) => ({ type: 'result', text }), + }) + for await (const _ of first.stream) { + /* drain */ + } + expect(first.replayed()).toBe(false) + + // Second attempt — same runId. Producer must NOT be constructed. + let constructed = 0 + const replay = runDurableTurn({ + store, + runId: 'chat:thread-1:1', + manifest: makeManifest(1), + workerId: 'worker-b', + produce: () => fakeProducer({ chunks: ['x'], onConstruct: () => (constructed += 1) }), + replayEvent: (text) => ({ type: 'result', text }), + }) + const events: FakeEvent[] = [] + for await (const e of replay.stream) events.push(e) + + expect(constructed).toBe(0) + expect(replay.replayed()).toBe(true) + expect(replay.finalText()).toBe('cached answer') + expect(events).toEqual([{ type: 'result', text: 'cached answer' }]) + }) + + it('mid-stream crash re-runs the turn from the top (no partial replay)', async () => { + // First attempt explodes after 1 event. + let firstConstructs = 0 + const first = runDurableTurn({ + store, + runId: 'chat:thread-1:2', + manifest: makeManifest(2), + workerId: 'worker-a', + produce: () => + fakeProducer({ + chunks: ['partial', '-lost'], + throwAfter: 1, + onConstruct: () => (firstConstructs += 1), + }), + replayEvent: (text) => ({ type: 'result', text }), + }) + const firstEvents: FakeEvent[] = [] + await expect( + (async () => { + for await (const e of first.stream) firstEvents.push(e) + })(), + ).rejects.toThrow('producer exploded mid-stream') + expect(firstConstructs).toBe(1) + expect(first.record()?.status).toBe('failed') + + // Second attempt — the failed step is NOT replayed; the producer runs + // again and the turn completes cleanly. + let secondConstructs = 0 + const second = runDurableTurn({ + store, + runId: 'chat:thread-1:2', + manifest: makeManifest(2), + workerId: 'worker-a', + produce: () => + fakeProducer({ chunks: ['recovered'], onConstruct: () => (secondConstructs += 1) }), + replayEvent: (text) => ({ type: 'result', text }), + }) + const secondEvents: FakeEvent[] = [] + for await (const e of second.stream) secondEvents.push(e) + + expect(secondConstructs).toBe(1) + expect(second.replayed()).toBe(false) + expect(second.finalText()).toBe('recovered') + expect(secondEvents).toEqual([ + { type: 'delta', text: 'recovered' }, + { type: 'result', text: 'recovered' }, + ]) + }) + + it('the accumulate hook builds final text when the producer reports none', async () => { + // Producer whose finalText() stays empty — accumulate must fill it. + function emptyFinalProducer() { + async function* stream(): AsyncGenerator { + yield { type: 'delta', text: 'a' } + yield { type: 'delta', text: 'b' } + } + return { stream: stream(), finalText: () => '' } + } + const handle = runDurableTurn({ + store, + runId: 'chat:thread-1:3', + manifest: makeManifest(3), + workerId: 'worker-a', + produce: emptyFinalProducer, + replayEvent: (text) => ({ type: 'result', text }), + accumulate: (event, current) => + event.type === 'delta' ? current + (event.text ?? '') : undefined, + }) + for await (const _ of handle.stream) { + /* drain */ + } + expect(handle.finalText()).toBe('ab') + + // Replay must return the accumulated text. + const replay = runDurableTurn({ + store, + runId: 'chat:thread-1:3', + manifest: makeManifest(3), + workerId: 'worker-b', + produce: emptyFinalProducer, + replayEvent: (text) => ({ type: 'result', text }), + }) + const events: FakeEvent[] = [] + for await (const e of replay.stream) events.push(e) + expect(events).toEqual([{ type: 'result', text: 'ab' }]) + }) + + it('rejects a concurrent attempt while the first turn holds the lease', async () => { + // Start the first turn but do NOT drain it — lease stays held. + const first = runDurableTurn({ + store, + runId: 'chat:thread-1:4', + manifest: makeManifest(4), + workerId: 'worker-a', + leaseMs: 60_000, + produce: () => fakeProducer({ chunks: ['slow'] }), + replayEvent: (text) => ({ type: 'result', text }), + }) + // Pump the first stream just enough to claim the lease (startOrResume + // runs on the first `.next()`). + const firstIter = first.stream[Symbol.asyncIterator]() + await firstIter.next() + + // Concurrent worker on the same runId must be rejected. + const second = runDurableTurn({ + store, + runId: 'chat:thread-1:4', + manifest: makeManifest(4), + workerId: 'worker-b', + leaseMs: 60_000, + produce: () => fakeProducer({ chunks: ['concurrent'] }), + replayEvent: (text) => ({ type: 'result', text }), + }) + await expect( + (async () => { + for await (const _ of second.stream) { + /* drain */ + } + })(), + ).rejects.toBeInstanceOf(DurableRunLeaseHeldError) + + // Drain the first to release the lease cleanly. + for await (const _ of { [Symbol.asyncIterator]: () => firstIter }) { + /* drain remainder */ + } + }) + }) +} diff --git a/src/durable/turn.ts b/src/durable/turn.ts new file mode 100644 index 0000000..1fdb6f8 --- /dev/null +++ b/src/durable/turn.ts @@ -0,0 +1,170 @@ +/** + * `runDurableTurn` — a streaming, backend-agnostic, checkpoint+replay durable + * turn. The single reusable primitive every product's chat handler routes + * through, so per-product durability code drops to zero. + * + * A **turn** is one request→response unit: a producer yields a stream of + * events and, once drained, exposes the turn's final text. `runDurableTurn` + * wraps that with a `DurableRunStore`: + * + * - **Fresh run** — no completed step for this `(runId)`. The producer + * runs; its events forward live to the caller (streaming preserved) + * while final text accumulates; on drain the text is checkpointed. + * + * - **Replay** — a completed step already exists (the worker died after + * the turn finished but before the response reached the client, and the + * client retried the same turn). The cached text is emitted as a single + * synthetic event; the producer is never constructed — no LLM call, no + * double-billing. + * + * - **Mid-stream crash** — a turn that died *while streaming* leaves step 0 + * in `running`/`failed`. There is no partial-stream checkpoint (the + * substrate checkpoints JSON values at step granularity), so the turn + * re-runs from the top. This is the honest durability ceiling: a + * *completed* turn is free to replay; an *interrupted* turn re-runs. + * + * Generic over the event type `TEvent` so a product can stream its own NDJSON + * shape or the runtime's `RuntimeStreamEvent` — `runDurableTurn` never + * inspects events, it only forwards them and reads `finalText()` after drain. + * + * Lease: a turn is a single step, fast enough that the heartbeat in + * `runDurable` is unnecessary — `runDurableTurn` claims the lease once via + * `startOrResume` and releases it on `endRun`. Concurrent workers on the same + * `runId` are rejected with `DurableRunLeaseHeldError` (the client retried + * before the first attempt finished); callers surface that as "turn already + * in flight." + */ + +import { canonicalHash } from './identity' +import type { DurableRunManifest, DurableRunStore, RunRecord } from './types' + +/** The live side of a turn — what a fresh run produces. */ +export interface DurableTurnProducer { + /** The turn's event stream. Forwarded verbatim to the caller. */ + stream: AsyncGenerator + /** The turn's final assistant text. Read once, after `stream` drains. */ + finalText(): string +} + +export interface RunDurableTurnOptions { + store: DurableRunStore + /** Stable per-turn run id. Convention: `chat::`. The + * same id on a retry is what enables replay. */ + runId: string + manifest: DurableRunManifest + /** Stable per-isolate worker id. Defaults to a fresh `deriveWorkerId()` + * per call when omitted — fine for single-attempt turns. */ + workerId: string + /** Lease window in ms. Default 60_000 — a turn rarely runs longer. */ + leaseMs?: number + /** Human-readable step label. Default `turn`. */ + intent?: string + /** Builds the live producer. Called exactly once, on a fresh run; never + * called on the replay path. */ + produce: () => DurableTurnProducer + /** Synthesizes the single event emitted on the replay path from the + * cached final text (e.g. a product's `{ type: 'result', data: {...} }`). */ + replayEvent: (finalText: string) => TEvent + /** Optional live accumulator. When the producer's `finalText()` is only + * valid after drain, this lets `runDurableTurn` also observe each event + * to build the text — return the running text or `undefined` to ignore + * an event. When omitted, `producer.finalText()` is the sole source. */ + accumulate?: (event: TEvent, current: string) => string | undefined +} + +export interface DurableTurnHandle { + /** Drop-in stream. Fresh runs forward producer events live; replays emit + * exactly one `replayEvent(cachedText)`. */ + stream: AsyncGenerator + /** The turn's final text. Valid after `stream` drains. */ + finalText(): string + /** True iff this turn replayed a cached result (no producer ran). Valid + * after `stream` drains. */ + replayed(): boolean + /** The durable `RunRecord` for this turn. Valid after `stream` drains. */ + record(): RunRecord | undefined +} + +const STEP_INDEX = 0 + +export function runDurableTurn( + options: RunDurableTurnOptions, +): DurableTurnHandle { + const { store, runId, manifest, workerId } = options + const leaseMs = options.leaseMs ?? 60_000 + const intent = options.intent ?? 'turn' + const inputHash = canonicalHash(manifest.input) + + let accumulated = '' + let didReplay = false + let finalRecord: RunRecord | undefined + + async function* stream(): AsyncGenerator { + const { completedSteps } = await store.startOrResume({ + runId, + manifest, + workerId, + leaseMs, + }) + const prior = completedSteps.find((s) => s.stepIndex === STEP_INDEX) + + if (prior && prior.status === 'completed') { + // ── Replay path — producer never constructed ────────────────── + didReplay = true + const cached = prior.result as { finalText?: string } | undefined + accumulated = cached?.finalText ?? '' + yield options.replayEvent(accumulated) + finalRecord = await store.endRun({ runId, workerId, status: 'completed' }) + return + } + + // ── Fresh run — produce live, forward, checkpoint ──────────────── + await store.beginStep({ + runId, + stepIndex: STEP_INDEX, + intent, + kind: 'llm', + inputHash, + }) + try { + const producer = options.produce() + for await (const event of producer.stream) { + if (options.accumulate) { + const next = options.accumulate(event, accumulated) + if (typeof next === 'string') accumulated = next + } + yield event + } + // Producer's own finalText wins when it is populated; otherwise the + // live accumulator's value stands. + const producerText = producer.finalText() + if (producerText) accumulated = producerText + await store.completeStep({ + runId, + stepIndex: STEP_INDEX, + result: { finalText: accumulated }, + }) + finalRecord = await store.endRun({ + runId, + workerId, + status: 'completed', + outcome: { notes: intent, metadata: { chars: accumulated.length } }, + }) + } catch (err) { + await store.failStep({ + runId, + stepIndex: STEP_INDEX, + error: { message: err instanceof Error ? err.message : String(err) }, + }) + finalRecord = await store.endRun({ runId, workerId, status: 'failed' }) + throw err + } + } + + return { + stream: stream(), + finalText: () => accumulated, + replayed: () => didReplay, + record: () => finalRecord, + } +}