diff --git a/.changeset/notification-receipt-unique-race.md b/.changeset/notification-receipt-unique-race.md new file mode 100644 index 000000000..dba1cb50d --- /dev/null +++ b/.changeset/notification-receipt-unique-race.md @@ -0,0 +1,14 @@ +--- +'@objectstack/service-messaging': patch +--- + +fix(messaging): converge mark-read receipt on unique-index race + +`markRead`'s `upsertReadReceipt` did `findOne`-then-`insert` (check-then-act), so +a concurrent mark-read — or the best-effort `delivered` receipt write still in +flight — could win the `UNIQUE(notification_id, user_id, channel)` index between +the read and the write. Clicking a notification then threw +`UNIQUE constraint failed: sys_notification_receipt...`. The insert now catches a +unique violation and falls back to flipping the now-present row to `read`, with a +cross-driver `isUniqueViolation` helper (SQLite / Postgres `23505` / +MySQL `ER_DUP_ENTRY`). diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index 78052de83..ee82e3a2e 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -538,6 +538,36 @@ describe('MessagingService — inbox read API (ADR-0030)', () => { expect(receipts[0]).toMatchObject({ notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' }); }); + it('markRead survives a unique-index race on the receipt insert', async () => { + // No receipt seeded, so the fast-path findOne misses. The insert then + // hits the UNIQUE(notification_id, user_id, channel) index because a + // concurrent mark-read (or the best-effort `delivered` write) created + // the row first. We must catch it and converge to the existing row. + const engine = inboxEngine({ + inbox: [{ id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }], + }); + const realInsert = engine.insert.bind(engine); + let raced = false; + engine.insert = async (object: string, row: any) => { + if (object === 'sys_notification_receipt' && !raced) { + raced = true; + // A concurrent writer wins the index, then our insert collides. + engine.store.sys_notification_receipt.push({ + id: 'r_concurrent', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'delivered', + }); + throw new Error('UNIQUE constraint failed: sys_notification_receipt.notification_id, sys_notification_receipt.user_id, sys_notification_receipt.channel'); + } + return realInsert(object, row); + }; + const svc = new MessagingService({ logger, getData: () => engine }); + + const res = await svc.markRead('u1', ['n1']); + expect(res).toEqual({ success: true, readCount: 1 }); + const receipts = engine.store.sys_notification_receipt; + expect(receipts).toHaveLength(1); // converged, not duplicated + expect(receipts[0]).toMatchObject({ id: 'r_concurrent', state: 'read' }); + }); + it('markAllRead flips every unread message and leaves already-read ones', async () => { const engine = inboxEngine({ inbox: [ diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 27cdd9fa4..e361a5da8 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -18,6 +18,24 @@ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; /** Receipt states that count as "read" for the inbox unread badge (ADR-0030). */ const READ_RECEIPT_STATES = new Set(['read', 'clicked', 'dismissed']); +/** + * Whether a driver error is a unique/primary-key constraint violation. Spans the + * SQL drivers we ship: SQLite (`UNIQUE constraint failed`), Postgres (`23505` / + * `duplicate key`), and MySQL (`ER_DUP_ENTRY` / `Duplicate entry`). Used to turn + * a lost check-then-act race on a unique index into a fallback update. + */ +function isUniqueViolation(err: unknown): boolean { + const e = err as { code?: string | number; message?: string } | undefined; + if (!e) return false; + if (e.code === '23505' || e.code === 'ER_DUP_ENTRY' || e.code === 'SQLITE_CONSTRAINT_UNIQUE') return true; + const msg = String(e.message ?? '').toLowerCase(); + return ( + msg.includes('unique constraint failed') || + msg.includes('duplicate key') || + msg.includes('duplicate entry') + ); +} + /** * One row of the inbox list REST response — the `Notification` shape in the API * spec (`NotificationSchema`). `id` is the notification's event id @@ -358,13 +376,22 @@ export class MessagingService { notificationId: string, at: string, ): Promise { - const existing = await data.findOne(RECEIPT_OBJECT, { - where: { notification_id: notificationId, user_id: userId, channel: 'inbox' }, - fields: ['id'], - }); - if (existing?.id) { + const where = { notification_id: notificationId, user_id: userId, channel: 'inbox' }; + const flipToRead = async (): Promise => { + const existing = await data.findOne(RECEIPT_OBJECT, { where, fields: ['id'] }); + if (!existing?.id) return false; await data.update(RECEIPT_OBJECT, { state: 'read', at }, { where: { id: existing.id } } as never); - } else { + return true; + }; + + if (await flipToRead()) return 1; + + // No receipt yet — insert one. findOne→insert is check-then-act, so a + // concurrent mark-read (or the best-effort `delivered` write still in + // flight) can win the (notification_id, user_id, channel) unique index + // between our read and write. Treat that collision as "someone else + // created it" and flip the now-present row to `read` instead of failing. + try { await data.insert(RECEIPT_OBJECT, { notification_id: notificationId, delivery_id: null, @@ -374,8 +401,11 @@ export class MessagingService { at, created_at: at, }); + return 1; + } catch (err) { + if (isUniqueViolation(err) && (await flipToRead())) return 1; + throw err; } - return 1; } /**