Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/notification-digest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@objectstack/service-messaging': minor
---

feat(messaging): digest batching for notifications (ADR-0030 P3b-2)

Recipients can now batch a topic into a `daily` / `weekly` **digest** instead of
receiving every notification immediately. Builds on P3b-1's deferral seam:

- `PreferenceResolver` consumes the `digest` preference field and `digestDeferral()`
defers a batched recipient to the next window (local midnight / Monday 00:00),
tagging the target with a stable `window`. Digest takes precedence over
quiet-hours; `critical` and mandatory topics bypass it.
- `sys_notification_delivery` gains a `digest_key` (`recipient|channel|window`).
Batched rows partition by that key so a window's rows co-locate, and the normal
outbox `claim()` skips them while the new `claimDigest()` drains a window whole.
- The dispatcher's digest pass collapses each `(recipient, channel, window)` group
into one `renderDigest()` message under the existing per-partition cluster lock,
then acks every row in the group with that single outcome.

Additive: non-digest notifications are unchanged. Timezone-from-`sys_user`,
configurable send-hour, and MJML digest emails are deferred follow-ups.
15 changes: 8 additions & 7 deletions docs/adr/0030-notification-platform-convergence.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ADR-0030 — Notification Platform Convergence (single ingress, layered pipeline)

**Status**: Accepted (2026-06-01) — **P0–P3b1 shipped**; P3b-2 (digest) + cross-repo objectui cut-over remain. See [§ Implementation status & remaining work](#implementation-status--remaining-work).
**Status**: Accepted (2026-06-01) — **P0–P3b2 shipped** (P3b-2 digest collapse landed); cross-repo objectui cut-over remains. See [§ Implementation status & remaining work](#implementation-status--remaining-work).
**Supersedes / refines**: [ADR-0012 — Notification Platform](./0012-notification-platform.md) (Draft)
**Related**: [ADR-0019 — Approval as a Flow Node](./0019-approval-as-flow-node.md), [ADR-0022 — Connectors vs Messaging Channels](./0022-connectors-vs-messaging-channels.md)
**Build spec**: [docs/design/notification-platform-convergence.md](../design/notification-platform-convergence.md)
Expand Down Expand Up @@ -82,16 +82,17 @@ The detailed cut-over runbook and per-item notes live in
| **P2 — Subscription + preference** | `sys_notification_preference` (user×topic×channel, admin-global `*` defaults + per-user override, wildcards) + `sys_notification_subscription`; `PreferenceResolver` wired into `emit()` (most-specific-wins, mandatory bypass, fail-open). | #1444 |
| **P3a — email channel + templates** | `createEmailChannel` (delegates transport to the `email` service per ADR-0022); `sys_notification_template` (topic×channel×locale) + declarative `{{ payload.x }}` renderer with `payload.title`/`body` fallback. | #1449 |
| **P3b-1 — quiet-hours** | Deferred dispatch on the outbox (`EnqueueDeliveryInput.notBefore` → `nextAttemptAt`); `quietHoursDeferral()` (tz/HH:MM, overnight-aware); `critical` bypass. | #1453 |
| **P3b-2 — digest** | `PreferenceResolver` consumes the `digest` field (`daily`/`weekly`) → `digestDeferral()` defers to the next window and tags the target; `digest_key` on `sys_notification_delivery` (partition-keyed so a window's rows co-locate); `INotificationOutbox.claimDigest()` drains batched rows whole while normal `claim()` skips them; the dispatcher's digest pass collapses each `(recipient, channel, window)` group into one `renderDigest()` message under the partition lock. `critical`/mandatory bypass. | this PR |
| Startup | `messaging` is foundational: in `ALWAYS_ON_CAPABILITIES` (CLI) and auto-loaded when `audit` is required (cloud capability-loader). | (in #1434) |

### Remaining work (handed off to a follow-up agent)

**1. P3b-2 — Digest (completes the build spec).** Build on P3b-1's deferral:
enqueue digest items deferred to the next window, then a **collapse** step merges
same-`(user, channel, window)` deliveries into one materialization at window time.
Needs: a `digest_key` on `sys_notification_delivery`; a digest assembler (in/beside
the dispatcher); a digest render template. Consumes P2's `digest` field;
`critical`/mandatory bypass.
**1. ~~P3b-2 — Digest~~ — done (this PR).** `PreferenceResolver` batches digest
recipients to the next window; deliveries carry `digest_key`; the dispatcher
collapses same-`(recipient, channel, window)` rows into one rendered message.
Deferred sub-items not in this cut: timezone fallback to a `sys_user` field
(digest windows currently use `quiet_hours.tz` → UTC), MJML digest emails, and a
configurable daily send-hour (windows flush at local midnight / Monday 00:00).

**2. Cross-repo objectui cut-over (the user-facing delivery — separate `objectui` repo).**
- Repoint the Console bell (`AppHeader`/`InboxPopover`/record views) from
Expand Down
46 changes: 46 additions & 0 deletions packages/services/service-messaging/src/digest-render.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { NotificationDeliveryRecord } from './outbox.js';

/**
* One collapsed digest message assembled from a window's batched deliveries
* (ADR-0030 P3b-2). `items` preserves the individual notifications so a
* structured channel (inbox) can render a list, while `title`/`body` give a
* flat rendering for plain channels (email text).
*/
export interface DigestRenderResult {
title: string;
body: string;
severity: 'info';
count: number;
items: Array<{
notificationId: string;
title: string;
body?: string;
topic?: string;
actionUrl?: string;
}>;
}

/**
* Collapse same-`(recipient, channel, window)` deliveries into a single message.
* The caller guarantees `rows` is non-empty and shares a recipient + channel
* (the digest group). Only non-`critical` notifications are ever batched, so the
* digest severity is always `info`.
*/
export function renderDigest(rows: NotificationDeliveryRecord[]): DigestRenderResult {
const items = rows.map((r) => {
const p = r.payload ?? {};
return {
notificationId: r.notificationId,
title: typeof p.title === 'string' && p.title ? p.title : (r.topic ?? 'Notification'),
body: typeof p.body === 'string' && p.body ? p.body : undefined,
topic: r.topic,
actionUrl: typeof p.actionUrl === 'string' && p.actionUrl ? p.actionUrl : undefined,
};
});
const count = items.length;
const title = count === 1 ? items[0].title : `You have ${count} notifications`;
const body = items.map((it) => `• ${it.title}`).join('\n');
return { title, body, severity: 'info', count, items };
}
139 changes: 139 additions & 0 deletions packages/services/service-messaging/src/digest.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect } from 'vitest';
import { MemoryNotificationOutbox } from './memory-outbox.js';
import { NotificationDispatcher } from './dispatcher.js';
import { renderDigest } from './digest-render.js';
import type { NotificationDeliveryRecord } from './outbox.js';
import type { MessagingChannel, Notification } from './channel.js';

function row(over: Partial<NotificationDeliveryRecord>): NotificationDeliveryRecord {
return {
id: 'd', notificationId: 'n', recipientId: 'u1', channel: 'inbox',
payload: {}, partitionKey: 0, status: 'pending', attempts: 0,
createdAt: 0, updatedAt: 0, ...over,
};
}

describe('renderDigest (P3b-2)', () => {
it('collapses a group into one message with a per-item list', () => {
const out = renderDigest([
row({ notificationId: 'a', payload: { title: 'Task A assigned', body: 'do A' }, topic: 'task.assigned' }),
row({ notificationId: 'b', payload: { title: 'Task B assigned' }, topic: 'task.assigned' }),
row({ notificationId: 'c', payload: { title: 'Mentioned in C' }, topic: 'mention' }),
]);
expect(out.count).toBe(3);
expect(out.title).toBe('You have 3 notifications');
expect(out.body).toBe('• Task A assigned\n• Task B assigned\n• Mentioned in C');
expect(out.items.map((i) => i.notificationId)).toEqual(['a', 'b', 'c']);
expect(out.severity).toBe('info');
});

it('uses the single item’s title when the window holds one notification', () => {
const out = renderDigest([row({ payload: { title: 'Just one' } })]);
expect(out.count).toBe(1);
expect(out.title).toBe('Just one');
});

it('falls back to the topic when an item has no title', () => {
const out = renderDigest([row({ payload: {}, topic: 'system.alert' })]);
expect(out.items[0].title).toBe('system.alert');
});
});

describe('MemoryNotificationOutbox — digest claim separation', () => {
it('claim() skips batched rows; claimDigest() returns only them', async () => {
const outbox = new MemoryNotificationOutbox(1, () => 1000);
await outbox.enqueue({ notificationId: 'imm', recipientId: 'u1', channel: 'inbox', payload: {} });
await outbox.enqueue({ notificationId: 'g1', recipientId: 'u1', channel: 'inbox', payload: {}, digestKey: 'u1|inbox|w', notBefore: 500 });

const normal = await outbox.claim({ nodeId: 'n', limit: 10, claimTtlMs: 1000 });
expect(normal.map((r) => r.notificationId)).toEqual(['imm']);

const digest = await outbox.claimDigest({ nodeId: 'n', limit: 10, claimTtlMs: 1000 });
expect(digest.map((r) => r.notificationId)).toEqual(['g1']);
});

it('claimDigest() defers a batched row until its window opens', async () => {
let now = 100;
const outbox = new MemoryNotificationOutbox(1, () => now);
await outbox.enqueue({ notificationId: 'g', recipientId: 'u1', channel: 'inbox', payload: {}, digestKey: 'u1|inbox|w', notBefore: 1000 });
expect(await outbox.claimDigest({ nodeId: 'n', limit: 10, claimTtlMs: 1000 })).toHaveLength(0);
now = 1000;
expect(await outbox.claimDigest({ nodeId: 'n', limit: 10, claimTtlMs: 1000 })).toHaveLength(1);
});
});

/** A channel that records every notification it is asked to send. */
function recordingChannel(id: string): { channel: MessagingChannel; sent: Notification[] } {
const sent: Notification[] = [];
const channel: MessagingChannel = {
id,
async send(_ctx, req) { sent.push(req.notification); return { ok: true }; },
classifyError: () => 'retryable',
};
return { channel, sent };
}

function dispatcher(outbox: MemoryNotificationOutbox, channels: MessagingChannel[], now: () => number) {
return new NotificationDispatcher({
nodeId: 'node-test',
outbox,
channels: { getChannel: (cid: string) => channels.find((c) => c.id === cid) },
channelContext: { logger: { info() {}, warn() {}, error() {} } },
rng: () => 0.5,
now,
partitionCount: 1,
intervalMs: 10_000,
});
}

describe('NotificationDispatcher — digest collapse (P3b-2)', () => {
it('collapses a window into one message at window time and sends normal rows immediately', async () => {
let now = Date.UTC(2026, 0, 1, 9, 0);
const windowAt = Date.UTC(2026, 0, 2, 0, 0);
const outbox = new MemoryNotificationOutbox(1, () => now);
const key = 'u1|inbox|2026-01-01';
for (let i = 0; i < 3; i++) {
await outbox.enqueue({ notificationId: `n${i}`, recipientId: 'u1', channel: 'inbox', payload: { title: `Item ${i}` }, digestKey: key, notBefore: windowAt });
}
await outbox.enqueue({ notificationId: 'imm', recipientId: 'u1', channel: 'inbox', payload: { title: 'Immediate' } });

const rec = recordingChannel('inbox');
const d = dispatcher(outbox, [rec.channel], () => now);

// Before the window: only the immediate row sends; the batch waits.
await d.tick();
expect(rec.sent.map((n) => n.title)).toEqual(['Immediate']);

// At the window: the 3 batched rows collapse into one message.
now = windowAt;
await d.tick();
expect(rec.sent).toHaveLength(2);
const digest = rec.sent[1];
expect(digest.title).toBe('You have 3 notifications');
expect(digest.payload?.digest).toBe(true);
expect(digest.payload?.count).toBe(3);

// All three batched rows are acked success by the single send.
const success = await outbox.list({ status: 'success' });
expect(success.filter((r) => r.digestKey === key)).toHaveLength(3);
});

it('re-defers the whole group when the digest send fails', async () => {
let now = 1000;
const outbox = new MemoryNotificationOutbox(1, () => now);
const key = 'u1|inbox|w';
for (let i = 0; i < 2; i++) {
await outbox.enqueue({ notificationId: `n${i}`, recipientId: 'u1', channel: 'inbox', payload: { title: `Item ${i}` }, digestKey: key, notBefore: 1000 });
}
const failing: MessagingChannel = { id: 'inbox', async send() { return { ok: false, error: 'boom' }; }, classifyError: () => 'retryable' };
const d = dispatcher(outbox, [failing], () => now);

await d.tick();
// Both rows go back to pending with a future next_attempt_at (retry), not success.
const pending = await outbox.list({ status: 'pending' });
expect(pending.filter((r) => r.digestKey === key)).toHaveLength(2);
expect(pending.every((r) => (r.nextAttemptAt ?? 0) > now)).toBe(true);
});
});
87 changes: 82 additions & 5 deletions packages/services/service-messaging/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import type { MessagingChannel, MessagingChannelContext, Notification, SendResult } from './channel.js';
import type { INotificationOutbox, NotificationDeliveryRecord } from './outbox.js';
import { classifyDeliveryAttempt } from './backoff.js';
import { renderDigest } from './digest-render.js';

/** Minimal channel-registry surface the dispatcher needs (MessagingService satisfies it). */
export interface ChannelRegistry {
Expand Down Expand Up @@ -166,17 +167,81 @@ export class NotificationDispatcher {
partition: { index, count: this.opts.partitionCount },
claimTtlMs: this.opts.claimTtlMs,
});
if (claimed.length === 0) return;
await handle.renew?.(this.opts.lockTtlMs);
for (const row of claimed) {
if (handle.isHeld && !handle.isHeld()) break;
await this.processRow(row);
if (claimed.length > 0) {
await handle.renew?.(this.opts.lockTtlMs);
for (const row of claimed) {
if (handle.isHeld && !handle.isHeld()) break;
await this.processRow(row);
}
}

// P3b-2 digest pass: collapse due batched rows by group. Runs under
// the same partition lock — a window's rows share a partition (keyed
// on digest_key), so exactly one node assembles each digest.
const digestRows = await this.opts.outbox.claimDigest({
nodeId: this.opts.nodeId,
limit: this.opts.batchSize,
partition: { index, count: this.opts.partitionCount },
claimTtlMs: this.opts.claimTtlMs,
});
if (digestRows.length > 0) {
await handle.renew?.(this.opts.lockTtlMs);
for (const group of groupByDigestKey(digestRows)) {
if (handle.isHeld && !handle.isHeld()) break;
await this.processDigestGroup(group);
}
}
} finally {
await handle.release();
}
}

/**
* Send one collapsed message for a `(recipient, channel, window)` group and
* ack every row in it with that one outcome. On failure the whole group
* re-defers together (each row keeps its own backoff via its `attempts`).
*/
private async processDigestGroup(rows: NotificationDeliveryRecord[]): Promise<void> {
const channelName = rows[0].channel;
const recipient = rows[0].recipientId;
const channel = this.opts.channels.getChannel(channelName);
if (!channel) {
for (const row of rows) {
await this.opts.outbox.ack(row.id, { success: false, error: `channel '${channelName}' not registered`, dead: true });
this.opts.onAttempt?.(row, false);
}
return;
}

const digest = renderDigest(rows);
const notification: Notification = {
notificationId: rows[0].notificationId, // representative event id
organizationId: rows[0].organizationId,
topic: rows[0].topic,
title: digest.title,
body: digest.body,
severity: 'info',
recipients: [recipient],
channels: [channelName],
payload: { digest: true, count: digest.count, items: digest.items },
};

let result: SendResult;
try {
result = await channel.send(this.opts.channelContext, { notification, channel: channelName, recipient });
} catch (err) {
result = { ok: false, error: (err as Error)?.message ?? String(err) };
}

const errorClass = !result.ok && channel.classifyError ? channel.classifyError(result.error) : undefined;
const now = this.opts.now?.() ?? Date.now();
for (const row of rows) {
const ack = classifyDeliveryAttempt(result, errorClass, row.attempts, now, this.opts.rng);
await this.opts.outbox.ack(row.id, ack);
this.opts.onAttempt?.(row, result.ok);
}
}

private async processRow(row: NotificationDeliveryRecord): Promise<void> {
const channel = this.opts.channels.getChannel(row.channel);
if (!channel) {
Expand Down Expand Up @@ -223,6 +288,18 @@ export class NotificationDispatcher {
}
}

/** Group claimed digest rows by their `digestKey` (insertion order preserved). */
function groupByDigestKey(rows: NotificationDeliveryRecord[]): NotificationDeliveryRecord[][] {
const groups = new Map<string, NotificationDeliveryRecord[]>();
for (const r of rows) {
const key = r.digestKey ?? r.id; // defensive — claimDigest only returns keyed rows
let g = groups.get(key);
if (!g) { g = []; groups.set(key, g); }
g.push(r);
}
return [...groups.values()];
}

/** Spread the starting partition per node so contention rotates fairly. */
function stableNodeOffset(nodeId: string, partitionCount: number): number {
let h = 0;
Expand Down
Loading