diff --git a/.changeset/adr-0018-m3-http-notify.md b/.changeset/adr-0018-m3-http-notify.md new file mode 100644 index 000000000..db43c79d4 --- /dev/null +++ b/.changeset/adr-0018-m3-http-notify.md @@ -0,0 +1,36 @@ +--- +"@objectstack/service-messaging": minor +"@objectstack/service-automation": minor +"@objectstack/spec": minor +--- + +ADR-0018 M3: unified `http` / `notify` executors backed by a generic HTTP outbox. + +Promotes a reliable outbound-HTTP delivery outbox into `service-messaging` (the +raw-callout counterpart to the notification outbox) and routes the Flow `http` +node through it — closing the "`http_request` is a bare `fetch()` with no retry" +gap. The five divergent outbound verbs collapse onto canonical `http` / `notify`. + +**`@objectstack/service-messaging` (additive):** + +- `IHttpOutbox` / `HttpDelivery` generic raw-callout shape + (`source` / `refId` / `dedupKey` / `label` / `signingSecret`), `SqlHttpOutbox` + over a new `sys_http_delivery` object, `MemoryHttpOutbox`, `HttpDispatcher` + (per-partition cluster lock, claim/ack/retry/dead-letter), and a shared + `sendOnce` + 7-step jittered retry schedule. +- `MessagingService` gains `setHttpOutbox()` / `isHttpDeliveryReady()` / + `enqueueHttp()`; the plugin wires the outbox + dispatcher at `kernel:ready`. + +**`@objectstack/service-automation`:** + +- Canonical `http` executor — `durable: true` enqueues onto the messaging HTTP + outbox (retry/dead-letter); otherwise an inline `fetch()` preserving + `http_request`'s request/response semantics. +- `engine.registerNodeAlias()` — registers a delegating executor + a + `deprecated` / `aliasOf` descriptor. `http_request` / `http_call` / `webhook` + are now deprecated aliases of `http`; existing flows keep running. +- `notify` descriptor marked `needsOutbox` (its delivery is outbox-backed). + +**`@objectstack/spec`:** `flow.zod` adds `http` to the builtin node-type seed set. + +`plugin-webhooks` cut-over to the shared outbox is a deliberate follow-up. diff --git a/packages/services/service-automation/src/builtin/http-nodes.test.ts b/packages/services/service-automation/src/builtin/http-nodes.test.ts new file mode 100644 index 000000000..24409e8de --- /dev/null +++ b/packages/services/service-automation/src/builtin/http-nodes.test.ts @@ -0,0 +1,161 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { AutomationEngine } from '../engine.js'; +import { registerHttpNodes } from './http-nodes.js'; + +function createTestLogger() { + return { + info: () => {}, + warn: () => {}, + error: () => {}, + debug: () => {}, + child: () => createTestLogger(), + } as any; +} + +interface HttpSurface { + isHttpDeliveryReady?(): boolean; + enqueueHttp?(input: any): Promise; +} + +function createCtx(messaging?: HttpSurface) { + return { + logger: createTestLogger(), + getService(name: string) { + if (name === 'messaging') return messaging; + return undefined; + }, + } as any; +} + +function httpFlow(type: 'http' | 'http_request' | 'http_call' | 'webhook', config: Record) { + return { + name: 'http_flow', + label: 'HTTP Flow', + type: 'autolaunched' as const, + variables: [ + { name: 'host', type: 'text' as const, isInput: true }, + { name: 'http.status', type: 'number' as const, isOutput: true }, + ], + nodes: [ + { id: 'start', type: 'start' as const, label: 'Start' }, + { id: 'http', type: type as any, label: 'HTTP', config }, + { id: 'end', type: 'end' as const, label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'http' }, + { id: 'e2', source: 'http', target: 'end' }, + ], + }; +} + +describe('http (canonical node) + deprecated aliases', () => { + it('publishes a builtin io descriptor flagged needsOutbox', () => { + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx()); + const d = engine.getActionDescriptor('http'); + expect(d?.source).toBe('builtin'); + expect(d?.category).toBe('io'); + expect(d?.needsOutbox).toBe(true); + expect(d?.paradigms).toEqual(expect.arrayContaining(['flow', 'workflow_rule', 'approval'])); + }); + + it('registers http_request/http_call/webhook as deprecated aliases of http', () => { + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx()); + for (const alias of ['http_request', 'http_call', 'webhook']) { + expect(engine.getRegisteredNodeTypes()).toContain(alias); + const d = engine.getActionDescriptor(alias); + expect(d?.deprecated).toBe(true); + expect(d?.aliasOf).toBe('http'); + } + }); + + describe('durable mode', () => { + it('enqueues onto the messaging HTTP outbox and returns a deliveryId', async () => { + const enqueued: any[] = []; + const messaging: HttpSurface = { + isHttpDeliveryReady: () => true, + async enqueueHttp(input) { + enqueued.push(input); + return 'dlv_1'; + }, + }; + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx(messaging)); + engine.registerFlow( + 'http_flow', + httpFlow('http', { url: 'https://example.test/hook', durable: true, body: { a: 1 } }), + ); + + const result = await engine.execute('http_flow'); + expect(result.success).toBe(true); + expect(enqueued).toHaveLength(1); + expect(enqueued[0]).toMatchObject({ + source: 'flow', + url: 'https://example.test/hook', + method: 'POST', + payload: { a: 1 }, + }); + }); + + it('degrades to an inline fetch when no HTTP outbox is wired', async () => { + const fetchMock = vi.fn(async () => ({ ok: true, status: 200, async json() { return { ok: true }; }, async text() { return ''; } })); + vi.stubGlobal('fetch', fetchMock); + const messaging: HttpSurface = { isHttpDeliveryReady: () => false }; + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx(messaging)); + engine.registerFlow('http_flow', httpFlow('http', { url: 'https://x/y', durable: true })); + + const result = await engine.execute('http_flow'); + expect(result.success).toBe(true); + expect(fetchMock).toHaveBeenCalledOnce(); + }); + }); + + describe('request/response mode (default)', () => { + let fetchMock: any; + beforeEach(() => { + fetchMock = vi.fn(async () => ({ + ok: true, + status: 201, + async json() { return { created: true }; }, + async text() { return ''; }, + })); + vi.stubGlobal('fetch', fetchMock); + }); + afterEach(() => vi.unstubAllGlobals()); + + it('runs an inline fetch and returns response + status', async () => { + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx()); + engine.registerFlow('http_flow', httpFlow('http', { url: 'https://api.test/items', method: 'POST', body: { n: 1 } })); + + const result = await engine.execute('http_flow'); + expect(result.success).toBe(true); + expect(fetchMock).toHaveBeenCalledOnce(); + const [, init] = fetchMock.mock.calls[0]; + expect(init.method).toBe('POST'); + expect(JSON.parse(init.body)).toEqual({ n: 1 }); + }); + + it('fails the step when url is missing', async () => { + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx()); + engine.registerFlow('http_flow', httpFlow('http', { method: 'GET' })); + const result = await engine.execute('http_flow'); + expect(result.success).toBe(false); + expect(result.error).toContain('url'); + }); + + it('a legacy http_request node still runs (via the alias → http)', async () => { + const engine = new AutomationEngine(createTestLogger()); + registerHttpNodes(engine, createCtx()); + engine.registerFlow('http_flow', httpFlow('http_request', { url: 'https://legacy.test', method: 'GET' })); + const result = await engine.execute('http_flow'); + expect(result.success).toBe(true); + expect(fetchMock).toHaveBeenCalledOnce(); + }); + }); +}); diff --git a/packages/services/service-automation/src/builtin/http-nodes.ts b/packages/services/service-automation/src/builtin/http-nodes.ts index 20309b675..db01909a4 100644 --- a/packages/services/service-automation/src/builtin/http-nodes.ts +++ b/packages/services/service-automation/src/builtin/http-nodes.ts @@ -2,60 +2,182 @@ import type { PluginContext } from '@objectstack/core'; import { defineActionDescriptor } from '@objectstack/spec/automation'; +import { randomUUID } from 'node:crypto'; import type { AutomationEngine } from '../engine.js'; +import { interpolate } from './template.js'; /** - * HTTP built-in node — `http_request` (foundational outbound I/O). + * HTTP built-in node — canonical `http` (ADR-0018 M3) + deprecated aliases. * - * Part of the platform baseline, so the core {@link AutomationServicePlugin} - * seeds it directly (ADR-0018). Its generic-dispatch sibling `connector_action` - * (see {@link ./connector-nodes.ts}) is now also baseline: where `http_request` - * calls a raw URL, `connector_action` invokes a registered connector's action, - * with concrete connectors contributed by plugins via `engine.registerConnector()` - * (ADR-0018 §Addendum). + * `http` is the single outbound-callout verb the platform offers Flow, Workflow + * Rules and Approval. It replaces the five divergent names (`http_request` / + * `http_call` / `webhook` / …) which are kept as **deprecated aliases** of + * `http` for back-compat (registered via {@link AutomationEngine.registerNodeAlias}). * - * ADR-0018 §M3 target: route `http_request` through the service-messaging - * outbox (retry / idempotency / dead-letter) under the canonical `http` type. - * Today it is a bare `fetch()`. + * Two execution modes: + * + * - **Durable (`config.durable: true`)** — fire-and-forget callout enqueued + * onto the `service-messaging` HTTP outbox (`sys_http_delivery`), inheriting + * retry / idempotency / dead-letter. The flow gets back `{ deliveryId }` and + * does NOT block on the response. This closes the "`http_request` is a bare + * fetch with no retry" reliability gap (ADR-0018 §4). When no messaging HTTP + * outbox is wired the node degrades to the inline call below. + * + * - **Request/response (default)** — a synchronous `fetch()` returning + * `{ response, status }` to the flow, preserving the historical `http_request` + * behavior so existing flows that read the response keep working. (The ADR's + * `isAsync` suspend-and-resume variant is future work.) */ + +/** Structural view of `service-messaging`'s HTTP outbox surface (ADR-0018 M3). */ +interface MessagingHttpSurface { + isHttpDeliveryReady?(): boolean; + enqueueHttp?(input: { + source: string; + refId: string; + dedupKey: string; + label?: string; + url: string; + method?: string; + headers?: Record; + signingSecret?: string; + timeoutMs?: number; + payload: unknown; + }): Promise; +} + +const HTTP_TYPE = 'http' as const; + export function registerHttpNodes(engine: AutomationEngine, ctx: PluginContext): void { - // http_request node executor - engine.registerNodeExecutor({ - type: 'http_request', - descriptor: defineActionDescriptor({ - type: 'http_request', version: '1.0.0', name: 'HTTP Request', - description: 'Call an external HTTP endpoint. (ADR-0018: migrates to outbox-backed `http`.)', - icon: 'globe', category: 'io', source: 'builtin', - // ADR-0018 §M3 target: route via service-messaging outbox for - // retry/idempotency/dead-letter. Today this is a bare fetch(). - needsOutbox: false, supportsRetry: true, - paradigms: ['flow', 'workflow_rule', 'approval'], - }), - async execute(node, _variables, _context) { - const config = node.config as Record | undefined; - const url = config?.url as string | undefined; - const method = (config?.method as string) ?? 'GET'; - const headers = config?.headers as Record | undefined; - const body = config?.body; - - if (!url) { - return { success: false, error: 'http_request: url is required' }; + const getMessaging = (): MessagingHttpSurface | undefined => { + try { + return ctx.getService('messaging'); + } catch { + return undefined; + } + }; + + engine.registerNodeExecutor({ + type: HTTP_TYPE, + descriptor: defineActionDescriptor({ + type: HTTP_TYPE, + version: '1.0.0', + name: 'HTTP', + description: + 'Call an external HTTP endpoint. With `durable: true`, the call is enqueued on the ' + + 'messaging outbox with retry / dead-letter; otherwise it runs inline and returns the response.', + icon: 'globe', + category: 'io', + source: 'builtin', + // Capable of outbox-backed durable delivery (used when durable:true + // and the messaging HTTP outbox is wired). + needsOutbox: true, + supportsRetry: true, + paradigms: ['flow', 'workflow_rule', 'approval'], + configSchema: { + type: 'object', + required: ['url'], + properties: { + url: { type: 'string', description: 'Target URL' }, + method: { type: 'string', description: 'HTTP method (default GET; POST when durable)' }, + headers: { type: 'object', description: 'Request headers' }, + body: { description: 'Request body (JSON-serialised)' }, + durable: { + type: 'boolean', + description: 'Fire-and-forget via the durable outbox (retry/dead-letter) instead of inline request/response', + }, + timeoutMs: { type: 'number', description: 'Per-request timeout (ms)' }, + signingSecret: { type: 'string', description: 'HMAC-SHA256 secret → X-Objectstack-Signature' }, + }, + }, + }), + async execute(node, variables, context) { + const raw = (node.config ?? {}) as Record; + const cfg = interpolate(raw, variables, context) as Record; + + const url = cfg.url as string | undefined; + if (!url) return { success: false, error: 'http: url is required' }; + + const durable = cfg.durable === true; + const headers = cfg.headers as Record | undefined; + const body = cfg.body; + const timeoutMs = typeof cfg.timeoutMs === 'number' ? cfg.timeoutMs : undefined; + const signingSecret = cfg.signingSecret as string | undefined; + + // ── Durable mode: enqueue onto the messaging HTTP outbox ────────── + if (durable) { + const messaging = getMessaging(); + if (messaging?.isHttpDeliveryReady?.() && messaging.enqueueHttp) { + try { + const deliveryId = await messaging.enqueueHttp({ + source: 'flow', + refId: node.id, + dedupKey: randomUUID(), + label: `flow:${node.id}`, + url, + method: (cfg.method as string) ?? 'POST', + headers, + signingSecret, + timeoutMs, + payload: body ?? {}, + }); + return { success: true, output: { deliveryId, enqueued: true } }; + } catch (err) { + return { success: false, error: `http (durable) failed to enqueue: ${(err as Error).message}` }; + } } + // No outbox available — degrade to a best-effort inline call. + ctx.logger.warn( + `[http] node '${node.id}' requested durable delivery but no messaging HTTP outbox is wired; falling back to inline fetch`, + ); + } + // ── Request/response mode (default; preserves http_request) ─────── + const method = (cfg.method as string) ?? 'GET'; + const controller = new AbortController(); + const timer = timeoutMs ? setTimeout(() => controller.abort(), timeoutMs) : undefined; + try { const response = await fetch(url, { method, headers, - body: body ? JSON.stringify(body) : undefined, + body: body !== undefined && body !== null ? JSON.stringify(body) : undefined, + signal: controller.signal, }); - const data = await response.json(); - + const data = await readBody(response); return { success: response.ok, output: { response: data, status: response.status }, error: response.ok ? undefined : `HTTP ${response.status}`, }; - }, - }); + } catch (err) { + const e = err as { name?: string; message?: string }; + const msg = e?.name === 'AbortError' ? `timeout after ${timeoutMs}ms` : e?.message ?? String(err); + return { success: false, error: `http: ${msg}` }; + } finally { + if (timer) clearTimeout(timer); + } + }, + }); + + // ADR-0018 M3: collapse the divergent outbound verbs onto `http`. Old saved + // flows / workflow rules / approval actions keep running via these aliases. + engine.registerNodeAlias('http_request', HTTP_TYPE, { name: 'HTTP Request', needsOutbox: true }); + engine.registerNodeAlias('http_call', HTTP_TYPE, { name: 'HTTP Call', needsOutbox: true }); + engine.registerNodeAlias('webhook', HTTP_TYPE, { name: 'Webhook', needsOutbox: true }); + + ctx.logger.info('[HTTP] http executor registered (+ deprecated aliases: http_request, http_call, webhook)'); +} - ctx.logger.info('[HTTP] 1 built-in node executor registered (http_request)'); +/** Read a response body as JSON, falling back to text (empty body → null). */ +async function readBody(response: { json(): Promise; text(): Promise }): Promise { + try { + return await response.json(); + } catch { + try { + const text = await response.text(); + return text || null; + } catch { + return null; + } + } } diff --git a/packages/services/service-automation/src/builtin/notify-node.ts b/packages/services/service-automation/src/builtin/notify-node.ts index 085f63478..2d1d0a596 100644 --- a/packages/services/service-automation/src/builtin/notify-node.ts +++ b/packages/services/service-automation/src/builtin/notify-node.ts @@ -64,6 +64,9 @@ export function registerNotifyNode(engine: AutomationEngine, ctx: PluginContext) description: 'Send an outbound notification to users via the messaging service (inbox / email / push / …).', icon: 'bell', category: 'io', source: 'builtin', supportsRetry: true, + // Delivery is outbox-backed inside the messaging service (ADR-0030 + // emit → sys_notification_delivery), so it inherits retry/dead-letter. + needsOutbox: true, paradigms: ['flow', 'workflow_rule', 'approval'], }), async execute(node, variables, context) { diff --git a/packages/services/service-automation/src/engine.ts b/packages/services/service-automation/src/engine.ts index 08f5772a6..278674f3f 100644 --- a/packages/services/service-automation/src/engine.ts +++ b/packages/services/service-automation/src/engine.ts @@ -4,7 +4,7 @@ import type { FlowParsed, FlowNodeParsed, FlowEdgeParsed } from '@objectstack/sp import type { ExecutionLog, ActionDescriptor } from '@objectstack/spec/automation'; import type { AutomationContext, AutomationResult, ResumeSignal, IAutomationService, ScreenSpec } from '@objectstack/spec/contracts'; import type { Logger } from '@objectstack/spec/contracts'; -import { FlowSchema, FLOW_STRUCTURAL_NODE_TYPES } from '@objectstack/spec/automation'; +import { FlowSchema, FLOW_STRUCTURAL_NODE_TYPES, defineActionDescriptor } from '@objectstack/spec/automation'; import type { Connector } from '@objectstack/spec/integration'; import { ConnectorSchema } from '@objectstack/spec/integration'; // Static import (not a lazy `require`): the engine ships as ESM ("type":"module"), @@ -316,6 +316,63 @@ export class AutomationEngine implements IAutomationService { this.logger.info(`Node executor registered: ${executor.type}`); } + /** + * Register a **deprecated alias** of a canonical node type (ADR-0018 M3). + * + * The alias is a real registered executor, so old saved flows whose nodes + * use the alias type keep validating and running with no migration. At + * execute time it delegates to the canonical executor (resolved live, so the + * canonical may be registered before or after the alias), logging a one-time + * deprecation warning. Its published descriptor is flagged `deprecated` + + * `aliasOf` so the designer palette can hide or mark it while the canonical + * type is the one offered for new authoring. + * + * This is how ADR-0018 collapses the five outbound verbs onto `http` / + * `notify`: `http_request` / `http_call` / `webhook` become aliases of + * `http`. + */ + registerNodeAlias( + alias: string, + canonicalType: string, + meta?: { name?: string; category?: ActionDescriptor['category']; paradigms?: ActionDescriptor['paradigms']; needsOutbox?: boolean }, + ): void { + const engine = this; + let warned = false; + this.registerNodeExecutor({ + type: alias, + descriptor: defineActionDescriptor({ + type: alias, + version: '1.0.0', + name: meta?.name ?? alias, + description: `Deprecated alias of '${canonicalType}' (ADR-0018 M3). Author new flows with '${canonicalType}'.`, + category: meta?.category ?? 'io', + source: 'builtin', + paradigms: meta?.paradigms ?? ['flow', 'workflow_rule', 'approval'], + supportsRetry: true, + needsOutbox: meta?.needsOutbox ?? false, + deprecated: true, + aliasOf: canonicalType, + }), + async execute(node, variables, context) { + if (!warned) { + warned = true; + engine.logger.warn( + `Node type '${alias}' is deprecated; use '${canonicalType}' (ADR-0018 M3). Existing flows keep running via the alias.`, + ); + } + const target = engine.nodeExecutors.get(canonicalType); + if (!target) { + return { + success: false, + error: `alias '${alias}' → '${canonicalType}': canonical executor not registered`, + }; + } + return target.execute(node, variables, context); + }, + }); + this.logger.info(`Node alias registered: ${alias} → ${canonicalType} (deprecated)`); + } + /** Unregister a node executor (hot-unplug) */ unregisterNodeExecutor(type: string): void { const executor = this.nodeExecutors.get(type); diff --git a/packages/services/service-messaging/src/http-dispatcher.ts b/packages/services/service-messaging/src/http-dispatcher.ts new file mode 100644 index 000000000..689219fc1 --- /dev/null +++ b/packages/services/service-messaging/src/http-dispatcher.ts @@ -0,0 +1,205 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import type { DispatchCluster, DispatchLockHandle } from './dispatcher.js'; +import { classifyAttempt, sendOnce, type FetchImpl } from './http-sender.js'; +import type { HttpDelivery, IHttpOutbox } from './http-outbox.js'; + +/** + * HttpDispatcher (ADR-0018 M3) — drains the generic outbound-HTTP outbox + * (`sys_http_delivery`) and POSTs each row, retrying with backoff and + * dead-lettering once the budget is exhausted. + * + * Structurally identical to `NotificationDispatcher` / `WebhookDispatcher`: an + * interval loop walks `partitionCount` partitions, each guarded by a + * per-partition cluster lock; within a held partition it claims a batch + * (`pending → in_flight`), sends, and acks. Partition affinity is on the + * delivery's `refId`, preserving in-order delivery per source anchor. + * + * At-least-once: if the POST succeeds but the ack write fails, the row reverts + * to pending after the claim TTL and is re-posted. Receivers MUST be idempotent + * on the `X-Objectstack-Delivery` (== row id) header. + */ + +const SINGLE_NODE_CLUSTER: DispatchCluster = { + lock: { + async acquire() { + return { release() {}, isHeld: () => true, renew() {} }; + }, + }, +}; + +export interface HttpDispatcherLogger { + warn: (msg: string, meta?: any) => void; + info?: (msg: string, meta?: any) => void; +} + +export interface HttpDispatcherOptions { + /** Stable id identifying this dispatcher node. */ + nodeId: string; + /** Outbox backend. */ + outbox: IHttpOutbox; + /** Cross-node coordination. Defaults to a single-node always-grant lock. */ + cluster?: DispatchCluster; + /** Partitions to split work across (must match the outbox's). Default 8. */ + partitionCount?: number; + /** Max rows to claim from each partition per tick. Default 32. */ + batchSize?: number; + /** Tick interval in ms. Default 500. */ + intervalMs?: number; + /** Per-partition lock TTL. Default = 5 × intervalMs. */ + lockTtlMs?: number; + /** Visibility timeout for claimed rows. Default = 2 × lockTtlMs. */ + claimTtlMs?: number; + /** Override `globalThis.fetch` (tests). */ + fetchImpl?: FetchImpl; + /** RNG override for the retry-jitter schedule (tests). */ + rng?: () => number; + /** Injectable clock (ms) for deterministic tests. Defaults to Date.now. */ + now?: () => number; + /** Logger callback (optional). */ + logger?: HttpDispatcherLogger; + /** Hook fired after every attempt — observability hook. */ + onAttempt?: (delivery: HttpDelivery, success: boolean) => void; +} + +export class HttpDispatcher { + private readonly opts: Required< + Omit + > & + Pick & { + cluster: DispatchCluster; + }; + private timer: ReturnType | undefined; + private running = false; + private inflightTick: Promise | undefined; + + constructor(options: HttpDispatcherOptions) { + const intervalMs = options.intervalMs ?? 500; + const lockTtlMs = options.lockTtlMs ?? intervalMs * 5; + this.opts = { + nodeId: options.nodeId, + outbox: options.outbox, + cluster: options.cluster ?? SINGLE_NODE_CLUSTER, + partitionCount: options.partitionCount ?? 8, + batchSize: options.batchSize ?? 32, + intervalMs, + lockTtlMs, + claimTtlMs: options.claimTtlMs ?? lockTtlMs * 2, + fetchImpl: options.fetchImpl, + rng: options.rng, + now: options.now, + logger: options.logger, + onAttempt: options.onAttempt, + }; + } + + /** Begin the periodic loop. Safe to call once; subsequent calls are no-ops. */ + start(): void { + if (this.running) return; + this.running = true; + this.scheduleTick(); + this.timer = setInterval(() => this.scheduleTick(), this.opts.intervalMs); + this.timer.unref?.(); + } + + /** Stop the loop and wait for the in-flight tick to drain. */ + async stop(): Promise { + if (!this.running) return; + this.running = false; + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + if (this.inflightTick) { + try { + await this.inflightTick; + } catch { + /* swallow — already logged */ + } + } + } + + /** Run one full tick (all partitions). Exposed for deterministic tests. */ + async tick(): Promise { + await this.runTick(); + } + + private scheduleTick(): void { + if (this.inflightTick) return; + this.inflightTick = this.runTick() + .catch((err) => { + this.opts.logger?.warn?.('http-dispatcher: tick failed', { + nodeId: this.opts.nodeId, + error: (err as Error)?.message ?? String(err), + }); + }) + .finally(() => { + this.inflightTick = undefined; + }); + } + + private async runTick(): Promise { + const partitionCount = this.opts.partitionCount; + const offset = stableNodeOffset(this.opts.nodeId, partitionCount); + for (let step = 0; step < partitionCount; step++) { + const i = (offset + step) % partitionCount; + await this.runPartition(i); + } + } + + private async runPartition(index: number): Promise { + const key = `http.dispatcher.partition.${index}`; + const handle: DispatchLockHandle | null = await this.opts.cluster.lock.acquire(key, { + ttlMs: this.opts.lockTtlMs, + waitMs: 0, + }); + if (!handle) return; + + try { + const claimed = await this.opts.outbox.claim({ + nodeId: this.opts.nodeId, + limit: this.opts.batchSize, + partition: { index, count: this.opts.partitionCount }, + claimTtlMs: this.opts.claimTtlMs, + now: this.opts.now?.(), + }); + if (claimed.length === 0) return; + await handle.renew?.(this.opts.lockTtlMs); + for (const row of claimed) { + if (handle.isHeld && !handle.isHeld()) break; + await this.processRow(row); + } + } finally { + await handle.release(); + } + } + + private async processRow(row: HttpDelivery): Promise { + const fetchImpl = (this.opts.fetchImpl ?? (globalThis.fetch as unknown as FetchImpl)) as + | FetchImpl + | undefined; + if (!fetchImpl) { + this.opts.logger?.warn?.('http-dispatcher: no fetch impl available', { rowId: row.id }); + await this.opts.outbox.ack(row.id, { + success: false, + error: 'no fetch implementation', + durationMs: 0, + dead: true, + }); + return; + } + const outcome = await sendOnce(row, fetchImpl); + const result = classifyAttempt(outcome, row.attempts, this.opts.now?.() ?? Date.now(), this.opts.rng); + await this.opts.outbox.ack(row.id, result); + this.opts.onAttempt?.(row, result.success); + } +} + +/** Spread starting partition per node so nodes don't serialise on partition 0. */ +function stableNodeOffset(nodeId: string, partitionCount: number): number { + let h = 0; + for (let i = 0; i < nodeId.length; i++) { + h = (h * 31 + nodeId.charCodeAt(i)) | 0; + } + return Math.abs(h) % partitionCount; +} diff --git a/packages/services/service-messaging/src/http-outbox.test.ts b/packages/services/service-messaging/src/http-outbox.test.ts new file mode 100644 index 000000000..ad3affcff --- /dev/null +++ b/packages/services/service-messaging/src/http-outbox.test.ts @@ -0,0 +1,125 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { MemoryHttpOutbox } from './memory-http-outbox.js'; +import { HttpDispatcher } from './http-dispatcher.js'; +import type { FetchImpl } from './http-sender.js'; + +/** A fetch stub that records calls and returns a scripted sequence of responses. */ +function makeFetch(responses: Array<{ ok: boolean; status: number; body?: string } | 'throw'>): { + impl: FetchImpl; + calls: Array<{ url: string; method: string; headers: Record; body: string }>; +} { + const calls: Array<{ url: string; method: string; headers: Record; body: string }> = []; + let i = 0; + const impl: FetchImpl = async (url, init) => { + calls.push({ url, method: init.method, headers: init.headers, body: init.body }); + const r = responses[Math.min(i, responses.length - 1)]; + i++; + if (r === 'throw') throw new Error('network down'); + return { ok: r.ok, status: r.status, async text() { return r.body ?? ''; } }; + }; + return { impl, calls }; +} + +describe('MemoryHttpOutbox', () => { + it('dedups on (source, dedupKey)', async () => { + const outbox = new MemoryHttpOutbox(); + const a = await outbox.enqueue({ source: 'flow', refId: 'f1', dedupKey: 'k1', url: 'https://x', payload: {} }); + const b = await outbox.enqueue({ source: 'flow', refId: 'f1', dedupKey: 'k1', url: 'https://x', payload: {} }); + expect(a).toBe(b); + // Different source, same dedupKey → distinct row. + const c = await outbox.enqueue({ source: 'webhook', refId: 'f1', dedupKey: 'k1', url: 'https://x', payload: {} }); + expect(c).not.toBe(a); + expect(await outbox.list()).toHaveLength(2); + }); + + it('claim is exclusive and partition-filtered', async () => { + const outbox = new MemoryHttpOutbox(); + await outbox.enqueue({ source: 'flow', refId: 'a', dedupKey: '1', url: 'https://x', payload: {} }); + const first = await outbox.claim({ nodeId: 'n1', limit: 10, claimTtlMs: 1000 }); + expect(first).toHaveLength(1); + // Second claim sees nothing — the row is in_flight. + const second = await outbox.claim({ nodeId: 'n2', limit: 10, claimTtlMs: 1000 }); + expect(second).toHaveLength(0); + }); +}); + +describe('HttpDispatcher', () => { + it('delivers a pending row on success', async () => { + const outbox = new MemoryHttpOutbox(); + await outbox.enqueue({ + source: 'flow', + refId: 'r1', + dedupKey: 'd1', + label: 'flow:node1', + url: 'https://example.test/hook', + method: 'POST', + payload: { hello: 'world' }, + }); + const { impl, calls } = makeFetch([{ ok: true, status: 200, body: 'ok' }]); + const dispatcher = new HttpDispatcher({ nodeId: 'n1', outbox, fetchImpl: impl, partitionCount: 1 }); + + await dispatcher.tick(); + + expect(calls).toHaveLength(1); + expect(calls[0].url).toBe('https://example.test/hook'); + expect(calls[0].headers['X-Objectstack-Event']).toBe('flow:node1'); + expect(JSON.parse(calls[0].body)).toEqual({ hello: 'world' }); + const rows = await outbox.list(); + expect(rows[0].status).toBe('success'); + expect(rows[0].attempts).toBe(1); + expect(rows[0].responseCode).toBe(200); + }); + + it('schedules a retry on a 500 (stays pending with nextRetryAt)', async () => { + const outbox = new MemoryHttpOutbox(); + await outbox.enqueue({ source: 'flow', refId: 'r', dedupKey: 'd', url: 'https://x', payload: {} }); + const { impl } = makeFetch([{ ok: false, status: 500, body: 'boom' }]); + const dispatcher = new HttpDispatcher({ + nodeId: 'n1', + outbox, + fetchImpl: impl, + partitionCount: 1, + rng: () => 0.5, + now: () => 1_000, + }); + + await dispatcher.tick(); + + const rows = await outbox.list(); + expect(rows[0].status).toBe('pending'); + expect(rows[0].attempts).toBe(1); + expect(rows[0].nextRetryAt).toBeGreaterThan(1_000); + }); + + it('dead-letters a permanent 4xx failure immediately', async () => { + const outbox = new MemoryHttpOutbox(); + await outbox.enqueue({ source: 'flow', refId: 'r', dedupKey: 'd', url: 'https://x', payload: {} }); + const { impl } = makeFetch([{ ok: false, status: 400, body: 'bad request' }]); + const dispatcher = new HttpDispatcher({ nodeId: 'n1', outbox, fetchImpl: impl, partitionCount: 1 }); + + await dispatcher.tick(); + + const rows = await outbox.list(); + expect(rows[0].status).toBe('dead'); + }); + + it('adds an HMAC signature header when signingSecret is set', async () => { + const outbox = new MemoryHttpOutbox(); + await outbox.enqueue({ + source: 'webhook', + refId: 'w1', + dedupKey: 'e1', + url: 'https://x', + signingSecret: 's3cr3t', + payload: { a: 1 }, + }); + const { impl, calls } = makeFetch([{ ok: true, status: 204 }]); + const dispatcher = new HttpDispatcher({ nodeId: 'n1', outbox, fetchImpl: impl, partitionCount: 1 }); + + await dispatcher.tick(); + + expect(calls[0].headers['X-Objectstack-Signature']).toMatch(/^sha256=/); + }); +}); diff --git a/packages/services/service-messaging/src/http-outbox.ts b/packages/services/service-messaging/src/http-outbox.ts new file mode 100644 index 000000000..f8d95df25 --- /dev/null +++ b/packages/services/service-messaging/src/http-outbox.ts @@ -0,0 +1,184 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * Generic outbound-HTTP delivery outbox (ADR-0018 M3). + * + * This is the *raw HTTP callout* counterpart to the notification outbox + * (`outbox.ts`, which is recipient/channel-centric). It stores rows that must + * be POSTed (or any method) exactly once — modulo at-least-once + receiver-side + * idempotency — with retry / backoff / dead-letter handled by the shared + * {@link HttpDispatcher}. + * + * It generalises the original `plugin-webhooks` outbox so two callers share one + * reliable substrate: + * - the Flow `http` node executor (`@objectstack/service-automation`), and + * - webhook fan-out (`@objectstack/plugin-webhooks`), + * + * which is exactly the "build the reliability machinery once, reuse it + * everywhere" decision in ADR-0018 §4. Webhook-specific concepts collapse onto + * generic fields: `webhookId`→`refId`, `eventId`→`dedupKey`, `eventType`→`label`, + * `secret`→`signingSecret`. + */ + +export type HttpDeliveryStatus = + | 'pending' + | 'in_flight' + | 'success' + | 'failed' + | 'dead'; + +export interface HttpDelivery { + /** UUID — also doubles as the receiver-side idempotency key (`X-Objectstack-Delivery`). */ + id: string; + /** + * Provenance domain — e.g. `'webhook'` | `'flow'`. Combined with `dedupKey` + * for the uniqueness constraint, and used (with `refId`) for partition + * affinity so rows from one source/anchor stay in-order. + */ + source: string; + /** + * Partition / ordering anchor within `source` — the webhook id, the flow id, + * etc. `hash(refId) mod partitionCount` picks the partition. + */ + refId: string; + /** UNIQUE(source, dedup_key) prevents double-enqueue. */ + dedupKey: string; + /** + * Human/diagnostic label, e.g. an event type (`data.record.created`) or a + * `flow:node` id. Surfaced on the `X-Objectstack-Event` header when present. + */ + label?: string; + /** Destination URL (snapshotted on enqueue — config edits don't rewrite live rows). */ + url: string; + /** HTTP method — defaults to POST. */ + method?: string; + /** Custom headers. */ + headers?: Record; + /** HMAC-SHA256 secret. If present, an `X-Objectstack-Signature` header is added. */ + signingSecret?: string; + /** Per-request timeout in ms. */ + timeoutMs?: number; + /** JSON-serialisable body. */ + payload: unknown; + + /** Lifecycle state. */ + status: HttpDeliveryStatus; + /** Number of attempts made so far (0 before first attempt). */ + attempts: number; + /** Node id currently working on this row, when `status = in_flight`. */ + claimedBy?: string; + /** Wall-clock ms when the row was claimed. */ + claimedAt?: number; + /** Earliest ms at which this row becomes eligible for the next attempt. */ + nextRetryAt?: number; + /** Wall-clock ms of the last attempt (success or fail). */ + lastAttemptedAt?: number; + /** HTTP status code from the most recent attempt. */ + responseCode?: number; + /** Truncated response body for diagnostics. */ + responseBody?: string; + /** Last transport / timeout error message. */ + error?: string; + + createdAt: number; + updatedAt: number; +} + +export interface EnqueueHttpInput { + source: string; + refId: string; + dedupKey: string; + label?: string; + url: string; + method?: string; + headers?: Record; + signingSecret?: string; + timeoutMs?: number; + payload: unknown; +} + +export interface HttpClaimOptions { + /** Identifier of the node doing the claim (for `claimedBy`). */ + nodeId: string; + /** Max rows to claim per call. */ + limit: number; + /** + * Partition assignment for this worker. Only rows whose + * `hash(refId) mod count === index` are claimed. Omit to claim across all + * partitions (single-node mode). + */ + partition?: { index: number; count: number }; + /** Visibility timeout — claimed rows revert to pending after this many ms. */ + claimTtlMs: number; + /** "Now" reference, ms since epoch. Defaults to Date.now(). */ + now?: number; +} + +export interface HttpAckSuccess { + success: true; + httpStatus: number; + responseBody?: string; + durationMs: number; +} + +export interface HttpAckFailure { + success: false; + httpStatus?: number; + responseBody?: string; + error?: string; + durationMs: number; + /** Computed by the dispatcher per the retry schedule, or undefined for dead. */ + nextRetryAt?: number; + /** Marks the row terminal — no more attempts. */ + dead?: boolean; +} + +export type HttpAckResult = HttpAckSuccess | HttpAckFailure; + +/** + * Error raised by `IHttpOutbox.redeliver` when the requested row is either + * missing or in a non-terminal state. + */ +export class HttpRedeliverError extends Error { + constructor( + message: string, + readonly code: 'not_found' | 'not_eligible', + ) { + super(message); + this.name = 'HttpRedeliverError'; + } +} + +/** + * Pluggable storage backend for outbound-HTTP delivery rows. Implementations + * MUST make `claim()` atomic across concurrent callers — that property is the + * exactly-once guarantee. + */ +export interface IHttpOutbox { + /** + * Insert a new delivery row. Implementations MUST treat `(source, dedupKey)` + * as unique and silently converge duplicates. Returns the row id (existing + * or new). + */ + enqueue(input: EnqueueHttpInput): Promise; + + /** + * Atomically claim up to `limit` rows whose `nextRetryAt <= now` (or null) + * and matching the partition predicate. Claimed rows MUST be marked + * `in_flight` so concurrent claimers don't see them. + */ + claim(opts: HttpClaimOptions): Promise; + + /** Record the outcome of an attempt. */ + ack(id: string, result: HttpAckResult): Promise; + + /** Snapshot accessor for tests / admin tooling. */ + list(filter?: { status?: HttpDeliveryStatus; source?: string }): Promise; + + /** + * Reset a terminal row (`success` / `failed` / `dead`) back to `pending` so + * the dispatcher re-sends it. Resets `attempts=0`; URL / payload / secret are + * NOT touched (byte-for-byte replay). Throws {@link HttpRedeliverError}. + */ + redeliver(id: string): Promise; +} diff --git a/packages/services/service-messaging/src/http-sender.ts b/packages/services/service-messaging/src/http-sender.ts new file mode 100644 index 000000000..b77ed4b6c --- /dev/null +++ b/packages/services/service-messaging/src/http-sender.ts @@ -0,0 +1,184 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { createHmac, randomUUID } from 'node:crypto'; +import type { HttpAckResult, HttpDelivery } from './http-outbox.js'; + +/** + * Pure HTTP transport for the generic outbound-delivery outbox (ADR-0018 M3). + * + * Lifted and generalised from `plugin-webhooks/src/http-sender.ts`: a single + * stateless attempt (`sendOnce`) plus the retry-schedule classifier + * (`classifyAttempt`). The dispatcher owns claim/ack; this module owns the wire. + */ + +/** Default per-request timeout. */ +export const DEFAULT_HTTP_TIMEOUT_MS = 15_000; + +/** Truncate response bodies to keep storage cost predictable. */ +const RESPONSE_BODY_CAP = 16 * 1024; + +export type FetchImpl = ( + input: string, + init: { + method: string; + headers: Record; + body: string; + signal: AbortSignal; + }, +) => Promise<{ + ok: boolean; + status: number; + text(): Promise; +}>; + +/** Single HTTP attempt classified to an ack shape (without nextRetryAt). */ +export type HttpAttemptOutcome = + | { success: true; httpStatus: number; responseBody?: string; durationMs: number } + | { + success: false; + retriable: boolean; + httpStatus?: number; + responseBody?: string; + error?: string; + durationMs: number; + }; + +/** + * Send one HTTP attempt for the delivery. Pure (no DB writes) so the dispatcher + * owns retry-schedule + ack logic. + * + * - 2xx → success + * - 4xx (except 408/429) → permanent failure (retriable = false → dead) + * - 408, 429, 5xx, transport → retriable + */ +export async function sendOnce( + delivery: HttpDelivery, + fetchImpl: FetchImpl, +): Promise { + const body = + typeof delivery.payload === 'string' + ? delivery.payload + : JSON.stringify(delivery.payload ?? null); + + const headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': 'ObjectStack-Http/1.0', + 'X-Objectstack-Delivery': delivery.id, + 'X-Objectstack-Attempt': String(delivery.attempts + 1), + ...(delivery.label ? { 'X-Objectstack-Event': delivery.label } : {}), + ...(delivery.headers ?? {}), + }; + if (delivery.signingSecret) { + const sig = createHmac('sha256', delivery.signingSecret).update(body).digest('hex'); + headers['X-Objectstack-Signature'] = `sha256=${sig}`; + } + + const timeoutMs = delivery.timeoutMs ?? DEFAULT_HTTP_TIMEOUT_MS; + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + const start = Date.now(); + try { + const res = await fetchImpl(delivery.url, { + method: delivery.method ?? 'POST', + headers, + body, + signal: controller.signal, + }); + clearTimeout(timer); + const responseText = await safeReadBody(res); + const durationMs = Date.now() - start; + if (res.ok) { + return { success: true, httpStatus: res.status, responseBody: responseText, durationMs }; + } + const retriable = res.status === 408 || res.status === 429 || res.status >= 500; + return { + success: false, + retriable, + httpStatus: res.status, + responseBody: responseText, + error: `HTTP ${res.status}`, + durationMs, + }; + } catch (err: unknown) { + clearTimeout(timer); + const durationMs = Date.now() - start; + const e = err as { name?: string; message?: string }; + const error = e?.name === 'AbortError' ? `timeout after ${timeoutMs}ms` : e?.message ?? String(err); + return { success: false, retriable: true, error, durationMs }; + } +} + +async function safeReadBody(res: { text(): Promise }): Promise { + try { + const text = await res.text(); + return text.length > RESPONSE_BODY_CAP ? text.slice(0, RESPONSE_BODY_CAP) : text; + } catch { + return undefined; + } +} + +/** + * Stripe-style retry schedule. Returns the next delay (ms) given how many + * attempts have already happened, or `null` once the budget is exhausted. + * + * 1→~1s · 2→~10s · 3→~1m · 4→~10m · 5→~1h · 6→~6h · 7→~24h · 8+→dead + * + * Each delay is multiplied by jitter ∈ [0.8, 1.2). + */ +export function nextHttpRetryDelayMs( + attemptsSoFar: number, + rng: () => number = Math.random, +): number | null { + const SCHEDULE = [1_000, 10_000, 60_000, 600_000, 3_600_000, 21_600_000, 86_400_000]; + if (attemptsSoFar < 1 || attemptsSoFar > SCHEDULE.length) return null; + const base = SCHEDULE[attemptsSoFar - 1]; + const jitter = 0.8 + rng() * 0.4; + return Math.floor(base * jitter); +} + +/** + * Compose an {@link HttpAckResult} from an outcome, applying the retry schedule + * on retriable failures. + */ +export function classifyAttempt( + outcome: HttpAttemptOutcome, + attemptsSoFar: number, + now: number = Date.now(), + rng?: () => number, +): HttpAckResult { + if (outcome.success) return outcome; + if (!outcome.retriable) { + return { + success: false, + httpStatus: outcome.httpStatus, + responseBody: outcome.responseBody, + error: outcome.error, + durationMs: outcome.durationMs, + dead: true, + }; + } + const delay = nextHttpRetryDelayMs(attemptsSoFar + 1, rng); + if (delay === null) { + return { + success: false, + httpStatus: outcome.httpStatus, + responseBody: outcome.responseBody, + error: outcome.error, + durationMs: outcome.durationMs, + dead: true, + }; + } + return { + success: false, + httpStatus: outcome.httpStatus, + responseBody: outcome.responseBody, + error: outcome.error, + durationMs: outcome.durationMs, + nextRetryAt: now + delay, + }; +} + +/** Generate a fresh delivery id (UUID v4). Exposed for tests. */ +export function newDeliveryId(): string { + return randomUUID(); +} diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index af6e2203a..cf5c2a661 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -105,6 +105,34 @@ export type { export { NotificationRetention, DEFAULT_RETENTION_TARGETS } from './retention.js'; export type { NotificationRetentionOptions, RetentionTarget, PruneOutcome } from './retention.js'; +// Generic outbound-HTTP delivery outbox (ADR-0018 M3) — the raw-callout +// counterpart to the notification outbox, shared by the Flow `http` node and +// webhook fan-out. +export type { + IHttpOutbox, + HttpDelivery as HttpDeliveryRecord, + HttpDeliveryStatus, + EnqueueHttpInput, + HttpClaimOptions, + HttpAckResult, + HttpAckSuccess, + HttpAckFailure, +} from './http-outbox.js'; +export { HttpRedeliverError } from './http-outbox.js'; +export { SqlHttpOutbox, type SqlHttpOutboxOptions } from './sql-http-outbox.js'; +export { MemoryHttpOutbox } from './memory-http-outbox.js'; +export { HttpDispatcher } from './http-dispatcher.js'; +export type { HttpDispatcherOptions, HttpDispatcherLogger } from './http-dispatcher.js'; +export { + sendOnce as sendHttpOnce, + classifyAttempt as classifyHttpAttempt, + nextHttpRetryDelayMs, + newDeliveryId as newHttpDeliveryId, + DEFAULT_HTTP_TIMEOUT_MS, + type FetchImpl, + type HttpAttemptOutcome, +} from './http-sender.js'; + // Objects (metadata definitions) export { InboxMessage, @@ -113,4 +141,6 @@ export { NotificationPreference, NotificationSubscription, NotificationTemplate, + HttpDelivery, + SYS_HTTP_DELIVERY, } from './objects/index.js'; diff --git a/packages/services/service-messaging/src/memory-http-outbox.ts b/packages/services/service-messaging/src/memory-http-outbox.ts new file mode 100644 index 000000000..8944830e2 --- /dev/null +++ b/packages/services/service-messaging/src/memory-http-outbox.ts @@ -0,0 +1,148 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { randomUUID } from 'node:crypto'; +import { hashPartition } from './backoff.js'; +import { + HttpRedeliverError, + type EnqueueHttpInput, + type HttpAckResult, + type HttpClaimOptions, + type HttpDelivery, + type HttpDeliveryStatus, + type IHttpOutbox, +} from './http-outbox.js'; + +/** + * In-memory {@link IHttpOutbox} for tests and single-process development. + * Mirrors `MemoryWebhookOutbox`: atomic-claim semantics come for free from the + * single-threaded event loop operating on one `Map`. Two instances do NOT share + * state — pass the same instance to both dispatchers to simulate one DB. + */ +export class MemoryHttpOutbox implements IHttpOutbox { + private readonly rows = new Map(); + /** Dedup index keyed by `${source}::${dedupKey}` -> row id. */ + private readonly dedup = new Map(); + + async enqueue(input: EnqueueHttpInput): Promise { + const dedupKey = `${input.source}::${input.dedupKey}`; + const existing = this.dedup.get(dedupKey); + if (existing) return existing; + + const id = randomUUID(); + const now = Date.now(); + const row: HttpDelivery = { + id, + source: input.source, + refId: input.refId, + dedupKey: input.dedupKey, + label: input.label, + url: input.url, + method: input.method ?? 'POST', + headers: input.headers, + signingSecret: input.signingSecret, + timeoutMs: input.timeoutMs, + payload: input.payload, + status: 'pending', + attempts: 0, + createdAt: now, + updatedAt: now, + }; + this.rows.set(id, row); + this.dedup.set(dedupKey, id); + return id; + } + + async claim(opts: HttpClaimOptions): Promise { + const now = opts.now ?? Date.now(); + const claimed: HttpDelivery[] = []; + + for (const row of this.rows.values()) { + if ( + row.status === 'in_flight' && + row.claimedAt !== undefined && + now - row.claimedAt > opts.claimTtlMs + ) { + row.status = 'pending'; + row.claimedBy = undefined; + row.claimedAt = undefined; + row.updatedAt = now; + } + } + + for (const row of this.rows.values()) { + if (claimed.length >= opts.limit) break; + if (row.status !== 'pending') continue; + if (row.nextRetryAt !== undefined && row.nextRetryAt > now) continue; + if (opts.partition) { + const p = hashPartition(row.refId, opts.partition.count); + if (p !== opts.partition.index) continue; + } + row.status = 'in_flight'; + row.claimedBy = opts.nodeId; + row.claimedAt = now; + row.updatedAt = now; + claimed.push({ ...row }); + } + return claimed; + } + + async ack(id: string, result: HttpAckResult): Promise { + const row = this.rows.get(id); + if (!row) return; + const now = Date.now(); + row.attempts += 1; + row.lastAttemptedAt = now; + row.updatedAt = now; + row.claimedBy = undefined; + row.claimedAt = undefined; + row.responseCode = result.httpStatus; + row.responseBody = result.responseBody; + + let status: HttpDeliveryStatus; + if (result.success) { + status = 'success'; + row.nextRetryAt = undefined; + row.error = undefined; + } else if (result.dead) { + status = 'dead'; + row.error = result.error; + row.nextRetryAt = undefined; + } else { + status = 'pending'; + row.error = result.error; + row.nextRetryAt = result.nextRetryAt; + } + row.status = status; + } + + async list(filter?: { status?: HttpDeliveryStatus; source?: string }): Promise { + let all = Array.from(this.rows.values()).map((r) => ({ ...r })); + if (filter?.status) all = all.filter((r) => r.status === filter.status); + if (filter?.source) all = all.filter((r) => r.source === filter.source); + return all; + } + + async redeliver(id: string): Promise { + const row = this.rows.get(id); + if (!row) { + throw new HttpRedeliverError(`Delivery row '${id}' not found`, 'not_found'); + } + if (row.status !== 'success' && row.status !== 'failed' && row.status !== 'dead') { + throw new HttpRedeliverError( + `Delivery row '${id}' is '${row.status}', expected one of: success, failed, dead`, + 'not_eligible', + ); + } + const now = Date.now(); + row.status = 'pending'; + row.attempts = 0; + row.claimedBy = undefined; + row.claimedAt = undefined; + row.nextRetryAt = undefined; + row.error = undefined; + row.responseCode = undefined; + row.responseBody = undefined; + row.updatedAt = now; + return { ...row }; + } +} diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index f26f464e9..11461060d 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -6,7 +6,9 @@ import type { IDataEngine } from '@objectstack/spec/contracts'; import { MessagingService } from './messaging-service.js'; import { createInboxChannel } from './inbox-channel.js'; import { SqlNotificationOutbox } from './sql-outbox.js'; +import { SqlHttpOutbox } from './sql-http-outbox.js'; import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js'; +import { HttpDispatcher } from './http-dispatcher.js'; import { NotificationRetention } from './retention.js'; import { createEmailChannel } from './email-channel.js'; import { NotificationTemplateStore } from './template-renderer.js'; @@ -17,6 +19,7 @@ import { NotificationPreference, NotificationSubscription, NotificationTemplate, + HttpDelivery, } from './objects/index.js'; export interface MessagingServicePluginOptions { @@ -85,6 +88,7 @@ export class MessagingServicePlugin implements Plugin { private readonly options: Required; private dispatcher?: NotificationDispatcher; + private httpDispatcher?: HttpDispatcher; private retentionTimer?: ReturnType; constructor(options: MessagingServicePluginOptions = {}) { @@ -142,6 +146,7 @@ export class MessagingServicePlugin implements Plugin { NotificationPreference, NotificationSubscription, NotificationTemplate, + HttpDelivery, ], navigationContributions: [ { @@ -212,6 +217,24 @@ export class MessagingServicePlugin implements Plugin { ctx.logger.info( `[messaging] reliable delivery on (outbox + dispatcher, ${this.options.partitionCount} partitions${cluster ? ', clustered' : ', single-node'})`, ); + + // ADR-0018 M3: generic outbound-HTTP outbox + dispatcher. Backs + // the Flow `http` node (and, going forward, webhook fan-out) with + // the same retry / dead-letter substrate as notifications. + const httpOutbox = new SqlHttpOutbox(engine, { partitionCount: this.options.partitionCount }); + service.setHttpOutbox(httpOutbox); + this.httpDispatcher = new HttpDispatcher({ + nodeId: `http-${process.pid}-${randomUUID().slice(0, 8)}`, + outbox: httpOutbox, + cluster, + partitionCount: this.options.partitionCount, + intervalMs: this.options.dispatchIntervalMs, + logger: ctx.logger, + }); + this.httpDispatcher.start(); + ctx.logger.info( + `[messaging] HTTP delivery on (sys_http_delivery outbox + dispatcher, ${this.options.partitionCount} partitions)`, + ); }); } @@ -246,6 +269,8 @@ export class MessagingServicePlugin implements Plugin { async stop(): Promise { await this.dispatcher?.stop(); this.dispatcher = undefined; + await this.httpDispatcher?.stop(); + this.httpDispatcher = undefined; if (this.retentionTimer) { clearInterval(this.retentionTimer); this.retentionTimer = undefined; diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 8054a9a38..65c083116 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -9,6 +9,7 @@ import type { import { RecipientResolver } from './recipient-resolver.js'; import { PreferenceResolver, type PreferenceTarget } from './preference-resolver.js'; import type { INotificationOutbox } from './outbox.js'; +import type { EnqueueHttpInput, IHttpOutbox } from './http-outbox.js'; /** The L2 event object every `emit()` writes one row to (ADR-0030). */ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; @@ -125,6 +126,7 @@ export class MessagingService { private readonly resolver: RecipientResolver; private readonly preferences: PreferenceResolver; private outbox?: INotificationOutbox; + private httpOutbox?: IHttpOutbox; constructor(private readonly ctx: MessagingServiceContext) { this.now = ctx.now ?? (() => new Date().toISOString()); @@ -150,6 +152,35 @@ export class MessagingService { this.outbox = outbox; } + /** + * Attach the generic outbound-HTTP delivery outbox (ADR-0018 M3). Wired by + * the plugin at `kernel:ready` once the data engine is resolvable. Once set, + * {@link enqueueHttp} persists durable rows the {@link HttpDispatcher} + * drains with retry / dead-letter; the Flow `http` node enqueues through it. + */ + setHttpOutbox(outbox: IHttpOutbox): void { + this.httpOutbox = outbox; + } + + /** + * Whether durable HTTP delivery is available. Callers (e.g. the `http` node) + * fall back to a direct, non-durable send when this is `false`. + */ + isHttpDeliveryReady(): boolean { + return this.httpOutbox !== undefined; + } + + /** + * Enqueue a durable outbound-HTTP delivery (ADR-0018 M3). Returns the row id. + * Throws if no HTTP outbox is wired — guard with {@link isHttpDeliveryReady}. + */ + async enqueueHttp(input: EnqueueHttpInput): Promise { + if (!this.httpOutbox) { + throw new Error('messaging: HTTP delivery outbox not configured (no data engine / reliableDelivery off)'); + } + return this.httpOutbox.enqueue(input); + } + /** Register a channel implementation. A duplicate id warns and replaces. */ registerChannel(channel: MessagingChannel): void { if (this.channels.has(channel.id)) { diff --git a/packages/services/service-messaging/src/objects/http-delivery.object.ts b/packages/services/service-messaging/src/objects/http-delivery.object.ts new file mode 100644 index 000000000..deb09a9f0 --- /dev/null +++ b/packages/services/service-messaging/src/objects/http-delivery.object.ts @@ -0,0 +1,167 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { Field, ObjectSchema } from '@objectstack/spec/data'; + +/** + * `sys_http_delivery` — durable outbox row for one outbound-HTTP attempt + * (ADR-0018 M3). + * + * The raw-callout counterpart to `sys_notification_delivery` (recipient/channel + * deliveries). Shared by the Flow `http` node executor and webhook fan-out so + * both inherit retry / idempotency / dead-letter from one substrate. Generalises + * `sys_webhook_delivery`: `webhook_id`→`ref_id`, `event_id`→`dedup_key`, + * `event_type`→`label`, `secret`→`signing_secret`. + * + * Designed for the SqlHttpOutbox claim algorithm: + * 1. Producers INSERT pending rows (dedup'd by `(source, dedup_key)`). + * 2. The per-partition lock-holder runs: + * SELECT id WHERE status='pending' AND partition_key=? AND (next_retry_at <= now OR null) + * UPDATE SET status='in_flight' WHERE id IN (...) AND status='pending' ← atomic claim + * POST to target URL + * UPDATE SET status=success/pending/dead, attempts=attempts+1, ... + * + * `partition_key` is precomputed on enqueue (`hash(ref_id) mod N`) so the + * dispatcher filters cheaply without DB-side hash functions. + * + * @namespace sys + */ +export const HttpDelivery = ObjectSchema.create({ + name: 'sys_http_delivery', + label: 'HTTP Delivery', + pluralLabel: 'HTTP Deliveries', + icon: 'globe', + isSystem: true, + managedBy: 'system', + userActions: { create: false, edit: false, delete: false, import: false }, + description: + 'Durable outbox row for one outbound-HTTP attempt (ADR-0018). Managed by @objectstack/service-messaging; do not write directly.', + displayNameField: 'id', + titleFormat: '{label} → {url}', + compactLayout: ['source', 'url', 'status', 'attempts', 'next_retry_at'], + + listViews: { + recent: { + type: 'grid', + name: 'recent', + label: 'Recent', + data: { provider: 'object', object: 'sys_http_delivery' }, + columns: ['source', 'label', 'url', 'status', 'attempts', 'response_code', 'updated_at'], + sort: [{ field: 'updated_at', order: 'desc' }], + pagination: { pageSize: 50 }, + }, + failures: { + type: 'grid', + name: 'failures', + label: 'Failures', + data: { provider: 'object', object: 'sys_http_delivery' }, + columns: ['source', 'url', 'status', 'attempts', 'response_code', 'error', 'updated_at'], + filter: [{ field: 'status', operator: 'in', value: ['failed', 'dead'] }], + sort: [{ field: 'updated_at', order: 'desc' }], + pagination: { pageSize: 50 }, + }, + pending: { + type: 'grid', + name: 'pending', + label: 'Pending', + data: { provider: 'object', object: 'sys_http_delivery' }, + columns: ['source', 'url', 'attempts', 'next_retry_at', 'updated_at'], + filter: [{ field: 'status', operator: 'equals', value: 'pending' }], + sort: [{ field: 'next_retry_at', order: 'asc' }], + pagination: { pageSize: 50 }, + }, + }, + + fields: { + id: Field.text({ + label: 'Delivery ID', + required: true, + maxLength: 64, + description: 'UUID — also doubles as the receiver-side idempotency key', + }), + + source: Field.text({ + label: 'Source', + required: true, + maxLength: 32, + description: "Provenance domain, e.g. 'webhook' | 'flow'. UNIQUE(source, dedup_key).", + }), + + ref_id: Field.text({ + label: 'Ref ID', + required: true, + maxLength: 128, + description: 'Partition/ordering anchor within source (webhook id, flow id, …)', + }), + + dedup_key: Field.text({ + label: 'Dedup Key', + required: true, + maxLength: 191, + description: 'UNIQUE(source, dedup_key) for at-most-once enqueue', + }), + + label: Field.text({ + label: 'Label', + required: false, + maxLength: 191, + description: 'Diagnostic label / event type — surfaced on X-Objectstack-Event', + }), + + url: Field.text({ + label: 'Target URL', + required: true, + maxLength: 2048, + description: 'Snapshotted at enqueue so config edits do not rewrite live rows', + }), + + method: Field.text({ label: 'Method', required: false, maxLength: 10 }), + headers_json: Field.textarea({ label: 'Headers JSON', required: false }), + signing_secret: Field.text({ label: 'HMAC Secret', required: false, maxLength: 256 }), + timeout_ms: Field.number({ label: 'Timeout (ms)', required: false }), + payload_json: Field.textarea({ label: 'Payload JSON', required: true }), + + partition_key: Field.number({ + label: 'Partition', + required: true, + description: 'hash(ref_id) mod partitionCount — precomputed for cheap WHERE', + }), + + status: Field.text({ + label: 'Status', + required: true, + defaultValue: 'pending', + maxLength: 16, + description: 'pending | in_flight | success | failed | dead', + }), + + attempts: Field.number({ + label: 'Attempts', + required: true, + defaultValue: 0, + description: 'Number of attempts made so far', + }), + + claimed_by: Field.text({ label: 'Claimed By', required: false, maxLength: 128 }), + claimed_at: Field.number({ label: 'Claimed At (ms)', required: false }), + next_retry_at: Field.number({ label: 'Next Retry At (ms)', required: false }), + last_attempted_at: Field.number({ label: 'Last Attempted At (ms)', required: false }), + response_code: Field.number({ label: 'HTTP Status', required: false }), + response_body: Field.textarea({ label: 'Response Body (capped)', required: false }), + error: Field.textarea({ label: 'Error', required: false }), + + created_at: Field.number({ label: 'Created At (ms)', required: true }), + updated_at: Field.number({ label: 'Updated At (ms)', required: true }), + }, + + indexes: [ + { fields: ['source', 'dedup_key'], unique: true }, + // Hot path: claim query + { fields: ['status', 'partition_key', 'next_retry_at'] }, + // Reaper: scan stale in_flight rows by claimed_at + { fields: ['status', 'claimed_at'] }, + { fields: ['source', 'ref_id'] }, + ], +}); + +/** Canonical object name — exported so SqlHttpOutbox callers can override. */ +export const SYS_HTTP_DELIVERY = 'sys_http_delivery' as const; diff --git a/packages/services/service-messaging/src/objects/index.ts b/packages/services/service-messaging/src/objects/index.ts index 81defeb49..fd92c2f7c 100644 --- a/packages/services/service-messaging/src/objects/index.ts +++ b/packages/services/service-messaging/src/objects/index.ts @@ -6,3 +6,4 @@ export { NotificationDelivery } from './notification-delivery.object.js'; export { NotificationPreference } from './notification-preference.object.js'; export { NotificationSubscription } from './notification-subscription.object.js'; export { NotificationTemplate } from './notification-template.object.js'; +export { HttpDelivery, SYS_HTTP_DELIVERY } from './http-delivery.object.js'; diff --git a/packages/services/service-messaging/src/sql-http-outbox.ts b/packages/services/service-messaging/src/sql-http-outbox.ts new file mode 100644 index 000000000..e71e4052f --- /dev/null +++ b/packages/services/service-messaging/src/sql-http-outbox.ts @@ -0,0 +1,277 @@ +// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. + +import { randomUUID } from 'node:crypto'; +import type { IDataEngine } from '@objectstack/spec/contracts'; +import { hashPartition } from './backoff.js'; +import { + HttpRedeliverError, + type EnqueueHttpInput, + type HttpAckResult, + type HttpClaimOptions, + type HttpDelivery, + type HttpDeliveryStatus, + type IHttpOutbox, +} from './http-outbox.js'; +import { SYS_HTTP_DELIVERY } from './objects/http-delivery.object.js'; + +export interface SqlHttpOutboxOptions { + /** + * Total partition count — MUST match the dispatcher's `partitionCount`. + * Used at enqueue time to precompute `partition_key`. + */ + partitionCount: number; + /** Object name to read/write. Defaults to `sys_http_delivery`. */ + objectName?: string; +} + +interface DeliveryRow { + id: string; + source: string; + ref_id: string; + dedup_key: string; + label?: string | null; + url: string; + method?: string | null; + headers_json?: string | null; + signing_secret?: string | null; + timeout_ms?: number | null; + payload_json: string; + partition_key: number; + status: HttpDeliveryStatus; + attempts: number; + claimed_by?: string | null; + claimed_at?: number | null; + next_retry_at?: number | null; + last_attempted_at?: number | null; + response_code?: number | null; + response_body?: string | null; + error?: string | null; + created_at: number; + updated_at: number; +} + +/** + * Durable {@link IHttpOutbox} backed by ObjectQL — the production storage impl + * for the generic outbound-HTTP outbox (ADR-0018 M3). Works against any + * registered driver through the driver-agnostic `IDataEngine` API. + * + * Mirrors `SqlWebhookOutbox` exactly (cluster-lock + atomic + * `UPDATE WHERE status='pending'` for the exactly-once claim; precomputed + * `partition_key`; SELECT-then-INSERT dedup converging on the unique index). + * Dedup uniqueness is `(source, dedup_key)`; partition affinity is on `ref_id`. + */ +export class SqlHttpOutbox implements IHttpOutbox { + private readonly objectName: string; + private readonly partitionCount: number; + + constructor( + private readonly engine: IDataEngine, + opts: SqlHttpOutboxOptions, + ) { + if (opts.partitionCount <= 0) { + throw new Error('SqlHttpOutbox: partitionCount must be > 0'); + } + this.objectName = opts.objectName ?? SYS_HTTP_DELIVERY; + this.partitionCount = opts.partitionCount; + } + + async enqueue(input: EnqueueHttpInput): Promise { + const existing = await this.engine.findOne(this.objectName, { + where: { source: input.source, dedup_key: input.dedupKey }, + fields: ['id'], + }); + if (existing?.id) return existing.id as string; + + const id = randomUUID(); + const now = Date.now(); + const row: DeliveryRow = { + id, + source: input.source, + ref_id: input.refId, + dedup_key: input.dedupKey, + label: input.label, + url: input.url, + method: input.method ?? 'POST', + headers_json: input.headers ? JSON.stringify(input.headers) : undefined, + signing_secret: input.signingSecret, + timeout_ms: input.timeoutMs, + payload_json: JSON.stringify(input.payload ?? null), + partition_key: hashPartition(input.refId, this.partitionCount), + status: 'pending', + attempts: 0, + created_at: now, + updated_at: now, + }; + try { + await this.engine.insert(this.objectName, row); + return id; + } catch (err) { + const winner = await this.engine.findOne(this.objectName, { + where: { source: input.source, dedup_key: input.dedupKey }, + fields: ['id'], + }); + if (winner?.id) return winner.id as string; + throw err; + } + } + + async claim(opts: HttpClaimOptions): Promise { + const now = opts.now ?? Date.now(); + + // 1. Reap stale in_flight rows — visibility-timeout recovery. + await this.engine.update( + this.objectName, + { status: 'pending', claimed_by: null, claimed_at: null, updated_at: now }, + { + where: { + status: 'in_flight', + claimed_at: { $lt: now - opts.claimTtlMs }, + }, + multi: true, + }, + ); + + // 2. Pick candidate ids. + const partitionFilter = opts.partition ? { partition_key: opts.partition.index } : {}; + const candidates = await this.engine.find(this.objectName, { + where: { + status: 'pending', + ...partitionFilter, + $or: [{ next_retry_at: null }, { next_retry_at: { $lte: now } }], + }, + fields: ['id'], + limit: opts.limit, + }); + if (candidates.length === 0) return []; + + const ids = (candidates as Array<{ id: string }>).map((c) => c.id); + + // 3. Atomic claim. WHERE status='pending' rejects rows another worker took. + await this.engine.update( + this.objectName, + { status: 'in_flight', claimed_by: opts.nodeId, claimed_at: now, updated_at: now }, + { where: { id: { $in: ids }, status: 'pending' }, multi: true }, + ); + + // 4. Read back the rows we actually own. + const claimed = (await this.engine.find(this.objectName, { + where: { id: { $in: ids }, claimed_by: opts.nodeId, claimed_at: now, status: 'in_flight' }, + })) as DeliveryRow[]; + + return claimed.map((r) => this.toDelivery(r)); + } + + async ack(id: string, result: HttpAckResult): Promise { + const current = (await this.engine.findOne(this.objectName, { + where: { id }, + fields: ['attempts'], + })) as { attempts?: number } | null; + if (!current) return; + + const now = Date.now(); + let status: HttpDeliveryStatus; + let nextRetryAt: number | null; + let error: string | null; + + if (result.success) { + status = 'success'; + nextRetryAt = null; + error = null; + } else if (result.dead) { + status = 'dead'; + nextRetryAt = null; + error = result.error ?? null; + } else { + status = 'pending'; + nextRetryAt = result.nextRetryAt ?? null; + error = result.error ?? null; + } + + await this.engine.update( + this.objectName, + { + status, + attempts: (current.attempts ?? 0) + 1, + last_attempted_at: now, + claimed_by: null, + claimed_at: null, + response_code: result.httpStatus ?? null, + response_body: result.responseBody ?? null, + next_retry_at: nextRetryAt, + error, + updated_at: now, + }, + { where: { id }, multi: false }, + ); + } + + async list(filter?: { status?: HttpDeliveryStatus; source?: string }): Promise { + const where: Record = {}; + if (filter?.status) where.status = filter.status; + if (filter?.source) where.source = filter.source; + const rows = (await this.engine.find(this.objectName, { where })) as DeliveryRow[]; + return rows.map((r) => this.toDelivery(r)); + } + + async redeliver(id: string): Promise { + const current = (await this.engine.findOne(this.objectName, { where: { id } })) as DeliveryRow | null; + if (!current) { + throw new HttpRedeliverError(`Delivery row '${id}' not found`, 'not_found'); + } + if (current.status !== 'success' && current.status !== 'failed' && current.status !== 'dead') { + throw new HttpRedeliverError( + `Delivery row '${id}' is '${current.status}', expected one of: success, failed, dead`, + 'not_eligible', + ); + } + const now = Date.now(); + await this.engine.update( + this.objectName, + { + status: 'pending', + attempts: 0, + claimed_by: null, + claimed_at: null, + next_retry_at: null, + last_attempted_at: null, + response_code: null, + response_body: null, + error: null, + updated_at: now, + }, + { where: { id, status: { $in: ['success', 'failed', 'dead'] } }, multi: false }, + ); + const after = (await this.engine.findOne(this.objectName, { where: { id } })) as DeliveryRow | null; + if (!after || after.status !== 'pending') { + throw new HttpRedeliverError(`Delivery row '${id}' state changed during redeliver`, 'not_eligible'); + } + return this.toDelivery(after); + } + + private toDelivery(r: DeliveryRow): HttpDelivery { + return { + id: r.id, + source: r.source, + refId: r.ref_id, + dedupKey: r.dedup_key, + label: r.label ?? undefined, + url: r.url, + method: r.method ?? undefined, + headers: r.headers_json ? JSON.parse(r.headers_json) : undefined, + signingSecret: r.signing_secret ?? undefined, + timeoutMs: r.timeout_ms ?? undefined, + payload: JSON.parse(r.payload_json), + status: r.status, + attempts: r.attempts, + claimedBy: r.claimed_by ?? undefined, + claimedAt: r.claimed_at ?? undefined, + nextRetryAt: r.next_retry_at ?? undefined, + lastAttemptedAt: r.last_attempted_at ?? undefined, + responseCode: r.response_code ?? undefined, + responseBody: r.response_body ?? undefined, + error: r.error ?? undefined, + createdAt: r.created_at, + updatedAt: r.updated_at, + }; + } +} diff --git a/packages/spec/src/automation/flow.zod.ts b/packages/spec/src/automation/flow.zod.ts index 405785525..ff15f819a 100644 --- a/packages/spec/src/automation/flow.zod.ts +++ b/packages/spec/src/automation/flow.zod.ts @@ -29,7 +29,8 @@ export const FlowNodeAction = z.enum([ 'update_record', // CRUD: Update 'delete_record', // CRUD: Delete 'get_record', // CRUD: Get/Query - 'http_request', // Webhook/API Call + 'http', // Outbound HTTP callout (ADR-0018 M3) — canonical; outbox-backed when durable + 'http_request', // Deprecated alias of `http` (ADR-0018 M3) 'notify', // Outbound notification (ADR-0012) — dispatched via the messaging service 'script', // Custom Script (JS/TS) 'screen', // Screen / User-Input Element