From 67c1cf9037d57479824d101e2bacf82901f9d26f Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:43:38 +0800 Subject: [PATCH] feat(messaging): race-safe dedup + opt-in retention (ADR-0030 hardening) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two correctness/ops gaps in the notification pipeline. Race-safe dedup: - `sys_notification.dedup_key` is now a UNIQUE index (was plain). SQL treats NULLs as distinct, so the common no-dedup_key events are unconstrained. - `emit()` converges on a unique-key conflict: the pre-insert check is a fast-path; if a concurrent emit inserted first, our insert hits the violation and we converge to the winner's event (a dedup hit) rather than throwing or double-emitting. Mirrors the delivery outbox's enqueue convergence; stops a record-change storm from producing duplicate bell notifications. Enforcement is per-driver (the catch is simply never taken where the index isn't materialized — the same situation the delivery/receipt unique indexes are in today; tracked separately). Opt-in retention: - New `NotificationRetention` sweeper + plugin `retentionDays`/`retentionSweepMs`. Every emit writes an event (+ delivery/materialization/receipt), so a high-frequency periodic flow grows the tables unbounded. When `retentionDays > 0`, a low-frequency sweep (default hourly, timer unref'd) bulk-deletes rows older than the cutoff across all four objects — a notification ages out wholesale (no dangling notification_id), the bell (recent-only) is unaffected. Delivery's epoch-ms created_at vs the others' ISO created_at is handled per target. Default OFF — no data deleted without explicit operator policy. Targets are isolated; sweep runs under a system (cross-tenant) context. Tests: +7 service-messaging cases — 102 passing. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...notification-dedup-unique-and-retention.md | 37 +++++ .../src/audit/sys-notification.object.ts | 6 +- .../services/service-messaging/src/index.ts | 2 + .../src/messaging-service-plugin.ts | 43 +++++- .../src/messaging-service.test.ts | 58 ++++++++ .../src/messaging-service.ts | 24 ++- .../service-messaging/src/retention.test.ts | 108 ++++++++++++++ .../service-messaging/src/retention.ts | 139 ++++++++++++++++++ 8 files changed, 414 insertions(+), 3 deletions(-) create mode 100644 .changeset/notification-dedup-unique-and-retention.md create mode 100644 packages/services/service-messaging/src/retention.test.ts create mode 100644 packages/services/service-messaging/src/retention.ts diff --git a/.changeset/notification-dedup-unique-and-retention.md b/.changeset/notification-dedup-unique-and-retention.md new file mode 100644 index 000000000..8da6b2a2b --- /dev/null +++ b/.changeset/notification-dedup-unique-and-retention.md @@ -0,0 +1,37 @@ +--- +"@objectstack/service-messaging": minor +"@objectstack/platform-objects": patch +--- + +Harden the notification pipeline: race-safe dedup + opt-in retention (ADR-0030). + +**Race-safe dedup.** `sys_notification.dedup_key` is now declared a **UNIQUE** +index (was a plain index), and `emit()` **converges on a unique-key conflict**: +the pre-insert `dedup_key` check is a fast-path, but if a concurrent `emit` +raced past it and inserted first, our insert hits the violation — we catch it +and converge to the winner's event (a dedup hit) instead of throwing or +double-emitting. This mirrors the delivery outbox's enqueue convergence and +stops a record-change storm from producing duplicate bell notifications. SQL +treats NULLs as distinct, so the common events with no `dedup_key` are +unconstrained. (Enforcement is per-driver: where declared indexes are +materialized the conflict path activates; drivers that don't materialize them +fall back to the best-effort fast-path — the catch is simply never taken. Note +the SQL driver currently doesn't sync declared object indexes, which already +affects the delivery/receipt unique indexes — tracked separately.) + +**Opt-in retention.** New `NotificationRetention` sweeper + plugin options +`retentionDays` / `retentionSweepMs`. Every `emit()` writes a `sys_notification` +event (plus delivery/materialization/receipt rows), so a high-frequency +periodic flow grows the tables unbounded. When `retentionDays > 0`, a +low-frequency sweep (default hourly, timer `unref`'d) bulk-deletes events, +deliveries, inbox messages and receipts older than the cutoff — a notification +ages out wholesale, keeping the model consistent (no dangling `notification_id`) +and the bell (recent-only) unaffected. The delivery row's epoch-ms `created_at` +vs the others' ISO `created_at` is handled per target. **Default off** — no +notification data is deleted without explicit operator policy. Each target is +isolated (one object's failure doesn't abort the sweep), and the sweep runs +under a system context (retention is a cross-tenant operator policy). + +Tests: +7 `service-messaging` cases (converge-on-conflict, non-conflict +rethrow, retention cutoff-formatting per target, no-engine / non-positive +no-ops, failure isolation, missing-count) — 102 passing. diff --git a/packages/platform-objects/src/audit/sys-notification.object.ts b/packages/platform-objects/src/audit/sys-notification.object.ts index 8495d45ba..17fff6122 100644 --- a/packages/platform-objects/src/audit/sys-notification.object.ts +++ b/packages/platform-objects/src/audit/sys-notification.object.ts @@ -137,7 +137,11 @@ export const SysNotification = ObjectSchema.create({ indexes: [ { fields: ['topic', 'created_at'] }, - { fields: ['dedup_key'] }, + // Idempotency spine (ADR-0030). UNIQUE so `emit()` dedup is race-safe: a + // concurrent emit with the same dedup_key loses the insert and converges to + // the winner (mirrors the delivery outbox). SQL treats NULLs as distinct, so + // the (common) events with no dedup_key are unconstrained. + { fields: ['dedup_key'], unique: true }, { fields: ['source_object', 'source_id'] }, ], }); diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index 0eb2e35fc..af6e2203a 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -102,6 +102,8 @@ export type { DispatchCluster, DispatchLockHandle, } from './dispatcher.js'; +export { NotificationRetention, DEFAULT_RETENTION_TARGETS } from './retention.js'; +export type { NotificationRetentionOptions, RetentionTarget, PruneOutcome } from './retention.js'; // Objects (metadata definitions) export { diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index 118d4f41c..f26f464e9 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -7,6 +7,7 @@ import { MessagingService } from './messaging-service.js'; import { createInboxChannel } from './inbox-channel.js'; import { SqlNotificationOutbox } from './sql-outbox.js'; import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js'; +import { NotificationRetention } from './retention.js'; import { createEmailChannel } from './email-channel.js'; import { NotificationTemplateStore } from './template-renderer.js'; import { @@ -40,6 +41,17 @@ export interface MessagingServicePluginOptions { * `prefix.` entry for a prefix match (default none). */ mandatoryTopics?: readonly string[]; + /** + * Retention window in days for the notification pipeline (ADR-0030 + * hardening). When set (> 0), a periodic sweep prunes events, deliveries, + * inbox materializations and receipts older than this — bounding the + * event-log growth from high-frequency periodic flows. **Opt-in**: unset (or + * ≤ 0) disables retention (default), so no notification data is ever deleted + * without explicit operator policy. + */ + retentionDays?: number; + /** Retention sweep interval in ms (default 1 hour). Only used when `retentionDays` is set. */ + retentionSweepMs?: number; } /** @@ -73,6 +85,7 @@ export class MessagingServicePlugin implements Plugin { private readonly options: Required; private dispatcher?: NotificationDispatcher; + private retentionTimer?: ReturnType; constructor(options: MessagingServicePluginOptions = {}) { this.options = { @@ -81,6 +94,8 @@ export class MessagingServicePlugin implements Plugin { partitionCount: 8, dispatchIntervalMs: 500, mandatoryTopics: [], + retentionDays: 0, + retentionSweepMs: 3_600_000, ...options, }; } @@ -200,14 +215,40 @@ export class MessagingServicePlugin implements Plugin { }); } + // Retention sweep (ADR-0030 hardening): opt-in pruning of the + // notification pipeline so the event log can't grow unbounded. Runs once + // at ready then on a low-frequency interval; the timer is unref'd so it + // never keeps the process alive. + if (this.options.retentionDays > 0 && typeof ctx.hook === 'function') { + ctx.hook('kernel:ready', async () => { + const retention = new NotificationRetention({ getData, logger: ctx.logger }); + const days = this.options.retentionDays; + const sweep = () => { + void retention.prune(days).catch((err) => + ctx.logger.warn(`[messaging] retention sweep failed: ${(err as Error)?.message ?? err}`), + ); + }; + sweep(); + this.retentionTimer = setInterval(sweep, this.options.retentionSweepMs); + this.retentionTimer.unref?.(); + ctx.logger.info( + `[messaging] retention on (prune > ${days}d every ${Math.round(this.options.retentionSweepMs / 1000)}s)`, + ); + }); + } + ctx.logger.info( `[messaging] service registered with channels: ${service.getRegisteredChannels().join(', ') || '(none)'}`, ); } - /** Stop the dispatcher loop on shutdown. */ + /** Stop the dispatcher loop + retention sweep on shutdown. */ async stop(): Promise { await this.dispatcher?.stop(); this.dispatcher = undefined; + if (this.retentionTimer) { + clearInterval(this.retentionTimer); + this.retentionTimer = undefined; + } } } diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index a9f4b3d26..f23b166bd 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -365,5 +365,63 @@ describe('MessagingService', () => { expect(inbox.seen).toHaveLength(0); // no re-fan expect(data.inserts.some((i) => i.object === 'sys_notification')).toBe(false); }); + + it('converges to the winner when a concurrent emit wins the dedup_key unique index', async () => { + // Simulate the race: the fast-path findOne misses (no prior event), + // but the event insert hits the UNIQUE(dedup_key) violation because a + // concurrent emit inserted first. We must catch it and converge to + // that winner rather than throwing or double-emitting. + let firstLookup = true; + const engine = { + async insert(object: string) { + if (object === 'sys_notification') throw new Error('UNIQUE constraint failed: sys_notification.dedup_key'); + return { id: 'row' }; + }, + async find() { return []; }, + async findOne(object: string) { + if (object !== 'sys_notification') return null; + // First call = the fast-path miss; second = post-conflict lookup finds the winner. + if (firstLookup) { firstLookup = false; return null; } + return { id: 'evt_winner' }; + }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any; + service = new MessagingService({ logger: silentLogger(), getData: () => engine }); + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + + const result = await service.emit({ + topic: 'task.assigned', + audience: ['user_1'], + dedupKey: 'task.assigned:t_7:user_1', + payload: { title: 'Assigned' }, + }); + + expect(result.deduped).toBe(true); + expect(result.notificationId).toBe('evt_winner'); + expect(inbox.seen).toHaveLength(0); // loser does not re-fan + }); + + it('rethrows an event insert error that is not a dedup conflict', async () => { + // No dedupKey ⇒ no convergence path ⇒ a genuine write failure surfaces. + const engine = { + async insert() { throw new Error('disk full'); }, + async find() { return []; }, + async findOne() { return null; }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any; + service = new MessagingService({ logger: silentLogger(), getData: () => engine }); + service.registerChannel(recordingChannel('inbox').channel); + + await expect( + service.emit({ topic: 't', audience: ['user_1'], payload: { title: 'x' } }), + ).rejects.toThrow('disk full'); + }); }); }); diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 3fc78b98c..8054a9a38 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -197,7 +197,29 @@ export class MessagingService { } // 2) Write the L2 event (or synthesize an id when there is no data layer). - const notificationId = await this.writeEvent(data, input); + // The check at (1) is a fast-path. Where the driver materializes the + // UNIQUE(dedup_key) index, it is the real guard: a concurrent emit + // that raced past (1) and inserted first makes our insert hit the + // unique violation — we catch it and converge to the winner's event + // (treated as a dedup hit), so a record-change storm can't produce + // duplicate notifications. Mirrors the delivery outbox's enqueue + // convergence. (Drivers that don't enforce the index fall back to the + // best-effort fast-path — the catch is then simply never taken.) + let notificationId: string; + try { + notificationId = await this.writeEvent(data, input); + } catch (err) { + if (input.dedupKey && data) { + const winner = await this.findEventByDedupKey(data, input.dedupKey); + if (winner) { + this.ctx.logger.info( + `[messaging] emit: dedupKey '${input.dedupKey}' raced; converged to ${winner}`, + ); + return { notificationId: winner, deduped: true, deliveries: [], delivered: 0, failed: 0 }; + } + } + throw err; + } // 3) Resolve the audience to recipient user ids (RecipientResolver owns // role:/team:/owner_of:/email→id expansion). diff --git a/packages/services/service-messaging/src/retention.test.ts b/packages/services/service-messaging/src/retention.test.ts new file mode 100644 index 000000000..864c9207f --- /dev/null +++ b/packages/services/service-messaging/src/retention.test.ts @@ -0,0 +1,108 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { NotificationRetention, DEFAULT_RETENTION_TARGETS } from './retention.js'; + +function silentLogger() { + return { info: () => {}, warn: () => {} }; +} + +/** A fake engine capturing every `delete(object, options)` call. */ +function captureEngine(deleteImpl?: (object: string, opts: any) => any) { + const deletes: Array<{ object: string; where: any; multi: any; context: any }> = []; + return { + deletes, + engine: { + async find() { return []; }, + async findOne() { return null; }, + async insert() { return {}; }, + async update() { return {}; }, + async delete(object: string, opts: any) { + deletes.push({ object, where: opts?.where, multi: opts?.multi, context: opts?.context }); + return deleteImpl ? deleteImpl(object, opts) : { deletedCount: 1 }; + }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any, + }; +} + +const FIXED_NOW = 1_700_000_000_000; // fixed clock for deterministic cutoffs + +describe('NotificationRetention', () => { + it('prunes every target older than the cutoff, formatting the cutoff per target', async () => { + const { engine, deletes } = captureEngine(); + const retention = new NotificationRetention({ + getData: () => engine, + logger: silentLogger(), + now: () => FIXED_NOW, + }); + + const outcomes = await retention.prune(30); + + // One bulk delete per default target (receipt, inbox, delivery, event). + expect(deletes.map((d) => d.object)).toEqual(DEFAULT_RETENTION_TARGETS.map((t) => t.object)); + + const cutoffMs = FIXED_NOW - 30 * 86_400_000; + const cutoffIso = new Date(cutoffMs).toISOString(); + for (const d of deletes) { + expect(d.multi).toBe(true); + // Cross-tenant system context — retention is an operator policy. + expect(d.context).toEqual({ isSystem: true }); + } + // The delivery row stores epoch-ms; everything else stores ISO strings. + const byObject = Object.fromEntries(deletes.map((d) => [d.object, d.where])); + expect(byObject['sys_notification_delivery']).toEqual({ created_at: { $lt: cutoffMs } }); + expect(byObject['sys_notification']).toEqual({ created_at: { $lt: cutoffIso } }); + expect(byObject['sys_inbox_message']).toEqual({ created_at: { $lt: cutoffIso } }); + expect(byObject['sys_notification_receipt']).toEqual({ created_at: { $lt: cutoffIso } }); + + expect(outcomes.every((o) => o.deleted === 1 && !o.error)).toBe(true); + }); + + it('no-ops when there is no data engine', async () => { + const retention = new NotificationRetention({ getData: () => undefined, logger: silentLogger() }); + expect(await retention.prune(30)).toEqual([]); + }); + + it('no-ops for a non-positive retention window', async () => { + const { engine, deletes } = captureEngine(); + const retention = new NotificationRetention({ getData: () => engine, logger: silentLogger() }); + expect(await retention.prune(0)).toEqual([]); + expect(await retention.prune(-5)).toEqual([]); + expect(deletes).toHaveLength(0); + }); + + it('isolates a failing target — the rest of the sweep still runs', async () => { + const { engine, deletes } = captureEngine((object) => { + if (object === 'sys_inbox_message') throw new Error('boom'); + return { deletedCount: 2 }; + }); + const retention = new NotificationRetention({ + getData: () => engine, + logger: silentLogger(), + now: () => FIXED_NOW, + }); + + const outcomes = await retention.prune(7); + + // All four were attempted despite the inbox failure. + expect(deletes).toHaveLength(4); + const failed = outcomes.find((o) => o.object === 'sys_inbox_message'); + expect(failed?.error).toContain('boom'); + const ok = outcomes.filter((o) => o.object !== 'sys_inbox_message'); + expect(ok.every((o) => o.deleted === 2)).toBe(true); + }); + + it('reports an undefined count when the driver returns no count', async () => { + const { engine } = captureEngine(() => ({})); + const retention = new NotificationRetention({ + getData: () => engine, + logger: silentLogger(), + now: () => FIXED_NOW, + targets: [{ object: 'sys_notification', tsField: 'created_at', format: 'iso' }], + }); + const outcomes = await retention.prune(1); + expect(outcomes).toEqual([{ object: 'sys_notification', deleted: undefined }]); + }); +}); diff --git a/packages/services/service-messaging/src/retention.ts b/packages/services/service-messaging/src/retention.ts new file mode 100644 index 000000000..c63abcde9 --- /dev/null +++ b/packages/services/service-messaging/src/retention.ts @@ -0,0 +1,139 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { IDataEngine } from '@objectstack/spec/contracts'; +import { NOTIFICATION_EVENT_OBJECT } from './messaging-service.js'; +import { DELIVERY_OBJECT } from './sql-outbox.js'; +import { INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; + +/** Minimal logger surface (matches the channel/service context). */ +interface RetentionLogger { + info(msg: string): void; + warn(msg: string): void; +} + +/** + * One object the sweeper prunes, plus how to read its age. The pipeline stores + * timestamps two ways: the event / inbox / receipt rows carry ISO-8601 strings + * (`created_at`), while the delivery outbox rows carry epoch-ms numbers — so the + * cutoff value is formatted per target. `$lt` works for both (ISO-8601 sorts + * lexicographically; epoch-ms numerically). + */ +export interface RetentionTarget { + readonly object: string; + readonly tsField: string; + readonly format: 'iso' | 'epoch'; +} + +/** + * Default sweep set, ordered leaf-first (materializations/receipts/deliveries + * before the event) so the log reads top-down even though there are no enforced + * FKs. A notification ages out **wholesale** — event + delivery + materialization + * + receipt past the cutoff are all removed — keeping the model consistent (no + * dangling `notification_id`) and the bell (which only shows recent rows) + * unaffected. + */ +export const DEFAULT_RETENTION_TARGETS: readonly RetentionTarget[] = [ + { object: RECEIPT_OBJECT, tsField: 'created_at', format: 'iso' }, + { object: INBOX_OBJECT, tsField: 'created_at', format: 'iso' }, + { object: DELIVERY_OBJECT, tsField: 'created_at', format: 'epoch' }, + { object: NOTIFICATION_EVENT_OBJECT, tsField: 'created_at', format: 'iso' }, +]; + +export interface NotificationRetentionOptions { + /** Resolve the data engine; `undefined` ⇒ prune is a no-op. */ + getData(): IDataEngine | undefined; + logger: RetentionLogger; + /** Override the swept objects (tests / custom deployments). */ + targets?: readonly RetentionTarget[]; + /** Clock injection for deterministic tests. Defaults to `Date.now()`. */ + now?(): number; +} + +/** Per-object prune outcome. `deleted` is `undefined` when the driver doesn't report a count. */ +export interface PruneOutcome { + object: string; + deleted?: number; + error?: string; +} + +/** + * Retention sweeper for the notification pipeline (ADR-0030 hardening). + * + * Every `emit()` writes a `sys_notification` event row (plus delivery / + * materialization / receipt rows), so a high-frequency periodic flow grows the + * tables unbounded. {@link prune} deletes everything older than a cutoff across + * all {@link RetentionTarget}s in one bulk `delete` per object, under a system + * context (cross-tenant: retention is an operator policy). Each target is + * isolated — one object's failure is logged and the sweep continues. + * + * Retention is **opt-in**: the plugin runs this only when `retentionDays` is + * configured. The mechanism lives here so it's testable in isolation. + */ +export class NotificationRetention { + private readonly now: () => number; + private readonly targets: readonly RetentionTarget[]; + + constructor(private readonly opts: NotificationRetentionOptions) { + this.now = opts.now ?? (() => Date.now()); + this.targets = opts.targets ?? DEFAULT_RETENTION_TARGETS; + } + + /** + * Delete pipeline rows older than `retentionDays`. Returns one outcome per + * swept object. No-op (empty result) when no data engine is available or + * `retentionDays` is not a positive number. + */ + async prune(retentionDays: number): Promise { + const data = this.opts.getData(); + if (!data) { + this.opts.logger.warn('[messaging] retention: no data engine; prune skipped'); + return []; + } + if (!(retentionDays > 0)) { + this.opts.logger.warn(`[messaging] retention: invalid retentionDays=${retentionDays}; prune skipped`); + return []; + } + + const cutoffMs = this.now() - retentionDays * 86_400_000; + const cutoffIso = new Date(cutoffMs).toISOString(); + const outcomes: PruneOutcome[] = []; + + for (const t of this.targets) { + const cutoff = t.format === 'epoch' ? cutoffMs : cutoffIso; + try { + const res = await data.delete(t.object, { + where: { [t.tsField]: { $lt: cutoff } }, + multi: true, + // System context: retention is an operator policy that spans + // tenants, so it must not be scoped by the caller's RLS. + context: { isSystem: true }, + } as any); + const deleted = countDeleted(res); + outcomes.push({ object: t.object, deleted }); + if (deleted === undefined || deleted > 0) { + this.opts.logger.info( + `[messaging] retention: pruned ${deleted ?? '?'} ${t.object} rows older than ${cutoffIso}`, + ); + } + } catch (err) { + const msg = (err as Error)?.message ?? String(err); + this.opts.logger.warn(`[messaging] retention: prune of ${t.object} failed (${msg}); continuing`); + outcomes.push({ object: t.object, error: msg }); + } + } + return outcomes; + } +} + +/** Best-effort row-count extraction from a driver's delete result. */ +function countDeleted(res: unknown): number | undefined { + if (typeof res === 'number') return res; + if (Array.isArray(res)) return res.length; + if (res && typeof res === 'object') { + const r = res as Record; + for (const k of ['deletedCount', 'deleted', 'count', 'affected', 'affectedRows']) { + if (typeof r[k] === 'number') return r[k] as number; + } + } + return undefined; +}