Skip to content

Commit a1bbb5e

Browse files
waleedlatif1claude
andcommitted
feat(copilot): add seq ordinal to copilot_messages for order-preserving reads
copilot_messages had no column preserving message order: created_at (set from each message's timestamp) ties at millisecond granularity in 58% of chats, and some chats have out-of-order timestamps within their array. The only other tiebreaker, id, is a random UUID — so ORDER BY created_at, id renders same-timestamp user/assistant pairs swapped. This blocks the R+1 read cutover. Add an integer seq = the message's 0-based index within the chat's JSONB array (ground-truth order), backfilled inline in migration 0219 (no script for self-hosters or us). Reads will use ORDER BY seq NULLS LAST, created_at, id at cutover; reads still come from JSONB after this PR. Design: - seq is a tiebreaker, not the sole sort key (concurrent-append/NULL safety). - Nullable now; defer NOT NULL so rolling-deploy old pods don't fail inserts. - replace (update-messages snapshot) overwrites seq = array index (re-densifies after a mid-conversation delete); append preserves existing seq via COALESCE and assigns base+idx from a single MAX(seq) read (never MAX+i in SQL — multi-row batches would collide). - Dedupe message ids before insert (87 prod chats carry dup ids; a repeated id in one INSERT...ON CONFLICT would otherwise throw). - Backfill picks first-occurrence per (chat,id), gap-free via ROW_NUMBER; validated on staging data (0-based, contiguous, 0 bad ranges). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 503432c commit a1bbb5e

7 files changed

Lines changed: 17649 additions & 20 deletions

File tree

apps/sim/lib/copilot/chat/messages-dual-write.test.ts

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ const assistantMsg: PersistedMessage = {
2626
timestamp: '2026-01-01T00:00:01.000Z',
2727
}
2828

29+
/** The first arg passed to the most recent `.values(...)` call. */
30+
function lastValuesRows() {
31+
const calls = dbChainMockFns.values.mock.calls
32+
return calls[calls.length - 1][0] as Array<Record<string, unknown>>
33+
}
34+
2935
describe('messages-dual-write', () => {
3036
beforeEach(() => {
3137
vi.clearAllMocks()
@@ -43,7 +49,7 @@ describe('messages-dual-write', () => {
4349

4450
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
4551
expect(dbChainMockFns.values).toHaveBeenCalledTimes(1)
46-
const rows = dbChainMockFns.values.mock.calls[0][0]
52+
const rows = lastValuesRows()
4753
expect(rows).toHaveLength(2)
4854

4955
expect(rows[0]).toMatchObject({
@@ -54,22 +60,33 @@ describe('messages-dual-write', () => {
5460
model: null,
5561
streamId: null,
5662
})
57-
expect(rows[0].createdAt).toEqual(new Date(userMsg.timestamp))
58-
expect(rows[0].updatedAt).toEqual(new Date(userMsg.timestamp))
63+
expect(rows[0].createdAt as Date).toEqual(new Date(userMsg.timestamp))
64+
expect(rows[0].updatedAt as Date).toEqual(new Date(userMsg.timestamp))
5965

6066
expect(rows[1]).toMatchObject({
6167
chatId: 'chat-1',
6268
messageId: 'msg-asst-1',
6369
role: 'assistant',
6470
content: assistantMsg,
6571
})
66-
expect(rows[1].createdAt).toEqual(new Date(assistantMsg.timestamp))
72+
expect(rows[1].createdAt as Date).toEqual(new Date(assistantMsg.timestamp))
73+
})
74+
75+
it('assigns seq as 0-based array index when the chat has no prior rows', async () => {
76+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
77+
const rows = lastValuesRows()
78+
expect(rows[0].seq).toBe(0)
79+
expect(rows[1].seq).toBe(1)
6780
})
6881

69-
it('preserves per-message ordering via timestamp', async () => {
82+
it('continues seq from MAX(seq)+1 when the chat already has rows', async () => {
83+
// MAX(seq) read resolves through the select().from().where() chain.
84+
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: 4 }])
85+
7086
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
71-
const rows = dbChainMockFns.values.mock.calls[0][0]
72-
expect(rows[0].createdAt.getTime()).toBeLessThan(rows[1].createdAt.getTime())
87+
const rows = lastValuesRows()
88+
expect(rows[0].seq).toBe(5)
89+
expect(rows[1].seq).toBe(6)
7390
})
7491

7592
it('passes chatModel and streamId options to every row', async () => {
@@ -78,14 +95,14 @@ describe('messages-dual-write', () => {
7895
streamId: 'stream-xyz',
7996
})
8097

81-
const rows = dbChainMockFns.values.mock.calls[0][0]
98+
const rows = lastValuesRows()
8299
expect(rows[0].model).toBe('claude-sonnet-4-5')
83100
expect(rows[0].streamId).toBe('stream-xyz')
84101
expect(rows[1].model).toBe('claude-sonnet-4-5')
85102
expect(rows[1].streamId).toBe('stream-xyz')
86103
})
87104

88-
it('uses ON CONFLICT DO UPDATE with chat_id + message_id target', async () => {
105+
it('uses ON CONFLICT DO UPDATE that PRESERVES existing seq', async () => {
89106
await appendCopilotChatMessages('chat-1', [userMsg])
90107

91108
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
@@ -96,6 +113,15 @@ describe('messages-dual-write', () => {
96113
expect(conflictArg.set).toHaveProperty('model')
97114
expect(conflictArg.set).toHaveProperty('streamId')
98115
expect(conflictArg.set).toHaveProperty('updatedAt')
116+
// append must not renumber existing rows -> COALESCE(existing, excluded)
117+
expect(conflictArg.set.seq.strings.join('')).toContain('COALESCE(')
118+
})
119+
120+
it('collapses duplicate message ids to a single row', async () => {
121+
await appendCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
122+
const rows = lastValuesRows()
123+
expect(rows).toHaveLength(1)
124+
expect(rows[0].messageId).toBe('msg-user-1')
99125
})
100126

101127
it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
@@ -120,25 +146,43 @@ describe('messages-dual-write', () => {
120146
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
121147
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
122148

123-
const rows = dbChainMockFns.values.mock.calls[0][0]
149+
const rows = lastValuesRows()
124150
expect(rows).toHaveLength(2)
125-
expect(rows.map((r: { messageId: string }) => r.messageId)).toEqual([
126-
'msg-user-1',
127-
'msg-asst-1',
128-
])
151+
expect(rows.map((r) => r.messageId)).toEqual(['msg-user-1', 'msg-asst-1'])
129152

130153
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
131154
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
132155
expect(conflictArg.set).toHaveProperty('streamId')
133156
expect(conflictArg.set).toHaveProperty('model')
134157
})
135158

159+
it('assigns seq as the snapshot array index (0-based)', async () => {
160+
await replaceCopilotChatMessages('chat-1', [userMsg, assistantMsg])
161+
const rows = lastValuesRows()
162+
expect(rows[0].seq).toBe(0)
163+
expect(rows[1].seq).toBe(1)
164+
})
165+
166+
it('OVERWRITES seq on conflict so positions re-densify after a delete', async () => {
167+
await replaceCopilotChatMessages('chat-1', [userMsg])
168+
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
169+
// authoritative snapshot -> seq = excluded.seq (no COALESCE preserve)
170+
expect(conflictArg.set.seq.strings.join('')).toBe('excluded.seq')
171+
})
172+
173+
it('collapses duplicate message ids to a single row', async () => {
174+
await replaceCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
175+
const rows = lastValuesRows()
176+
expect(rows).toHaveLength(1)
177+
expect(rows[0].seq).toBe(0)
178+
})
179+
136180
it('passes chatModel to every row in the snapshot', async () => {
137181
await replaceCopilotChatMessages('chat-1', [userMsg], {
138182
chatModel: 'gpt-4o-mini',
139183
})
140184

141-
const rows = dbChainMockFns.values.mock.calls[0][0]
185+
const rows = lastValuesRows()
142186
expect(rows[0].model).toBe('gpt-4o-mini')
143187
})
144188

apps/sim/lib/copilot/chat/messages-dual-write.ts

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,27 @@ import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
77

88
const logger = createLogger('CopilotMessagesDualWrite')
99

10+
/**
11+
* Collapse duplicate message ids (keeping the first occurrence), preserving
12+
* order. A single `INSERT ... ON CONFLICT` cannot touch the same conflict
13+
* target twice, so a snapshot/array carrying a repeated id would otherwise
14+
* throw "ON CONFLICT DO UPDATE command cannot affect row a second time".
15+
*/
16+
function dedupeById(messages: PersistedMessage[]): PersistedMessage[] {
17+
const seen = new Set<string>()
18+
const out: PersistedMessage[] = []
19+
for (const m of messages) {
20+
if (seen.has(m.id)) continue
21+
seen.add(m.id)
22+
out.push(m)
23+
}
24+
return out
25+
}
26+
1027
function toRow(
1128
chatId: string,
1229
message: PersistedMessage,
30+
seq: number,
1331
options?: { chatModel?: string | null; streamId?: string | null }
1432
): typeof copilotMessages.$inferInsert {
1533
const ts = new Date(message.timestamp)
@@ -18,6 +36,7 @@ function toRow(
1836
messageId: message.id,
1937
role: message.role,
2038
content: message,
39+
seq,
2140
model: options?.chatModel ?? null,
2241
streamId: options?.streamId ?? null,
2342
createdAt: ts,
@@ -37,16 +56,28 @@ export async function appendCopilotChatMessages(
3756
): Promise<void> {
3857
if (messages.length === 0) return
3958
try {
59+
const deduped = dedupeById(messages)
60+
// Assign seq as base + array index. Base is computed in JS from a single
61+
// MAX(seq) read — never `MAX(seq)+i` in SQL, since a multi-row INSERT
62+
// evaluates every row's subquery against the same pre-insert state and the
63+
// rows would collide. Existing rows keep their seq on conflict (COALESCE);
64+
// the authoritative renumber happens via replaceCopilotChatMessages.
65+
const [maxRow] = await db
66+
.select({ maxSeq: sql<number | null>`max(${copilotMessages.seq})` })
67+
.from(copilotMessages)
68+
.where(eq(copilotMessages.chatId, chatId))
69+
const base = (maxRow?.maxSeq ?? -1) + 1
4070
await db
4171
.insert(copilotMessages)
42-
.values(messages.map((m) => toRow(chatId, m, options)))
72+
.values(deduped.map((m, i) => toRow(chatId, m, base + i, options)))
4373
.onConflictDoUpdate({
4474
target: [copilotMessages.chatId, copilotMessages.messageId],
4575
set: {
4676
content: sql`excluded.content`,
4777
role: sql`excluded.role`,
4878
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
4979
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
80+
seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`,
5081
updatedAt: sql`now()`,
5182
},
5283
})
@@ -69,7 +100,8 @@ export async function replaceCopilotChatMessages(
69100
options?: { chatModel?: string | null }
70101
): Promise<void> {
71102
try {
72-
const newMessageIds = messages.map((m) => m.id)
103+
const deduped = dedupeById(messages)
104+
const newMessageIds = deduped.map((m) => m.id)
73105
await db.transaction(async (tx) => {
74106
// Drop rows for messages not in the new snapshot.
75107
await tx
@@ -82,19 +114,22 @@ export async function replaceCopilotChatMessages(
82114
)
83115
: eq(copilotMessages.chatId, chatId)
84116
)
85-
if (messages.length === 0) return
86-
// Upsert remaining rows. ON CONFLICT preserves existing stream_id / model
117+
if (deduped.length === 0) return
118+
// Upsert remaining rows. The snapshot is authoritative on order, so seq is
119+
// overwritten with the array index — this re-densifies positions after a
120+
// mid-conversation delete. ON CONFLICT preserves existing stream_id / model
87121
// so a snapshot save doesn't clobber metadata set during streaming.
88122
await tx
89123
.insert(copilotMessages)
90-
.values(messages.map((m) => toRow(chatId, m, options)))
124+
.values(deduped.map((m, i) => toRow(chatId, m, i, options)))
91125
.onConflictDoUpdate({
92126
target: [copilotMessages.chatId, copilotMessages.messageId],
93127
set: {
94128
content: sql`excluded.content`,
95129
role: sql`excluded.role`,
96130
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
97131
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
132+
seq: sql`excluded.seq`,
98133
updatedAt: sql`now()`,
99134
},
100135
})
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
ALTER TABLE "copilot_messages" ADD COLUMN "seq" integer;--> statement-breakpoint
2+
WITH ordered AS (
3+
SELECT c."id" AS chat_id, elem.value->>'id' AS message_id, elem.ord AS ord
4+
FROM "copilot_chats" c
5+
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS elem(value, ord)
6+
WHERE jsonb_typeof(c."messages") = 'array' AND jsonb_array_length(c."messages") > 0
7+
),
8+
first_occurrence AS (
9+
SELECT chat_id, message_id, MIN(ord) AS first_ord FROM ordered GROUP BY chat_id, message_id
10+
),
11+
ranked AS (
12+
SELECT chat_id, message_id,
13+
(ROW_NUMBER() OVER (PARTITION BY chat_id ORDER BY first_ord) - 1) AS seq
14+
FROM first_occurrence
15+
)
16+
UPDATE "copilot_messages" m SET "seq" = r.seq
17+
FROM ranked r
18+
WHERE m."chat_id" = r.chat_id AND m."message_id" = r.message_id;--> statement-breakpoint
19+
CREATE INDEX "copilot_messages_chat_seq_idx" ON "copilot_messages" USING btree ("chat_id","seq") WHERE "copilot_messages"."deleted_at" IS NULL;

0 commit comments

Comments
 (0)