From 7a3bc625a8bfbc65387d53e8db2e54fb54691575 Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Tue, 2 Jun 2026 02:44:33 +0800 Subject: [PATCH] feat(webhooks,messaging)!: webhooks deliver via the shared HTTP outbox (ADR-0018 M3 Phase 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plugin-webhooks no longer ships its own delivery engine. The durable outbox, cluster-coordinated dispatcher, retry/backoff/dead-letter, and retention are removed and replaced by the generic sys_http_delivery outbox + HttpDispatcher in service-messaging (promoted in #1483). Webhooks keep only their domain concerns: the sys_webhook config object, the AutoEnqueuer, and the redeliver endpoint. service-messaging: - MessagingService gains redeliverHttp(id) + listHttp(filter) over the HTTP outbox plugin-webhooks (BREAKING): - requires MessagingServicePlugin (declared dependency) - AutoEnqueuer enqueues source='webhook' rows via messaging.enqueueHttp (dedupKey ':', refId=webhookId, label=eventType, signingSecret=secret) — fire-and-forget, off the write path - redeliver endpoint backed by messaging.redeliverHttp - removed sys_webhook_delivery object; Setup nav points at sys_http_delivery - deleted outbox/sql-outbox/memory-outbox/dispatcher/http-sender/partition/ retention + their tests; dropped the ./sql subpath export - WebhookOutboxPluginOptions reduced to { autoEnqueue } - AutoEnqueuer constructor takes an HttpEnqueueFn instead of IWebhookOutbox No data migration (system not yet live). Tests: plugin-webhooks auto-enqueuer (10) rewired to the shared enqueue; service-messaging 108 green. Co-Authored-By: Claude Opus 4.8 --- .../adr-0018-m5-webhooks-shared-outbox.md | 32 ++ packages/plugins/plugin-webhooks/package.json | 11 +- .../plugin-webhooks/src/auto-enqueuer.test.ts | 199 +++---- .../plugin-webhooks/src/auto-enqueuer.ts | 65 ++- .../plugin-webhooks/src/dispatcher.test.ts | 324 ------------ .../plugins/plugin-webhooks/src/dispatcher.ts | 218 -------- .../plugin-webhooks/src/http-sender.ts | 187 ------- packages/plugins/plugin-webhooks/src/index.ts | 53 +- .../plugin-webhooks/src/memory-outbox.test.ts | 86 --- .../plugin-webhooks/src/memory-outbox.ts | 155 ------ .../plugins/plugin-webhooks/src/outbox.ts | 175 ------- .../plugins/plugin-webhooks/src/partition.ts | 19 - .../plugin-webhooks/src/retention.test.ts | 116 ----- .../plugins/plugin-webhooks/src/retention.ts | 144 ----- .../plugins/plugin-webhooks/src/schema.ts | 27 +- .../plugin-webhooks/src/sql-outbox.test.ts | 490 ------------------ .../plugins/plugin-webhooks/src/sql-outbox.ts | 343 ------------ .../src/sys-webhook-delivery.object.ts | 224 -------- .../src/webhook-outbox-plugin.ts | 376 +++----------- .../plugins/plugin-webhooks/tsup.config.ts | 2 +- .../src/messaging-service.ts | 21 +- pnpm-lock.yaml | 10 +- 22 files changed, 282 insertions(+), 2995 deletions(-) create mode 100644 .changeset/adr-0018-m5-webhooks-shared-outbox.md delete mode 100644 packages/plugins/plugin-webhooks/src/dispatcher.test.ts delete mode 100644 packages/plugins/plugin-webhooks/src/dispatcher.ts delete mode 100644 packages/plugins/plugin-webhooks/src/http-sender.ts delete mode 100644 packages/plugins/plugin-webhooks/src/memory-outbox.test.ts delete mode 100644 packages/plugins/plugin-webhooks/src/memory-outbox.ts delete mode 100644 packages/plugins/plugin-webhooks/src/outbox.ts delete mode 100644 packages/plugins/plugin-webhooks/src/partition.ts delete mode 100644 packages/plugins/plugin-webhooks/src/retention.test.ts delete mode 100644 packages/plugins/plugin-webhooks/src/retention.ts delete mode 100644 packages/plugins/plugin-webhooks/src/sql-outbox.test.ts delete mode 100644 packages/plugins/plugin-webhooks/src/sql-outbox.ts delete mode 100644 packages/plugins/plugin-webhooks/src/sys-webhook-delivery.object.ts diff --git a/.changeset/adr-0018-m5-webhooks-shared-outbox.md b/.changeset/adr-0018-m5-webhooks-shared-outbox.md new file mode 100644 index 000000000..2c0922910 --- /dev/null +++ b/.changeset/adr-0018-m5-webhooks-shared-outbox.md @@ -0,0 +1,32 @@ +--- +"@objectstack/plugin-webhooks": major +"@objectstack/service-messaging": minor +--- + +ADR-0018 M3 (Phase 5): `plugin-webhooks` now delivers through the shared +`service-messaging` HTTP outbox instead of its own. + +The webhook delivery substrate — durable outbox, cluster-coordinated dispatcher, +retry/backoff/dead-letter, retention — is removed from `plugin-webhooks` and +replaced by the generic `sys_http_delivery` outbox + `HttpDispatcher` in +`@objectstack/service-messaging`. Webhooks keep only their domain concerns: the +`sys_webhook` config object, the `AutoEnqueuer` (now enqueues `source: 'webhook'` +rows via `messaging.enqueueHttp`), and the redeliver admin endpoint (now backed +by `messaging.redeliverHttp`). + +**`@objectstack/service-messaging`:** `MessagingService` gains `redeliverHttp(id)` +and `listHttp(filter)` over the HTTP outbox. + +**`@objectstack/plugin-webhooks` — BREAKING:** + +- Now **requires** `MessagingServicePlugin` (declared as a plugin dependency). +- Removed exports: `WebhookDispatcher`, `MemoryWebhookOutbox`, `SqlWebhookOutbox` + (and the `./sql` subpath), `DeliveryRetentionSweeper`, `hashPartition`, + `sendOnce` / `classifyAttempt` / `nextRetryDelayMs`, and the `IWebhookOutbox` / + `WebhookDelivery` / `EnqueueInput` / `AckResult` / `RedeliverError` types. +- Removed the `sys_webhook_delivery` object — webhook deliveries are now rows in + `sys_http_delivery` (`source = 'webhook'`). The Setup nav points there. +- `AutoEnqueuer`'s constructor takes an `HttpEnqueueFn` instead of an + `IWebhookOutbox`. +- `WebhookOutboxPluginOptions` reduced to `{ autoEnqueue }` (dispatcher / outbox / + retention / nodeId options removed — those now live on `MessagingServicePlugin`). diff --git a/packages/plugins/plugin-webhooks/package.json b/packages/plugins/plugin-webhooks/package.json index 32a3d28ff..cd292227e 100644 --- a/packages/plugins/plugin-webhooks/package.json +++ b/packages/plugins/plugin-webhooks/package.json @@ -12,11 +12,6 @@ "import": "./dist/index.js", "require": "./dist/index.cjs" }, - "./sql": { - "types": "./dist/sql-outbox.d.ts", - "import": "./dist/sql-outbox.js", - "require": "./dist/sql-outbox.cjs" - }, "./schema": { "types": "./dist/schema.d.ts", "import": "./dist/schema.js", @@ -29,10 +24,8 @@ }, "dependencies": { "@objectstack/core": "workspace:*", - "@objectstack/platform-objects": "workspace:*", - "@objectstack/service-cluster": "workspace:*", - "@objectstack/spec": "workspace:*", - "@objectstack/types": "workspace:*" + "@objectstack/service-messaging": "workspace:*", + "@objectstack/spec": "workspace:*" }, "devDependencies": { "@types/node": "^25.9.1", diff --git a/packages/plugins/plugin-webhooks/src/auto-enqueuer.test.ts b/packages/plugins/plugin-webhooks/src/auto-enqueuer.test.ts index a8c718fa8..e80406ef0 100644 --- a/packages/plugins/plugin-webhooks/src/auto-enqueuer.test.ts +++ b/packages/plugins/plugin-webhooks/src/auto-enqueuer.test.ts @@ -3,18 +3,17 @@ /** * AutoEnqueuer end-to-end test. * - * Verifies that the bridge between `IRealtimeService` (data events) and - * `IWebhookOutbox` (delivery rows) works as documented: + * Verifies the bridge between `IRealtimeService` (data events) and the shared + * `service-messaging` HTTP outbox (ADR-0018 M3 — enqueue via `messaging.enqueueHttp`): * * - On startup, subscription rules are loaded from the engine. * - `data.record.created/updated/deleted` events fan out to matching - * `sys_webhook` rows. + * `sys_webhook` rows, enqueued as `source: 'webhook'`. * - The `triggers` CSV column filters which actions fire. * - The `object_name` field scopes events to a specific object. * - Edits to `sys_webhook` self-heal the cache without restart. * - Enqueue is fire-and-forget (handler never throws or blocks). - * - The deterministic eventId means two replays of the same event - * produce one outbox row (dedup via the underlying outbox). + * - The deterministic dedupKey (`:`) collapses replays. */ import { describe, expect, it, vi } from 'vitest'; @@ -24,13 +23,31 @@ import type { RealtimeEventHandler, RealtimeEventPayload, } from '@objectstack/spec/contracts'; -import { AutoEnqueuer } from './auto-enqueuer.js'; -import { MemoryWebhookOutbox } from './memory-outbox.js'; +import type { EnqueueHttpInput } from '@objectstack/service-messaging'; +import { AutoEnqueuer, type HttpEnqueueFn } from './auto-enqueuer.js'; // --------------------------------------------------------------------------- // Fakes // --------------------------------------------------------------------------- +/** + * Records `enqueueHttp` calls and dedups on `(source, dedupKey)` — mirroring the + * shared outbox's UNIQUE constraint so the replay test still holds. + */ +function makeRecorder() { + const calls: EnqueueHttpInput[] = []; + const seen = new Map(); + const enqueue: HttpEnqueueFn = async (input) => { + const key = `${input.source}::${input.dedupKey}`; + const existing = seen.get(key); + if (existing) return existing; + seen.set(key, key); + calls.push(input); + return key; + }; + return { enqueue, calls }; +} + class FakeRealtime implements IRealtimeService { private subs = new Map(); private n = 0; @@ -42,7 +59,7 @@ class FakeRealtime implements IRealtimeService { await sub.handler(event); } } - async subscribe(channel: string, handler: any, opts?: any): Promise { + async subscribe(_channel: string, handler: any, opts?: any): Promise { const id = `s-${++this.n}`; this.subs.set(id, { handler, opts }); return id; @@ -62,9 +79,7 @@ class FakeEngine implements IDataEngine { async find(name: string, q?: any): Promise { const all = this.rows[name] ?? []; if (!q?.where) return all; - return all.filter((r) => - Object.entries(q.where).every(([k, v]) => r[k] === v), - ); + return all.filter((r) => Object.entries(q.where).every(([k, v]) => r[k] === v)); } async findOne(name: string, q?: any): Promise { return (await this.find(name, q))[0] ?? null; @@ -77,10 +92,7 @@ class FakeEngine implements IDataEngine { async update(name: string, data: any, opts?: any): Promise { const arr = this.rows[name] ?? []; for (const r of arr) { - if ( - opts?.where && - Object.entries(opts.where).every(([k, v]) => r[k] === v) - ) { + if (opts?.where && Object.entries(opts.where).every(([k, v]) => r[k] === v)) { Object.assign(r, data); } } @@ -90,11 +102,7 @@ class FakeEngine implements IDataEngine { const arr = this.rows[name] ?? []; const before = arr.length; this.rows[name] = arr.filter( - (r) => - !( - opts?.where && - Object.entries(opts.where).every(([k, v]) => r[k] === v) - ), + (r) => !(opts?.where && Object.entries(opts.where).every(([k, v]) => r[k] === v)), ); return { affected: before - this.rows[name].length }; } @@ -139,7 +147,6 @@ function event( } async function flush() { - // Let microtasks run — fire-and-forget enqueues return on next tick. await new Promise((r) => setTimeout(r, 0)); } @@ -151,48 +158,41 @@ describe('AutoEnqueuer', () => { it('enqueues a delivery when a matching data event fires', async () => { const engine = new FakeEngine({ sys_webhook: [webhook()] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1', name: 'Alice' })); await flush(); - const rows = await outbox.list(); - expect(rows).toHaveLength(1); - expect(rows[0].url).toBe('https://hooks.example/wh'); - expect(rows[0].eventType).toBe('data.record.created'); - expect((rows[0].payload as any).recordId).toBe('c-1'); + expect(calls).toHaveLength(1); + expect(calls[0].source).toBe('webhook'); + expect(calls[0].refId).toBe('wh-1'); + expect(calls[0].url).toBe('https://hooks.example/wh'); + expect(calls[0].label).toBe('data.record.created'); + expect((calls[0].payload as any).recordId).toBe('c-1'); await ae.stop(); }); it('skips events for other objects', async () => { const engine = new FakeEngine({ sys_webhook: [webhook({ object_name: 'contact' })] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'lead', { id: 'l-1' })); await flush(); - expect(await outbox.list()).toHaveLength(0); + expect(calls).toHaveLength(0); await ae.stop(); }); it('respects the triggers CSV (create-only webhook ignores updates)', async () => { - const engine = new FakeEngine({ - sys_webhook: [webhook({ triggers: 'create' })], - }); + const engine = new FakeEngine({ sys_webhook: [webhook({ triggers: 'create' })] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); @@ -200,9 +200,8 @@ describe('AutoEnqueuer', () => { await realtime.publish(event('deleted', 'contact', { id: 'c-1' }, '2026-05-24T00:00:02.000Z')); await flush(); - const rows = await outbox.list(); - expect(rows).toHaveLength(1); - expect(rows[0].eventType).toBe('data.record.created'); + expect(calls).toHaveLength(1); + expect(calls[0].label).toBe('data.record.created'); await ae.stop(); }); @@ -214,18 +213,15 @@ describe('AutoEnqueuer', () => { ], }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); await flush(); - const rows = await outbox.list(); - expect(rows).toHaveLength(2); - expect(rows.map((r) => r.url).sort()).toEqual([ + expect(calls).toHaveLength(2); + expect(calls.map((r) => r.url).sort()).toEqual([ 'https://amplitude.test', 'https://slack.test', ]); @@ -233,58 +229,44 @@ describe('AutoEnqueuer', () => { }); it('skips inactive webhooks', async () => { - const engine = new FakeEngine({ - sys_webhook: [webhook({ active: false })], - }); + const engine = new FakeEngine({ sys_webhook: [webhook({ active: false })] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); await flush(); - expect(await outbox.list()).toHaveLength(0); + expect(calls).toHaveLength(0); await ae.stop(); }); it('skips manual-only webhooks (no triggers)', async () => { - const engine = new FakeEngine({ - sys_webhook: [webhook({ triggers: '' })], - }); + const engine = new FakeEngine({ sys_webhook: [webhook({ triggers: '' })] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); await flush(); - expect(await outbox.list()).toHaveLength(0); + expect(calls).toHaveLength(0); await ae.stop(); }); it('self-heals the cache when sys_webhook changes', async () => { - // Start with no webhooks; add one via the engine; the next event - // should be enqueued without an explicit refresh() call. const engine = new FakeEngine({ sys_webhook: [] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); await flush(); - expect(await outbox.list()).toHaveLength(0); + expect(calls).toHaveLength(0); - // Admin adds a webhook through the API and the engine publishes - // a data.record.created event for sys_webhook itself. await engine.insert('sys_webhook', webhook()); await realtime.publish({ type: 'data.record.created', @@ -295,56 +277,45 @@ describe('AutoEnqueuer', () => { await flush(); await flush(); // Two ticks: the self-heal handler itself awaits refresh - await realtime.publish( - event('created', 'contact', { id: 'c-2' }, '2026-05-24T00:01:01.000Z'), - ); + await realtime.publish(event('created', 'contact', { id: 'c-2' }, '2026-05-24T00:01:01.000Z')); await flush(); - const rows = await outbox.list(); - expect(rows).toHaveLength(1); - expect((rows[0].payload as any).recordId).toBe('c-2'); + expect(calls).toHaveLength(1); + expect((calls[0].payload as any).recordId).toBe('c-2'); await ae.stop(); }); - it('uses deterministic eventId so dedup catches replays', async () => { + it('uses a deterministic dedupKey so replays collapse', async () => { const engine = new FakeEngine({ sys_webhook: [webhook()] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - }); + const { enqueue, calls } = makeRecorder(); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); - // Publish identical event twice — outbox dedup must collapse. const evt = event('created', 'contact', { id: 'c-1' }); await realtime.publish(evt); await realtime.publish(evt); await flush(); - expect(await outbox.list()).toHaveLength(1); + expect(calls).toHaveLength(1); await ae.stop(); }); it('handler is fire-and-forget (publish does not block on enqueue)', async () => { const engine = new FakeEngine({ sys_webhook: [webhook()] }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); let slowResolve!: () => void; const blocker = new Promise((res) => { slowResolve = res; }); - - // Wrap outbox to make enqueue slow. - const slow: typeof outbox = Object.assign(outbox, { - enqueue: async (...args: Parameters) => { - await blocker; - return MemoryWebhookOutbox.prototype.enqueue.apply(outbox, args); - }, - }); - - const ae = new AutoEnqueuer(engine, realtime, slow, { - refreshIntervalMs: 0, - }); + const calls: EnqueueHttpInput[] = []; + const enqueue: HttpEnqueueFn = async (input) => { + await blocker; + calls.push(input); + return 'id'; + }; + + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0 }); await ae.start(); const before = Date.now(); @@ -354,7 +325,7 @@ describe('AutoEnqueuer', () => { slowResolve(); await flush(); - expect(await outbox.list()).toHaveLength(1); + expect(calls).toHaveLength(1); await ae.stop(); }); @@ -366,25 +337,21 @@ describe('AutoEnqueuer', () => { ], }); const realtime = new FakeRealtime(); - const outbox = new MemoryWebhookOutbox(); - const orig = outbox.enqueue.bind(outbox); - outbox.enqueue = vi.fn(async (input) => { - if (input.webhookId === 'wh-bad') throw new Error('boom'); - return orig(input); + const calls: EnqueueHttpInput[] = []; + const enqueue: HttpEnqueueFn = vi.fn(async (input) => { + if (input.refId === 'wh-bad') throw new Error('boom'); + calls.push(input); + return 'id'; }); const warn = vi.fn(); - const ae = new AutoEnqueuer(engine, realtime, outbox, { - refreshIntervalMs: 0, - logger: { warn }, - }); + const ae = new AutoEnqueuer(engine, realtime, enqueue, { refreshIntervalMs: 0, logger: { warn } }); await ae.start(); await realtime.publish(event('created', 'contact', { id: 'c-1' })); await flush(); - const rows = await outbox.list(); - expect(rows).toHaveLength(1); - expect(rows[0].url).toBe('https://good.test'); + expect(calls).toHaveLength(1); + expect(calls[0].url).toBe('https://good.test'); expect(warn).toHaveBeenCalled(); await ae.stop(); }); diff --git a/packages/plugins/plugin-webhooks/src/auto-enqueuer.ts b/packages/plugins/plugin-webhooks/src/auto-enqueuer.ts index 7668aa877..868a82782 100644 --- a/packages/plugins/plugin-webhooks/src/auto-enqueuer.ts +++ b/packages/plugins/plugin-webhooks/src/auto-enqueuer.ts @@ -1,7 +1,14 @@ // Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. import type { IDataEngine, IRealtimeService, RealtimeEventPayload } from '@objectstack/spec/contracts'; -import type { IWebhookOutbox } from './outbox.js'; +import type { EnqueueHttpInput } from '@objectstack/service-messaging'; + +/** + * Enqueue callback into the shared `service-messaging` HTTP outbox (ADR-0018 M3). + * The plugin supplies one bound to `messaging.enqueueHttp(...)`; webhooks no + * longer own a delivery outbox/dispatcher — they share the generic substrate. + */ +export type HttpEnqueueFn = (input: EnqueueHttpInput) => Promise; /** * Optional logger interface (subset of console / kernel logger). @@ -98,7 +105,7 @@ export class AutoEnqueuer { constructor( private readonly engine: IDataEngine, private readonly realtime: IRealtimeService, - private readonly outbox: IWebhookOutbox, + private readonly enqueue: HttpEnqueueFn, opts: AutoEnqueuerOptions = {}, ) { this.subscriptionsObject = opts.subscriptionsObject ?? 'sys_webhook'; @@ -274,32 +281,36 @@ export class AutoEnqueuer { for (const sub of subs) { if (!sub.triggers.has(trigger)) continue; - // Fire-and-forget — never await on the hot path. - void this.outbox - .enqueue({ - webhookId: sub.id, + // Fire-and-forget — never await on the hot path. Map the webhook + // delivery onto the generic HTTP-outbox shape (ADR-0018 M3): + // - source 'webhook' + dedupKey ':' preserves + // the old (event_id, webhook_id) at-most-once enqueue; + // - refId = webhookId keeps per-webhook partition affinity / ordering; + // - label = event type → X-Objectstack-Event header. + void this.enqueue({ + source: 'webhook', + refId: sub.id, + dedupKey: `${sub.id}:${eventId}`, + label: event.type, + url: sub.url, + method: sub.method, + headers: sub.headers, + signingSecret: sub.secret, + timeoutMs: sub.timeoutMs, + payload: { + object: event.object, + recordId, + action, + timestamp: event.timestamp, + ...payload, + }, + }).catch((err) => + this.logger.warn?.('[webhook-auto-enqueuer] enqueue failed', { + webhook: sub.name, eventId, - eventType: event.type, - url: sub.url, - method: sub.method, - headers: sub.headers, - secret: sub.secret, - timeoutMs: sub.timeoutMs, - payload: { - object: event.object, - recordId, - action, - timestamp: event.timestamp, - ...payload, - }, - }) - .catch((err) => - this.logger.warn?.('[webhook-auto-enqueuer] enqueue failed', { - webhook: sub.name, - eventId, - err: (err as Error)?.message ?? err, - }), - ); + err: (err as Error)?.message ?? err, + }), + ); } } diff --git a/packages/plugins/plugin-webhooks/src/dispatcher.test.ts b/packages/plugins/plugin-webhooks/src/dispatcher.test.ts deleted file mode 100644 index 4275f8f09..000000000 --- a/packages/plugins/plugin-webhooks/src/dispatcher.test.ts +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * Cross-node webhook dispatcher contract test. - * - * Builds two `WebhookDispatcher` instances that share one in-memory outbox - * AND one cluster `ILock`/`IPubSub` (simulating two nodes sharing one - * Redis/Postgres). Asserts: - * - * 1. Every enqueued delivery is POSTed *exactly once* (no double-fire). - * 2. Work is distributed across both nodes (no starvation). - * 3. 5xx responses are retried per the Stripe-style schedule. - * 4. 4xx (permanent) responses go straight to `dead`. - */ - -import { describe, expect, it } from 'vitest'; -import { - ComposedClusterService, - MemoryCounter, - MemoryKV, - MemoryLock, - MemoryPubSub, -} from '@objectstack/service-cluster'; -import type { IClusterService } from '@objectstack/spec/contracts'; -import { WebhookDispatcher } from './dispatcher.js'; -import type { FetchImpl } from './http-sender.js'; -import { MemoryWebhookOutbox } from './memory-outbox.js'; -import { hashPartition } from './partition.js'; - -interface SharedCluster { - nodeA: IClusterService; - nodeB: IClusterService; -} - -function makeSharedCluster(): SharedCluster { - // ONE lock + pubsub shared by both "nodes" — this is what makes the test - // a realistic cross-node simulation. - const lock = new MemoryLock(); - const pubsub = new MemoryPubSub(); - const kv = new MemoryKV(); - const counter = new MemoryCounter(); - return { - nodeA: new ComposedClusterService('node-A', 'memory', pubsub, lock, kv, counter), - nodeB: new ComposedClusterService('node-B', 'memory', pubsub, lock, kv, counter), - }; -} - -function makeFetchImpl(opts: { - status?: number; - log?: { url: string; deliveryId: string }[]; -}): FetchImpl { - const status = opts.status ?? 200; - return async (url, init) => { - opts.log?.push({ - url, - deliveryId: init.headers['X-Objectstack-Delivery'] ?? '', - }); - return { - ok: status >= 200 && status < 300, - status, - async text() { - return ''; - }, - }; - }; -} - -async function flushTicks(...dispatchers: WebhookDispatcher[]): Promise { - // Run several rounds with the nodes ticking *concurrently* so they - // genuinely contend for the cluster lock — sequential ticks would let - // whichever node ran first drain every partition. - for (let round = 0; round < 6; round++) { - await Promise.all(dispatchers.map((d) => d.tick())); - } -} - -describe('WebhookDispatcher cross-node', () => { - it('exactly-once: 50 deliveries across 2 nodes → 50 POSTs total', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const log: { url: string; deliveryId: string }[] = []; - const fetchImpl = makeFetchImpl({ status: 200, log }); - - const partitionCount = 4; - const a = new WebhookDispatcher({ - nodeId: 'node-A', - cluster: cluster.nodeA, - outbox, - fetchImpl, - partitionCount, - intervalMs: 1_000_000, // disable timer; we drive with tick() - }); - const b = new WebhookDispatcher({ - nodeId: 'node-B', - cluster: cluster.nodeB, - outbox, - fetchImpl, - partitionCount, - intervalMs: 1_000_000, - }); - - for (let i = 0; i < 50; i++) { - await outbox.enqueue({ - webhookId: `wh-${i % 10}`, // 10 webhooks → spread across 4 partitions - eventId: `evt-${i}`, - eventType: 'data.record.created', - url: `https://example.test/${i}`, - payload: { i }, - }); - } - - await flushTicks(a, b); - - expect(log).toHaveLength(50); - const uniqueIds = new Set(log.map((l) => l.deliveryId)); - expect(uniqueIds.size).toBe(50); - - const success = await outbox.list({ status: 'success' }); - expect(success).toHaveLength(50); - }); - - it('partition affinity: dispatcher only claims rows for partitions it locked', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const partitionCount = 8; - - // For each attempt, record (nodeId, partitionForWebhook). - const observed: { nodeId: string; partition: number }[] = []; - - const make = (nodeId: string, c: IClusterService) => - new WebhookDispatcher({ - nodeId, - cluster: c, - outbox, - fetchImpl: makeFetchImpl({ status: 200 }), - partitionCount, - intervalMs: 1_000_000, - onAttempt: (delivery) => { - observed.push({ - nodeId, - partition: hashPartition(delivery.webhookId, partitionCount), - }); - }, - }); - - const a = make('node-A', cluster.nodeA); - const b = make('node-B', cluster.nodeB); - - for (let i = 0; i < 30; i++) { - await outbox.enqueue({ - webhookId: `wh-${i % 5}`, - eventId: `evt-${i}`, - eventType: 't', - url: 'https://example.test/x', - payload: { i }, - }); - } - await flushTicks(a, b); - - expect(observed).toHaveLength(30); - // Each row's partition came from hash(webhookId, 8) — only 5 distinct - // webhook ids → at most 5 distinct partitions. - const partitionsTouched = new Set(observed.map((o) => o.partition)); - expect(partitionsTouched.size).toBeLessThanOrEqual(5); - }); - - it('load distribution: both nodes process some rows', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const partitionCount = 8; - const counts: Record = { 'node-A': 0, 'node-B': 0 }; - - const make = (nodeId: string, c: IClusterService) => - new WebhookDispatcher({ - nodeId, - cluster: c, - outbox, - fetchImpl: makeFetchImpl({ status: 200 }), - partitionCount, - intervalMs: 1_000_000, - onAttempt: () => { - counts[nodeId] += 1; - }, - }); - - const a = make('node-A', cluster.nodeA); - const b = make('node-B', cluster.nodeB); - - // Lots of distinct webhookIds → spreads work across many partitions. - for (let i = 0; i < 200; i++) { - await outbox.enqueue({ - webhookId: `wh-${i}`, - eventId: `evt-${i}`, - eventType: 't', - url: 'https://example.test/x', - payload: { i }, - }); - } - await flushTicks(a, b); - - // Each node should have processed at least one row — proving the - // rotation/offset isn't pinning all work to node A. - expect(counts['node-A']).toBeGreaterThan(0); - expect(counts['node-B']).toBeGreaterThan(0); - expect(counts['node-A'] + counts['node-B']).toBe(200); - }); - - it('5xx is retried: row stays pending with future nextRetryAt', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const a = new WebhookDispatcher({ - nodeId: 'node-A', - cluster: cluster.nodeA, - outbox, - fetchImpl: makeFetchImpl({ status: 503 }), - partitionCount: 1, - intervalMs: 1_000_000, - rng: () => 0.5, - }); - - await outbox.enqueue({ - webhookId: 'wh-x', - eventId: 'evt-x', - eventType: 't', - url: 'https://example.test/fail', - payload: {}, - }); - - await a.tick(); - const rows = await outbox.list(); - expect(rows[0].status).toBe('pending'); - expect(rows[0].attempts).toBe(1); - expect(rows[0].nextRetryAt).toBeGreaterThan(Date.now()); - }); - - it('4xx is permanent: row moves to dead', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const a = new WebhookDispatcher({ - nodeId: 'node-A', - cluster: cluster.nodeA, - outbox, - fetchImpl: makeFetchImpl({ status: 404 }), - partitionCount: 1, - intervalMs: 1_000_000, - }); - - await outbox.enqueue({ - webhookId: 'wh-x', - eventId: 'evt-x', - eventType: 't', - url: 'https://example.test/missing', - payload: {}, - }); - - await a.tick(); - const dead = await outbox.list({ status: 'dead' }); - expect(dead).toHaveLength(1); - expect(dead[0].responseCode).toBe(404); - }); - - it('dedup: identical (eventId, webhookId) enqueues collapse to one row', async () => { - const outbox = new MemoryWebhookOutbox(); - const id1 = await outbox.enqueue({ - webhookId: 'wh-1', - eventId: 'evt-dup', - eventType: 't', - url: 'https://example.test/', - payload: {}, - }); - const id2 = await outbox.enqueue({ - webhookId: 'wh-1', - eventId: 'evt-dup', - eventType: 't', - url: 'https://example.test/', - payload: {}, - }); - expect(id1).toBe(id2); - const rows = await outbox.list(); - expect(rows).toHaveLength(1); - }); - - it('lock prevents same partition being claimed twice in a tick', async () => { - const cluster = makeSharedCluster(); - const outbox = new MemoryWebhookOutbox(); - const log: { url: string; deliveryId: string }[] = []; - const fetchImpl = makeFetchImpl({ status: 200, log }); - - // Single partition → both nodes contend for the same lock. - const a = new WebhookDispatcher({ - nodeId: 'node-A', - cluster: cluster.nodeA, - outbox, - fetchImpl, - partitionCount: 1, - intervalMs: 1_000_000, - }); - const b = new WebhookDispatcher({ - nodeId: 'node-B', - cluster: cluster.nodeB, - outbox, - fetchImpl, - partitionCount: 1, - intervalMs: 1_000_000, - }); - - for (let i = 0; i < 5; i++) { - await outbox.enqueue({ - webhookId: 'wh-1', - eventId: `evt-${i}`, - eventType: 't', - url: 'https://example.test/', - payload: { i }, - }); - } - - // Fire both ticks "simultaneously" — only one should claim the partition. - await Promise.all([a.tick(), b.tick()]); - - expect(log).toHaveLength(5); - const uniqueIds = new Set(log.map((l) => l.deliveryId)); - expect(uniqueIds.size).toBe(5); - }); -}); diff --git a/packages/plugins/plugin-webhooks/src/dispatcher.ts b/packages/plugins/plugin-webhooks/src/dispatcher.ts deleted file mode 100644 index 5b284ca03..000000000 --- a/packages/plugins/plugin-webhooks/src/dispatcher.ts +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import type { IClusterService, LockHandle } from '@objectstack/spec/contracts'; -import type { FetchImpl } from './http-sender.js'; -import { classifyAttempt, sendOnce } from './http-sender.js'; -import type { IWebhookOutbox, WebhookDelivery } from './outbox.js'; - -/** - * Minimal logger surface — kernel's `Logger` is compatible (extra params - * accepted). Keeping it permissive avoids a hard dependency on the spec - * Logger interface here. - */ -export interface DispatcherLogger { - warn: (msg: string, meta?: any) => void; - info?: (msg: string, meta?: any) => void; -} - -export interface DispatcherOptions { - /** Stable id identifying this dispatcher node. */ - nodeId: string; - /** Cluster service providing `lock` (and optional metrics). */ - cluster: IClusterService; - /** Outbox backend. */ - outbox: IWebhookOutbox; - /** - * How many partitions to split work across. Each tick the dispatcher - * attempts to acquire each partition's lock independently — the node - * that wins owns that partition for the duration of the batch. - * - * Default: 8 (matches webhook-delivery.mdx §4 example). - */ - partitionCount?: number; - /** Max rows to claim from each partition per tick. Default 32. */ - batchSize?: number; - /** Tick interval in ms. Default 250. */ - intervalMs?: number; - /** Per-partition lock TTL. Default = 5 × intervalMs. */ - lockTtlMs?: number; - /** Visibility timeout for claimed rows. Default = 2 × lockTtlMs. */ - claimTtlMs?: number; - /** Override `globalThis.fetch` (tests). */ - fetchImpl?: FetchImpl; - /** Hook fired after every attempt — observability hook. */ - onAttempt?: (delivery: WebhookDelivery, success: boolean) => void; - /** RNG override for the retry-jitter schedule (tests). */ - rng?: () => number; - /** Logger callback (optional). */ - logger?: DispatcherLogger; -} - -/** - * Cross-node webhook dispatcher. - * - * **Design** — each tick the dispatcher iterates over `partitionCount` - * logical partitions. For each, it tries to acquire a cluster-scoped lock - * (`webhook.dispatcher.partition.{i}`) with a short TTL. If it wins the - * lock, it claims up to `batchSize` ready rows whose `hash(webhookId) mod - * partitionCount === i`, POSTs them, and acks. The lock is released - * immediately after the batch so other nodes can fairly rotate through. - * - * **Why per-partition locks rather than one global lock?** - * - * 1. Throughput — N nodes can process N partitions concurrently. - * 2. Partition affinity — rows for the same webhook always sort into the - * same partition, preserving in-order delivery per webhook. - * 3. Failure isolation — a stuck node only blocks its partition until the - * TTL elapses; other partitions keep moving. - * - * **At-least-once, not exactly-once.** Receivers MUST be idempotent on the - * `X-Objectstack-Delivery` (== row id) header. If the HTTP call succeeds - * but the ack write fails, the row reverts to pending after the claim TTL - * and will be re-posted. - */ -export class WebhookDispatcher { - private readonly opts: Required< - Omit - > & Pick; - private timer: ReturnType | undefined; - private running = false; - private inflightTick: Promise | undefined; - - constructor(options: DispatcherOptions) { - const intervalMs = options.intervalMs ?? 250; - const lockTtlMs = options.lockTtlMs ?? intervalMs * 5; - this.opts = { - nodeId: options.nodeId, - cluster: options.cluster, - outbox: options.outbox, - partitionCount: options.partitionCount ?? 8, - batchSize: options.batchSize ?? 32, - intervalMs, - lockTtlMs, - claimTtlMs: options.claimTtlMs ?? lockTtlMs * 2, - onAttempt: options.onAttempt, - fetchImpl: options.fetchImpl, - rng: options.rng, - logger: options.logger, - }; - } - - /** Begin the periodic loop. Safe to call once; subsequent calls are no-ops. */ - start(): void { - if (this.running) return; - this.running = true; - // Fire one tick immediately so single-row tests don't wait the interval. - this.scheduleTick(); - this.timer = setInterval(() => this.scheduleTick(), this.opts.intervalMs); - } - - /** Stop the loop and wait for the in-flight tick to drain. */ - async stop(): Promise { - if (!this.running) return; - this.running = false; - if (this.timer) { - clearInterval(this.timer); - this.timer = undefined; - } - if (this.inflightTick) { - try { - await this.inflightTick; - } catch { - /* swallow — already logged */ - } - } - } - - /** - * Run one full tick (all partitions, single attempt each). Exposed for - * deterministic tests that want to step the dispatcher manually. - */ - async tick(): Promise { - await this.runTick(); - } - - private scheduleTick(): void { - if (this.inflightTick) return; // skip if previous tick still running - this.inflightTick = this.runTick() - .catch((err) => { - this.opts.logger?.warn?.('webhook-dispatcher: tick failed', { - nodeId: this.opts.nodeId, - error: (err as Error)?.message ?? String(err), - }); - }) - .finally(() => { - this.inflightTick = undefined; - }); - } - - private async runTick(): Promise { - const partitionCount = this.opts.partitionCount; - // Walk partitions in a rotated order per node so contention spreads. - const offset = stableNodeOffset(this.opts.nodeId, partitionCount); - for (let step = 0; step < partitionCount; step++) { - const i = (offset + step) % partitionCount; - await this.runPartition(i); - } - } - - private async runPartition(index: number): Promise { - const key = `webhook.dispatcher.partition.${index}`; - const handle: LockHandle | null = await this.opts.cluster.lock.acquire(key, { - ttlMs: this.opts.lockTtlMs, - // waitMs=0 → fail-fast; we'll try this partition again next tick. - waitMs: 0, - }); - if (!handle) return; - - try { - const claimed = await this.opts.outbox.claim({ - nodeId: this.opts.nodeId, - limit: this.opts.batchSize, - partition: { index, count: this.opts.partitionCount }, - claimTtlMs: this.opts.claimTtlMs, - }); - if (claimed.length === 0) return; - // Renew before potentially long HTTP work — and bound batch time. - await handle.renew(this.opts.lockTtlMs); - for (const row of claimed) { - if (!handle.isHeld()) break; // lost the lock — abandon remaining rows - await this.processRow(row); - } - } finally { - await handle.release(); - } - } - - private async processRow(row: WebhookDelivery): Promise { - const fetchImpl = (this.opts.fetchImpl ?? (globalThis.fetch as unknown as FetchImpl)) as FetchImpl | undefined; - if (!fetchImpl) { - this.opts.logger?.warn?.('webhook-dispatcher: no fetch impl available', { - rowId: row.id, - }); - await this.opts.outbox.ack(row.id, { - success: false, - error: 'no fetch implementation', - durationMs: 0, - dead: true, - }); - return; - } - const outcome = await sendOnce(row, fetchImpl); - const result = classifyAttempt(outcome, row.attempts, Date.now(), this.opts.rng); - await this.opts.outbox.ack(row.id, result); - this.opts.onAttempt?.(row, result.success); - } -} - -/** - * Spread starting partition per node so a 2-node cluster with 8 partitions - * doesn't have both nodes serialise on partition 0 every tick. - */ -function stableNodeOffset(nodeId: string, partitionCount: number): number { - let h = 0; - for (let i = 0; i < nodeId.length; i++) { - h = (h * 31 + nodeId.charCodeAt(i)) | 0; - } - return Math.abs(h) % partitionCount; -} diff --git a/packages/plugins/plugin-webhooks/src/http-sender.ts b/packages/plugins/plugin-webhooks/src/http-sender.ts deleted file mode 100644 index dfb1be048..000000000 --- a/packages/plugins/plugin-webhooks/src/http-sender.ts +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import { createHmac, randomUUID } from 'node:crypto'; -import type { WebhookDelivery, AckResult } from './outbox.js'; - -/** - * Default per-request timeout. Receivers SHOULD respond within ~30s; we - * cap aggressively to free dispatcher slots. - */ -export const DEFAULT_TIMEOUT_MS = 15_000; - -/** Truncate response bodies to keep storage cost predictable. */ -const RESPONSE_BODY_CAP = 16 * 1024; - -export type FetchImpl = ( - input: string, - init: { - method: string; - headers: Record; - body: string; - signal: AbortSignal; - }, -) => Promise<{ - ok: boolean; - status: number; - text(): Promise; -}>; - -/** Single HTTP attempt classified to an `AckResult` shape (without nextRetryAt). */ -export type AttemptOutcome = - | { success: true; httpStatus: number; responseBody?: string; durationMs: number } - | { - success: false; - retriable: boolean; - httpStatus?: number; - responseBody?: string; - error?: string; - durationMs: number; - }; - -/** - * Send one HTTP attempt for the delivery. Pure (no DB writes) so the - * dispatcher owns retry-schedule + ack logic. - * - * - 2xx → success - * - 4xx (except 408/429) → permanent failure (retriable = false → goes to `dead`) - * - 408, 429, 5xx, transport → retriable - */ -export async function sendOnce( - delivery: WebhookDelivery, - fetchImpl: FetchImpl, -): Promise { - const body = - typeof delivery.payload === 'string' - ? delivery.payload - : JSON.stringify(delivery.payload); - - const headers: Record = { - 'Content-Type': 'application/json', - 'User-Agent': 'ObjectStack-Webhooks/1.0', - 'X-Objectstack-Event': delivery.eventType, - 'X-Objectstack-Delivery': delivery.id, - 'X-Objectstack-Attempt': String(delivery.attempts + 1), - ...(delivery.headers ?? {}), - }; - if (delivery.secret) { - const sig = createHmac('sha256', delivery.secret).update(body).digest('hex'); - headers['X-Objectstack-Signature'] = `sha256=${sig}`; - } - - const timeoutMs = delivery.timeoutMs ?? DEFAULT_TIMEOUT_MS; - const controller = new AbortController(); - const timer = setTimeout(() => controller.abort(), timeoutMs); - const start = Date.now(); - try { - const res = await fetchImpl(delivery.url, { - method: delivery.method ?? 'POST', - headers, - body, - signal: controller.signal, - }); - clearTimeout(timer); - const responseText = await safeReadBody(res); - const durationMs = Date.now() - start; - if (res.ok) { - return { success: true, httpStatus: res.status, responseBody: responseText, durationMs }; - } - const retriable = res.status === 408 || res.status === 429 || res.status >= 500; - return { - success: false, - retriable, - httpStatus: res.status, - responseBody: responseText, - error: `HTTP ${res.status}`, - durationMs, - }; - } catch (err: unknown) { - clearTimeout(timer); - const durationMs = Date.now() - start; - const e = err as { name?: string; message?: string }; - const error = e?.name === 'AbortError' ? `timeout after ${timeoutMs}ms` : e?.message ?? String(err); - return { success: false, retriable: true, error, durationMs }; - } -} - -async function safeReadBody(res: { text(): Promise }): Promise { - try { - const text = await res.text(); - return text.length > RESPONSE_BODY_CAP ? text.slice(0, RESPONSE_BODY_CAP) : text; - } catch { - return undefined; - } -} - -/** - * Stripe-style retry schedule. Returns the next `nextRetryAt` ms (relative - * to `now`) given how many attempts have already happened, or `null` if - * the row should be moved to `dead`. - * - * attempt 1 fails -> retry in ~1s - * attempt 2 fails -> ~10s - * attempt 3 fails -> ~1m - * attempt 4 fails -> ~10m - * attempt 5 fails -> ~1h - * attempt 6 fails -> ~6h - * attempt 7 fails -> ~24h - * attempt 8+ fails -> dead - * - * Each delay is multiplied by jitter ∈ [0.8, 1.2]. - */ -export function nextRetryDelayMs( - attemptsSoFar: number, - rng: () => number = Math.random, -): number | null { - const SCHEDULE = [1_000, 10_000, 60_000, 600_000, 3_600_000, 21_600_000, 86_400_000]; - if (attemptsSoFar < 1 || attemptsSoFar > SCHEDULE.length) return null; - const base = SCHEDULE[attemptsSoFar - 1]; - const jitter = 0.8 + rng() * 0.4; - return Math.floor(base * jitter); -} - -/** - * Compose an `AckResult` from an `AttemptOutcome`, applying the retry - * schedule on retriable failures. - */ -export function classifyAttempt( - outcome: AttemptOutcome, - attemptsSoFar: number, - now: number = Date.now(), - rng?: () => number, -): AckResult { - if (outcome.success) return outcome; - if (!outcome.retriable) { - return { - success: false, - httpStatus: outcome.httpStatus, - responseBody: outcome.responseBody, - error: outcome.error, - durationMs: outcome.durationMs, - dead: true, - }; - } - const delay = nextRetryDelayMs(attemptsSoFar + 1, rng); - if (delay === null) { - return { - success: false, - httpStatus: outcome.httpStatus, - responseBody: outcome.responseBody, - error: outcome.error, - durationMs: outcome.durationMs, - dead: true, - }; - } - return { - success: false, - httpStatus: outcome.httpStatus, - responseBody: outcome.responseBody, - error: outcome.error, - durationMs: outcome.durationMs, - nextRetryAt: now + delay, - }; -} - -/** Generate a fresh delivery id (UUID v4). Exposed for tests. */ -export function newDeliveryId(): string { - return randomUUID(); -} diff --git a/packages/plugins/plugin-webhooks/src/index.ts b/packages/plugins/plugin-webhooks/src/index.ts index 3dee4f32c..988d0d930 100644 --- a/packages/plugins/plugin-webhooks/src/index.ts +++ b/packages/plugins/plugin-webhooks/src/index.ts @@ -3,23 +3,21 @@ /** * @objectstack/plugin-webhooks * - * Persistent, cluster-aware webhook outbox + dispatcher. + * Webhook fan-out on top of the shared outbound-HTTP delivery substrate + * (ADR-0018 M3). The durable outbox, cluster-coordinated dispatcher, retry / + * backoff / dead-letter, and retention all live in + * `@objectstack/service-messaging` (`sys_http_delivery` + `HttpDispatcher`). * - * Implements stages 3–5 of the pipeline in - * `content/docs/concepts/webhook-delivery.mdx` (Persist · Dispatch · - * Retry). Stages 1–2 (Event capture · Match) integrate via the - * `webhook.outbox.enqueue()` service consumers call after persistence. + * This package ships only the webhook-specific concerns: + * - the `sys_webhook` configuration object, + * - the {@link AutoEnqueuer} that turns `data.record.*` events into outbox + * rows (`source: 'webhook'`), + * - the redeliver admin endpoint. * - * The first real cross-node consumer of `cluster.lock`. + * **Requires** `MessagingServicePlugin` (a foundational, always-on capability). * * ## Subpath exports - * - `@objectstack/plugin-webhooks/sql` — `SqlWebhookOutbox` (production - * storage; durable rows via ObjectQL / any driver) - * - `@objectstack/plugin-webhooks/schema` — `SysWebhookDelivery` object - * schema to register in `defineStack({ objects: [...] })` - * - * The main entry intentionally ships only the `MemoryWebhookOutbox` so - * downstream bundles don't pay for the SQL impl unless they import it. + * - `@objectstack/plugin-webhooks/schema` — `SysWebhook` object schema. */ export { @@ -27,30 +25,5 @@ export { type WebhookOutboxPluginOptions, } from './webhook-outbox-plugin.js'; -export { WebhookDispatcher, type DispatcherOptions } from './dispatcher.js'; -export { MemoryWebhookOutbox } from './memory-outbox.js'; -export { AutoEnqueuer, type AutoEnqueuerOptions } from './auto-enqueuer.js'; -export { - DeliveryRetentionSweeper, - type DeliveryRetentionOptions, -} from './retention.js'; -export { hashPartition } from './partition.js'; -export { - sendOnce, - classifyAttempt, - nextRetryDelayMs, - DEFAULT_TIMEOUT_MS, - type AttemptOutcome, - type FetchImpl, -} from './http-sender.js'; -export type { - AckFailure, - AckResult, - AckSuccess, - ClaimOptions, - DeliveryStatus, - EnqueueInput, - IWebhookOutbox, - WebhookDelivery, -} from './outbox.js'; -export { RedeliverError } from './outbox.js'; +export { AutoEnqueuer, type AutoEnqueuerOptions, type HttpEnqueueFn } from './auto-enqueuer.js'; +export { SysWebhook } from './sys-webhook.object.js'; diff --git a/packages/plugins/plugin-webhooks/src/memory-outbox.test.ts b/packages/plugins/plugin-webhooks/src/memory-outbox.test.ts deleted file mode 100644 index 06fa0a738..000000000 --- a/packages/plugins/plugin-webhooks/src/memory-outbox.test.ts +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * MemoryWebhookOutbox — focused tests for behaviours not already covered - * via `dispatcher.test.ts`. Today that's just `redeliver()` — the rest of - * the contract is exercised end-to-end through the dispatcher path. - */ - -import { describe, expect, it } from 'vitest'; -import { MemoryWebhookOutbox } from './memory-outbox.js'; -import type { EnqueueInput } from './outbox.js'; - -function input(webhookId: string, eventId: string): EnqueueInput { - return { - webhookId, - eventId, - eventType: 'data.record.created', - url: 'https://example.test/hook', - payload: { hello: 'world' }, - }; -} - -describe('MemoryWebhookOutbox.redeliver', () => { - it('resets a success row back to pending with attempts=0', async () => { - const outbox = new MemoryWebhookOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { success: true, httpStatus: 200, durationMs: 1 }); - - const row = await outbox.redeliver(id); - expect(row.status).toBe('pending'); - expect(row.attempts).toBe(0); - expect(row.claimedBy).toBeUndefined(); - expect(row.claimedAt).toBeUndefined(); - expect(row.nextRetryAt).toBeUndefined(); - expect(row.error).toBeUndefined(); - expect(row.responseCode).toBeUndefined(); - expect(row.responseBody).toBeUndefined(); - expect(row.url).toBe('https://example.test/hook'); - expect(row.payload).toEqual({ hello: 'world' }); - }); - - it('resets a dead row and makes it claimable again', async () => { - const outbox = new MemoryWebhookOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { - success: false, - error: 'final', - dead: true, - durationMs: 5, - }); - - await outbox.redeliver(id); - const claimed = await outbox.claim({ - nodeId: 'B', - limit: 10, - claimTtlMs: 60_000, - }); - expect(claimed.map((r) => r.id)).toContain(id); - }); - - it('throws not_found when row does not exist', async () => { - const outbox = new MemoryWebhookOutbox(); - await expect(outbox.redeliver('missing')).rejects.toMatchObject({ - code: 'not_found', - }); - }); - - it('throws not_eligible for pending rows', async () => { - const outbox = new MemoryWebhookOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await expect(outbox.redeliver(id)).rejects.toMatchObject({ - code: 'not_eligible', - }); - }); - - it('throws not_eligible for in_flight rows', async () => { - const outbox = new MemoryWebhookOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await expect(outbox.redeliver(id)).rejects.toMatchObject({ - code: 'not_eligible', - }); - }); -}); diff --git a/packages/plugins/plugin-webhooks/src/memory-outbox.ts b/packages/plugins/plugin-webhooks/src/memory-outbox.ts deleted file mode 100644 index 41c7c04b7..000000000 --- a/packages/plugins/plugin-webhooks/src/memory-outbox.ts +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import { randomUUID } from 'node:crypto'; -import type { - AckResult, - ClaimOptions, - EnqueueInput, - DeliveryStatus, - IWebhookOutbox, - WebhookDelivery, -} from './outbox.js'; -import { RedeliverError } from './outbox.js'; -import { hashPartition } from './partition.js'; - -/** - * In-memory `IWebhookOutbox` for tests and single-process development. - * - * Implements the atomic-claim semantics by running its claim/ack logic - * synchronously (single-threaded JS event loop) inside one `Map`. Two - * `MemoryWebhookOutbox` instances do NOT share state — for the cross-node - * test the *same* instance is passed to both dispatchers (simulating one - * shared database). - * - * A production SQL-backed implementation will live in a sibling file and - * use `SELECT ... FOR UPDATE SKIP LOCKED`. - */ -export class MemoryWebhookOutbox implements IWebhookOutbox { - private readonly rows = new Map(); - /** Dedup index keyed by `${eventId}::${webhookId}` -> row id. */ - private readonly dedup = new Map(); - - async enqueue(input: EnqueueInput): Promise { - const dedupKey = `${input.eventId}::${input.webhookId}`; - const existing = this.dedup.get(dedupKey); - if (existing) return existing; - - const id = randomUUID(); - const now = Date.now(); - const row: WebhookDelivery = { - id, - webhookId: input.webhookId, - eventId: input.eventId, - eventType: input.eventType, - url: input.url, - method: input.method ?? 'POST', - headers: input.headers, - secret: input.secret, - timeoutMs: input.timeoutMs, - payload: input.payload, - status: 'pending', - attempts: 0, - createdAt: now, - updatedAt: now, - }; - this.rows.set(id, row); - this.dedup.set(dedupKey, id); - return id; - } - - async claim(opts: ClaimOptions): Promise { - const now = opts.now ?? Date.now(); - const claimed: WebhookDelivery[] = []; - - // First pass: reap expired in_flight rows (visibility timeout). - for (const row of this.rows.values()) { - if ( - row.status === 'in_flight' && - row.claimedAt !== undefined && - now - row.claimedAt > opts.claimTtlMs - ) { - row.status = 'pending'; - row.claimedBy = undefined; - row.claimedAt = undefined; - row.updatedAt = now; - } - } - - for (const row of this.rows.values()) { - if (claimed.length >= opts.limit) break; - if (row.status !== 'pending') continue; - if (row.nextRetryAt !== undefined && row.nextRetryAt > now) continue; - if (opts.partition) { - const p = hashPartition(row.webhookId, opts.partition.count); - if (p !== opts.partition.index) continue; - } - row.status = 'in_flight'; - row.claimedBy = opts.nodeId; - row.claimedAt = now; - row.updatedAt = now; - claimed.push({ ...row }); - } - return claimed; - } - - async ack(id: string, result: AckResult): Promise { - const row = this.rows.get(id); - if (!row) return; - const now = Date.now(); - row.attempts += 1; - row.lastAttemptedAt = now; - row.updatedAt = now; - row.claimedBy = undefined; - row.claimedAt = undefined; - row.responseCode = result.httpStatus; - row.responseBody = result.responseBody; - - let status: DeliveryStatus; - if (result.success) { - status = 'success'; - row.nextRetryAt = undefined; - row.error = undefined; - } else if (result.dead) { - status = 'dead'; - row.error = result.error; - row.nextRetryAt = undefined; - } else { - status = 'pending'; - row.error = result.error; - row.nextRetryAt = result.nextRetryAt; - } - row.status = status; - } - - async list(filter?: { status?: DeliveryStatus }): Promise { - const all = Array.from(this.rows.values()).map((r) => ({ ...r })); - return filter?.status ? all.filter((r) => r.status === filter.status) : all; - } - - async redeliver(id: string): Promise { - const row = this.rows.get(id); - if (!row) { - throw new RedeliverError( - `Delivery row '${id}' not found`, - 'not_found', - ); - } - if (row.status !== 'success' && row.status !== 'failed' && row.status !== 'dead') { - throw new RedeliverError( - `Delivery row '${id}' is '${row.status}', expected one of: success, failed, dead`, - 'not_eligible', - ); - } - const now = Date.now(); - row.status = 'pending'; - row.attempts = 0; - row.claimedBy = undefined; - row.claimedAt = undefined; - row.nextRetryAt = undefined; - row.error = undefined; - row.responseCode = undefined; - row.responseBody = undefined; - row.updatedAt = now; - return { ...row }; - } -} diff --git a/packages/plugins/plugin-webhooks/src/outbox.ts b/packages/plugins/plugin-webhooks/src/outbox.ts deleted file mode 100644 index 768d5e79d..000000000 --- a/packages/plugins/plugin-webhooks/src/outbox.ts +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * Webhook outbox contracts. - * - * The outbox stores webhook delivery rows that must be POSTed exactly once - * (modulo at-least-once + receiver-side idempotency). Implementations are - * pluggable so the same dispatcher can run against an in-memory test store - * or a SQL-backed production table. - * - * See `content/docs/concepts/webhook-delivery.mdx` §3.2 for the full schema. - */ - -export type DeliveryStatus = - | 'pending' - | 'in_flight' - | 'success' - | 'failed' - | 'dead'; - -export interface WebhookDelivery { - /** UUID — also doubles as the receiver-side idempotency key. */ - id: string; - /** FK to sys_webhook.id — opaque to the dispatcher; only used for hashing. */ - webhookId: string; - /** Origin event id. UNIQUE(event_id, webhook_id) prevents double-enqueue. */ - eventId: string; - /** Origin event type, e.g. `data.record.created`. */ - eventType: string; - /** Destination URL (snapshotted on enqueue — config edits don't rewrite live rows). */ - url: string; - /** HTTP method — defaults to POST. */ - method?: string; - /** Custom headers configured on the sink. */ - headers?: Record; - /** HMAC-SHA256 secret. If present, signature is added. */ - secret?: string; - /** Per-request timeout in ms. */ - timeoutMs?: number; - /** JSON-serialisable body. */ - payload: unknown; - - /** Lifecycle state. */ - status: DeliveryStatus; - /** Number of POST attempts made so far (0 before first attempt). */ - attempts: number; - /** Node id currently working on this row, when `status = in_flight`. */ - claimedBy?: string; - /** Wall-clock ms when the row was claimed. */ - claimedAt?: number; - /** Earliest ms at which this row becomes eligible for the next attempt. */ - nextRetryAt?: number; - /** Wall-clock ms of the last attempt (success or fail). */ - lastAttemptedAt?: number; - /** HTTP status code from the most recent attempt. */ - responseCode?: number; - /** Truncated response body for diagnostics. */ - responseBody?: string; - /** Last transport / timeout error message. */ - error?: string; - - createdAt: number; - updatedAt: number; -} - -export interface EnqueueInput { - webhookId: string; - eventId: string; - eventType: string; - url: string; - method?: string; - headers?: Record; - secret?: string; - timeoutMs?: number; - payload: unknown; -} - -export interface ClaimOptions { - /** Identifier of the node doing the claim (for `claimedBy`). */ - nodeId: string; - /** Max rows to claim per call. */ - limit: number; - /** - * Partition assignment for this worker. Only rows whose - * `hash(webhookId) mod count === index` are claimed. Omit to claim - * across all partitions (single-node mode). - */ - partition?: { index: number; count: number }; - /** Visibility timeout — claimed rows revert to pending after this many ms. */ - claimTtlMs: number; - /** "Now" reference, ms since epoch. Defaults to Date.now(). */ - now?: number; -} - -export interface AckSuccess { - success: true; - httpStatus: number; - responseBody?: string; - durationMs: number; -} - -export interface AckFailure { - success: false; - httpStatus?: number; - responseBody?: string; - error?: string; - durationMs: number; - /** Computed by the dispatcher per the retry schedule, or undefined for dead. */ - nextRetryAt?: number; - /** Marks the row terminal — no more attempts. */ - dead?: boolean; -} - -export type AckResult = AckSuccess | AckFailure; - -/** - * Error raised by `IWebhookOutbox.redeliver` when the requested row is - * either missing or in a non-terminal state. The dispatcher / admin UI - * surfaces this verbatim to the caller — never throw it for transient - * conditions (transport errors should bubble as native `Error`). - */ -export class RedeliverError extends Error { - constructor( - message: string, - readonly code: 'not_found' | 'not_eligible', - ) { - super(message); - this.name = 'RedeliverError'; - } -} - -/** - * Pluggable storage backend for delivery rows. Implementations MUST make - * `claim()` atomic across concurrent callers — that property is the - * exactly-once guarantee. - */ -export interface IWebhookOutbox { - /** - * Insert a new delivery row. Implementations MUST treat - * `(eventId, webhookId)` as unique and silently drop duplicates. - * Returns the row id (existing or new). - */ - enqueue(input: EnqueueInput): Promise; - - /** - * Atomically claim up to `limit` rows whose `nextRetryAt <= now` (or - * null) and matching the partition predicate. Claimed rows MUST be - * marked `in_flight` so concurrent claimers don't see them. - */ - claim(opts: ClaimOptions): Promise; - - /** Record the outcome of an attempt. */ - ack(id: string, result: AckResult): Promise; - - /** Snapshot accessor for tests / admin tooling. */ - list(filter?: { status?: DeliveryStatus }): Promise; - - /** - * Reset a terminal row back to `pending` so the dispatcher will pick - * it up again on its next tick. - * - * - Eligible source states: `success`, `failed`, `dead`. - * - Rejects `pending` / `in_flight` rows — replaying those would - * double-deliver because they're either already queued or actively - * being sent. - * - Resets `attempts=0` so the retry budget restarts. - * - Clears `claimed_by`, `claimed_at`, `next_retry_at`, `error`, - * `response_code`, `response_body`. URL / payload / secret are NOT - * touched — replay reproduces the original POST byte-for-byte. - * - * Throws `RedeliverError` with code `not_found` or `not_eligible`. - * Returns the post-reset row. - */ - redeliver(id: string): Promise; -} diff --git a/packages/plugins/plugin-webhooks/src/partition.ts b/packages/plugins/plugin-webhooks/src/partition.ts deleted file mode 100644 index 6a4766eca..000000000 --- a/packages/plugins/plugin-webhooks/src/partition.ts +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * Stable, framework-free partition hash. The dispatcher uses this to - * assign webhooks to partitions; the in-memory outbox uses the same hash - * to filter rows in `claim()`. Both call sites MUST agree, which is why - * this is a single shared helper. - * - * Uses a 32-bit FNV-1a variant — fast, no allocations, deterministic. - */ -export function hashPartition(key: string, count: number): number { - if (count <= 0) throw new Error('partition count must be > 0'); - let h = 0x811c9dc5; - for (let i = 0; i < key.length; i++) { - h ^= key.charCodeAt(i); - h = Math.imul(h, 0x01000193); - } - return Math.abs(h | 0) % count; -} diff --git a/packages/plugins/plugin-webhooks/src/retention.test.ts b/packages/plugins/plugin-webhooks/src/retention.test.ts deleted file mode 100644 index d76da21fb..000000000 --- a/packages/plugins/plugin-webhooks/src/retention.test.ts +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * DeliveryRetentionSweeper test. - * - * Verifies the retention policy applied to `sys_webhook_delivery`: - * - success rows older than `successTtlMs` are deleted - * - dead rows older than `deadTtlMs` are deleted - * - pending / in_flight / failed rows are NEVER auto-pruned - * - rows newer than the TTL stay - * - successTtlMs=0 disables the success sweep - */ - -import { describe, expect, it } from 'vitest'; -import type { IDataEngine } from '@objectstack/spec/contracts'; -import { DeliveryRetentionSweeper } from './retention.js'; - -class FakeEngine implements IDataEngine { - rows: any[] = []; - async find(): Promise { - return this.rows; - } - async findOne(): Promise { - return null; - } - async insert(_n: string, data: any): Promise { - const arr = Array.isArray(data) ? data : [data]; - for (const r of arr) this.rows.push(r); - return data; - } - async update(): Promise { - return { affected: 0 }; - } - async delete(_name: string, opts?: any): Promise { - const before = this.rows.length; - const where = opts?.where ?? {}; - this.rows = this.rows.filter((r) => { - if (where.status && r.status !== where.status) return true; - if (where.updated_at?.$lt != null && !(r.updated_at < where.updated_at.$lt)) - return true; - return false; - }); - return { affected: before - this.rows.length }; - } - async count(): Promise { - return this.rows.length; - } - async aggregate(): Promise { - return []; - } -} - -const HOUR = 60 * 60 * 1000; -const DAY = 24 * HOUR; - -describe('DeliveryRetentionSweeper', () => { - it('deletes old success rows past TTL', async () => { - const engine = new FakeEngine(); - const now = Date.now(); - engine.rows.push( - { id: 'a', status: 'success', updated_at: now - 8 * DAY }, - { id: 'b', status: 'success', updated_at: now - 6 * DAY }, // inside TTL - { id: 'c', status: 'success', updated_at: now - 30 * DAY }, - ); - const sweeper = new DeliveryRetentionSweeper(engine, { successTtlMs: 7 * DAY }); - const res = await sweeper.sweep(now); - expect(res.success).toBe(2); - expect(engine.rows.map((r) => r.id)).toEqual(['b']); - }); - - it('keeps pending / in_flight / failed rows regardless of age', async () => { - const engine = new FakeEngine(); - const now = Date.now(); - engine.rows.push( - { id: 'p', status: 'pending', updated_at: now - 100 * DAY }, - { id: 'i', status: 'in_flight', updated_at: now - 100 * DAY }, - { id: 'f', status: 'failed', updated_at: now - 100 * DAY }, - ); - const sweeper = new DeliveryRetentionSweeper(engine); - await sweeper.sweep(now); - expect(engine.rows).toHaveLength(3); - }); - - it('deletes old dead rows past deadTtlMs', async () => { - const engine = new FakeEngine(); - const now = Date.now(); - engine.rows.push( - { id: 'd1', status: 'dead', updated_at: now - 31 * DAY }, - { id: 'd2', status: 'dead', updated_at: now - 29 * DAY }, // inside TTL - ); - const sweeper = new DeliveryRetentionSweeper(engine, { deadTtlMs: 30 * DAY }); - const res = await sweeper.sweep(now); - expect(res.dead).toBe(1); - expect(engine.rows.map((r) => r.id)).toEqual(['d2']); - }); - - it('successTtlMs=0 disables the success sweep', async () => { - const engine = new FakeEngine(); - const now = Date.now(); - engine.rows.push({ id: 'a', status: 'success', updated_at: now - 100 * DAY }); - const sweeper = new DeliveryRetentionSweeper(engine, { successTtlMs: 0 }); - const res = await sweeper.sweep(now); - expect(res.success).toBe(0); - expect(engine.rows).toHaveLength(1); - }); - - it('deadTtlMs=0 disables the dead sweep', async () => { - const engine = new FakeEngine(); - const now = Date.now(); - engine.rows.push({ id: 'd', status: 'dead', updated_at: now - 100 * DAY }); - const sweeper = new DeliveryRetentionSweeper(engine, { deadTtlMs: 0 }); - const res = await sweeper.sweep(now); - expect(res.dead).toBe(0); - expect(engine.rows).toHaveLength(1); - }); -}); diff --git a/packages/plugins/plugin-webhooks/src/retention.ts b/packages/plugins/plugin-webhooks/src/retention.ts deleted file mode 100644 index 078d34578..000000000 --- a/packages/plugins/plugin-webhooks/src/retention.ts +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import type { IDataEngine } from '@objectstack/spec/contracts'; -import { SYS_WEBHOOK_DELIVERY } from './schema.js'; - -interface OptionalLogger { - info?(msg: string, meta?: unknown): void; - warn?(msg: string, meta?: unknown): void; - debug?(msg: string, meta?: unknown): void; -} - -export interface DeliveryRetentionOptions { - /** - * Object name backing the outbox. Defaults to `sys_webhook_delivery`. - */ - objectName?: string; - - /** - * How long to keep `success` rows. Default 7 days. Set to `0` to - * disable the success sweep (keep forever — not recommended in - * production). - */ - successTtlMs?: number; - - /** - * How long to keep `dead` rows. Default 30 days. Set to `0` to - * keep forever. - */ - deadTtlMs?: number; - - /** - * How often to run the sweep. Default 1h. - */ - sweepIntervalMs?: number; - - logger?: OptionalLogger; -} - -const DEFAULTS = { - successTtlMs: 7 * 24 * 60 * 60 * 1000, - deadTtlMs: 30 * 24 * 60 * 60 * 1000, - sweepIntervalMs: 60 * 60 * 1000, -}; - -/** - * Periodically prunes `sys_webhook_delivery` rows so the table doesn't - * grow unbounded. - * - * Without this every successful POST would leave a permanent row. At - * even moderate scale (10 events/s × 3 webhooks = 30 rows/s = ~2.6M - * rows/day) the table becomes a problem within a week. - * - * Retention defaults mirror Stripe/GitHub: - * - `success`: 7 days - * - `dead`: 30 days (kept longer for audit & manual re-delivery) - * - `pending`/`in_flight`/`failed`: never auto-pruned (they're - * either live work or signal something needs human attention) - * - * Runs on whichever node holds the sweeper interval — it doesn't need - * a cluster lock because DELETE WHERE created_at < threshold is - * idempotent; multiple nodes running concurrently is wasteful but - * safe. - */ -export class DeliveryRetentionSweeper { - private readonly objectName: string; - private readonly successTtlMs: number; - private readonly deadTtlMs: number; - private readonly sweepIntervalMs: number; - private readonly logger: OptionalLogger; - private timer: ReturnType | undefined; - private running = false; - - constructor( - private readonly engine: IDataEngine, - opts: DeliveryRetentionOptions = {}, - ) { - this.objectName = opts.objectName ?? SYS_WEBHOOK_DELIVERY; - this.successTtlMs = opts.successTtlMs ?? DEFAULTS.successTtlMs; - this.deadTtlMs = opts.deadTtlMs ?? DEFAULTS.deadTtlMs; - this.sweepIntervalMs = opts.sweepIntervalMs ?? DEFAULTS.sweepIntervalMs; - this.logger = opts.logger ?? {}; - } - - start(): void { - if (this.running) return; - this.running = true; - // First sweep deferred by one interval — let the system boot first. - this.timer = setInterval(() => { - this.sweep().catch((err) => - this.logger.warn?.('[webhook-retention] sweep failed', err), - ); - }, this.sweepIntervalMs); - this.timer.unref?.(); - } - - stop(): void { - if (!this.running) return; - this.running = false; - if (this.timer) clearInterval(this.timer); - this.timer = undefined; - } - - /** Run one sweep immediately. Returns the number of rows deleted. */ - async sweep(now: number = Date.now()): Promise<{ success: number; dead: number }> { - let successDeleted = 0; - let deadDeleted = 0; - - if (this.successTtlMs > 0) { - try { - const res = await this.engine.delete(this.objectName, { - where: { - status: 'success', - updated_at: { $lt: now - this.successTtlMs }, - }, - }); - successDeleted = res?.affected ?? 0; - } catch (err) { - this.logger.warn?.('[webhook-retention] success sweep failed', err); - } - } - - if (this.deadTtlMs > 0) { - try { - const res = await this.engine.delete(this.objectName, { - where: { - status: 'dead', - updated_at: { $lt: now - this.deadTtlMs }, - }, - }); - deadDeleted = res?.affected ?? 0; - } catch (err) { - this.logger.warn?.('[webhook-retention] dead sweep failed', err); - } - } - - if (successDeleted + deadDeleted > 0) { - this.logger.info?.('[webhook-retention] sweep complete', { - success: successDeleted, - dead: deadDeleted, - }); - } - return { success: successDeleted, dead: deadDeleted }; - } -} diff --git a/packages/plugins/plugin-webhooks/src/schema.ts b/packages/plugins/plugin-webhooks/src/schema.ts index b817719ad..ca7c0822a 100644 --- a/packages/plugins/plugin-webhooks/src/schema.ts +++ b/packages/plugins/plugin-webhooks/src/schema.ts @@ -3,24 +3,19 @@ /** * Public schema subpath: `@objectstack/plugin-webhooks/schema`. * - * Thin re-export barrel kept stable across refactors. The actual object - * definitions live in `sys-webhook.object.ts` and - * `sys-webhook-delivery.object.ts` (matching the `*.object.ts` convention - * used everywhere else in the monorepo for `sys_*` schemas). + * Thin re-export barrel kept stable across refactors. The object definition + * lives in `sys-webhook.object.ts` (matching the `*.object.ts` convention used + * everywhere else in the monorepo for `sys_*` schemas). * - * `sys_webhook` moved here from `@objectstack/platform-objects` per - * ADR-0029 (K2.a) so this plugin owns both of its objects. + * `sys_webhook` moved here from `@objectstack/platform-objects` per ADR-0029 + * (K2.a) so this plugin owns its configuration object. Delivery telemetry is no + * longer a webhook-owned object: post-ADR-0018 M3 deliveries are rows in the + * shared `sys_http_delivery` outbox owned by `@objectstack/service-messaging`. * - * Note: callers that just need the runtime should import from the - * package root (`@objectstack/plugin-webhooks`), which auto-registers - * `sys_webhook` + `sys_webhook_delivery` via the plugin manifest. This - * subpath exists for the rare case where you want the schema without - * installing the dispatcher plugin (e.g. read-only inspection from a - * different runtime). + * Note: callers that just need the runtime should import from the package root + * (`@objectstack/plugin-webhooks`), which auto-registers `sys_webhook` via the + * plugin manifest. This subpath exists for read-only inspection from a + * different runtime. */ export { SysWebhook } from './sys-webhook.object.js'; -export { - SysWebhookDelivery, - SYS_WEBHOOK_DELIVERY, -} from './sys-webhook-delivery.object.js'; diff --git a/packages/plugins/plugin-webhooks/src/sql-outbox.test.ts b/packages/plugins/plugin-webhooks/src/sql-outbox.test.ts deleted file mode 100644 index dcc3bb4b5..000000000 --- a/packages/plugins/plugin-webhooks/src/sql-outbox.test.ts +++ /dev/null @@ -1,490 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -/** - * SqlWebhookOutbox contract test. - * - * Validates that `SqlWebhookOutbox` honours the same `IWebhookOutbox` - * semantics as `MemoryWebhookOutbox`, but on top of `IDataEngine`. We use - * a hand-rolled `FakeDataEngine` instead of booting ObjectQL + a real - * driver because: - * - * 1. The interesting bug surface is the *claim race* (UPDATE ... WHERE - * status='pending' must reject losers atomically). FakeDataEngine - * models this exactly. - * 2. Faster + zero glue. - * - * Coverage: - * - enqueue dedup (by event_id + webhook_id) - * - claim → ack happy path - * - claim ignores rows in other partitions - * - claim ignores rows whose next_retry_at is in the future - * - claim reaps stale in_flight rows past claim_ttl - * - ack(failure) increments attempts and schedules retry - * - ack(dead) marks terminal - * - concurrent claim() from many "workers" never double-claims a row - */ - -import { describe, expect, it } from 'vitest'; -import type { IDataEngine } from '@objectstack/spec/contracts'; -import { SqlWebhookOutbox } from './sql-outbox.js'; -import { hashPartition } from './partition.js'; -import type { EnqueueInput } from './outbox.js'; - -// --------------------------------------------------------------------------- -// FakeDataEngine — models the subset of ObjectQL semantics SqlWebhookOutbox -// relies on. Atomic per-call: every `update` claims the JS event loop until -// it returns, mirroring how a single SQL statement holds row locks. -// --------------------------------------------------------------------------- - -interface AnyRow { - [k: string]: any; -} - -class FakeDataEngine implements IDataEngine { - readonly tables = new Map(); - - private get(table: string): AnyRow[] { - if (!this.tables.has(table)) this.tables.set(table, []); - return this.tables.get(table)!; - } - - async find(table: string, q?: any): Promise { - const rows = this.get(table).filter((r) => matchWhere(r, q?.where)); - const limit = q?.limit ?? rows.length; - return rows.slice(0, limit).map((r) => projectFields(r, q?.fields)); - } - - async findOne(table: string, q?: any): Promise { - const rows = await this.find(table, { ...q, limit: 1 }); - return rows[0] ?? null; - } - - async insert(table: string, data: any): Promise { - const arr = Array.isArray(data) ? data : [data]; - for (const row of arr) { - // Enforce the unique index that the real SQL schema declares. - if ( - this.get(table).some( - (r) => - r.event_id === row.event_id && - r.webhook_id === row.webhook_id, - ) - ) { - throw new Error('UNIQUE constraint: event_id+webhook_id'); - } - this.get(table).push({ ...row }); - } - return arr; - } - - async update(table: string, data: any, opts?: any): Promise { - const rows = this.get(table); - let n = 0; - for (const r of rows) { - if (matchWhere(r, opts?.where)) { - Object.assign(r, data); - n += 1; - if (!opts?.multi) break; - } - } - return { affected: n }; - } - - async delete(table: string, opts?: any): Promise { - const rows = this.get(table); - const keep = rows.filter((r) => !matchWhere(r, opts?.where)); - const n = rows.length - keep.length; - this.tables.set(table, keep); - return { affected: n }; - } - - async count(table: string, q?: any): Promise { - return this.get(table).filter((r) => matchWhere(r, q?.where)).length; - } - - async aggregate(): Promise { - throw new Error('not implemented for tests'); - } -} - -function projectFields(row: AnyRow, fields?: string[]): AnyRow { - if (!fields || fields.length === 0) return { ...row }; - const out: AnyRow = {}; - for (const f of fields) out[f] = row[f]; - return out; -} - -function matchWhere(row: AnyRow, where: any): boolean { - if (!where || Object.keys(where).length === 0) return true; - for (const [key, cond] of Object.entries(where)) { - if (key === '$or') { - const arr = cond as any[]; - if (!arr.some((c) => matchWhere(row, c))) return false; - continue; - } - if (key === '$and') { - const arr = cond as any[]; - if (!arr.every((c) => matchWhere(row, c))) return false; - continue; - } - if (cond === null) { - if (row[key] != null) return false; - continue; - } - if (typeof cond === 'object' && !Array.isArray(cond)) { - for (const [op, val] of Object.entries(cond as any)) { - switch (op) { - case '$lt': - if (!(row[key] != null && row[key] < (val as any))) return false; - break; - case '$lte': - if (!(row[key] != null && row[key] <= (val as any))) return false; - break; - case '$gt': - if (!(row[key] != null && row[key] > (val as any))) return false; - break; - case '$gte': - if (!(row[key] != null && row[key] >= (val as any))) return false; - break; - case '$in': - if (!(val as any[]).includes(row[key])) return false; - break; - case '$ne': - if (row[key] === val) return false; - break; - default: - throw new Error(`FakeDataEngine: unsupported op ${op}`); - } - } - continue; - } - if (row[key] !== cond) return false; - } - return true; -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -const PARTITIONS = 8; - -function newOutbox() { - const engine = new FakeDataEngine(); - const outbox = new SqlWebhookOutbox(engine, { partitionCount: PARTITIONS }); - return { engine, outbox }; -} - -function input(webhookId: string, eventId: string): EnqueueInput { - return { - webhookId, - eventId, - eventType: 'data.record.created', - url: 'https://example.test/hook', - payload: { hello: 'world' }, - }; -} - -describe('SqlWebhookOutbox', () => { - it('enqueue inserts a row with precomputed partition_key', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - - const stored = await engine.findOne('sys_webhook_delivery', { - where: { id }, - }); - expect(stored.partition_key).toBe(hashPartition('wh-1', PARTITIONS)); - expect(stored.status).toBe('pending'); - }); - - it('enqueue dedups by (event_id, webhook_id)', async () => { - const { outbox } = newOutbox(); - const a = await outbox.enqueue(input('wh-1', 'ev-1')); - const b = await outbox.enqueue(input('wh-1', 'ev-1')); - expect(a).toBe(b); - }); - - it('enqueue tolerates concurrent dup INSERTs via unique-index fallback', async () => { - const { engine, outbox } = newOutbox(); - // Pre-seed a winner row, then make the SqlOutbox think no row exists - // by inserting *after* its findOne — to simulate a real race we just - // call enqueue twice and confirm both return the same id. - const [a, b] = await Promise.all([ - outbox.enqueue(input('wh-1', 'ev-1')), - outbox.enqueue(input('wh-1', 'ev-1')), - ]); - expect(a).toBe(b); - const all = await engine.find('sys_webhook_delivery', {}); - expect(all).toHaveLength(1); - }); - - it('claim returns a row and marks it in_flight', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - - const claimed = await outbox.claim({ - nodeId: 'node-A', - limit: 10, - claimTtlMs: 60_000, - }); - expect(claimed.map((c) => c.id)).toEqual([id]); - - const stored = await engine.findOne('sys_webhook_delivery', { - where: { id }, - }); - expect(stored.status).toBe('in_flight'); - expect(stored.claimed_by).toBe('node-A'); - }); - - it('claim filters by partition', async () => { - const { outbox } = newOutbox(); - // Find two webhook ids that fall in different partitions. - const ids: string[] = []; - for (let i = 0; i < 50 && ids.length < 2; i++) { - const wh = `wh-${i}`; - const p = hashPartition(wh, PARTITIONS); - if (ids.length === 0) ids.push(wh); - else if (hashPartition(ids[0], PARTITIONS) !== p) ids.push(wh); - } - const [whP0, whP1] = ids; - const p0 = hashPartition(whP0, PARTITIONS); - const p1 = hashPartition(whP1, PARTITIONS); - - await outbox.enqueue(input(whP0, 'ev-a')); - await outbox.enqueue(input(whP1, 'ev-b')); - - const claimed = await outbox.claim({ - nodeId: 'node-A', - limit: 10, - claimTtlMs: 60_000, - partition: { index: p0, count: PARTITIONS }, - }); - expect(claimed).toHaveLength(1); - expect(claimed[0].webhookId).toBe(whP0); - expect(p0).not.toBe(p1); - }); - - it('claim skips rows whose next_retry_at is in the future', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - // Manually set a future retry. - await engine.update( - 'sys_webhook_delivery', - { next_retry_at: Date.now() + 60_000 }, - { where: { id } }, - ); - - const claimed = await outbox.claim({ - nodeId: 'node-A', - limit: 10, - claimTtlMs: 60_000, - }); - expect(claimed).toHaveLength(0); - }); - - it('claim reaps stale in_flight rows past claim_ttl', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - // Manually pretend a dead worker claimed it 5 minutes ago. - await engine.update( - 'sys_webhook_delivery', - { - status: 'in_flight', - claimed_by: 'dead-node', - claimed_at: Date.now() - 300_000, - }, - { where: { id } }, - ); - - const claimed = await outbox.claim({ - nodeId: 'node-A', - limit: 10, - claimTtlMs: 60_000, - }); - expect(claimed.map((c) => c.id)).toEqual([id]); - expect(claimed[0].claimedBy).toBe('node-A'); - }); - - it('ack(success) marks success and increments attempts', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { success: true, httpStatus: 200, durationMs: 12 }); - - const stored = await engine.findOne('sys_webhook_delivery', { - where: { id }, - }); - expect(stored.status).toBe('success'); - expect(stored.attempts).toBe(1); - expect(stored.claimed_by).toBeNull(); - }); - - it('ack(failure) schedules retry with status=pending', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - const retryAt = Date.now() + 5_000; - await outbox.ack(id, { - success: false, - httpStatus: 503, - error: 'upstream', - nextRetryAt: retryAt, - durationMs: 15, - }); - - const stored = await engine.findOne('sys_webhook_delivery', { - where: { id }, - }); - expect(stored.status).toBe('pending'); - expect(stored.attempts).toBe(1); - expect(stored.next_retry_at).toBe(retryAt); - expect(stored.error).toBe('upstream'); - }); - - it('ack(dead) marks terminal', async () => { - const { engine, outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { - success: false, - httpStatus: 400, - error: 'bad request', - dead: true, - durationMs: 5, - }); - - const stored = await engine.findOne('sys_webhook_delivery', { - where: { id }, - }); - expect(stored.status).toBe('dead'); - expect(stored.next_retry_at).toBeNull(); - }); - - it('concurrent claim() never double-claims a row', async () => { - // 200 rows, 10 "workers" all racing on the same partition. Each row - // must be claimed by exactly one worker. - const { engine, outbox } = newOutbox(); - const target = hashPartition('wh-fixed', PARTITIONS); - for (let i = 0; i < 200; i++) { - await outbox.enqueue(input('wh-fixed', `ev-${i}`)); - } - - const workers = Array.from({ length: 10 }, (_, i) => - outbox.claim({ - nodeId: `worker-${i}`, - limit: 1000, - claimTtlMs: 60_000, - partition: { index: target, count: PARTITIONS }, - }), - ); - const results = await Promise.all(workers); - const allClaimed = results.flat(); - - // Total rows claimed equals 200 (no row missed) - expect(allClaimed.length).toBe(200); - // Each id appears exactly once across all workers - const ids = new Set(allClaimed.map((r) => r.id)); - expect(ids.size).toBe(200); - - // Every persisted row is now in_flight with claimed_by set - const stored = await engine.find('sys_webhook_delivery', {}); - for (const r of stored) { - expect(r.status).toBe('in_flight'); - expect(r.claimed_by).toMatch(/^worker-\d$/); - } - }); - - it('list filters by status', async () => { - const { outbox } = newOutbox(); - const id1 = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.enqueue(input('wh-2', 'ev-2')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id1, { success: true, httpStatus: 200, durationMs: 1 }); - - const success = await outbox.list({ status: 'success' }); - expect(success.map((r) => r.id)).toEqual([id1]); - - const inFlight = await outbox.list({ status: 'in_flight' }); - expect(inFlight).toHaveLength(1); - }); - - describe('redeliver', () => { - it('resets a success row back to pending with attempts=0', async () => { - const { outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { success: true, httpStatus: 200, durationMs: 5 }); - - const row = await outbox.redeliver(id); - expect(row.status).toBe('pending'); - expect(row.attempts).toBe(0); - expect(row.claimedBy).toBeUndefined(); - expect(row.claimedAt).toBeUndefined(); - expect(row.nextRetryAt).toBeUndefined(); - expect(row.error).toBeUndefined(); - expect(row.responseCode).toBeUndefined(); - expect(row.responseBody).toBeUndefined(); - // Original immutable fields preserved - expect(row.url).toBe('https://example.test/hook'); - expect(row.payload).toEqual({ hello: 'world' }); - }); - - it('resets a dead row back to pending and clears retry backoff', async () => { - const { outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { - success: false, - error: 'final', - dead: true, - durationMs: 5, - }); - - const row = await outbox.redeliver(id); - expect(row.status).toBe('pending'); - expect(row.attempts).toBe(0); - expect(row.error).toBeUndefined(); - expect(row.nextRetryAt).toBeUndefined(); - }); - - it('throws not_found when row does not exist', async () => { - const { outbox } = newOutbox(); - await expect(outbox.redeliver('missing')).rejects.toMatchObject({ - code: 'not_found', - }); - }); - - it('throws not_eligible for pending rows', async () => { - const { outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await expect(outbox.redeliver(id)).rejects.toMatchObject({ - code: 'not_eligible', - }); - }); - - it('throws not_eligible for in_flight rows', async () => { - const { outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await expect(outbox.redeliver(id)).rejects.toMatchObject({ - code: 'not_eligible', - }); - }); - - it('redelivered row is immediately claimable again', async () => { - const { outbox } = newOutbox(); - const id = await outbox.enqueue(input('wh-1', 'ev-1')); - await outbox.claim({ nodeId: 'A', limit: 10, claimTtlMs: 60_000 }); - await outbox.ack(id, { success: true, httpStatus: 200, durationMs: 1 }); - - await outbox.redeliver(id); - - const claimed = await outbox.claim({ - nodeId: 'B', - limit: 10, - claimTtlMs: 60_000, - }); - expect(claimed.map((r) => r.id)).toContain(id); - }); - }); -}); diff --git a/packages/plugins/plugin-webhooks/src/sql-outbox.ts b/packages/plugins/plugin-webhooks/src/sql-outbox.ts deleted file mode 100644 index 72f87c6dc..000000000 --- a/packages/plugins/plugin-webhooks/src/sql-outbox.ts +++ /dev/null @@ -1,343 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import { randomUUID } from 'node:crypto'; -import type { IDataEngine } from '@objectstack/spec/contracts'; -import type { - AckResult, - ClaimOptions, - DeliveryStatus, - EnqueueInput, - IWebhookOutbox, - WebhookDelivery, -} from './outbox.js'; -import { RedeliverError } from './outbox.js'; -import { hashPartition } from './partition.js'; -import { SYS_WEBHOOK_DELIVERY } from './schema.js'; - -export interface SqlWebhookOutboxOptions { - /** - * Total partition count — MUST match the dispatcher's `partitionCount`. - * Used at enqueue time to precompute `partition_key`. - */ - partitionCount: number; - /** - * Object name to read/write. Defaults to `sys_webhook_delivery`. Override - * only if you've registered the schema under a different name. - */ - objectName?: string; -} - -interface DeliveryRow { - id: string; - webhook_id: string; - event_id: string; - event_type: string; - url: string; - method?: string | null; - headers_json?: string | null; - secret?: string | null; - timeout_ms?: number | null; - payload_json: string; - partition_key: number; - status: DeliveryStatus; - attempts: number; - claimed_by?: string | null; - claimed_at?: number | null; - next_retry_at?: number | null; - last_attempted_at?: number | null; - response_code?: number | null; - response_body?: string | null; - error?: string | null; - created_at: number; - updated_at: number; -} - -/** - * Durable `IWebhookOutbox` backed by ObjectQL — the production storage - * impl. Works against any registered driver (SQL, Turso, Mongo, in-memory) - * because everything goes through the driver-agnostic `IDataEngine` API. - * - * **Why no `FOR UPDATE SKIP LOCKED`?** ObjectQL is driver-agnostic — that - * SQL feature is Postgres-only. We get equivalent safety from two layers: - * - * 1. `cluster.lock` held per partition by the dispatcher (the primary - * mutex). One node owns one partition at a time → no two claimers. - * 2. Atomic `UPDATE WHERE status='pending'` (the backup). Even if two - * claimers slip through (e.g. admin reschedule + dispatcher), only - * the first UPDATE matches each row. - * - * **Why precompute `partition_key` on enqueue?** ObjectQL has no - * cross-driver `hash()` function in WHERE clauses. Storing the partition - * as a column makes the claim query a plain indexed lookup. - * - * **Dedup race**: SELECT-then-INSERT has a tiny window where two - * concurrent producers both miss the SELECT and both INSERT. The unique - * index `(event_id, webhook_id)` on the table catches it — the second - * INSERT errors, the producer ignores it. Receivers MUST be idempotent - * on the `X-Objectstack-Delivery` header anyway. - */ -export class SqlWebhookOutbox implements IWebhookOutbox { - private readonly objectName: string; - private readonly partitionCount: number; - - constructor( - private readonly engine: IDataEngine, - opts: SqlWebhookOutboxOptions, - ) { - if (opts.partitionCount <= 0) { - throw new Error('SqlWebhookOutbox: partitionCount must be > 0'); - } - this.objectName = opts.objectName ?? SYS_WEBHOOK_DELIVERY; - this.partitionCount = opts.partitionCount; - } - - async enqueue(input: EnqueueInput): Promise { - // Cheap pre-check to absorb most duplicates without hitting the - // unique-index error path. Race window with the INSERT below is - // intentional and documented. - const existing = await this.engine.findOne(this.objectName, { - where: { event_id: input.eventId, webhook_id: input.webhookId }, - fields: ['id'], - }); - if (existing?.id) return existing.id as string; - - const id = randomUUID(); - const now = Date.now(); - const row: Omit = { - id, - webhook_id: input.webhookId, - event_id: input.eventId, - event_type: input.eventType, - url: input.url, - method: input.method ?? 'POST', - headers_json: input.headers ? JSON.stringify(input.headers) : undefined, - secret: input.secret, - timeout_ms: input.timeoutMs, - payload_json: JSON.stringify(input.payload ?? null), - partition_key: hashPartition(input.webhookId, this.partitionCount), - status: 'pending', - attempts: 0, - created_at: now, - updated_at: now, - }; - try { - await this.engine.insert(this.objectName, row); - return id; - } catch (err) { - // Unique-index collision (dedup race) → look up the winner and - // return its id. Any other error propagates. - const winner = await this.engine.findOne(this.objectName, { - where: { event_id: input.eventId, webhook_id: input.webhookId }, - fields: ['id'], - }); - if (winner?.id) return winner.id as string; - throw err; - } - } - - async claim(opts: ClaimOptions): Promise { - const now = opts.now ?? Date.now(); - - // 1. Reap stale in_flight rows — visibility-timeout recovery. - await this.engine.update( - this.objectName, - { status: 'pending', claimed_by: null, claimed_at: null, updated_at: now }, - { - where: { - status: 'in_flight', - claimed_at: { $lt: now - opts.claimTtlMs }, - }, - multi: true, - }, - ); - - // 2. Pick candidate ids. - const partitionFilter = opts.partition - ? { partition_key: opts.partition.index } - : {}; - const candidates = await this.engine.find(this.objectName, { - where: { - status: 'pending', - ...partitionFilter, - // next_retry_at <= now OR null - $or: [ - { next_retry_at: null }, - { next_retry_at: { $lte: now } }, - ], - }, - fields: ['id'], - // No orderBy for portability — drivers handle the natural insert order. - limit: opts.limit, - }); - if (candidates.length === 0) return []; - - const ids = (candidates as Array<{ id: string }>).map((c) => c.id); - - // 3. Atomic claim. WHERE status='pending' rejects any rows another - // worker swept up between steps 2 and 3. - await this.engine.update( - this.objectName, - { - status: 'in_flight', - claimed_by: opts.nodeId, - claimed_at: now, - updated_at: now, - }, - { - where: { id: { $in: ids }, status: 'pending' }, - multi: true, - }, - ); - - // 4. Read back the rows we actually own. - const claimed = (await this.engine.find(this.objectName, { - where: { - id: { $in: ids }, - claimed_by: opts.nodeId, - claimed_at: now, - status: 'in_flight', - }, - })) as DeliveryRow[]; - - return claimed.map((r) => this.toDelivery(r)); - } - - async ack(id: string, result: AckResult): Promise { - // ObjectQL has no atomic $inc across drivers, so read-then-write. - // Safe enough: ack is single-writer per row (only the claimer acks). - const current = (await this.engine.findOne(this.objectName, { - where: { id }, - fields: ['attempts'], - })) as { attempts?: number } | null; - if (!current) return; - - const now = Date.now(); - let status: DeliveryStatus; - let nextRetryAt: number | null; - let error: string | null; - - if (result.success) { - status = 'success'; - nextRetryAt = null; - error = null; - } else if (result.dead) { - status = 'dead'; - nextRetryAt = null; - error = result.error ?? null; - } else { - status = 'pending'; - nextRetryAt = result.nextRetryAt ?? null; - error = result.error ?? null; - } - - await this.engine.update( - this.objectName, - { - status, - attempts: (current.attempts ?? 0) + 1, - last_attempted_at: now, - claimed_by: null, - claimed_at: null, - response_code: result.httpStatus ?? null, - response_body: result.responseBody ?? null, - next_retry_at: nextRetryAt, - error, - updated_at: now, - }, - { where: { id }, multi: false }, - ); - } - - async list(filter?: { status?: DeliveryStatus }): Promise { - const rows = (await this.engine.find(this.objectName, { - where: filter?.status ? { status: filter.status } : {}, - })) as DeliveryRow[]; - return rows.map((r) => this.toDelivery(r)); - } - - async redeliver(id: string): Promise { - const current = (await this.engine.findOne(this.objectName, { - where: { id }, - })) as DeliveryRow | null; - if (!current) { - throw new RedeliverError( - `Delivery row '${id}' not found`, - 'not_found', - ); - } - if ( - current.status !== 'success' && - current.status !== 'failed' && - current.status !== 'dead' - ) { - throw new RedeliverError( - `Delivery row '${id}' is '${current.status}', expected one of: success, failed, dead`, - 'not_eligible', - ); - } - const now = Date.now(); - // Guarded UPDATE — re-check status server-side so two concurrent - // redeliver calls cannot both flip the row, and so a dispatcher - // tick that flipped the row to in_flight between our SELECT and - // UPDATE cannot be clobbered. - await this.engine.update( - this.objectName, - { - status: 'pending', - attempts: 0, - claimed_by: null, - claimed_at: null, - next_retry_at: null, - last_attempted_at: null, - response_code: null, - response_body: null, - error: null, - updated_at: now, - }, - { - where: { - id, - status: { $in: ['success', 'failed', 'dead'] }, - }, - multi: false, - }, - ); - const after = (await this.engine.findOne(this.objectName, { - where: { id }, - })) as DeliveryRow | null; - if (!after || after.status !== 'pending') { - // Lost the race — another writer flipped the row. - throw new RedeliverError( - `Delivery row '${id}' state changed during redeliver`, - 'not_eligible', - ); - } - return this.toDelivery(after); - } - - private toDelivery(r: DeliveryRow): WebhookDelivery { - return { - id: r.id, - webhookId: r.webhook_id, - eventId: r.event_id, - eventType: r.event_type, - url: r.url, - method: r.method ?? undefined, - headers: r.headers_json ? JSON.parse(r.headers_json) : undefined, - secret: r.secret ?? undefined, - timeoutMs: r.timeout_ms ?? undefined, - payload: JSON.parse(r.payload_json), - status: r.status, - attempts: r.attempts, - claimedBy: r.claimed_by ?? undefined, - claimedAt: r.claimed_at ?? undefined, - nextRetryAt: r.next_retry_at ?? undefined, - lastAttemptedAt: r.last_attempted_at ?? undefined, - responseCode: r.response_code ?? undefined, - responseBody: r.response_body ?? undefined, - error: r.error ?? undefined, - createdAt: r.created_at, - updatedAt: r.updated_at, - }; - } -} diff --git a/packages/plugins/plugin-webhooks/src/sys-webhook-delivery.object.ts b/packages/plugins/plugin-webhooks/src/sys-webhook-delivery.object.ts deleted file mode 100644 index fa34fa307..000000000 --- a/packages/plugins/plugin-webhooks/src/sys-webhook-delivery.object.ts +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. - -import { Field, ObjectSchema } from '@objectstack/spec/data'; - -/** - * sys_webhook_delivery — Durable outbox row for one HTTP attempt. - * - * Schema is owned by `@objectstack/plugin-webhooks`. Add it to your stack - * via: - * - * import { SysWebhookDelivery } from '@objectstack/plugin-webhooks/schema'; - * defineStack({ objects: [SysWebhookDelivery, ...], plugins: [...] }); - * - * Designed for the SqlWebhookOutbox claim algorithm: - * - * 1. Producers INSERT pending rows (dedup'd by (event_id, webhook_id)). - * 2. The dispatcher's per-partition lock-holder runs: - * SELECT id WHERE status='pending' AND partition_key=? AND (next_retry_at <= now OR null) - * UPDATE SET status='in_flight' WHERE id IN (...) AND status='pending' ← atomic claim - * POST to target URL - * UPDATE SET status=success/pending/dead, attempts=attempts+1, ... - * - * `partition_key` is precomputed on enqueue (hash(webhook_id) mod N) so the - * dispatcher can filter cheaply without DB-side hash functions. - * - * Indexes are tuned for the hot path: `(status, partition_key, next_retry_at)` - * is the claim query; `(event_id, webhook_id)` is the dedup uniqueness. - * - * @namespace sys - */ -export const SysWebhookDelivery = ObjectSchema.create({ - name: 'sys_webhook_delivery', - label: 'Webhook Delivery', - pluralLabel: 'Webhook Deliveries', - icon: 'package', - isSystem: true, - managedBy: 'config', - userActions: { create: false, edit: false, delete: false, import: false }, - description: - 'Durable outbox row for one webhook attempt. Managed by @objectstack/plugin-webhooks; do not write directly.', - displayNameField: 'id', - titleFormat: '{event_type} → {url}', - compactLayout: ['event_type', 'url', 'status', 'attempts', 'next_retry_at'], - - actions: [ - { - name: 'redeliver', - label: 'Redeliver', - icon: 'refresh-cw', - variant: 'secondary', - locations: ['list_item', 'record_header'], - type: 'api', - target: '/api/v1/webhooks/redeliver', - method: 'POST', - recordIdParam: 'deliveryId', - confirmText: - 'Replay this delivery? The receiver will get the original payload again — they must be idempotent on the X-Objectstack-Delivery header.', - successMessage: 'Queued for redelivery', - refreshAfter: true, - // Only terminal rows are safe to replay. Pending / in_flight rows - // are either already queued or actively being sent — replaying - // would double-deliver. - disabled: "!(status in ['success', 'failed', 'dead'])", - }, - ], - - listViews: { - recent: { - type: 'grid', - name: 'recent', - label: 'Recent', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['event_type', 'url', 'status', 'attempts', 'response_code', 'updated_at'], - sort: [{ field: 'updated_at', order: 'desc' }], - pagination: { pageSize: 50 }, - }, - failures: { - type: 'grid', - name: 'failures', - label: 'Failures', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['event_type', 'url', 'status', 'attempts', 'response_code', 'error', 'updated_at'], - filter: [{ field: 'status', operator: 'in', value: ['failed', 'dead'] }], - sort: [{ field: 'updated_at', order: 'desc' }], - pagination: { pageSize: 50 }, - }, - in_flight: { - type: 'grid', - name: 'in_flight', - label: 'In Flight', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['event_type', 'url', 'attempts', 'claimed_by', 'claimed_at'], - filter: [{ field: 'status', operator: 'equals', value: 'in_flight' }], - sort: [{ field: 'claimed_at', order: 'desc' }], - pagination: { pageSize: 50 }, - }, - pending: { - type: 'grid', - name: 'pending', - label: 'Pending', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['event_type', 'url', 'attempts', 'next_retry_at', 'updated_at'], - filter: [{ field: 'status', operator: 'equals', value: 'pending' }], - sort: [{ field: 'next_retry_at', order: 'asc' }], - pagination: { pageSize: 50 }, - }, - by_status: { - type: 'grid', - name: 'by_status', - label: 'By Status', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['status', 'event_type', 'url', 'attempts', 'updated_at'], - sort: [{ field: 'status', order: 'asc' }, { field: 'updated_at', order: 'desc' }], - grouping: { fields: [{ field: 'status', order: 'asc', collapsed: false }] }, - pagination: { pageSize: 100 }, - }, - by_webhook: { - type: 'grid', - name: 'by_webhook', - label: 'By Webhook', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['webhook_id', 'event_type', 'status', 'attempts', 'updated_at'], - sort: [{ field: 'webhook_id', order: 'asc' }, { field: 'updated_at', order: 'desc' }], - grouping: { fields: [{ field: 'webhook_id', order: 'asc', collapsed: true }] }, - pagination: { pageSize: 100 }, - }, - all_deliveries: { - type: 'grid', - name: 'all_deliveries', - label: 'All', - data: { provider: 'object', object: 'sys_webhook_delivery' }, - columns: ['event_type', 'url', 'status', 'attempts', 'response_code', 'updated_at'], - sort: [{ field: 'updated_at', order: 'desc' }], - pagination: { pageSize: 100 }, - }, - }, - - fields: { - id: Field.text({ - label: 'Delivery ID', - required: true, - maxLength: 64, - description: 'UUID — also doubles as the receiver-side idempotency key', - }), - - webhook_id: Field.text({ - label: 'Webhook ID', - required: true, - maxLength: 64, - description: 'FK to sys_webhook.id (loosely coupled — denormalised URL/secret on row)', - }), - - event_id: Field.text({ - label: 'Event ID', - required: true, - maxLength: 128, - description: 'Source event id; UNIQUE(event_id, webhook_id) for dedup', - }), - - event_type: Field.text({ - label: 'Event Type', - required: true, - maxLength: 128, - description: 'e.g. data.record.created', - }), - - url: Field.text({ - label: 'Target URL', - required: true, - maxLength: 2048, - description: 'Snapshotted at enqueue so config edits do not rewrite live rows', - }), - - method: Field.text({ label: 'Method', required: false, maxLength: 10 }), - headers_json: Field.textarea({ label: 'Headers JSON', required: false }), - secret: Field.text({ label: 'HMAC Secret', required: false, maxLength: 256 }), - timeout_ms: Field.number({ label: 'Timeout (ms)', required: false }), - payload_json: Field.textarea({ label: 'Payload JSON', required: true }), - - partition_key: Field.number({ - label: 'Partition', - required: true, - description: 'hash(webhook_id) mod partitionCount — precomputed for cheap WHERE', - }), - - status: Field.text({ - label: 'Status', - required: true, - defaultValue: 'pending', - maxLength: 16, - description: 'pending | in_flight | success | failed | dead', - }), - - attempts: Field.number({ - label: 'Attempts', - required: true, - defaultValue: 0, - description: 'Number of POST attempts made so far', - }), - - claimed_by: Field.text({ label: 'Claimed By', required: false, maxLength: 128 }), - claimed_at: Field.number({ label: 'Claimed At (ms)', required: false }), - next_retry_at: Field.number({ label: 'Next Retry At (ms)', required: false }), - last_attempted_at: Field.number({ label: 'Last Attempted At (ms)', required: false }), - response_code: Field.number({ label: 'HTTP Status', required: false }), - response_body: Field.textarea({ label: 'Response Body (capped)', required: false }), - error: Field.textarea({ label: 'Error', required: false }), - - created_at: Field.number({ label: 'Created At (ms)', required: true }), - updated_at: Field.number({ label: 'Updated At (ms)', required: true }), - }, - - indexes: [ - { fields: ['event_id', 'webhook_id'], unique: true }, - // Hot path: claim query - { fields: ['status', 'partition_key', 'next_retry_at'] }, - // Reaper: scan stale in_flight rows by claimed_at - { fields: ['status', 'claimed_at'] }, - { fields: ['webhook_id'] }, - ], -}); - -/** Canonical object name — exported so SqlWebhookOutbox callers can override if needed. */ -export const SYS_WEBHOOK_DELIVERY = 'sys_webhook_delivery' as const; diff --git a/packages/plugins/plugin-webhooks/src/webhook-outbox-plugin.ts b/packages/plugins/plugin-webhooks/src/webhook-outbox-plugin.ts index 4a710a59e..ff5e04d30 100644 --- a/packages/plugins/plugin-webhooks/src/webhook-outbox-plugin.ts +++ b/packages/plugins/plugin-webhooks/src/webhook-outbox-plugin.ts @@ -1,129 +1,74 @@ // Copyright (c) 2026 ObjectStack. Licensed under the Apache-2.0 license. import type { Plugin, PluginContext } from '@objectstack/core'; -import { readEnvWithDeprecation } from '@objectstack/types'; -import type { - IClusterService, - IDataEngine, - IRealtimeService, -} from '@objectstack/spec/contracts'; +import type { IDataEngine, IRealtimeService } from '@objectstack/spec/contracts'; +import type { EnqueueHttpInput } from '@objectstack/service-messaging'; import { AutoEnqueuer, type AutoEnqueuerOptions } from './auto-enqueuer.js'; -import { WebhookDispatcher, type DispatcherOptions } from './dispatcher.js'; -import { MemoryWebhookOutbox } from './memory-outbox.js'; -import type { IWebhookOutbox } from './outbox.js'; -import { - DeliveryRetentionSweeper, - type DeliveryRetentionOptions, -} from './retention.js'; -import { SqlWebhookOutbox } from './sql-outbox.js'; import { SysWebhook } from './sys-webhook.object.js'; -import { SysWebhookDelivery } from './sys-webhook-delivery.object.js'; -export interface WebhookOutboxPluginOptions - extends Partial> { - /** - * Override the outbox backend. If omitted a fresh `MemoryWebhookOutbox` - * is used — fine for local development, **not for production**: each - * node will see only its own rows. - * - * Pass a factory if you need the kernel-resolved `IDataEngine`: - * - * ```ts - * outbox: (ctx) => new SqlWebhookOutbox( - * ctx.getService('objectql'), { partitionCount: 8 }, - * ), - * ``` - */ - outbox?: IWebhookOutbox | ((ctx: PluginContext) => IWebhookOutbox); - - /** - * Stable node id. If omitted, uses `process.env.OS_NODE_ID` - * (legacy `OBJECTSTACK_NODE_ID` still honoured with deprecation warning) - * or a random UUID generated at plugin init. - */ - nodeId?: string; - - /** - * If `false`, the plugin registers the outbox/dispatcher services but - * does NOT auto-start the loop — useful for tests that want to step - * the dispatcher manually via `dispatcher.tick()`. - * - * Default: true. - */ - autoStart?: boolean; +/** + * Structural view of `@objectstack/service-messaging`'s HTTP-outbox surface + * (ADR-0018 M3) — declared locally so this plugin doesn't take a hard runtime + * import on the service. Webhook deliveries are enqueued onto the shared + * `sys_http_delivery` outbox and drained by the messaging `HttpDispatcher`. + */ +interface MessagingHttpSurface { + isHttpDeliveryReady(): boolean; + enqueueHttp(input: EnqueueHttpInput): Promise; + redeliverHttp(id: string): Promise<{ id: string; status: string }>; +} +export interface WebhookOutboxPluginOptions { /** - * Auto-enqueue config. When enabled (default `true` if the realtime - * + data engine services are available), the plugin subscribes to - * `data.record.*` events emitted by the engine and automatically - * enqueues a delivery row for every matching `sys_webhook` row. + * Auto-enqueue config. When enabled (default `true` if the realtime + data + * engine services are available), the plugin subscribes to `data.record.*` + * events and enqueues a delivery onto the shared messaging HTTP outbox for + * every matching `sys_webhook` row. * - * Set `false` to disable and only use the imperative - * `outbox.enqueue()` API. + * Set `false` to disable and enqueue webhooks imperatively elsewhere. */ autoEnqueue?: boolean | AutoEnqueuerOptions; - - /** - * Retention sweep config. When enabled (default `true` if a SQL - * outbox is in use), a periodic timer prunes old `success` and - * `dead` rows from `sys_webhook_delivery`. - * - * Set `false` to disable (e.g. when using `MemoryWebhookOutbox`). - */ - retention?: boolean | DeliveryRetentionOptions; } /** - * Wires a persistent, cluster-aware webhook outbox into the kernel. + * Wires webhook fan-out on top of the shared outbound-HTTP delivery substrate + * (ADR-0018 M3). * - * Registered services: - * - `webhook.outbox` → `IWebhookOutbox` (enqueue / claim / ack / list) - * - `webhook.dispatcher` → `WebhookDispatcher` (manual `tick()` if needed) - * - `webhook.autoEnqueuer` → `AutoEnqueuer` when auto-enqueue is on - * - `webhook.retention` → `DeliveryRetentionSweeper` when retention is on + * Webhooks are no longer their own delivery engine: the durable outbox, the + * cluster-coordinated dispatcher, the retry/backoff/dead-letter schedule, and + * the retention sweep all live in `@objectstack/service-messaging` + * (`sys_http_delivery` + `HttpDispatcher`). This plugin owns only the + * webhook-specific concerns: + * - the `sys_webhook` configuration object, + * - the {@link AutoEnqueuer} that turns `data.record.*` events into outbox + * rows (`source: 'webhook'`), and + * - the redeliver admin endpoint. * - * End-to-end flow once auto-enqueue is enabled: + * End-to-end flow: * * engine.insert('contact', {...}) * → engine publishes data.record.created via IRealtimeService * → AutoEnqueuer matches active sys_webhook rows in O(1) - * → outbox.enqueue() runs fire-and-forget (not on the write path) - * → dispatcher claims and POSTs (cluster-coordinated) + * → messaging.enqueueHttp() runs fire-and-forget (off the write path) + * → messaging HttpDispatcher claims and POSTs (cluster-coordinated, retried) * - * **Cluster requirement** — this plugin depends on the cluster service - * (`ClusterServicePlugin`). With the default `memory` driver the - * dispatcher works correctly inside a single process; with a real driver - * (`@objectstack/service-cluster-redis`) it correctly coordinates work - * across nodes. + * **Requires** `MessagingServicePlugin` (`@objectstack/service-messaging`), + * which is a foundational, always-on capability. */ export class WebhookOutboxPlugin implements Plugin { name = 'com.objectstack.plugin-webhook-outbox'; - version = '1.1.0'; + version = '2.0.0'; type = 'standard' as const; - dependencies = ['com.objectstack.service.cluster']; + dependencies = ['com.objectstack.service.messaging']; - private dispatcher: WebhookDispatcher | undefined; private autoEnqueuer: AutoEnqueuer | undefined; - private retention: DeliveryRetentionSweeper | undefined; - private outboxInstance: IWebhookOutbox | undefined; constructor(private readonly options: WebhookOutboxPluginOptions = {}) {} async init(ctx: PluginContext): Promise { - const cluster = ctx.getService('cluster'); - if (!cluster) { - throw new Error( - 'WebhookOutboxPlugin: required service "cluster" not found — register ClusterServicePlugin first', - ); - } - - // Register the schemas this plugin owns at runtime (ADR-0029 K2.a). - // Both `sys_webhook` (config) and `sys_webhook_delivery` (telemetry) - // are now defined and owned here — the webhook plugin ships its data - // model and behavior as one unit instead of importing `sys_webhook` - // from the @objectstack/platform-objects monolith. Registering them - // here means a stack just needs `plugins: [new WebhookOutboxPlugin(...)]` - // and both objects auto-appear in REST/Studio/Setup nav. + // Register the webhook config object (ADR-0029 K2.a). The delivery + // telemetry now lives in messaging's `sys_http_delivery`, so the nav's + // "Deliveries" entry points there (filtered to source=webhook in views). const manifest = ctx.getService<{ register(m: any): void }>('manifest'); if (manifest && typeof manifest.register === 'function') { manifest.register({ @@ -132,14 +77,9 @@ export class WebhookOutboxPlugin implements Plugin { version: this.version, type: 'plugin', scope: 'system', - name: 'Webhook Outbox Schemas', - description: - 'Registers sys_webhook (configuration) and sys_webhook_delivery (durable outbox telemetry).', - objects: [SysWebhook, SysWebhookDelivery], - // ADR-0029 D7 — contribute the Webhooks entries into the - // Setup app's `group_integrations` slot. The plugin owns these - // objects (K2.a), so it ships their menu too; when the plugin - // isn't installed the slot stays empty. + name: 'Webhook Schemas', + description: 'Registers sys_webhook (configuration). Deliveries use messaging\'s sys_http_delivery outbox.', + objects: [SysWebhook], navigationContributions: [ { app: 'setup', @@ -147,19 +87,18 @@ export class WebhookOutboxPlugin implements Plugin { priority: 100, items: [ { id: 'nav_webhooks', type: 'object', label: 'Webhooks', objectName: 'sys_webhook', icon: 'webhook', requiresObject: 'sys_webhook' }, - { id: 'nav_webhook_deliveries', type: 'object', label: 'Webhook Deliveries', objectName: 'sys_webhook_delivery', icon: 'send', requiresObject: 'sys_webhook_delivery' }, + { id: 'nav_http_deliveries', type: 'object', label: 'HTTP Deliveries', objectName: 'sys_http_delivery', icon: 'send', requiresObject: 'sys_http_delivery' }, ], }, ], }); } else { ctx.logger.warn?.( - '[webhook-outbox] manifest service unavailable — sys_webhook / sys_webhook_delivery will NOT appear in REST or Studio nav. Register MetadataService before WebhookOutboxPlugin.', + '[webhook-outbox] manifest service unavailable — sys_webhook will NOT appear in REST or Studio nav. Register MetadataService before WebhookOutboxPlugin.', ); } - // ADR-0029 D8 — contribute this plugin's object translations to the - // i18n service on kernel:ready (the i18n plugin may register later). + // ADR-0029 D8 — contribute object translations once i18n is up. if (typeof (ctx as any).hook === 'function') { (ctx as any).hook('kernel:ready', async () => { try { @@ -174,117 +113,27 @@ export class WebhookOutboxPlugin implements Plugin { }); } - const outbox = this.resolveOutbox(ctx); - this.outboxInstance = outbox; - const nodeId = - this.options.nodeId ?? - readEnvWithDeprecation('OS_NODE_ID', 'OBJECTSTACK_NODE_ID') ?? - `node-${Math.random().toString(36).slice(2, 10)}`; - - const dispatcher = new WebhookDispatcher({ - nodeId, - cluster, - outbox, - partitionCount: this.options.partitionCount, - batchSize: this.options.batchSize, - intervalMs: this.options.intervalMs, - lockTtlMs: this.options.lockTtlMs, - claimTtlMs: this.options.claimTtlMs, - fetchImpl: this.options.fetchImpl, - onAttempt: this.options.onAttempt, - rng: this.options.rng, - logger: ctx.logger, - }); - this.dispatcher = dispatcher; - - ctx.registerService('webhook.outbox', outbox); - ctx.registerService('webhook.dispatcher', dispatcher); - - if (this.options.autoStart !== false) { - dispatcher.start(); - } - - // Loud warning when running with the in-memory outbox in production — - // it loses data on restart and never shares rows across nodes. With - // the auto-pick logic above this only fires when no IDataEngine is - // available, but flag it loudly anyway. - const usingMemoryOutbox = outbox instanceof MemoryWebhookOutbox; - if (usingMemoryOutbox && process.env.NODE_ENV === 'production') { - ctx.logger.warn?.( - '[webhook-outbox] MemoryWebhookOutbox in production — webhook deliveries WILL be lost on process exit. Ensure ObjectQL is registered before WebhookOutboxPlugin so the SQL outbox can auto-select.', - ); - } - - // Auto-enqueue + retention need the kernel to be fully ready - // before ObjectQL / Realtime services are resolvable. const autoEnqueueOpt = this.options.autoEnqueue ?? true; - const retentionOpt = this.options.retention ?? true; - const needsReadyHook = autoEnqueueOpt !== false || retentionOpt !== false; - if (needsReadyHook && typeof (ctx as any).hook === 'function') { + if (typeof (ctx as any).hook === 'function') { (ctx as any).hook('kernel:ready', async () => { await this.bootAutoEnqueue(ctx, autoEnqueueOpt); - this.bootRetention(ctx, retentionOpt); - }); - } - - // Admin REST endpoint — POST /api/v1/webhooks/redeliver { deliveryId }. - // Wired in `kernel:ready` so the auth + http services are guaranteed - // resolvable. Gated on a session cookie so anonymous callers cannot - // replay deliveries; finer-grained RBAC (e.g. "only admins") can be - // layered on later — for now any signed-in user with access to the - // Setup app can redeliver. The action is also `disabled`-gated by - // status on the Studio side so the button only lights up on - // success / failed / dead rows. - if (typeof (ctx as any).hook === 'function') { - (ctx as any).hook('kernel:ready', () => { this.registerAdminRoutes(ctx); }); } - ctx.logger.info?.('[webhook-outbox] initialised', { - nodeId, - partitions: this.options.partitionCount ?? 8, - interval: this.options.intervalMs ?? 250, + ctx.logger.info?.('[webhook-outbox] initialised (delivery via shared messaging HTTP outbox)', { autoEnqueue: autoEnqueueOpt !== false, - retention: retentionOpt !== false, }); } async dispose(): Promise { await this.autoEnqueuer?.stop(); - this.retention?.stop(); - await this.dispatcher?.stop(); } - private resolveOutbox(ctx: PluginContext): IWebhookOutbox { - const opt = this.options.outbox; - if (opt) { - return typeof opt === 'function' - ? (opt as (c: PluginContext) => IWebhookOutbox)(ctx) - : opt; - } - // No explicit override — auto-pick the right backend for the host. - // SqlWebhookOutbox needs an `IDataEngine`; if one is resolvable - // (the usual case in CLI-served stacks), use it so durable rows - // in `sys_webhook_delivery` actually round-trip through the - // dispatcher and the redeliver REST endpoint. Memory is only a - // last-resort fallback for tests / edge environments without an - // engine. - const engine = this.tryGetService(ctx, ['objectql', 'data']); - if (engine) { - const partitionCount = this.options.partitionCount ?? 8; - const sql = new SqlWebhookOutbox(engine, { partitionCount }); - ctx.logger.info?.( - '[webhook-outbox] auto-selected SqlWebhookOutbox (objectql engine detected)', - { partitionCount }, - ); - return sql; - } - ctx.logger.warn?.( - '[webhook-outbox] no IDataEngine available — falling back to MemoryWebhookOutbox. Deliveries will NOT survive process restart and the redeliver REST endpoint will not see rows written through ObjectQL.', - ); - return new MemoryWebhookOutbox(); + private getMessaging(ctx: PluginContext): MessagingHttpSurface | undefined { + const svc = this.tryGetService(ctx, ['messaging']); + return svc && typeof svc.enqueueHttp === 'function' ? svc : undefined; } private async bootAutoEnqueue( @@ -294,49 +143,30 @@ export class WebhookOutboxPlugin implements Plugin { if (opt === false) return; const engine = this.tryGetService(ctx, ['objectql', 'data']); const realtime = this.tryGetService(ctx, ['realtime']); - if (!engine || !realtime) { + const messaging = this.getMessaging(ctx); + if (!engine || !realtime || !messaging) { ctx.logger.warn?.( - '[webhook-auto-enqueuer] disabled — ObjectQL or Realtime service not available', - { hasEngine: !!engine, hasRealtime: !!realtime }, + '[webhook-auto-enqueuer] disabled — ObjectQL, Realtime, or Messaging service not available', + { hasEngine: !!engine, hasRealtime: !!realtime, hasMessaging: !!messaging }, ); return; } - if (!this.outboxInstance) return; + if (!messaging.isHttpDeliveryReady()) { + ctx.logger.warn?.( + '[webhook-auto-enqueuer] messaging HTTP outbox not ready (no data engine / reliableDelivery off) — webhook deliveries will not be durable', + ); + } const enqOpts = (typeof opt === 'object' ? opt : {}) as AutoEnqueuerOptions; this.autoEnqueuer = new AutoEnqueuer( engine, realtime, - this.outboxInstance, + (input) => messaging.enqueueHttp(input), { ...enqOpts, logger: ctx.logger }, ); await this.autoEnqueuer.start(); ctx.registerService('webhook.autoEnqueuer', this.autoEnqueuer); - ctx.logger.info?.('[webhook-auto-enqueuer] started'); - } - - private bootRetention( - ctx: PluginContext, - opt: boolean | DeliveryRetentionOptions, - ): void { - if (opt === false) return; - // Only meaningful for SQL outbox — Memory has its own (process-lifetime) GC. - if (this.outboxInstance instanceof MemoryWebhookOutbox) return; - const engine = this.tryGetService(ctx, ['objectql', 'data']); - if (!engine) { - ctx.logger.warn?.( - '[webhook-retention] disabled — ObjectQL service not available', - ); - return; - } - const retOpts = (typeof opt === 'object' ? opt : {}) as DeliveryRetentionOptions; - this.retention = new DeliveryRetentionSweeper(engine, { - ...retOpts, - logger: ctx.logger, - }); - this.retention.start(); - ctx.registerService('webhook.retention', this.retention); - ctx.logger.info?.('[webhook-retention] sweeper started'); + ctx.logger.info?.('[webhook-auto-enqueuer] started (enqueues source=webhook onto sys_http_delivery)'); } private tryGetService(ctx: PluginContext, names: string[]): T | undefined { @@ -352,33 +182,25 @@ export class WebhookOutboxPlugin implements Plugin { } /** - * Mount POST /api/v1/webhooks/redeliver on the host Hono app, if one - * is available. Silently no-ops in environments without an HTTP - * server (MSW, edge tests, pure library use). Auth is delegated to - * the better-auth session cookie — every authenticated user counts. + * Mount POST /api/v1/webhooks/redeliver on the host Hono app, if one is + * available. Delegates to `messaging.redeliverHttp(deliveryId)`. Auth is the + * better-auth session cookie — every authenticated user counts. */ private registerAdminRoutes(ctx: PluginContext): void { const http = this.tryGetService(ctx, ['http-server']); if (!http || typeof http.getRawApp !== 'function') { - ctx.logger.debug?.( - '[webhook-outbox] HTTP server not available; redeliver REST endpoint not mounted', - ); + ctx.logger.debug?.('[webhook-outbox] HTTP server not available; redeliver endpoint not mounted'); return; } const rawApp = http.getRawApp(); - const outbox = this.outboxInstance; - if (!rawApp || !outbox) return; + const messaging = this.getMessaging(ctx); + if (!rawApp || !messaging) return; rawApp.post('/api/v1/webhooks/redeliver', async (c: any) => { - // Auth gate — require a signed-in session. const userId = await this.resolveSessionUserId(ctx, c); if (!userId) { return c.json( - { - success: false, - error: 'unauthenticated', - message: 'Sign in to redeliver webhook deliveries.', - }, + { success: false, error: 'unauthenticated', message: 'Sign in to redeliver webhook deliveries.' }, 401, ); } @@ -386,77 +208,39 @@ export class WebhookOutboxPlugin implements Plugin { try { body = await c.req.json(); } catch { - return c.json( - { - success: false, - error: 'invalid_body', - message: 'Request body must be JSON.', - }, - 400, - ); + return c.json({ success: false, error: 'invalid_body', message: 'Request body must be JSON.' }, 400); } - const deliveryId = - typeof body?.deliveryId === 'string' ? body.deliveryId.trim() : ''; + const deliveryId = typeof body?.deliveryId === 'string' ? body.deliveryId.trim() : ''; if (!deliveryId) { return c.json( - { - success: false, - error: 'missing_delivery_id', - message: 'Body must include `deliveryId: string`.', - }, + { success: false, error: 'missing_delivery_id', message: 'Body must include `deliveryId: string`.' }, 400, ); } try { - const row = await outbox.redeliver(deliveryId); - ctx.logger.info?.('[webhook-outbox] redelivered', { - deliveryId, - requestedBy: userId, - }); + const row = await messaging.redeliverHttp(deliveryId); + ctx.logger.info?.('[webhook-outbox] redelivered', { deliveryId, requestedBy: userId }); return c.json({ success: true, data: { id: row.id, status: row.status } }); } catch (err: any) { const code = err?.code; if (code === 'not_found') { - return c.json( - { success: false, error: 'not_found', message: err.message }, - 404, - ); + return c.json({ success: false, error: 'not_found', message: err.message }, 404); } if (code === 'not_eligible') { - return c.json( - { success: false, error: 'not_eligible', message: err.message }, - 409, - ); + return c.json({ success: false, error: 'not_eligible', message: err.message }, 409); } - ctx.logger.error?.( - '[webhook-outbox] redeliver failed', - err as Error, - ); + ctx.logger.error?.('[webhook-outbox] redeliver failed', err as Error); return c.json( - { - success: false, - error: 'internal_error', - message: err?.message ?? String(err), - }, + { success: false, error: 'internal_error', message: err?.message ?? String(err) }, 500, ); } }); - ctx.logger.info?.( - '[webhook-outbox] redeliver endpoint mounted at POST /api/v1/webhooks/redeliver', - ); + ctx.logger.info?.('[webhook-outbox] redeliver endpoint mounted at POST /api/v1/webhooks/redeliver'); } - /** - * Resolve the requesting user's id from a better-auth session cookie. - * Returns `undefined` for anonymous callers — the caller decides - * whether that's a 401. - */ - private async resolveSessionUserId( - ctx: PluginContext, - c: any, - ): Promise { + private async resolveSessionUserId(ctx: PluginContext, c: any): Promise { try { const authService: any = this.tryGetService(ctx, ['auth']); if (!authService) return undefined; diff --git a/packages/plugins/plugin-webhooks/tsup.config.ts b/packages/plugins/plugin-webhooks/tsup.config.ts index 483da5a28..983062dbe 100644 --- a/packages/plugins/plugin-webhooks/tsup.config.ts +++ b/packages/plugins/plugin-webhooks/tsup.config.ts @@ -3,7 +3,7 @@ import { defineConfig } from 'tsup'; export default defineConfig({ - entry: ['src/index.ts', 'src/sql-outbox.ts', 'src/schema.ts'], + entry: ['src/index.ts', 'src/schema.ts'], splitting: true, sourcemap: true, clean: true, diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 65c083116..4539bc0c4 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -9,7 +9,7 @@ import type { import { RecipientResolver } from './recipient-resolver.js'; import { PreferenceResolver, type PreferenceTarget } from './preference-resolver.js'; import type { INotificationOutbox } from './outbox.js'; -import type { EnqueueHttpInput, IHttpOutbox } from './http-outbox.js'; +import type { EnqueueHttpInput, HttpDelivery, HttpDeliveryStatus, IHttpOutbox } from './http-outbox.js'; /** The L2 event object every `emit()` writes one row to (ADR-0030). */ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; @@ -181,6 +181,25 @@ export class MessagingService { return this.httpOutbox.enqueue(input); } + /** + * Reset a terminal HTTP delivery row back to `pending` so the dispatcher + * re-sends it (ADR-0018 M3). Backs the webhook redeliver admin endpoint. + * Throws if no HTTP outbox is wired, or `HttpRedeliverError` for a missing / + * non-terminal row. + */ + async redeliverHttp(id: string): Promise { + if (!this.httpOutbox) { + throw new Error('messaging: HTTP delivery outbox not configured'); + } + return this.httpOutbox.redeliver(id); + } + + /** List HTTP delivery rows (admin/tests). Empty when no outbox is wired. */ + async listHttp(filter?: { status?: HttpDeliveryStatus; source?: string }): Promise { + if (!this.httpOutbox) return []; + return this.httpOutbox.list(filter); + } + /** Register a channel implementation. A duplicate id warns and replaces. */ registerChannel(channel: MessagingChannel): void { if (this.channels.has(channel.id)) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 689b50f99..35da31f89 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1550,18 +1550,12 @@ importers: '@objectstack/core': specifier: workspace:* version: link:../../core - '@objectstack/platform-objects': - specifier: workspace:* - version: link:../../platform-objects - '@objectstack/service-cluster': + '@objectstack/service-messaging': specifier: workspace:* - version: link:../../services/service-cluster + version: link:../../services/service-messaging '@objectstack/spec': specifier: workspace:* version: link:../../spec - '@objectstack/types': - specifier: workspace:* - version: link:../../types devDependencies: '@types/node': specifier: ^25.9.1