diff --git a/.changeset/durable-suspended-flow-runs-1518.md b/.changeset/durable-suspended-flow-runs-1518.md new file mode 100644 index 000000000..fe2601a9d --- /dev/null +++ b/.changeset/durable-suspended-flow-runs-1518.md @@ -0,0 +1,37 @@ +--- +"@objectstack/service-automation": minor +--- + +Persist suspended flow runs so a durable pause survives a process restart (#1518). + +`service-automation` kept suspended runs in an in-memory `Map` only, so a flow +paused at an `approval` / `wait` / `screen` node could never be resumed after the +process restarted — a hard blocker on hibernating/serverless hosts (e.g. the +Cloudflare Workers control plane), where the approval record persists but +`resume(runId)` had nothing to continue. + +The engine now backs that map with a pluggable `SuspendedRunStore` (ADR-0019): + +- **`SuspendedRunStore`** interface + two implementations — `InMemorySuspendedRunStore` + (the default; JSON round-trips so it faithfully mirrors a DB boundary) and + `ObjectStoreSuspendedRunStore`, which persists to a new **`sys_automation_run`** + system object via the ObjectQL engine. `AutomationServicePlugin` registers the + object and auto-enables the DB-backed store when an ObjectQL engine is present + (opt out with `suspendedRunStore: 'memory'`). +- **Durable suspend/resume** — a run is persisted on suspend and deleted on + terminal completion. `resume(runId)` rehydrates from the store when the run is + not in memory (cold boot), so a fully restarted kernel can continue from the + paused node down the correct branch and run the downstream nodes. The resumable + state (`variables` / `steps` / `context` / `screen`) round-trips through the + store, including nested objects. +- **Idempotent resume** — the suspension is consumed before downstream work runs, + plus an in-process guard rejects a concurrent duplicate `resume`, so a repeated + resume after a partial restart can't double-run side effects. +- Run ids are now process-unique (random component) so they don't collide with a + still-suspended run persisted by a previous process lifetime. + +New exports: `SuspendedRun`, `SuspendedRunStore`, `StepLogEntry`, +`InMemorySuspendedRunStore`, `ObjectStoreSuspendedRunStore`, +`SuspendedRunStoreEngine`, `SysAutomationRun`, plus +`AutomationEngine.setSuspendedRunStore()` and `listSuspendedRunsDurable()`. +Existing service-automation and plugin-approvals tests pass unchanged. diff --git a/packages/services/service-automation/src/engine.test.ts b/packages/services/service-automation/src/engine.test.ts index ab75e42e0..0b83183e2 100644 --- a/packages/services/service-automation/src/engine.test.ts +++ b/packages/services/service-automation/src/engine.test.ts @@ -5,6 +5,7 @@ import { LiteKernel } from '@objectstack/core'; import { AutomationEngine } from './engine.js'; import { AutomationServicePlugin } from './plugin.js'; import { registerScreenNodes } from './builtin/screen-nodes.js'; +import { InMemorySuspendedRunStore } from './suspended-run-store.js'; import type { NodeExecutor } from './engine.js'; import type { IAutomationService } from '@objectstack/spec/contracts'; @@ -562,6 +563,141 @@ describe('AutomationEngine', () => { expect(resumed.success).toBe(true); expect(resumed.status).toBeUndefined(); }); + + // ── Durable persistence across a process restart (ADR-0019) ── + // + // A shared SuspendedRunStore stands in for a database: suspend on one + // engine instance, then resume on a brand-new instance backed by the + // same store — simulating a cold boot after the original process is gone. + describe('Durable persistence across process restart', () => { + function buildEngine( + store: InMemorySuspendedRunStore, + captured: { snapshot?: unknown; ran: string[] }, + ) { + const e = new AutomationEngine(createTestLogger(), store); + e.registerNodeExecutor({ + type: 'pause_node', + async execute() { + // Snapshot a nested object + array so we can assert the + // variable map round-trips through the store. + return { + success: true, + suspend: true, + correlation: 'req_1', + output: { snapshot: { nested: { value: 42 }, arr: [1, 2, 3] } }, + }; + }, + }); + e.registerNodeExecutor({ + type: 'branch_node', + async execute(node, variables) { + captured.ran.push(node.id); + captured.snapshot = variables.get('pause.snapshot'); + return { success: true }; + }, + }); + e.registerFlow('approval_flow', { + name: 'approval_flow', label: 'Approval Flow', type: 'autolaunched', + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'pause', type: 'pause_node', label: 'Approval' }, + { id: 'approved', type: 'branch_node', label: 'Approved' }, + { id: 'rejected', type: 'branch_node', label: 'Rejected' }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'pause' }, + { id: 'e2', source: 'pause', target: 'approved', label: 'approve' }, + { id: 'e3', source: 'pause', target: 'rejected', label: 'reject' }, + { id: 'e4', source: 'approved', target: 'end' }, + { id: 'e5', source: 'rejected', target: 'end' }, + ], + }); + return e; + } + + it('survives a full restart: suspend on one engine, resume on a fresh one', async () => { + const store = new InMemorySuspendedRunStore(); + + // Process lifetime #1 — the run suspends at the approval node. + const a = { snapshot: undefined as unknown, ran: [] as string[] }; + const engineA = buildEngine(store, a); + const paused = await engineA.execute('approval_flow'); + expect(paused.status).toBe('paused'); + expect(paused.runId).toBeDefined(); + expect(await store.list()).toHaveLength(1); + + // Process lifetime #2 — a brand-new engine cold-boots. The run is + // NOT in its in-memory cache… + const b = { snapshot: undefined as unknown, ran: [] as string[] }; + const engineB = buildEngine(store, b); + expect(engineB.listSuspendedRuns()).toHaveLength(0); + // …but it is visible and resumable via the durable store. + const durable = await engineB.listSuspendedRunsDurable(); + expect(durable).toHaveLength(1); + expect(durable[0]).toMatchObject({ + runId: paused.runId, flowName: 'approval_flow', nodeId: 'pause', correlation: 'req_1', + }); + + const resumed = await engineB.resume(paused.runId!, { + branchLabel: 'approve', output: { decision: 'approved' }, + }); + expect(resumed.success).toBe(true); + expect(resumed.status).toBeUndefined(); + // Continued down the correct branch on the fresh engine. + expect(b.ran).toContain('approved'); + expect(b.ran).not.toContain('rejected'); + // Variables (nested object + array) round-tripped through the store. + expect(b.snapshot).toEqual({ nested: { value: 42 }, arr: [1, 2, 3] }); + // The durable record is consumed on terminal completion. + expect(await store.list()).toHaveLength(0); + expect(await engineB.listSuspendedRunsDurable()).toHaveLength(0); + }); + + it('resume is idempotent: a duplicate resume does not double-run downstream', async () => { + const store = new InMemorySuspendedRunStore(); + const a = { snapshot: undefined as unknown, ran: [] as string[] }; + const engineA = buildEngine(store, a); + const paused = await engineA.execute('approval_flow'); + + // Fresh engine (restart) resumes once. + const b = { snapshot: undefined as unknown, ran: [] as string[] }; + const engineB = buildEngine(store, b); + const first = await engineB.resume(paused.runId!, { branchLabel: 'approve' }); + expect(first.success).toBe(true); + expect(b.ran.filter(x => x === 'approved')).toHaveLength(1); + + // A second resume of the same run finds nothing — no double-run. + const second = await engineB.resume(paused.runId!, { branchLabel: 'approve' }); + expect(second.success).toBe(false); + expect(second.error).toContain('No suspended run'); + expect(b.ran.filter(x => x === 'approved')).toHaveLength(1); + }); + + it('listSuspendedRunsDurable falls back to the in-memory list with no store', async () => { + const e = new AutomationEngine(createTestLogger()); // no store + e.registerNodeExecutor({ + type: 'pause_node', + async execute() { return { success: true, suspend: true, correlation: 'req_1' }; }, + }); + e.registerFlow('p', { + name: 'p', label: 'P', type: 'autolaunched', + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'pause', type: 'pause_node', label: 'Pause' }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'pause' }, + { id: 'e2', source: 'pause', target: 'end' }, + ], + }); + const paused = await e.execute('p'); + const durable = await e.listSuspendedRunsDurable(); + expect(durable).toHaveLength(1); + expect(durable[0].runId).toBe(paused.runId); + }); + }); }); describe('IAutomationService Contract', () => { diff --git a/packages/services/service-automation/src/engine.ts b/packages/services/service-automation/src/engine.ts index 3146997ff..4b33b5e1b 100644 --- a/packages/services/service-automation/src/engine.ts +++ b/packages/services/service-automation/src/engine.ts @@ -190,9 +190,10 @@ export interface ConnectorDescriptor { // ─── Core Automation Engine ───────────────────────────────────────── /** - * Internal execution step log entry. + * Execution step log entry. Part of a {@link SuspendedRun}'s persisted state, so + * it survives serialization to a durable {@link SuspendedRunStore}. */ -interface StepLogEntry { +export interface StepLogEntry { nodeId: string; nodeType: string; nodeLabel?: string; @@ -242,12 +243,15 @@ function isSuspendSignal(err: unknown): err is FlowSuspendSignal { } /** - * A run paused at a node, awaiting {@link AutomationEngine.resume}. Held - * in-memory, matching the engine's existing in-memory run model — durable - * persistence of suspended runs across process restart is a follow-up tracked - * with run-state persistence generally (ADR-0019 §Consequences). + * A run paused at a node, awaiting {@link AutomationEngine.resume} (ADR-0019). + * + * Held in an in-memory hot cache and — when a {@link SuspendedRunStore} is + * configured — mirrored to durable storage so the pause survives a process + * restart. Every field is JSON-serializable (the engine's variable `Map` is + * snapshotted as a plain object) so the whole record round-trips through a + * store. */ -interface SuspendedRun { +export interface SuspendedRun { runId: string; flowName: string; flowVersion?: number; @@ -264,6 +268,28 @@ interface SuspendedRun { screen?: ScreenSpec; } +/** + * Pluggable durable store for suspended runs (ADR-0019). The engine persists a + * {@link SuspendedRun} on suspend and deletes it on terminal completion; on + * {@link AutomationEngine.resume} of a run not in the in-memory cache (e.g. + * after a process restart) it rehydrates from here. + * + * The default is purely in-memory (no store); a host wires a DB-backed store + * (`ObjectStoreSuspendedRunStore`, on `sys_automation_run`) for production / + * serverless deployments where the process hibernates between suspend and + * resume. + */ +export interface SuspendedRunStore { + /** Persist (insert or replace) a suspended run. */ + save(run: SuspendedRun): Promise; + /** Load a suspended run by id, or `null` if not stored. */ + load(runId: string): Promise; + /** Remove a suspended run's durable record (idempotent). */ + delete(runId: string): Promise; + /** List all currently-stored suspended runs. */ + list(): Promise; +} + export class AutomationEngine implements IAutomationService { private flows = new Map(); private flowEnabled = new Map(); @@ -282,12 +308,88 @@ export class AutomationEngine implements IAutomationService { private executionLogs: ExecutionLogEntry[] = []; private maxLogSize = 1000; private logger: Logger; - private runCounter = 0; - /** Runs paused at a node, keyed by runId (ADR-0019). In-memory, see {@link SuspendedRun}. */ + /** + * Runs paused at a node, keyed by runId (ADR-0019). In-memory hot cache — + * mirrored to {@link store} when one is configured, so a pause survives a + * process restart. See {@link SuspendedRun}. + */ private suspendedRuns = new Map(); + /** + * Optional durable backing for {@link suspendedRuns}. When set, suspended + * runs are persisted on suspend and rehydrated on resume after a restart; + * when absent, behaviour is purely in-memory (the historical default). + */ + private store?: SuspendedRunStore; + /** + * Run ids currently mid-resume — an in-process idempotency guard so a + * duplicate `resume(runId)` can't re-enter and double-run side effects. + */ + private resuming = new Set(); - constructor(logger: Logger) { + constructor(logger: Logger, store?: SuspendedRunStore) { this.logger = logger; + this.store = store; + } + + /** + * Attach (or replace) the durable {@link SuspendedRunStore}. Used by the + * service plugin to upgrade the engine to DB-backed persistence once the + * ObjectQL engine is available (the engine is constructed earlier, during + * `init`, before services are wired). + */ + setSuspendedRunStore(store: SuspendedRunStore): void { + this.store = store; + } + + /** + * Generate a process-unique run id. Includes a random component so ids do + * not collide with runs persisted by a previous process lifetime (a plain + * incrementing counter would reissue `run_1` after a restart, clashing with + * a still-suspended durable run). + */ + private nextRunId(): string { + const g = globalThis as { crypto?: { randomUUID?: () => string } }; + const rand = g.crypto?.randomUUID + ? g.crypto.randomUUID() + : `${Date.now().toString(36)}${Math.random().toString(36).slice(2, 10)}`; + return `run_${rand}`; + } + + /** + * Persist a suspended run to the in-memory cache and (best-effort) the + * durable store. A store failure is logged but does not fail the run — the + * in-memory copy still allows in-process resume; only cross-restart + * durability is lost. + */ + private async persistSuspendedRun(run: SuspendedRun): Promise { + this.suspendedRuns.set(run.runId, run); + if (this.store) { + try { + await this.store.save(run); + } catch (err) { + this.logger.warn( + `[automation] failed to persist suspended run '${run.runId}' to durable store (kept in memory only): ${(err as Error).message}`, + ); + } + } + } + + /** + * Drop a suspended run from the in-memory cache and (best-effort) the + * durable store. Called once the run is claimed for resume or reaches a + * terminal state. + */ + private async forgetSuspendedRun(runId: string): Promise { + this.suspendedRuns.delete(runId); + if (this.store) { + try { + await this.store.delete(runId); + } catch (err) { + this.logger.warn( + `[automation] failed to delete suspended run '${runId}' from durable store: ${(err as Error).message}`, + ); + } + } } // ── Plugin Extension API ────────────────────────────── @@ -730,7 +832,7 @@ export class AutomationEngine implements IAutomationService { variables.set('previous', context.previous); } - const runId = `run_${++this.runCounter}`; + const runId = this.nextRunId(); // Expose the run id to executors (ADR-0019): a pausing node (e.g. Approval) // reads `$runId` to map its external state back to this run for resume. variables.set('$runId', runId); @@ -810,7 +912,7 @@ export class AutomationEngine implements IAutomationService { // caller can later `resume()` it. This is NOT a failure. if (isSuspendSignal(err)) { const durationMs = Date.now() - startTime; - this.suspendedRuns.set(runId, { + await this.persistSuspendedRun({ runId, flowName, flowVersion: flow.version, @@ -888,87 +990,129 @@ export class AutomationEngine implements IAutomationService { * returns `{ status: 'paused', runId }` afresh. */ async resume(runId: string, signal?: ResumeSignal): Promise { - const run = this.suspendedRuns.get(runId); - if (!run) { - return { success: false, error: `No suspended run '${runId}'` }; - } - const flow = this.flows.get(run.flowName); - if (!flow) { - return { success: false, error: `Flow '${run.flowName}' not found for run '${runId}'` }; + // Idempotency guard (set synchronously, before any await): reject a + // concurrent duplicate resume of the same run so side effects can't run + // twice. A duplicate that arrives *after* this one finishes finds no + // suspended run and returns the "no suspended run" error below. + if (this.resuming.has(runId)) { + return { success: false, error: `Run '${runId}' is already being resumed` }; } - const node = flow.nodes.find(n => n.id === run.nodeId); - if (!node) { - return { success: false, error: `Suspended node '${run.nodeId}' no longer exists in flow '${run.flowName}'` }; - } - // Consume the suspension — a run resumes exactly once per pause. - this.suspendedRuns.delete(runId); - - // Restore variable context and apply the resume signal's output as if it - // were the node's output, so downstream edges branch on it. - const variables = new Map(Object.entries(run.variables)); - if (signal?.output) { - for (const [key, value] of Object.entries(signal.output)) { - variables.set(`${run.nodeId}.${key}`, value); + this.resuming.add(runId); + try { + // Hot path: suspended in this process. Cold path: rehydrate from the + // durable store (e.g. the process restarted since the pause, ADR-0019). + let run = this.suspendedRuns.get(runId) ?? null; + if (!run && this.store) { + try { + run = await this.store.load(runId); + } catch (err) { + this.logger.warn( + `[automation] failed to load suspended run '${runId}' from durable store: ${(err as Error).message}`, + ); + } } - } - // Bare flow variables — a `screen` node's collected inputs land under - // their plain names so downstream `{var}` interpolation / conditions - // read them directly (e.g. `new_assignee` → update_record fields). - if (signal?.variables) { - for (const [key, value] of Object.entries(signal.variables)) { - variables.set(key, value); + if (!run) { + return { success: false, error: `No suspended run '${runId}'` }; + } + const flow = this.flows.get(run.flowName); + if (!flow) { + return { success: false, error: `Flow '${run.flowName}' not found for run '${runId}'` }; + } + const node = flow.nodes.find(n => n.id === run.nodeId); + if (!node) { + return { success: false, error: `Suspended node '${run.nodeId}' no longer exists in flow '${run.flowName}'` }; + } + // Consume the suspension *before* running downstream work — a run + // resumes exactly once per pause, and a duplicate resume after a + // partial restart must not double-run side effects. + await this.forgetSuspendedRun(runId); + + // Restore variable context and apply the resume signal's output as if it + // were the node's output, so downstream edges branch on it. + const variables = new Map(Object.entries(run.variables)); + if (signal?.output) { + for (const [key, value] of Object.entries(signal.output)) { + variables.set(`${run.nodeId}.${key}`, value); + } + } + // Bare flow variables — a `screen` node's collected inputs land under + // their plain names so downstream `{var}` interpolation / conditions + // read them directly (e.g. `new_assignee` → update_record fields). + if (signal?.variables) { + for (const [key, value] of Object.entries(signal.variables)) { + variables.set(key, value); + } } - } - const steps = run.steps; - const context = run.context; + const steps = run.steps; + const context = run.context; - try { - await this.traverseNext(node, flow, variables, context, steps, signal?.branchLabel); + try { + await this.traverseNext(node, flow, variables, context, steps, signal?.branchLabel); - // Collect output variables - const output: Record = {}; - if (flow.variables) { - for (const v of flow.variables) { - if (v.isOutput) output[v.name] = variables.get(v.name); + // Collect output variables + const output: Record = {}; + if (flow.variables) { + for (const v of flow.variables) { + if (v.isOutput) output[v.name] = variables.get(v.name); + } } - } - const durationMs = Date.now() - run.startTime; - this.recordLog({ - id: runId, - flowName: run.flowName, - flowVersion: run.flowVersion, - status: 'completed', - startedAt: run.startedAt, - completedAt: new Date().toISOString(), - durationMs, - trigger: { - type: context.event ?? 'manual', - userId: context.userId, - object: context.object, - }, - steps, - output, - }); - return { success: true, output, durationMs }; - } catch (err: unknown) { - // Re-suspended at a downstream node: persist a fresh continuation. - if (isSuspendSignal(err)) { const durationMs = Date.now() - run.startTime; - this.suspendedRuns.set(runId, { - ...run, - nodeId: err.nodeId, - variables: Object.fromEntries(variables), + this.recordLog({ + id: runId, + flowName: run.flowName, + flowVersion: run.flowVersion, + status: 'completed', + startedAt: run.startedAt, + completedAt: new Date().toISOString(), + durationMs, + trigger: { + type: context.event ?? 'manual', + userId: context.userId, + object: context.object, + }, steps, - correlation: err.correlation, - screen: err.screen, + output, }); + return { success: true, output, durationMs }; + } catch (err: unknown) { + // Re-suspended at a downstream node: persist a fresh continuation. + if (isSuspendSignal(err)) { + const durationMs = Date.now() - run.startTime; + await this.persistSuspendedRun({ + ...run, + nodeId: err.nodeId, + variables: Object.fromEntries(variables), + steps, + correlation: err.correlation, + screen: err.screen, + }); + this.recordLog({ + id: runId, + flowName: run.flowName, + flowVersion: run.flowVersion, + status: 'paused', + startedAt: run.startedAt, + durationMs, + trigger: { + type: context.event ?? 'manual', + userId: context.userId, + object: context.object, + }, + steps, + }); + return { success: true, status: 'paused', runId, durationMs, screen: err.screen }; + } + + const errorMessage = err instanceof Error ? err.message : String(err); + const durationMs = Date.now() - run.startTime; this.recordLog({ id: runId, flowName: run.flowName, flowVersion: run.flowVersion, - status: 'paused', + status: 'failed', startedAt: run.startedAt, + completedAt: new Date().toISOString(), durationMs, trigger: { type: context.event ?? 'manual', @@ -976,35 +1120,23 @@ export class AutomationEngine implements IAutomationService { object: context.object, }, steps, + error: errorMessage, }); - return { success: true, status: 'paused', runId, durationMs, screen: err.screen }; + return { success: false, error: errorMessage, durationMs }; } - - const errorMessage = err instanceof Error ? err.message : String(err); - const durationMs = Date.now() - run.startTime; - this.recordLog({ - id: runId, - flowName: run.flowName, - flowVersion: run.flowVersion, - status: 'failed', - startedAt: run.startedAt, - completedAt: new Date().toISOString(), - durationMs, - trigger: { - type: context.event ?? 'manual', - userId: context.userId, - object: context.object, - }, - steps, - error: errorMessage, - }); - return { success: false, error: errorMessage, durationMs }; + } finally { + this.resuming.delete(runId); } } /** * List the runs currently suspended awaiting {@link resume} (ADR-0019). * Backs operability surfaces such as a "pending approvals" view. + * + * Synchronous — reads the in-memory cache only, so after a process restart + * runs that suspended in a prior lifetime are not listed here even though + * they remain durably stored and resumable by id. Use + * {@link listSuspendedRunsDurable} to include those. */ listSuspendedRuns(): Array<{ runId: string; flowName: string; nodeId: string; correlation?: string }> { return [...this.suspendedRuns.values()].map(r => ({ @@ -1015,6 +1147,30 @@ export class AutomationEngine implements IAutomationService { })); } + /** + * Like {@link listSuspendedRuns} but includes runs held only in the durable + * {@link SuspendedRunStore} (e.g. suspended before a restart). The in-memory + * cache takes precedence on id collisions. Falls back to the in-memory list + * when no store is configured. + */ + async listSuspendedRunsDurable(): Promise> { + const byId = new Map(); + if (this.store) { + try { + for (const r of await this.store.list()) { + byId.set(r.runId, { runId: r.runId, flowName: r.flowName, nodeId: r.nodeId, correlation: r.correlation }); + } + } catch (err) { + this.logger.warn(`[automation] failed to list suspended runs from durable store: ${(err as Error).message}`); + } + } + // In-memory entries win — they are the freshest copy. + for (const r of this.suspendedRuns.values()) { + byId.set(r.runId, { runId: r.runId, flowName: r.flowName, nodeId: r.nodeId, correlation: r.correlation }); + } + return [...byId.values()]; + } + /** * The screen a paused run is currently waiting on (screen-flow runtime), or * `null` if the run isn't suspended / didn't pause at a screen node. Lets a @@ -1592,7 +1748,7 @@ export class AutomationEngine implements IAutomationService { variables.set('$record', context.record); } - const runId = `run_${++this.runCounter}`; + const runId = this.nextRunId(); const startedAt = new Date().toISOString(); const steps: StepLogEntry[] = []; diff --git a/packages/services/service-automation/src/index.ts b/packages/services/service-automation/src/index.ts index c727f79c4..3ede8be07 100644 --- a/packages/services/service-automation/src/index.ts +++ b/packages/services/service-automation/src/index.ts @@ -12,8 +12,23 @@ export type { RegisteredConnector, ConnectorDescriptor, ConnectorActionDescriptor, + SuspendedRun, + SuspendedRunStore, + StepLogEntry, } from './engine.js'; +// Durable suspended-run persistence (ADR-0019). The in-memory store is the +// default; the ObjectQL-backed store persists pauses across process restarts. +export { + InMemorySuspendedRunStore, + ObjectStoreSuspendedRunStore, +} from './suspended-run-store.js'; +export type { SuspendedRunStoreEngine } from './suspended-run-store.js'; + +// The sys_automation_run object backing the durable store — registered by +// AutomationServicePlugin and exported for hosts wiring a custom store. +export { SysAutomationRun } from './sys-automation-run.object.js'; + // Kernel plugin — seeds all built-in nodes; this is the only plugin needed for // a fully-functional automation capability. export { AutomationServicePlugin } from './plugin.js'; diff --git a/packages/services/service-automation/src/plugin.ts b/packages/services/service-automation/src/plugin.ts index 0b16215d1..a3cf6a9d0 100644 --- a/packages/services/service-automation/src/plugin.ts +++ b/packages/services/service-automation/src/plugin.ts @@ -3,6 +3,8 @@ import type { Plugin, PluginContext } from '@objectstack/core'; import { AutomationEngine } from './engine.js'; import { installBuiltinNodes } from './builtin/index.js'; +import { SysAutomationRun } from './sys-automation-run.object.js'; +import { ObjectStoreSuspendedRunStore, type SuspendedRunStoreEngine } from './suspended-run-store.js'; /** * Configuration options for the AutomationServicePlugin. @@ -10,6 +12,17 @@ import { installBuiltinNodes } from './builtin/index.js'; export interface AutomationServicePluginOptions { /** Enable debug logging for flow execution */ debug?: boolean; + /** + * Durable suspended-run persistence (ADR-0019): + * - `'auto'` (default): persist to `sys_automation_run` via the ObjectQL + * engine when one is available, otherwise stay in-memory. + * - `'memory'`: never persist — keep suspended runs in memory only (the + * historical behaviour; suitable for tests / single-process dev). + * + * When persistence is on, a run paused at an approval / wait / screen node + * survives a process restart and can be resumed after a cold boot. + */ + suspendedRunStore?: 'auto' | 'memory'; } /** @@ -62,6 +75,28 @@ export class AutomationServicePlugin implements Plugin { // Register as global service — other plugins access via ctx.getService('automation') ctx.registerService('automation', this.engine); + // Register the sys_automation_run object so suspended-run state migrates + // like other sys_* tables (ADR-0019). Best-effort: a host without the + // manifest service still runs in-memory. Skipped when persistence is off. + if ((this.options.suspendedRunStore ?? 'auto') !== 'memory') { + try { + ctx.getService<{ register(m: unknown): void }>('manifest').register({ + id: 'com.objectstack.service-automation', + name: 'Automation Service', + version: '1.0.0', + type: 'plugin', + scope: 'system', + defaultDatasource: 'cloud', + namespace: 'sys', + objects: [SysAutomationRun], + }); + } catch (err) { + ctx.logger.warn( + `[Automation] manifest service unavailable; sys_automation_run not registered (suspended runs stay in-memory): ${(err as Error).message}`, + ); + } + } + // Seed the platform's built-in node executors. A bare // `new AutomationServicePlugin()` is thus a self-contained automation // capability — no companion node-pack plugins required (ADR-0018). @@ -91,6 +126,23 @@ export class AutomationServicePlugin implements Plugin { `[Automation] Engine started with ${nodeTypes.length} node types: ${nodeTypes.join(', ') || '(none)'}`, ); + // Upgrade to durable suspended-run persistence when an ObjectQL engine is + // present (ADR-0019). The engine was constructed in init() before + // services were wired, so we attach the DB-backed store here. Without an + // engine (or with `suspendedRunStore: 'memory'`) the in-memory default + // stands — suspended runs simply don't survive a restart. + if ((this.options.suspendedRunStore ?? 'auto') !== 'memory') { + let dataEngine: SuspendedRunStoreEngine | null = null; + try { dataEngine = ctx.getService('objectql'); } + catch { try { dataEngine = ctx.getService('data'); } catch { /* none */ } } + if (dataEngine && typeof dataEngine.find === 'function' && typeof dataEngine.insert === 'function') { + this.engine.setSuspendedRunStore(new ObjectStoreSuspendedRunStore(dataEngine, ctx.logger)); + ctx.logger.info('[Automation] Suspended-run persistence enabled (sys_automation_run)'); + } else { + ctx.logger.info('[Automation] No ObjectQL engine — suspended runs kept in-memory only'); + } + } + // Pull flow definitions from the ObjectQL schema registry. AppPlugin.init() // calls manifest.register(payload), which routes to ql.registerApp() and // stores each inline flow under type 'flow'. By the time start() runs, diff --git a/packages/services/service-automation/src/suspended-run-store.test.ts b/packages/services/service-automation/src/suspended-run-store.test.ts new file mode 100644 index 000000000..af606e071 --- /dev/null +++ b/packages/services/service-automation/src/suspended-run-store.test.ts @@ -0,0 +1,150 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { AutomationEngine } from './engine.js'; +import { ObjectStoreSuspendedRunStore, type SuspendedRunStoreEngine } from './suspended-run-store.js'; +import type { SuspendedRun } from './engine.js'; + +function createTestLogger() { + return { info: () => {}, warn: () => {}, error: () => {}, debug: () => {}, child: () => createTestLogger() } as any; +} + +/** + * Minimal in-memory ObjectQL-like engine: rows keyed by id, with `where` + * equality filtering. Stands in for the `sys_automation_run` table so we can + * exercise {@link ObjectStoreSuspendedRunStore} (and a restart through it) + * without a real driver. + */ +function createFakeEngine(): SuspendedRunStoreEngine & { rows: Map } { + const rows = new Map(); + const matches = (row: any, where: any) => + !where || Object.entries(where).every(([k, v]) => row[k] === v); + return { + rows, + async find(_object, options) { + const where = options?.where; + const out = [...rows.values()].filter(r => matches(r, where)); + return typeof options?.limit === 'number' ? out.slice(0, options.limit) : out; + }, + async insert(_object, data) { + rows.set(String(data.id), { ...data }); + return data; + }, + async update(_object, data, options) { + const id = options?.where?.id ?? data.id; + const existing = rows.get(String(id)) ?? { id }; + rows.set(String(id), { ...existing, ...data }); + return rows.get(String(id)); + }, + async delete(_object, options) { + const id = options?.where?.id; + rows.delete(String(id)); + return true; + }, + }; +} + +const baseRun = (): SuspendedRun => ({ + runId: 'run_abc', + flowName: 'approval_flow', + flowVersion: 3, + nodeId: 'approve_step', + variables: { $runId: 'run_abc', pause: { snapshot: { nested: { value: 42 }, arr: [1, 2, 3] } } }, + steps: [{ nodeId: 'start', nodeType: 'start', status: 'success', startedAt: '2026-01-01T00:00:00.000Z' }], + context: { object: 'crm_deal', userId: 'u1', organizationId: 'org_1', record: { id: 'd1', amount: 100 } } as any, + startedAt: '2026-01-01T00:00:00.000Z', + startTime: 1735689600000, + correlation: 'areq_1', +}); + +describe('ObjectStoreSuspendedRunStore', () => { + it('round-trips a suspended run (nested variables, steps, context)', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + const run = baseRun(); + + await store.save(run); + // Persisted as a single row with JSON-encoded state columns. + expect(engine.rows.size).toBe(1); + const row = engine.rows.get('run_abc'); + expect(row).toMatchObject({ + id: 'run_abc', flow_name: 'approval_flow', flow_version: 3, + node_id: 'approve_step', status: 'paused', correlation: 'areq_1', + user_id: 'u1', organization_id: 'org_1', + }); + expect(typeof row.variables_json).toBe('string'); + + const loaded = await store.load('run_abc'); + expect(loaded).not.toBeNull(); + expect(loaded).toEqual(run); + }); + + it('upserts on re-save rather than duplicating', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + await store.save(baseRun()); + await store.save({ ...baseRun(), nodeId: 'second_step' }); + expect(engine.rows.size).toBe(1); + expect((await store.load('run_abc'))?.nodeId).toBe('second_step'); + }); + + it('deletes and lists paused runs', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + await store.save(baseRun()); + await store.save({ ...baseRun(), runId: 'run_def' }); + expect(await store.list()).toHaveLength(2); + + await store.delete('run_abc'); + expect(await store.load('run_abc')).toBeNull(); + const remaining = await store.list(); + expect(remaining.map(r => r.runId)).toEqual(['run_def']); + }); + + it('drives a suspend → restart → resume through the DB-backed store', async () => { + const engine = createFakeEngine(); + const ran: string[] = []; + + function build() { + const e = new AutomationEngine(createTestLogger(), new ObjectStoreSuspendedRunStore(engine, createTestLogger())); + e.registerNodeExecutor({ + type: 'pause_node', + async execute() { return { success: true, suspend: true, correlation: 'areq_1' }; }, + }); + e.registerNodeExecutor({ + type: 'branch_node', + async execute(node) { ran.push(node.id); return { success: true }; }, + }); + e.registerFlow('approval_flow', { + name: 'approval_flow', label: 'Approval Flow', type: 'autolaunched', + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'pause', type: 'pause_node', label: 'Approval' }, + { id: 'approved', type: 'branch_node', label: 'Approved' }, + { id: 'rejected', type: 'branch_node', label: 'Rejected' }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'pause' }, + { id: 'e2', source: 'pause', target: 'approved', label: 'approve' }, + { id: 'e3', source: 'pause', target: 'rejected', label: 'reject' }, + { id: 'e4', source: 'approved', target: 'end' }, + { id: 'e5', source: 'rejected', target: 'end' }, + ], + }); + return e; + } + + const paused = await build().execute('approval_flow'); + expect(paused.status).toBe('paused'); + expect(engine.rows.size).toBe(1); // durably stored + + // Fresh engine over the same backing table — the run survives. + const resumed = await build().resume(paused.runId!, { branchLabel: 'reject' }); + expect(resumed.success).toBe(true); + expect(ran).toContain('rejected'); + expect(ran).not.toContain('approved'); + // Row removed on terminal completion. + expect(engine.rows.size).toBe(0); + }); +}); diff --git a/packages/services/service-automation/src/suspended-run-store.ts b/packages/services/service-automation/src/suspended-run-store.ts new file mode 100644 index 000000000..719aa276d --- /dev/null +++ b/packages/services/service-automation/src/suspended-run-store.ts @@ -0,0 +1,191 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { Logger } from '@objectstack/spec/contracts'; +import type { SuspendedRun, SuspendedRunStore } from './engine.js'; + +/** + * Durable persistence for suspended flow runs (ADR-0019). + * + * The engine keeps an in-memory map of paused runs; that map is lost on a + * process restart (e.g. a hibernating Cloudflare Worker), so a run that paused + * at an `approval` / `wait` / `screen` node can never be resumed afterwards. + * A {@link SuspendedRunStore} backs the in-memory map with durable storage so a + * cold-booted kernel can rehydrate and continue. + * + * Two implementations ship here: + * - {@link InMemorySuspendedRunStore} — a Map (the default behaviour, for + * tests / dev). It JSON round-trips on save/load so it faithfully exercises + * the serialization boundary a DB store imposes. + * - {@link ObjectStoreSuspendedRunStore} — persists to the `sys_automation_run` + * object via the ObjectQL engine, for production / serverless hosts. + */ + +const TABLE = 'sys_automation_run'; +const SYSTEM_CTX = { isSystem: true, roles: [], permissions: [] } as const; + +/** Deep clone via JSON so a stored snapshot can't alias live engine state. */ +function jsonClone(value: T): T { + return JSON.parse(JSON.stringify(value)) as T; +} + +/** Parse a JSON column that may already be an object (some drivers auto-parse). */ +function parseJson(raw: unknown, fallback: T): T { + if (raw == null || raw === '') return fallback; + if (typeof raw === 'string') { + try { + return JSON.parse(raw) as T; + } catch { + return fallback; + } + } + return raw as T; +} + +/** + * In-memory {@link SuspendedRunStore}. Snapshots are JSON-cloned on the way in + * and out, matching the serialize/deserialize boundary of a DB-backed store — + * so a unit test can share one instance across two engine instances to simulate + * a process restart (suspend on engine A, resume on engine B). + */ +export class InMemorySuspendedRunStore implements SuspendedRunStore { + private readonly runs = new Map(); + + async save(run: SuspendedRun): Promise { + this.runs.set(run.runId, jsonClone(run)); + } + + async load(runId: string): Promise { + const run = this.runs.get(runId); + return run ? jsonClone(run) : null; + } + + async delete(runId: string): Promise { + this.runs.delete(runId); + } + + async list(): Promise { + return [...this.runs.values()].map(jsonClone); + } +} + +/** + * Minimal ObjectQL engine surface the {@link ObjectStoreSuspendedRunStore} uses. + * Matches the find/insert/update/delete shape exposed by the `objectql` service + * (and mirrors `ApprovalEngine` in plugin-approvals). + */ +export interface SuspendedRunStoreEngine { + find(object: string, options?: any): Promise; + insert(object: string, data: any, options?: any): Promise; + update(object: string, data: any, options?: any): Promise; + delete?(object: string, options?: any): Promise; +} + +interface MinimalLogger { + warn?: Logger['warn']; + debug?: Logger['debug']; +} + +/** + * Durable {@link SuspendedRunStore} backed by the `sys_automation_run` object. + * + * Persists the resumable run state (`variables` / `steps` / `context` / `screen`) + * JSON-serialized, so the engine's `Map`-based variable context round-trips. The + * row is keyed by `runId` and removed on terminal completion; only live pauses + * are stored. All access uses a system context — these are infrastructure rows, + * not tenant data subject to RLS (the tenant is captured in `organization_id` + * for scoping/observability). + */ +export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { + constructor( + private readonly engine: SuspendedRunStoreEngine, + private readonly logger?: MinimalLogger, + ) {} + + async save(run: SuspendedRun): Promise { + const now = new Date().toISOString(); + const row = this.serialize(run); + // Upsert: a re-suspend (the run paused again at a downstream node) updates + // the existing row rather than inserting a duplicate. + const existing = await this.engine.find(TABLE, { + where: { id: run.runId }, limit: 1, context: SYSTEM_CTX, + }); + if (Array.isArray(existing) && existing[0]) { + await this.engine.update( + TABLE, + { ...row, updated_at: now }, + { where: { id: run.runId }, context: SYSTEM_CTX }, + ); + } else { + await this.engine.insert( + TABLE, + { ...row, created_at: now, updated_at: now }, + { context: SYSTEM_CTX }, + ); + } + } + + async load(runId: string): Promise { + const rows = await this.engine.find(TABLE, { + where: { id: runId }, limit: 1, context: SYSTEM_CTX, + }); + const row = Array.isArray(rows) ? rows[0] : null; + return row ? this.deserialize(row) : null; + } + + async delete(runId: string): Promise { + if (typeof this.engine.delete !== 'function') { + this.logger?.warn?.( + `[automation] ObjectStoreSuspendedRunStore: engine has no delete(); suspended run '${runId}' row not removed`, + ); + return; + } + await this.engine.delete(TABLE, { where: { id: runId }, context: SYSTEM_CTX }); + } + + async list(): Promise { + const rows = await this.engine.find(TABLE, { + where: { status: 'paused' }, limit: 1000, context: SYSTEM_CTX, + }); + return (Array.isArray(rows) ? rows : []).map(r => this.deserialize(r)); + } + + /** Flatten a run into a `sys_automation_run` row (state columns JSON-encoded). */ + private serialize(run: SuspendedRun): Record { + const ctx = (run.context ?? {}) as Record; + const org = ctx.organizationId ?? ctx.tenantId ?? null; + return { + id: run.runId, + organization_id: org, + flow_name: run.flowName, + flow_version: run.flowVersion ?? null, + node_id: run.nodeId, + status: 'paused', + correlation: run.correlation ?? null, + user_id: ctx.userId ?? null, + variables_json: JSON.stringify(run.variables ?? {}), + steps_json: JSON.stringify(run.steps ?? []), + context_json: JSON.stringify(run.context ?? {}), + screen_json: run.screen ? JSON.stringify(run.screen) : null, + started_at: run.startedAt, + start_time: run.startTime ?? null, + }; + } + + /** Rebuild a run from a `sys_automation_run` row. */ + private deserialize(row: any): SuspendedRun { + const startedAt = row.started_at ?? new Date().toISOString(); + return { + runId: String(row.id), + flowName: String(row.flow_name ?? ''), + flowVersion: row.flow_version ?? undefined, + nodeId: String(row.node_id ?? ''), + variables: parseJson>(row.variables_json, {}), + steps: parseJson(row.steps_json, []), + context: parseJson(row.context_json, {}), + startedAt, + startTime: typeof row.start_time === 'number' ? row.start_time : (Date.parse(startedAt) || Date.now()), + correlation: row.correlation ?? undefined, + screen: parseJson(row.screen_json, undefined as any), + }; + } +} diff --git a/packages/services/service-automation/src/sys-automation-run.object.ts b/packages/services/service-automation/src/sys-automation-run.object.ts new file mode 100644 index 000000000..8b610a429 --- /dev/null +++ b/packages/services/service-automation/src/sys-automation-run.object.ts @@ -0,0 +1,153 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * sys_automation_run — Durable state of a **suspended** automation flow run. + * + * ADR-0019: a flow that reaches a long-lived pause node (an `approval` node, + * `wait`, `screen`, …) suspends. Without persistence the continuation lives + * only in the engine's in-memory map, so a process restart (e.g. a hibernating + * Cloudflare Worker) loses the run and `resume(runId)` fails even though the + * approval record survives. Persisting the run here makes the pause **durable**: + * the engine writes a row on suspend and deletes it on terminal completion, so a + * cold-booted kernel can rehydrate and continue. + * + * Lifecycle: one row per *currently* suspended run. The row is removed when the + * run resumes to completion or fails — only live pauses are stored. `id` is the + * `runId`; `correlation` ties back to the pausing node's external state (e.g. + * `sys_approval_request.id`, mirrored by `sys_approval_request.flow_run_id`). + * + * The resumable state (`variables` / `steps` / `context` / `screen`) is stored + * JSON-serialized — the engine works with a `Map`, which round-trips through + * these `*_json` columns. + * + * Writers: the automation engine's durable {@link SuspendedRunStore}. + * Readers: operability surfaces (a "pending/suspended runs" view), the engine on + * resume after a restart. + * + * @namespace sys + */ +export const SysAutomationRun = ObjectSchema.create({ + name: 'sys_automation_run', + label: 'Automation Run', + pluralLabel: 'Automation Runs', + icon: 'pause-circle', + isSystem: true, + managedBy: 'system', + description: 'Durable state of a suspended automation flow run (ADR-0019)', + displayNameField: 'id', + titleFormat: '{flow_name} · {node_id}', + compactLayout: ['flow_name', 'node_id', 'status', 'correlation', 'started_at', 'updated_at'], + + fields: { + id: Field.text({ label: 'Run ID', required: true, readonly: true, group: 'System' }), + + organization_id: Field.lookup('sys_organization', { + label: 'Organization', + required: false, + group: 'System', + description: 'Tenant that owns this run (propagated from the trigger context)', + }), + + flow_name: Field.text({ + label: 'Flow', + required: true, + maxLength: 255, + searchable: true, + group: 'Identity', + }), + + flow_version: Field.number({ label: 'Flow Version', required: false, group: 'Identity' }), + + node_id: Field.text({ + label: 'Paused Node', + required: true, + maxLength: 255, + description: 'Node the run is suspended at; resume continues from its out-edges.', + group: 'State', + }), + + status: Field.select( + ['paused'], + { + label: 'Status', + required: true, + defaultValue: 'paused', + description: 'Only suspended runs are persisted; the row is deleted on terminal completion.', + group: 'State', + }, + ), + + correlation: Field.text({ + label: 'Correlation', + required: false, + maxLength: 255, + description: 'Correlation key from the pausing node (e.g. approval request id).', + group: 'State', + }), + + user_id: Field.text({ + label: 'User', + required: false, + maxLength: 255, + description: 'User who triggered the run (from context.userId).', + group: 'State', + }), + + variables_json: Field.textarea({ + label: 'Variables', + required: false, + description: 'JSON snapshot of the flow variable map at suspend time.', + group: 'State', + }), + + steps_json: Field.textarea({ + label: 'Steps', + required: false, + description: 'JSON snapshot of the executed step logs so far.', + group: 'State', + }), + + context_json: Field.textarea({ + label: 'Context', + required: false, + description: 'JSON snapshot of the trigger / automation context.', + group: 'State', + }), + + screen_json: Field.textarea({ + label: 'Screen', + required: false, + description: 'JSON snapshot of the screen spec the run is waiting on (screen-flow runtime).', + group: 'State', + }), + + started_at: Field.datetime({ label: 'Started At', required: true, group: 'State' }), + + start_time: Field.number({ + label: 'Start Time (epoch ms)', + required: false, + description: 'Epoch ms when the run started; used to compute duration on resume.', + group: 'State', + }), + + created_at: Field.datetime({ + label: 'Created At', + required: true, + defaultValue: 'NOW()', + readonly: true, + group: 'System', + }), + + updated_at: Field.datetime({ label: 'Updated At', required: false, group: 'System' }), + }, + + indexes: [ + // "Which runs are suspended for this flow?" — operability / resume sweeps. + { fields: ['flow_name', 'status'] }, + { fields: ['status', 'updated_at'] }, + // Look up a suspended run by the pausing node's correlation key. + { fields: ['correlation'] }, + ], +});