From 232e098e5790d3f79f708a95efda0b923cf80037 Mon Sep 17 00:00:00 2001 From: Jack Zhuang <277994282+os-zhuang@users.noreply.github.com> Date: Tue, 2 Jun 2026 02:51:41 +0800 Subject: [PATCH] feat(messaging,runtime): implement /api/v1/notifications REST surface (ADR-0030) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The notification REST routes (GET /notifications, POST /notifications/read, /read/all) were declared in the spec but never had a server-side handler — no plugin registered the `notification` core service, so the routes were never advertised and client.notifications.* 404'd. (The Console bell works only because it bypasses these endpoints via the generic data API.) - MessagingService: add listInbox/markRead/markAllRead reading sys_inbox_message + sys_notification_receipt; mark-read upserts the receipt keyed (notification_id, user_id, channel:'inbox'), updating the existing delivered receipt in place and inserting only when absent. - MessagingServicePlugin: also register the service under the `notification` core slot so the dispatcher resolves + advertises the routes. - HttpDispatcher: add handleNotification + /notifications dispatch branch; takes the user from the execution context; responses match the spec schemas ({ notifications, unreadCount }, { success, readCount }). - Tests: 6 new MessagingService inbox-API tests; client notification response shapes reconciled to the spec schemas. Co-Authored-By: Claude Opus 4.8 --- .changeset/notification-rest-inbox-surface.md | 36 ++++ packages/client/src/client.test.ts | 10 +- packages/runtime/src/http-dispatcher.ts | 56 +++++++ .../src/messaging-service-plugin.ts | 9 + .../src/messaging-service.test.ts | 137 ++++++++++++++++ .../src/messaging-service.ts | 154 ++++++++++++++++++ 6 files changed, 397 insertions(+), 5 deletions(-) create mode 100644 .changeset/notification-rest-inbox-surface.md diff --git a/.changeset/notification-rest-inbox-surface.md b/.changeset/notification-rest-inbox-surface.md new file mode 100644 index 000000000..b3ce168f5 --- /dev/null +++ b/.changeset/notification-rest-inbox-surface.md @@ -0,0 +1,36 @@ +--- +'@objectstack/service-messaging': minor +'@objectstack/runtime': minor +--- + +Implement the `/api/v1/notifications` REST surface (ADR-0030) + +The notification REST routes (`GET /notifications`, `POST /notifications/read`, +`POST /notifications/read/all`) were declared in the spec but never had a +server-side handler — no plugin registered the `notification` core service, so +the routes were never advertised in discovery and `client.notifications.*` +calls 404'd. (The Console bell works today only because it bypasses these +endpoints and reads the inbox via the generic data API.) + +This wires the surface end-to-end against the ADR-0030 L5 model: + +- **`MessagingService`** gains an inbox read API: `listInbox(userId, opts)` + reads `sys_inbox_message` joined with `sys_notification_receipt` for + read-state (a message is unread until its event has a `read`/`clicked`/ + `dismissed` receipt); `markRead(userId, ids)` and `markAllRead(userId)` + upsert the receipt to `read`, keyed `(notification_id, user_id, + channel:'inbox')` — updating the existing `delivered` receipt in place, + inserting only when absent. No reliance on the re-modeled `sys_notification` + L2 event (which carries no recipient/read columns). +- **`MessagingServicePlugin`** now also registers the messaging service under + the `notification` core service slot, so the dispatcher resolves + advertises + the routes. The legacy `INotificationService.send()` abstraction is unused and + unconsumed. +- **`HttpDispatcher`** gains `handleNotification` + a `/notifications` dispatch + branch: it takes the authenticated user from the execution context and maps + list / mark-read / mark-all-read to the service. Responses match the spec + schemas (`{ notifications, unreadCount }`, `{ success, readCount }`). + +Pairs with the objectui SDK consumer repoint (`useClientNotifications` → +`markRead`/`registerDevice` signatures). Device registration and preference +endpoints remain out of scope (unimplemented as before). diff --git a/packages/client/src/client.test.ts b/packages/client/src/client.test.ts index a839e03fc..45ee3d231 100644 --- a/packages/client/src/client.test.ts +++ b/packages/client/src/client.test.ts @@ -440,7 +440,7 @@ describe('Notifications namespace', () => { it('should list notifications with filters', async () => { const { client, fetchMock } = createMockClient({ success: true, - data: { notifications: [], total: 0 } + data: { notifications: [], unreadCount: 0 } }); await client.notifications.list({ read: false, limit: 10 }); const url = fetchMock.mock.calls[0][0] as string; @@ -452,10 +452,10 @@ describe('Notifications namespace', () => { it('should mark notifications as read', async () => { const { client, fetchMock } = createMockClient({ success: true, - data: { updated: 2 } + data: { success: true, readCount: 2 } }); const result = await client.notifications.markRead(['n1', 'n2']); - expect(result.updated).toBe(2); + expect(result.readCount).toBe(2); const body = JSON.parse(fetchMock.mock.calls[0][1].body); expect(body.ids).toEqual(['n1', 'n2']); }); @@ -463,10 +463,10 @@ describe('Notifications namespace', () => { it('should mark all notifications as read', async () => { const { client, fetchMock } = createMockClient({ success: true, - data: { updated: 5 } + data: { success: true, readCount: 5 } }); const result = await client.notifications.markAllRead(); - expect(result.updated).toBe(5); + expect(result.readCount).toBe(5); const [url, opts] = fetchMock.mock.calls[0]; expect(url).toContain('/api/v1/notifications/read/all'); expect(opts.method).toBe('POST'); diff --git a/packages/runtime/src/http-dispatcher.ts b/packages/runtime/src/http-dispatcher.ts index a74098923..8e78033ed 100644 --- a/packages/runtime/src/http-dispatcher.ts +++ b/packages/runtime/src/http-dispatcher.ts @@ -1212,6 +1212,56 @@ export class HttpDispatcher { return { handled: false }; } + /** + * Handles in-app notification requests (ADR-0030) — the + * `/api/v1/notifications` surface backed by the messaging service's inbox + * read API. Reads the L5 `sys_inbox_message` + `sys_notification_receipt` + * join; mark-read upserts the receipt keyed `(notification_id, user_id, + * channel:'inbox')`. The routes are `auth: true`, so an authenticated user + * is required. + * + * Routes (path is the sub-path after `/notifications`): + * GET '' → listInbox (query: read, type, limit) + * POST /read → markRead (body: { ids: string[] }) + * POST /read/all → markAllRead + */ + async handleNotification(path: string, method: string, body: any, query: any, context: HttpProtocolContext): Promise { + const service = await this.resolveService(CoreServiceName.enum.notification, context.environmentId) as any; + if (!service || typeof service.listInbox !== 'function') return { handled: false }; + + const userId: string | undefined = context.executionContext?.userId; + if (!userId) { + return { handled: true, response: this.error('Authentication required', 401) }; + } + + const m = method.toUpperCase(); + const subPath = path.replace(/^\/+/, '').replace(/\/+$/, ''); + + // GET /notifications — list the user's inbox joined with read-state. + if (subPath === '' && m === 'GET') { + const read = query?.read === undefined ? undefined : String(query.read) === 'true'; + const limit = query?.limit ? Number(query.limit) : undefined; + const type = query?.type ? String(query.type) : undefined; + const result = await service.listInbox(userId, { read, type, limit }); + return { handled: true, response: this.success(result) }; + } + + // POST /notifications/read — mark specific notifications read. + if (subPath === 'read' && m === 'POST') { + const ids: string[] = Array.isArray(body?.ids) ? body.ids.map((x: unknown) => String(x)) : []; + const result = await service.markRead(userId, ids); + return { handled: true, response: this.success(result) }; + } + + // POST /notifications/read/all — mark all of the user's inbox read. + if (subPath === 'read/all' && m === 'POST') { + const result = await service.markAllRead(userId); + return { handled: true, response: this.success(result) }; + } + + return { handled: false }; + } + /** * Handles i18n requests * path: sub-path after /i18n/ @@ -2357,6 +2407,12 @@ export class HttpDispatcher { return this.handleAnalytics(cleanPath.substring(10), method, body, context); } + // In-app notifications (ADR-0030) — inbox list + receipt mark-read, + // backed by the messaging service registered under the `notification` slot. + if (cleanPath.startsWith('/notifications')) { + return this.handleNotification(cleanPath.substring(14), method, body, query, context); + } + if (cleanPath.startsWith('/packages')) { return this.handlePackages(cleanPath.substring(9), method, body, query, context); } diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index 11461060d..a0746025b 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -129,6 +129,15 @@ export class MessagingServicePlugin implements Plugin { ctx.registerService('messaging', service); + // ADR-0030: the messaging service also backs the `notification` core + // service slot — it owns the in-app inbox + receipts, so it answers the + // `/api/v1/notifications` REST surface (list / mark-read / mark-all-read) + // via its inbox read API. Registering it here makes the dispatcher + // resolve + advertise those routes (`hasNotification`). The legacy + // INotificationService `send()` abstraction is unused; nothing consumes + // the slot expecting it. + ctx.registerService('notification', service); + // Register the messaging objects so their rows can be written. The // preference/subscription objects (ADR-0030 P2) are Studio-configurable, // so contribute them to the Setup app's Configuration slot (ADR-0029 D7) diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index f23b166bd..78052de83 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -425,3 +425,140 @@ describe('MessagingService', () => { }); }); }); + +/** + * A stateful in-memory engine for the inbox read API (ADR-0030). Supports the + * flat-equality `where` filters listInbox/markRead/markAllRead issue, plus + * `update(..., { where: { id } })` mutation and `insert`. + */ +function inboxEngine(seed: { inbox?: any[]; receipts?: any[] } = {}) { + const store: Record = { + sys_inbox_message: [...(seed.inbox ?? [])], + sys_notification_receipt: [...(seed.receipts ?? [])], + }; + let seq = 0; + const matches = (row: any, where: any = {}) => + Object.entries(where).every(([k, v]) => String(row[k]) === String(v)); + const engine = { + store, + async find(object: string, query: any = {}) { + let rows = (store[object] ?? []).filter((r) => matches(r, query.where)); + const ob = Array.isArray(query.orderBy) ? query.orderBy : []; + if (ob.some((o: any) => o.field === 'created_at' && o.order === 'desc')) { + rows = [...rows].sort((a, b) => String(b.created_at).localeCompare(String(a.created_at))); + } + return typeof query.limit === 'number' ? rows.slice(0, query.limit) : rows; + }, + async findOne(object: string, query: any = {}) { + return (store[object] ?? []).find((r) => matches(r, query.where)) ?? null; + }, + async insert(object: string, row: any) { + const created = { id: `row_${++seq}`, ...row }; + (store[object] ??= []).push(created); + return created; + }, + async update(object: string, data: any, options: any = {}) { + for (const r of store[object] ?? []) { + if (matches(r, options.where)) Object.assign(r, data); + } + return {}; + }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + }; + return engine as any; +} + +describe('MessagingService — inbox read API (ADR-0030)', () => { + const logger = silentLogger(); + + it('lists inbox rows joined with receipt read-state and counts unread', async () => { + const engine = inboxEngine({ + inbox: [ + { id: 'm1', user_id: 'u1', notification_id: 'n1', topic: 'collab.mention', title: 'A', body_md: 'a', action_url: '/x', created_at: '2026-01-01T00:00:01Z' }, + { id: 'm2', user_id: 'u1', notification_id: 'n2', topic: 'task.assigned', title: 'B', body_md: 'b', created_at: '2026-01-01T00:00:02Z' }, + { id: 'm3', user_id: 'u2', notification_id: 'n3', topic: 'x', title: 'C', created_at: '2026-01-01T00:00:03Z' }, + ], + receipts: [ + { id: 'r1', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' }, + { id: 'r2', notification_id: 'n2', user_id: 'u1', channel: 'inbox', state: 'delivered' }, + ], + }); + const svc = new MessagingService({ logger, getData: () => engine }); + + const res = await svc.listInbox('u1'); + // Only u1's rows; newest first; n2 unread, n1 read. + expect(res.notifications.map((n) => n.id)).toEqual(['n2', 'n1']); + expect(res.unreadCount).toBe(1); + const n1 = res.notifications.find((n) => n.id === 'n1')!; + expect(n1).toMatchObject({ type: 'collab.mention', title: 'A', body: 'a', read: true, actionUrl: '/x' }); + expect(res.notifications.find((n) => n.id === 'n2')!.read).toBe(false); + }); + + it('filters by read state when requested', async () => { + const engine = inboxEngine({ + inbox: [ + { id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }, + { id: 'm2', user_id: 'u1', notification_id: 'n2', title: 'B', created_at: '2' }, + ], + receipts: [{ id: 'r1', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' }], + }); + const svc = new MessagingService({ logger, getData: () => engine }); + + expect((await svc.listInbox('u1', { read: false })).notifications.map((n) => n.id)).toEqual(['n2']); + expect((await svc.listInbox('u1', { read: true })).notifications.map((n) => n.id)).toEqual(['n1']); + }); + + it('markRead updates the existing delivered receipt in place (no duplicate)', async () => { + const engine = inboxEngine({ + inbox: [{ id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }], + receipts: [{ id: 'r1', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'delivered' }], + }); + const svc = new MessagingService({ logger, getData: () => engine }); + + const res = await svc.markRead('u1', ['n1']); + expect(res).toEqual({ success: true, readCount: 1 }); + const receipts = engine.store.sys_notification_receipt; + expect(receipts).toHaveLength(1); // updated in place, not duplicated + expect(receipts[0]).toMatchObject({ id: 'r1', state: 'read' }); + expect(receipts[0].at).toBeTruthy(); + }); + + it('markRead inserts a read receipt when none exists yet', async () => { + const engine = inboxEngine({ + inbox: [{ id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }], + }); + const svc = new MessagingService({ logger, getData: () => engine }); + + const res = await svc.markRead('u1', ['n1']); + expect(res.readCount).toBe(1); + const receipts = engine.store.sys_notification_receipt; + expect(receipts).toHaveLength(1); + expect(receipts[0]).toMatchObject({ notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' }); + }); + + it('markAllRead flips every unread message and leaves already-read ones', async () => { + const engine = inboxEngine({ + inbox: [ + { id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }, + { id: 'm2', user_id: 'u1', notification_id: 'n2', title: 'B', created_at: '2' }, + ], + receipts: [{ id: 'r1', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' }], + }); + const svc = new MessagingService({ logger, getData: () => engine }); + + const res = await svc.markAllRead('u1'); + expect(res.readCount).toBe(1); // only n2 was unread + expect((await svc.listInbox('u1')).unreadCount).toBe(0); + }); + + it('degrades to empty without a data engine or user id', async () => { + const noData = new MessagingService({ logger }); + expect(await noData.listInbox('u1')).toEqual({ notifications: [], unreadCount: 0 }); + expect(await noData.markRead('u1', ['n1'])).toEqual({ success: true, readCount: 0 }); + + const svc = new MessagingService({ logger, getData: () => inboxEngine() }); + expect(await svc.listInbox('')).toEqual({ notifications: [], unreadCount: 0 }); + }); +}); diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 4539bc0c4..5f3639248 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -10,10 +10,30 @@ import { RecipientResolver } from './recipient-resolver.js'; import { PreferenceResolver, type PreferenceTarget } from './preference-resolver.js'; import type { INotificationOutbox } from './outbox.js'; import type { EnqueueHttpInput, HttpDelivery, HttpDeliveryStatus, IHttpOutbox } from './http-outbox.js'; +import { INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; /** The L2 event object every `emit()` writes one row to (ADR-0030). */ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; +/** Receipt states that count as "read" for the inbox unread badge (ADR-0030). */ +const READ_RECEIPT_STATES = new Set(['read', 'clicked', 'dismissed']); + +/** + * One row of the inbox list REST response — the `Notification` shape in the API + * spec (`NotificationSchema`). `id` is the notification's event id + * (`notification_id`), which keys its read-state receipt, falling back to the + * inbox row id for synthetic/legacy rows that carry no event id. + */ +export interface InboxNotificationView { + id: string; + type: string; + title: string; + body: string; + read: boolean; + actionUrl?: string; + createdAt: string; +} + /** * Audience selector for {@link EmitInput}. P0 resolves explicit user ids and * email-shaped recipients inline (email→id is finished at the inbox channel); @@ -224,6 +244,140 @@ export class MessagingService { return [...this.channels.keys()]; } + /* ------------------------------------------------------------------ */ + /* Inbox read API (ADR-0030) — backs /api/v1/notifications */ + /* */ + /* The REST notification surface reads the L5 `sys_inbox_message` */ + /* materialization joined with the `sys_notification_receipt` */ + /* read-state spine — NOT the re-modeled `sys_notification` L2 event */ + /* (which carries no recipient/read columns). Mark-read upserts the */ + /* receipt, keyed `(notification_id, user_id, channel:'inbox')`. */ + /* ------------------------------------------------------------------ */ + + /** + * List the signed-in user's in-app inbox, joined with read-state. + * + * A message is unread until its event has a `read`/`clicked`/`dismissed` + * receipt; the `read` filter (when given) is applied in-memory after the + * join. `unreadCount` is computed over the fetched window (bounded by + * `limit`, like the Console bell's poll). Returns the REST contract shape + * (`ListNotificationsResponseSchema`): `{ notifications, unreadCount }`. + */ + async listInbox( + userId: string, + opts: { read?: boolean; type?: string; limit?: number } = {}, + ): Promise<{ notifications: InboxNotificationView[]; unreadCount: number }> { + const data = this.ctx.getData?.(); + if (!data || !userId) return { notifications: [], unreadCount: 0 }; + + const limit = Math.min(Math.max(opts.limit ?? 50, 1), 200); + const where: Record = { user_id: userId }; + if (opts.type) where.topic = opts.type; + + const [rows, receipts] = await Promise.all([ + data.find(INBOX_OBJECT, { where, orderBy: [{ field: 'created_at', order: 'desc' }], limit }) as Promise>>, + // Read-state spine. Best-effort: if receipts are unavailable the + // inbox still lists (everything reads as unread) rather than erroring. + (data.find(RECEIPT_OBJECT, { where: { user_id: userId, channel: 'inbox' } }) as Promise>>) + .catch(() => [] as Array>), + ]); + + // notification_id → most-advanced receipt state (read/clicked/dismissed + // wins over a plain delivered one). + const stateByNotif = new Map(); + for (const r of receipts) { + const nid = r?.notification_id != null ? String(r.notification_id) : ''; + if (!nid) continue; + const state = String(r.state ?? 'delivered'); + const prev = stateByNotif.get(nid); + if (!prev || (!READ_RECEIPT_STATES.has(prev) && READ_RECEIPT_STATES.has(state))) { + stateByNotif.set(nid, state); + } + } + + let unreadCount = 0; + const all: InboxNotificationView[] = rows.map((m) => { + const nid = m?.notification_id != null ? String(m.notification_id) : null; + const state = nid ? stateByNotif.get(nid) : undefined; + const read = state ? READ_RECEIPT_STATES.has(state) : false; + if (!read) unreadCount += 1; + return { + id: nid ?? String(m.id), + type: (m.topic as string) ?? 'notification', + title: (m.title as string) ?? '', + body: (m.body_md as string) ?? '', + read, + actionUrl: (m.action_url as string) ?? undefined, + createdAt: (m.created_at as string) ?? this.now(), + }; + }); + + const notifications = opts.read === undefined ? all : all.filter((n) => n.read === opts.read); + return { notifications, unreadCount }; + } + + /** + * Mark specific notifications read by upserting their inbox receipts to + * `read`. Updates the existing `delivered` receipt in place (keyed + * `(notification_id, user_id, channel:'inbox')`); inserts one only when + * absent. `ids` are notification (event) ids. Returns the REST contract + * shape (`MarkNotificationsReadResponseSchema`): `{ success, readCount }`. + */ + async markRead(userId: string, ids: readonly string[]): Promise<{ success: boolean; readCount: number }> { + const data = this.ctx.getData?.(); + if (!data || !userId || !ids?.length) return { success: true, readCount: 0 }; + const at = this.now(); + let readCount = 0; + for (const id of ids) { + const nid = String(id ?? ''); + if (!nid) continue; + try { + readCount += await this.upsertReadReceipt(data, userId, nid, at); + } catch (err) { + this.ctx.logger.warn(`[messaging] markRead failed for '${nid}': ${(err as Error).message}`); + } + } + return { success: true, readCount }; + } + + /** + * Mark every currently-unread inbox message for the user as read. Returns + * `{ success, readCount }` (`MarkAllNotificationsReadResponseSchema`). + */ + async markAllRead(userId: string): Promise<{ success: boolean; readCount: number }> { + const data = this.ctx.getData?.(); + if (!data || !userId) return { success: true, readCount: 0 }; + const { notifications } = await this.listInbox(userId, { read: false, limit: 200 }); + return this.markRead(userId, notifications.map((n) => n.id)); + } + + /** Upsert a `read` receipt for one notification; returns 1 when it persisted. */ + private async upsertReadReceipt( + data: IDataEngine, + userId: string, + notificationId: string, + at: string, + ): Promise { + const existing = await data.findOne(RECEIPT_OBJECT, { + where: { notification_id: notificationId, user_id: userId, channel: 'inbox' }, + fields: ['id'], + }); + if (existing?.id) { + await data.update(RECEIPT_OBJECT, { state: 'read', at }, { where: { id: existing.id } } as never); + } else { + await data.insert(RECEIPT_OBJECT, { + notification_id: notificationId, + delivery_id: null, + user_id: userId, + channel: 'inbox', + state: 'read', + at, + created_at: at, + }); + } + return 1; + } + /** * The single notification ingress. Writes the L2 event, resolves the * audience, and fans the result out to its channels. An unregistered