Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .changeset/durable-suspended-flow-runs-1518.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
"@objectstack/service-automation": minor
---

Persist suspended flow runs so a durable pause survives a process restart (#1518).

`service-automation` kept suspended runs in an in-memory `Map` only, so a flow
paused at an `approval` / `wait` / `screen` node could never be resumed after the
process restarted — a hard blocker on hibernating/serverless hosts (e.g. the
Cloudflare Workers control plane), where the approval record persists but
`resume(runId)` had nothing to continue.

The engine now backs that map with a pluggable `SuspendedRunStore` (ADR-0019):

- **`SuspendedRunStore`** interface + two implementations — `InMemorySuspendedRunStore`
(the default; JSON round-trips so it faithfully mirrors a DB boundary) and
`ObjectStoreSuspendedRunStore`, which persists to a new **`sys_automation_run`**
system object via the ObjectQL engine. `AutomationServicePlugin` registers the
object and auto-enables the DB-backed store when an ObjectQL engine is present
(opt out with `suspendedRunStore: 'memory'`).
- **Durable suspend/resume** — a run is persisted on suspend and deleted on
terminal completion. `resume(runId)` rehydrates from the store when the run is
not in memory (cold boot), so a fully restarted kernel can continue from the
paused node down the correct branch and run the downstream nodes. The resumable
state (`variables` / `steps` / `context` / `screen`) round-trips through the
store, including nested objects.
- **Idempotent resume** — the suspension is consumed before downstream work runs,
plus an in-process guard rejects a concurrent duplicate `resume`, so a repeated
resume after a partial restart can't double-run side effects.
- Run ids are now process-unique (random component) so they don't collide with a
still-suspended run persisted by a previous process lifetime.

New exports: `SuspendedRun`, `SuspendedRunStore`, `StepLogEntry`,
`InMemorySuspendedRunStore`, `ObjectStoreSuspendedRunStore`,
`SuspendedRunStoreEngine`, `SysAutomationRun`, plus
`AutomationEngine.setSuspendedRunStore()` and `listSuspendedRunsDurable()`.
Existing service-automation and plugin-approvals tests pass unchanged.
136 changes: 136 additions & 0 deletions packages/services/service-automation/src/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { LiteKernel } from '@objectstack/core';
import { AutomationEngine } from './engine.js';
import { AutomationServicePlugin } from './plugin.js';
import { registerScreenNodes } from './builtin/screen-nodes.js';
import { InMemorySuspendedRunStore } from './suspended-run-store.js';
import type { NodeExecutor } from './engine.js';
import type { IAutomationService } from '@objectstack/spec/contracts';

Expand Down Expand Up @@ -562,6 +563,141 @@ describe('AutomationEngine', () => {
expect(resumed.success).toBe(true);
expect(resumed.status).toBeUndefined();
});

// ── Durable persistence across a process restart (ADR-0019) ──
//
// A shared SuspendedRunStore stands in for a database: suspend on one
// engine instance, then resume on a brand-new instance backed by the
// same store — simulating a cold boot after the original process is gone.
describe('Durable persistence across process restart', () => {
function buildEngine(
store: InMemorySuspendedRunStore,
captured: { snapshot?: unknown; ran: string[] },
) {
const e = new AutomationEngine(createTestLogger(), store);
e.registerNodeExecutor({
type: 'pause_node',
async execute() {
// Snapshot a nested object + array so we can assert the
// variable map round-trips through the store.
return {
success: true,
suspend: true,
correlation: 'req_1',
output: { snapshot: { nested: { value: 42 }, arr: [1, 2, 3] } },
};
},
});
e.registerNodeExecutor({
type: 'branch_node',
async execute(node, variables) {
captured.ran.push(node.id);
captured.snapshot = variables.get('pause.snapshot');
return { success: true };
},
});
e.registerFlow('approval_flow', {
name: 'approval_flow', label: 'Approval Flow', type: 'autolaunched',
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{ id: 'pause', type: 'pause_node', label: 'Approval' },
{ id: 'approved', type: 'branch_node', label: 'Approved' },
{ id: 'rejected', type: 'branch_node', label: 'Rejected' },
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'pause' },
{ id: 'e2', source: 'pause', target: 'approved', label: 'approve' },
{ id: 'e3', source: 'pause', target: 'rejected', label: 'reject' },
{ id: 'e4', source: 'approved', target: 'end' },
{ id: 'e5', source: 'rejected', target: 'end' },
],
});
return e;
}

it('survives a full restart: suspend on one engine, resume on a fresh one', async () => {
const store = new InMemorySuspendedRunStore();

// Process lifetime #1 — the run suspends at the approval node.
const a = { snapshot: undefined as unknown, ran: [] as string[] };
const engineA = buildEngine(store, a);
const paused = await engineA.execute('approval_flow');
expect(paused.status).toBe('paused');
expect(paused.runId).toBeDefined();
expect(await store.list()).toHaveLength(1);

// Process lifetime #2 — a brand-new engine cold-boots. The run is
// NOT in its in-memory cache…
const b = { snapshot: undefined as unknown, ran: [] as string[] };
const engineB = buildEngine(store, b);
expect(engineB.listSuspendedRuns()).toHaveLength(0);
// …but it is visible and resumable via the durable store.
const durable = await engineB.listSuspendedRunsDurable();
expect(durable).toHaveLength(1);
expect(durable[0]).toMatchObject({
runId: paused.runId, flowName: 'approval_flow', nodeId: 'pause', correlation: 'req_1',
});

const resumed = await engineB.resume(paused.runId!, {
branchLabel: 'approve', output: { decision: 'approved' },
});
expect(resumed.success).toBe(true);
expect(resumed.status).toBeUndefined();
// Continued down the correct branch on the fresh engine.
expect(b.ran).toContain('approved');
expect(b.ran).not.toContain('rejected');
// Variables (nested object + array) round-tripped through the store.
expect(b.snapshot).toEqual({ nested: { value: 42 }, arr: [1, 2, 3] });
// The durable record is consumed on terminal completion.
expect(await store.list()).toHaveLength(0);
expect(await engineB.listSuspendedRunsDurable()).toHaveLength(0);
});

it('resume is idempotent: a duplicate resume does not double-run downstream', async () => {
const store = new InMemorySuspendedRunStore();
const a = { snapshot: undefined as unknown, ran: [] as string[] };
const engineA = buildEngine(store, a);
const paused = await engineA.execute('approval_flow');

// Fresh engine (restart) resumes once.
const b = { snapshot: undefined as unknown, ran: [] as string[] };
const engineB = buildEngine(store, b);
const first = await engineB.resume(paused.runId!, { branchLabel: 'approve' });
expect(first.success).toBe(true);
expect(b.ran.filter(x => x === 'approved')).toHaveLength(1);

// A second resume of the same run finds nothing — no double-run.
const second = await engineB.resume(paused.runId!, { branchLabel: 'approve' });
expect(second.success).toBe(false);
expect(second.error).toContain('No suspended run');
expect(b.ran.filter(x => x === 'approved')).toHaveLength(1);
});

it('listSuspendedRunsDurable falls back to the in-memory list with no store', async () => {
const e = new AutomationEngine(createTestLogger()); // no store
e.registerNodeExecutor({
type: 'pause_node',
async execute() { return { success: true, suspend: true, correlation: 'req_1' }; },
});
e.registerFlow('p', {
name: 'p', label: 'P', type: 'autolaunched',
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{ id: 'pause', type: 'pause_node', label: 'Pause' },
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'pause' },
{ id: 'e2', source: 'pause', target: 'end' },
],
});
const paused = await e.execute('p');
const durable = await e.listSuspendedRunsDurable();
expect(durable).toHaveLength(1);
expect(durable[0].runId).toBe(paused.runId);
});
});
});

describe('IAutomationService Contract', () => {
Expand Down
Loading