Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/notification-receipt-unique-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
'@objectstack/service-messaging': patch
---

fix(messaging): converge mark-read receipt on unique-index race

`markRead`'s `upsertReadReceipt` did `findOne`-then-`insert` (check-then-act), so
a concurrent mark-read — or the best-effort `delivered` receipt write still in
flight — could win the `UNIQUE(notification_id, user_id, channel)` index between
the read and the write. Clicking a notification then threw
`UNIQUE constraint failed: sys_notification_receipt...`. The insert now catches a
unique violation and falls back to flipping the now-present row to `read`, with a
cross-driver `isUniqueViolation` helper (SQLite / Postgres `23505` /
MySQL `ER_DUP_ENTRY`).
30 changes: 30 additions & 0 deletions packages/services/service-messaging/src/messaging-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,36 @@ describe('MessagingService — inbox read API (ADR-0030)', () => {
expect(receipts[0]).toMatchObject({ notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'read' });
});

it('markRead survives a unique-index race on the receipt insert', async () => {
// No receipt seeded, so the fast-path findOne misses. The insert then
// hits the UNIQUE(notification_id, user_id, channel) index because a
// concurrent mark-read (or the best-effort `delivered` write) created
// the row first. We must catch it and converge to the existing row.
const engine = inboxEngine({
inbox: [{ id: 'm1', user_id: 'u1', notification_id: 'n1', title: 'A', created_at: '1' }],
});
const realInsert = engine.insert.bind(engine);
let raced = false;
engine.insert = async (object: string, row: any) => {
if (object === 'sys_notification_receipt' && !raced) {
raced = true;
// A concurrent writer wins the index, then our insert collides.
engine.store.sys_notification_receipt.push({
id: 'r_concurrent', notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'delivered',
});
throw new Error('UNIQUE constraint failed: sys_notification_receipt.notification_id, sys_notification_receipt.user_id, sys_notification_receipt.channel');
}
return realInsert(object, row);
};
const svc = new MessagingService({ logger, getData: () => engine });

const res = await svc.markRead('u1', ['n1']);
expect(res).toEqual({ success: true, readCount: 1 });
const receipts = engine.store.sys_notification_receipt;
expect(receipts).toHaveLength(1); // converged, not duplicated
expect(receipts[0]).toMatchObject({ id: 'r_concurrent', state: 'read' });
});

it('markAllRead flips every unread message and leaves already-read ones', async () => {
const engine = inboxEngine({
inbox: [
Expand Down
44 changes: 37 additions & 7 deletions packages/services/service-messaging/src/messaging-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification';
/** Receipt states that count as "read" for the inbox unread badge (ADR-0030). */
const READ_RECEIPT_STATES = new Set(['read', 'clicked', 'dismissed']);

/**
* Whether a driver error is a unique/primary-key constraint violation. Spans the
* SQL drivers we ship: SQLite (`UNIQUE constraint failed`), Postgres (`23505` /
* `duplicate key`), and MySQL (`ER_DUP_ENTRY` / `Duplicate entry`). Used to turn
* a lost check-then-act race on a unique index into a fallback update.
*/
function isUniqueViolation(err: unknown): boolean {
const e = err as { code?: string | number; message?: string } | undefined;
if (!e) return false;
if (e.code === '23505' || e.code === 'ER_DUP_ENTRY' || e.code === 'SQLITE_CONSTRAINT_UNIQUE') return true;
const msg = String(e.message ?? '').toLowerCase();
return (
msg.includes('unique constraint failed') ||
msg.includes('duplicate key') ||
msg.includes('duplicate entry')
);
}

/**
* One row of the inbox list REST response — the `Notification` shape in the API
* spec (`NotificationSchema`). `id` is the notification's event id
Expand Down Expand Up @@ -358,13 +376,22 @@ export class MessagingService {
notificationId: string,
at: string,
): Promise<number> {
const existing = await data.findOne(RECEIPT_OBJECT, {
where: { notification_id: notificationId, user_id: userId, channel: 'inbox' },
fields: ['id'],
});
if (existing?.id) {
const where = { notification_id: notificationId, user_id: userId, channel: 'inbox' };
const flipToRead = async (): Promise<boolean> => {
const existing = await data.findOne(RECEIPT_OBJECT, { where, fields: ['id'] });
if (!existing?.id) return false;
await data.update(RECEIPT_OBJECT, { state: 'read', at }, { where: { id: existing.id } } as never);
} else {
return true;
};

if (await flipToRead()) return 1;

// No receipt yet — insert one. findOne→insert is check-then-act, so a
// concurrent mark-read (or the best-effort `delivered` write still in
// flight) can win the (notification_id, user_id, channel) unique index
// between our read and write. Treat that collision as "someone else
// created it" and flip the now-present row to `read` instead of failing.
try {
await data.insert(RECEIPT_OBJECT, {
notification_id: notificationId,
delivery_id: null,
Expand All @@ -374,8 +401,11 @@ export class MessagingService {
at,
created_at: at,
});
return 1;
} catch (err) {
if (isUniqueViolation(err) && (await flipToRead())) return 1;
throw err;
}
return 1;
}

/**
Expand Down