Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .changeset/notification-dedup-unique-and-retention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
"@objectstack/service-messaging": minor
"@objectstack/platform-objects": patch
---

Harden the notification pipeline: race-safe dedup + opt-in retention (ADR-0030).

**Race-safe dedup.** `sys_notification.dedup_key` is now declared a **UNIQUE**
index (was a plain index), and `emit()` **converges on a unique-key conflict**:
the pre-insert `dedup_key` check is a fast-path, but if a concurrent `emit`
raced past it and inserted first, our insert hits the violation — we catch it
and converge to the winner's event (a dedup hit) instead of throwing or
double-emitting. This mirrors the delivery outbox's enqueue convergence and
stops a record-change storm from producing duplicate bell notifications. SQL
treats NULLs as distinct, so the common events with no `dedup_key` are
unconstrained. (Enforcement is per-driver: where declared indexes are
materialized the conflict path activates; drivers that don't materialize them
fall back to the best-effort fast-path — the catch is simply never taken. Note
the SQL driver currently doesn't sync declared object indexes, which already
affects the delivery/receipt unique indexes — tracked separately.)

**Opt-in retention.** New `NotificationRetention` sweeper + plugin options
`retentionDays` / `retentionSweepMs`. Every `emit()` writes a `sys_notification`
event (plus delivery/materialization/receipt rows), so a high-frequency
periodic flow grows the tables unbounded. When `retentionDays > 0`, a
low-frequency sweep (default hourly, timer `unref`'d) bulk-deletes events,
deliveries, inbox messages and receipts older than the cutoff — a notification
ages out wholesale, keeping the model consistent (no dangling `notification_id`)
and the bell (recent-only) unaffected. The delivery row's epoch-ms `created_at`
vs the others' ISO `created_at` is handled per target. **Default off** — no
notification data is deleted without explicit operator policy. Each target is
isolated (one object's failure doesn't abort the sweep), and the sweep runs
under a system context (retention is a cross-tenant operator policy).

Tests: +7 `service-messaging` cases (converge-on-conflict, non-conflict
rethrow, retention cutoff-formatting per target, no-engine / non-positive
no-ops, failure isolation, missing-count) — 102 passing.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ export const SysNotification = ObjectSchema.create({

indexes: [
{ fields: ['topic', 'created_at'] },
{ fields: ['dedup_key'] },
// Idempotency spine (ADR-0030). UNIQUE so `emit()` dedup is race-safe: a
// concurrent emit with the same dedup_key loses the insert and converges to
// the winner (mirrors the delivery outbox). SQL treats NULLs as distinct, so
// the (common) events with no dedup_key are unconstrained.
{ fields: ['dedup_key'], unique: true },
{ fields: ['source_object', 'source_id'] },
],
});
2 changes: 2 additions & 0 deletions packages/services/service-messaging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ export type {
DispatchCluster,
DispatchLockHandle,
} from './dispatcher.js';
export { NotificationRetention, DEFAULT_RETENTION_TARGETS } from './retention.js';
export type { NotificationRetentionOptions, RetentionTarget, PruneOutcome } from './retention.js';

// Objects (metadata definitions)
export {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MessagingService } from './messaging-service.js';
import { createInboxChannel } from './inbox-channel.js';
import { SqlNotificationOutbox } from './sql-outbox.js';
import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js';
import { NotificationRetention } from './retention.js';
import { createEmailChannel } from './email-channel.js';
import { NotificationTemplateStore } from './template-renderer.js';
import {
Expand Down Expand Up @@ -40,6 +41,17 @@ export interface MessagingServicePluginOptions {
* `prefix.` entry for a prefix match (default none).
*/
mandatoryTopics?: readonly string[];
/**
* Retention window in days for the notification pipeline (ADR-0030
* hardening). When set (> 0), a periodic sweep prunes events, deliveries,
* inbox materializations and receipts older than this — bounding the
* event-log growth from high-frequency periodic flows. **Opt-in**: unset (or
* ≤ 0) disables retention (default), so no notification data is ever deleted
* without explicit operator policy.
*/
retentionDays?: number;
/** Retention sweep interval in ms (default 1 hour). Only used when `retentionDays` is set. */
retentionSweepMs?: number;
}

/**
Expand Down Expand Up @@ -73,6 +85,7 @@ export class MessagingServicePlugin implements Plugin {

private readonly options: Required<MessagingServicePluginOptions>;
private dispatcher?: NotificationDispatcher;
private retentionTimer?: ReturnType<typeof setInterval>;

constructor(options: MessagingServicePluginOptions = {}) {
this.options = {
Expand All @@ -81,6 +94,8 @@ export class MessagingServicePlugin implements Plugin {
partitionCount: 8,
dispatchIntervalMs: 500,
mandatoryTopics: [],
retentionDays: 0,
retentionSweepMs: 3_600_000,
...options,
};
}
Expand Down Expand Up @@ -200,14 +215,40 @@ export class MessagingServicePlugin implements Plugin {
});
}

// Retention sweep (ADR-0030 hardening): opt-in pruning of the
// notification pipeline so the event log can't grow unbounded. Runs once
// at ready then on a low-frequency interval; the timer is unref'd so it
// never keeps the process alive.
if (this.options.retentionDays > 0 && typeof ctx.hook === 'function') {
ctx.hook('kernel:ready', async () => {
const retention = new NotificationRetention({ getData, logger: ctx.logger });
const days = this.options.retentionDays;
const sweep = () => {
void retention.prune(days).catch((err) =>
ctx.logger.warn(`[messaging] retention sweep failed: ${(err as Error)?.message ?? err}`),
);
};
sweep();
this.retentionTimer = setInterval(sweep, this.options.retentionSweepMs);
this.retentionTimer.unref?.();
ctx.logger.info(
`[messaging] retention on (prune > ${days}d every ${Math.round(this.options.retentionSweepMs / 1000)}s)`,
);
});
}

ctx.logger.info(
`[messaging] service registered with channels: ${service.getRegisteredChannels().join(', ') || '(none)'}`,
);
}

/** Stop the dispatcher loop on shutdown. */
/** Stop the dispatcher loop + retention sweep on shutdown. */
async stop(): Promise<void> {
await this.dispatcher?.stop();
this.dispatcher = undefined;
if (this.retentionTimer) {
clearInterval(this.retentionTimer);
this.retentionTimer = undefined;
}
}
}
58 changes: 58 additions & 0 deletions packages/services/service-messaging/src/messaging-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,5 +365,63 @@ describe('MessagingService', () => {
expect(inbox.seen).toHaveLength(0); // no re-fan
expect(data.inserts.some((i) => i.object === 'sys_notification')).toBe(false);
});

it('converges to the winner when a concurrent emit wins the dedup_key unique index', async () => {
// Simulate the race: the fast-path findOne misses (no prior event),
// but the event insert hits the UNIQUE(dedup_key) violation because a
// concurrent emit inserted first. We must catch it and converge to
// that winner rather than throwing or double-emitting.
let firstLookup = true;
const engine = {
async insert(object: string) {
if (object === 'sys_notification') throw new Error('UNIQUE constraint failed: sys_notification.dedup_key');
return { id: 'row' };
},
async find() { return []; },
async findOne(object: string) {
if (object !== 'sys_notification') return null;
// First call = the fast-path miss; second = post-conflict lookup finds the winner.
if (firstLookup) { firstLookup = false; return null; }
return { id: 'evt_winner' };
},
async update() { return {}; },
async delete() { return {}; },
async count() { return 0; },
async aggregate() { return []; },
} as any;
service = new MessagingService({ logger: silentLogger(), getData: () => engine });
const inbox = recordingChannel('inbox');
service.registerChannel(inbox.channel);

const result = await service.emit({
topic: 'task.assigned',
audience: ['user_1'],
dedupKey: 'task.assigned:t_7:user_1',
payload: { title: 'Assigned' },
});

expect(result.deduped).toBe(true);
expect(result.notificationId).toBe('evt_winner');
expect(inbox.seen).toHaveLength(0); // loser does not re-fan
});

it('rethrows an event insert error that is not a dedup conflict', async () => {
// No dedupKey ⇒ no convergence path ⇒ a genuine write failure surfaces.
const engine = {
async insert() { throw new Error('disk full'); },
async find() { return []; },
async findOne() { return null; },
async update() { return {}; },
async delete() { return {}; },
async count() { return 0; },
async aggregate() { return []; },
} as any;
service = new MessagingService({ logger: silentLogger(), getData: () => engine });
service.registerChannel(recordingChannel('inbox').channel);

await expect(
service.emit({ topic: 't', audience: ['user_1'], payload: { title: 'x' } }),
).rejects.toThrow('disk full');
});
});
});
24 changes: 23 additions & 1 deletion packages/services/service-messaging/src/messaging-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,29 @@ export class MessagingService {
}

// 2) Write the L2 event (or synthesize an id when there is no data layer).
const notificationId = await this.writeEvent(data, input);
// The check at (1) is a fast-path. Where the driver materializes the
// UNIQUE(dedup_key) index, it is the real guard: a concurrent emit
// that raced past (1) and inserted first makes our insert hit the
// unique violation — we catch it and converge to the winner's event
// (treated as a dedup hit), so a record-change storm can't produce
// duplicate notifications. Mirrors the delivery outbox's enqueue
// convergence. (Drivers that don't enforce the index fall back to the
// best-effort fast-path — the catch is then simply never taken.)
let notificationId: string;
try {
notificationId = await this.writeEvent(data, input);
} catch (err) {
if (input.dedupKey && data) {
const winner = await this.findEventByDedupKey(data, input.dedupKey);
if (winner) {
this.ctx.logger.info(
`[messaging] emit: dedupKey '${input.dedupKey}' raced; converged to ${winner}`,
);
return { notificationId: winner, deduped: true, deliveries: [], delivered: 0, failed: 0 };
}
}
throw err;
}

// 3) Resolve the audience to recipient user ids (RecipientResolver owns
// role:/team:/owner_of:/email→id expansion).
Expand Down
108 changes: 108 additions & 0 deletions packages/services/service-messaging/src/retention.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect } from 'vitest';
import { NotificationRetention, DEFAULT_RETENTION_TARGETS } from './retention.js';

function silentLogger() {
return { info: () => {}, warn: () => {} };
}

/** A fake engine capturing every `delete(object, options)` call. */
function captureEngine(deleteImpl?: (object: string, opts: any) => any) {
const deletes: Array<{ object: string; where: any; multi: any; context: any }> = [];
return {
deletes,
engine: {
async find() { return []; },
async findOne() { return null; },
async insert() { return {}; },
async update() { return {}; },
async delete(object: string, opts: any) {
deletes.push({ object, where: opts?.where, multi: opts?.multi, context: opts?.context });
return deleteImpl ? deleteImpl(object, opts) : { deletedCount: 1 };
},
async count() { return 0; },
async aggregate() { return []; },
} as any,
};
}

const FIXED_NOW = 1_700_000_000_000; // fixed clock for deterministic cutoffs

describe('NotificationRetention', () => {
it('prunes every target older than the cutoff, formatting the cutoff per target', async () => {
const { engine, deletes } = captureEngine();
const retention = new NotificationRetention({
getData: () => engine,
logger: silentLogger(),
now: () => FIXED_NOW,
});

const outcomes = await retention.prune(30);

// One bulk delete per default target (receipt, inbox, delivery, event).
expect(deletes.map((d) => d.object)).toEqual(DEFAULT_RETENTION_TARGETS.map((t) => t.object));

const cutoffMs = FIXED_NOW - 30 * 86_400_000;
const cutoffIso = new Date(cutoffMs).toISOString();
for (const d of deletes) {
expect(d.multi).toBe(true);
// Cross-tenant system context — retention is an operator policy.
expect(d.context).toEqual({ isSystem: true });
}
// The delivery row stores epoch-ms; everything else stores ISO strings.
const byObject = Object.fromEntries(deletes.map((d) => [d.object, d.where]));
expect(byObject['sys_notification_delivery']).toEqual({ created_at: { $lt: cutoffMs } });
expect(byObject['sys_notification']).toEqual({ created_at: { $lt: cutoffIso } });
expect(byObject['sys_inbox_message']).toEqual({ created_at: { $lt: cutoffIso } });
expect(byObject['sys_notification_receipt']).toEqual({ created_at: { $lt: cutoffIso } });

expect(outcomes.every((o) => o.deleted === 1 && !o.error)).toBe(true);
});

it('no-ops when there is no data engine', async () => {
const retention = new NotificationRetention({ getData: () => undefined, logger: silentLogger() });
expect(await retention.prune(30)).toEqual([]);
});

it('no-ops for a non-positive retention window', async () => {
const { engine, deletes } = captureEngine();
const retention = new NotificationRetention({ getData: () => engine, logger: silentLogger() });
expect(await retention.prune(0)).toEqual([]);
expect(await retention.prune(-5)).toEqual([]);
expect(deletes).toHaveLength(0);
});

it('isolates a failing target — the rest of the sweep still runs', async () => {
const { engine, deletes } = captureEngine((object) => {
if (object === 'sys_inbox_message') throw new Error('boom');
return { deletedCount: 2 };
});
const retention = new NotificationRetention({
getData: () => engine,
logger: silentLogger(),
now: () => FIXED_NOW,
});

const outcomes = await retention.prune(7);

// All four were attempted despite the inbox failure.
expect(deletes).toHaveLength(4);
const failed = outcomes.find((o) => o.object === 'sys_inbox_message');
expect(failed?.error).toContain('boom');
const ok = outcomes.filter((o) => o.object !== 'sys_inbox_message');
expect(ok.every((o) => o.deleted === 2)).toBe(true);
});

it('reports an undefined count when the driver returns no count', async () => {
const { engine } = captureEngine(() => ({}));
const retention = new NotificationRetention({
getData: () => engine,
logger: silentLogger(),
now: () => FIXED_NOW,
targets: [{ object: 'sys_notification', tsField: 'created_at', format: 'iso' }],
});
const outcomes = await retention.prune(1);
expect(outcomes).toEqual([{ object: 'sys_notification', deleted: undefined }]);
});
});
Loading