From 90d9fafda45b84eb862871c39ca0a9c4ec3e958c Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 10:38:11 -0500 Subject: [PATCH 1/9] feat: rotate sealed chat history into chat-archive.jsonl at compaction boundaries --- src/common/constants/paths.ts | 14 + src/node/services/historyService.test.ts | 243 +++++++++ src/node/services/historyService.ts | 661 ++++++++++++++++++----- src/node/services/workspaceService.ts | 59 +- 4 files changed, 826 insertions(+), 151 deletions(-) diff --git a/src/common/constants/paths.ts b/src/common/constants/paths.ts index 2f0ac9e50c..7dfa96d86c 100644 --- a/src/common/constants/paths.ts +++ b/src/common/constants/paths.ts @@ -5,6 +5,20 @@ import { join } from "path"; const LEGACY_MUX_DIR_NAME = ".cmux"; const MUX_DIR_NAME = ".mux"; +/** + * Session-dir file holding the active chat history epoch (latest compaction + * boundary onward). Example: ~/.mux/sessions//chat.jsonl + */ +export const CHAT_FILE_NAME = "chat.jsonl"; + +/** + * Session-dir file holding sealed pre-boundary chat history. HistoryService + * rotates everything before the latest durable context boundary out of + * chat.jsonl into this append-only archive so per-turn reads/rewrites stay + * O(active epoch) instead of O(lifetime history). + */ +export const CHAT_ARCHIVE_FILE_NAME = "chat-archive.jsonl"; + /** * Migrate from the legacy ~/.cmux directory into ~/.mux for rebranded installs. * Called on startup to preserve data created by earlier releases. diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 9b02b6261c..5abec1b899 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1375,4 +1375,247 @@ describe("HistoryService", () => { expect(collected[0].id).toBe("valid-1"); }); }); + + describe("sealed history rotation", () => { + const wsId = "ws-rotation"; + + function boundaryMessage(id: string, epoch: number): MuxMessage { + return createMuxMessage(id, "assistant", `Summary ${epoch}`, { + compactionBoundary: true, + compacted: "user", + compactionEpoch: epoch, + }); + } + + async function readJsonlFile(filePath: string): Promise { + const data = await fs.readFile(filePath, "utf-8"); + return data + .split("\n") + .filter((line) => line.trim()) + .map((line) => JSON.parse(line) as MuxMessage); + } + + function chatPath(workspaceId: string): string { + return path.join(config.getSessionDir(workspaceId), "chat.jsonl"); + } + + function archivePath(workspaceId: string): string { + return path.join(config.getSessionDir(workspaceId), "chat-archive.jsonl"); + } + + it("rotates the sealed prefix into the archive when a boundary is appended", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post-0", "user", "after")); // seq 4 + + // Active file holds only the latest epoch; sealed rows moved to the archive. + const chatRows = await readJsonlFile(chatPath(wsId)); + expect(chatRows.map((m) => m.id)).toEqual(["boundary-1", "post-0"]); + const archiveRows = await readJsonlFile(archivePath(wsId)); + expect(archiveRows.map((m) => m.id)).toEqual(["msg-0", "msg-1", "msg-2"]); + + // Hot-path read returns the active epoch. + const latest = await service.getHistoryFromLatestBoundary(wsId); + expect(latest.success).toBe(true); + if (latest.success) { + expect(latest.data.map((m) => m.id)).toEqual(["boundary-1", "post-0"]); + } + + // Full iteration still sees everything in order. + const full = await collectFullHistory(service, wsId); + expect(full.map((m) => m.id)).toEqual(["msg-0", "msg-1", "msg-2", "boundary-1", "post-0"]); + + // Paging into the sealed window still works. + const window = await service.getHistoryBoundaryWindow(wsId, 3); + expect(window.success).toBe(true); + if (window.success) { + expect(window.data.messages.map((m) => m.id)).toEqual(["msg-0", "msg-1", "msg-2"]); + expect(window.data.hasOlder).toBe(false); + } + }); + + it("lazily rotates legacy files with a mid-file boundary on first read", async () => { + const lines = [ + messageLine(wsId, createMuxMessage("old-0", "user", "old", { historySequence: 0 })), + messageLine(wsId, { + ...boundaryMessage("boundary-1", 1), + metadata: { ...boundaryMessage("boundary-1", 1).metadata, historySequence: 1 }, + }), + messageLine(wsId, createMuxMessage("post-0", "user", "after", { historySequence: 2 })), + ]; + await writeHistoryLines(config, wsId, lines); + + const latest = await service.getHistoryFromLatestBoundary(wsId); + expect(latest.success).toBe(true); + if (latest.success) { + expect(latest.data.map((m) => m.id)).toEqual(["boundary-1", "post-0"]); + } + + // The read migrated the sealed prefix out of chat.jsonl. + const chatRows = await readJsonlFile(chatPath(wsId)); + expect(chatRows.map((m) => m.id)).toEqual(["boundary-1", "post-0"]); + const archiveRows = await readJsonlFile(archivePath(wsId)); + expect(archiveRows.map((m) => m.id)).toEqual(["old-0"]); + }); + + it("reads boundary windows across the archive seam (skip + paging)", async () => { + await service.appendToHistory(wsId, createMuxMessage("e1-user", "user", "msg")); // seq 0 + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 1 + await service.appendToHistory(wsId, createMuxMessage("e2-user", "user", "msg")); // seq 2 + await service.appendToHistory(wsId, boundaryMessage("boundary-2", 2)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post", "user", "after")); // seq 4 + + // Both sealed epochs live in the archive now. + const archiveRows = await readJsonlFile(archivePath(wsId)); + expect(archiveRows.map((m) => m.id)).toEqual(["e1-user", "boundary-1", "e2-user"]); + + // skip=1 spans archive tail + entire active file. + const penultimate = await service.getHistoryFromLatestBoundary(wsId, 1); + expect(penultimate.success).toBe(true); + if (penultimate.success) { + expect(penultimate.data.map((m) => m.id)).toEqual([ + "boundary-1", + "e2-user", + "boundary-2", + "post", + ]); + } + + // Page one: the boundary-1 window from the archive. + const page1 = await service.getHistoryBoundaryWindow(wsId, 3); + expect(page1.success).toBe(true); + if (page1.success) { + expect(page1.data.messages.map((m) => m.id)).toEqual(["boundary-1", "e2-user"]); + expect(page1.data.hasOlder).toBe(true); + } + + // Page two: pre-boundary rows, no older history. + const page2 = await service.getHistoryBoundaryWindow(wsId, 1); + expect(page2.success).toBe(true); + if (page2.success) { + expect(page2.data.messages.map((m) => m.id)).toEqual(["e1-user"]); + expect(page2.data.hasOlder).toBe(false); + } + }); + + it("initializes the sequence counter from the archive when chat.jsonl is missing", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + + // Simulate hand-deletion of the active file; archived sequences must not be reused. + await fs.rm(chatPath(wsId)); + + const restarted = new HistoryService(config); + const msg = createMuxMessage("new-msg", "user", "fresh"); + const appendResult = await restarted.appendToHistory(wsId, msg); + expect(appendResult.success).toBe(true); + expect(msg.metadata?.historySequence).toBe(3); + }); + + it("deduplicates rows when a crash replays the sealed prefix", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 → archived after boundary + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post-0", "user", "after")); // seq 4 + + // Simulate a crash between the archive append and the chat.jsonl rewrite: + // the sealed prefix reappears at the head of chat.jsonl while the archive + // already contains it. + const archived = await fs.readFile(archivePath(wsId), "utf-8"); + const active = await fs.readFile(chatPath(wsId), "utf-8"); + await fs.writeFile(chatPath(wsId), archived + active); + + // A fresh process triggers the lazy rotation check on first read. + const restarted = new HistoryService(config); + const latest = await restarted.getHistoryFromLatestBoundary(wsId); + expect(latest.success).toBe(true); + + const archiveRows = await readJsonlFile(archivePath(wsId)); + expect(archiveRows.map((m) => m.id)).toEqual(["msg-0", "msg-1", "msg-2"]); + + const full = await collectFullHistory(restarted, wsId); + expect(full.map((m) => m.id)).toEqual(["msg-0", "msg-1", "msg-2", "boundary-1", "post-0"]); + }); + + it("returns the tail across the archive seam from getLastMessages", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post-0", "user", "after")); // seq 4 + + const result = await service.getLastMessages(wsId, 4); + expect(result.success).toBe(true); + if (result.success) { + // chat.jsonl only has 2 rows; the older two must come from the archive. + expect(result.data.map((m) => m.id)).toEqual(["msg-1", "msg-2", "boundary-1", "post-0"]); + } + }); + + it("truncates after an archived message and collapses the archive", async () => { + await appendNumberedMessages(service, wsId, 3); // msg-0..2, seq 0..2 + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post-0", "user", "after")); // seq 4 + + const truncateResult = await service.truncateAfterMessage(wsId, "msg-1", { + keepTargetMessage: true, + }); + expect(truncateResult.success).toBe(true); + + const full = await collectFullHistory(service, wsId); + expect(full.map((m) => m.id)).toEqual(["msg-0", "msg-1"]); + + // The archive was collapsed back into chat.jsonl. + expect( + await fs.stat(archivePath(wsId)).then( + () => true, + () => false + ) + ).toBe(false); + + // The sequence counter continues from the cut point. + const msg = createMuxMessage("new-msg", "user", "fresh"); + await service.appendToHistory(wsId, msg); + expect(msg.metadata?.historySequence).toBe(2); + }); + + it("deletes archived rows via deleteMessage", async () => { + await appendNumberedMessages(service, wsId, 3); + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); + + const deleteResult = await service.deleteMessage(wsId, "msg-1"); + expect(deleteResult.success).toBe(true); + + const archiveRows = await readJsonlFile(archivePath(wsId)); + expect(archiveRows.map((m) => m.id)).toEqual(["msg-0", "msg-2"]); + + const full = await collectFullHistory(service, wsId); + expect(full.map((m) => m.id)).toEqual(["msg-0", "msg-2", "boundary-1"]); + }); + + it("clearHistory removes the archive too", async () => { + await appendNumberedMessages(service, wsId, 3); + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); + + const clearResult = await service.clearHistory(wsId); + expect(clearResult.success).toBe(true); + if (clearResult.success) { + // All rows (archived + active) are reported as deleted. + expect(clearResult.data).toEqual([0, 1, 2, 3]); + } + + expect(await service.hasHistory(wsId)).toBe(false); + expect( + await fs.stat(archivePath(wsId)).then( + () => true, + () => false + ) + ).toBe(false); + }); + + it("hasHistory sees archive-only workspaces", async () => { + await appendNumberedMessages(service, wsId, 1); + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); + await fs.rm(chatPath(wsId)); + + expect(await service.hasHistory(wsId)).toBe(true); + }); + }); }); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index e3b9cfab66..33e5e2fd55 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -22,6 +22,7 @@ import { isDurableCompactedMarker, isDurableContextBoundaryMarker, } from "@/common/utils/messages/compactionBoundary"; +import { CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths"; import { isRefusalFinishReason } from "@/common/utils/messages/refusalFinishReason"; import { getErrorMessage } from "@/common/utils/errors"; import { isNonNegativeInteger, isPositiveInteger } from "@/common/utils/numbers"; @@ -100,12 +101,29 @@ function getCompactionMetadataToPreserve( * - Read/write partial message staging state (partial.json) * - Assign sequence numbers to messages (single source of truth) * - Track next sequence number per workspace + * + * On-disk layout (per session dir): + * - chat.jsonl — the ACTIVE epoch: latest durable context boundary onward. + * - chat-archive.jsonl — sealed pre-boundary history, append-only, oldest→newest. + * - partial.json — in-flight assistant message staging. + * + * Invariant: full history = chat-archive.jsonl ++ chat.jsonl, and every + * historySequence in the archive is older than every sequence in chat.jsonl. + * Rotation (see rotateSealedHistoryUnlocked) moves the sealed prefix of + * chat.jsonl into the archive whenever a durable boundary lands, so hot-path + * reads and full-file rewrites (updateHistory on every stream end) scale with + * the active epoch instead of lifetime history. */ export class HistoryService { - private readonly CHAT_FILE = "chat.jsonl"; + private readonly CHAT_FILE = CHAT_FILE_NAME; + private readonly CHAT_ARCHIVE_FILE = CHAT_ARCHIVE_FILE_NAME; private readonly PARTIAL_FILE = "partial.json"; // Track next sequence number per workspace in memory private sequenceCounters = new Map(); + // Workspaces whose chat.jsonl was already checked for a sealed (pre-boundary) + // prefix this process. Guards the lazy one-time migration of legacy files; + // new boundaries rotate eagerly at write time. + private sealedRotationChecked = new Set(); // Shared file operation lock across all workspace file services // This prevents deadlocks when operations compose while touching the same workspace files. private readonly fileLocks = workspaceFileLocks; @@ -119,15 +137,21 @@ export class HistoryService { return path.join(this.config.getSessionDir(workspaceId), this.CHAT_FILE); } + private getChatArchivePath(workspaceId: string): string { + return path.join(this.config.getSessionDir(workspaceId), this.CHAT_ARCHIVE_FILE); + } + private getPartialPath(workspaceId: string): string { return path.join(this.config.getSessionDir(workspaceId), this.PARTIAL_FILE); } // ── Reverse-read infrastructure ───────────────────────────────────────────── - // Reads chat.jsonl from the tail to avoid O(total-history) parsing on hot paths. - // \n (0x0A) never appears inside multi-byte UTF-8 sequences, so chunked reverse - // reading is byte-safe. JSON.stringify escapes prevent false positives for the - // needle inside user-content strings. + // Reads a history JSONL file from the tail to avoid O(total-history) parsing on + // hot paths. \n (0x0A) never appears inside multi-byte UTF-8 sequences, so + // chunked reverse reading is byte-safe. JSON.stringify escapes prevent false + // positives for the needle inside user-content strings. + // These helpers take a file path so they work on both chat.jsonl and + // chat-archive.jsonl. /** Size of each chunk when scanning the file in reverse (256KB covers typical post-compaction content). */ private static readonly REVERSE_READ_CHUNK_SIZE = 256 * 1024; @@ -138,7 +162,7 @@ export class HistoryService { ] as const; /** - * Scan chat.jsonl in reverse to find the byte offset of a durable compaction boundary. + * Scan a history file in reverse to find the byte offset of a durable compaction boundary. * Returns `null` when no (matching) boundary exists. * * @param skip How many boundaries to skip before returning. 0 = last boundary, @@ -148,9 +172,7 @@ export class HistoryService { * lengths) so that chunk boundaries splitting multi-byte UTF-8 sequences don't corrupt * the returned offset. */ - private async findLastBoundaryByteOffset(workspaceId: string, skip = 0): Promise { - const filePath = this.getChatHistoryPath(workspaceId); - + private async findLastBoundaryByteOffset(filePath: string, skip = 0): Promise { let fileSize: number; try { const stat = await fs.stat(filePath); @@ -254,14 +276,10 @@ export class HistoryService { } /** - * Read and parse messages from a byte offset to the end of chat.jsonl. + * Read and parse messages from a byte offset to the end of a history file. * Self-healing: skips malformed JSON lines the same way readChatHistory does. */ - private async readHistoryFromOffset( - workspaceId: string, - byteOffset: number - ): Promise { - const filePath = this.getChatHistoryPath(workspaceId); + private async readHistoryFromOffset(filePath: string, byteOffset: number): Promise { const stat = await fs.stat(filePath); const tailSize = stat.size - byteOffset; if (tailSize <= 0) return []; @@ -289,15 +307,13 @@ export class HistoryService { } /** - * Read the last N messages from chat.jsonl by scanning the file in reverse. + * Read the last N messages from a history file by scanning it in reverse. * Much cheaper than a full read when only the tail is needed. * * Uses raw byte scanning for \n positions (same approach as findLastBoundaryByteOffset) * so that chunk boundaries splitting multi-byte UTF-8 sequences don't corrupt lines. */ - private async readLastMessages(workspaceId: string, n: number): Promise { - const filePath = this.getChatHistoryPath(workspaceId); - + private async readLastMessagesFromFile(filePath: string, n: number): Promise { let fileSize: number; try { const stat = await fs.stat(filePath); @@ -377,20 +393,13 @@ export class HistoryService { } /** - * Read raw messages from chat.jsonl (does not include partial.json) - * Returns empty array if file doesn't exist - * Skips malformed JSON lines to prevent data loss from corruption + * Read raw messages from a history JSONL file. + * Returns empty array if the file doesn't exist. + * Skips malformed JSON lines to prevent data loss from corruption. */ - private async readChatHistory(workspaceId: string): Promise { + private async readMessagesFromFile(filePath: string, logLabel: string): Promise { try { - const chatHistoryPath = this.getChatHistoryPath(workspaceId); - const data = await fs.readFile(chatHistoryPath, "utf-8"); - if (data.length > 5 * 1024 * 1024) { - log.warn("chat.jsonl exceeds 5MB — full read may be slow, consider compaction", { - workspaceId, - sizeBytes: data.length, - }); - } + const data = await fs.readFile(filePath, "utf-8"); const lines = data.split("\n").filter((line) => line.trim()); const messages: MuxMessage[] = []; @@ -401,7 +410,7 @@ export class HistoryService { } catch (parseError) { // Skip malformed lines but log error for debugging log.warn( - `Skipping malformed JSON at line ${i + 1} in ${workspaceId}/chat.jsonl:`, + `Skipping malformed JSON at line ${i + 1} in ${logLabel}:`, getErrorMessage(parseError), "\nLine content:", lines[i].substring(0, 100) + (lines[i].length > 100 ? "..." : "") @@ -418,33 +427,55 @@ export class HistoryService { } } + /** + * Read raw messages from the active chat.jsonl (does not include partial.json + * or the sealed archive). + */ + private async readChatHistory(workspaceId: string): Promise { + return this.readMessagesFromFile( + this.getChatHistoryPath(workspaceId), + `${workspaceId}/${this.CHAT_FILE}` + ); + } + + /** + * Read raw messages from the sealed chat-archive.jsonl (pre-boundary history). + */ + private async readArchivedHistory(workspaceId: string): Promise { + return this.readMessagesFromFile( + this.getChatArchivePath(workspaceId), + `${workspaceId}/${this.CHAT_ARCHIVE_FILE}` + ); + } + // ── Forward/backward iteration infrastructure ──────────────────────────── - // Chunked iteration over chat.jsonl that yields messages to a visitor callback. - // Supports early exit (return false) and reduces memory pressure vs. loading - // the entire file into an array. + // Chunked iteration over a history JSONL file that yields messages to a + // visitor callback. Supports early exit (return false) and reduces memory + // pressure vs. loading the entire file into an array. /** - * Read chat.jsonl from start to end in chunks, calling visitor with each + * Read a history file from start to end in chunks, calling visitor with each * batch of parsed messages. Uses raw byte scanning for \n to handle * multi-byte UTF-8 safely at chunk boundaries. + * + * Returns false when the visitor stopped iteration early, true otherwise — + * so multi-file iteration (archive + chat.jsonl) can honor early exits. */ private async iterateForward( - workspaceId: string, + filePath: string, visitor: (messages: MuxMessage[]) => boolean | void | Promise - ): Promise { - const filePath = this.getChatHistoryPath(workspaceId); - + ): Promise { let fileSize: number; try { const stat = await fs.stat(filePath); fileSize = stat.size; } catch (error) { if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { - return; // No history + return true; // No history } throw error; } - if (fileSize === 0) return; + if (fileSize === 0) return true; const fh = await fs.open(filePath, "r"); try { @@ -496,7 +527,7 @@ export class HistoryService { if (messages.length > 0) { const shouldContinue = await visitor(messages); - if (shouldContinue === false) return; + if (shouldContinue === false) return false; } } @@ -506,39 +537,41 @@ export class HistoryService { if (line.length > 0) { try { const msg = normalizeLegacyMuxMetadata(JSON.parse(line) as MuxMessage); - await visitor([msg]); + const shouldContinue = await visitor([msg]); + if (shouldContinue === false) return false; } catch { // Skip malformed line } } } + return true; } finally { await fh.close(); } } /** - * Read chat.jsonl from end to start in chunks, calling visitor with each + * Read a history file from end to start in chunks, calling visitor with each * batch of parsed messages (newest first within each chunk). Uses the same * raw-byte \n scanning as findLastBoundaryByteOffset. + * + * Returns false when the visitor stopped iteration early, true otherwise. */ private async iterateBackward( - workspaceId: string, + filePath: string, visitor: (messages: MuxMessage[]) => boolean | void | Promise - ): Promise { - const filePath = this.getChatHistoryPath(workspaceId); - + ): Promise { let fileSize: number; try { const stat = await fs.stat(filePath); fileSize = stat.size; } catch (error) { if (error && typeof error === "object" && "code" in error && error.code === "ENOENT") { - return; // No history + return true; // No history } throw error; } - if (fileSize === 0) return; + if (fileSize === 0) return true; const fh = await fs.open(filePath, "r"); try { @@ -588,7 +621,7 @@ export class HistoryService { if (messages.length > 0) { const shouldContinue = await visitor(messages); - if (shouldContinue === false) return; + if (shouldContinue === false) return false; } readEnd = readStart; @@ -600,19 +633,22 @@ export class HistoryService { if (line.length > 0) { try { const msg = normalizeLegacyMuxMetadata(JSON.parse(line) as MuxMessage); - await visitor([msg]); + const shouldContinue = await visitor([msg]); + if (shouldContinue === false) return false; } catch { // Skip malformed line } } } + return true; } finally { await fh.close(); } } /** - * Iterate over ALL messages in chat.jsonl — O(file-size) I/O + parse. + * Iterate over ALL messages in history (sealed archive + active chat.jsonl) — + * O(total-history) I/O + parse. * * ⚠️ Prefer targeted alternatives for hot paths: * - getHistoryFromLatestBoundary() — for provider-request assembly @@ -630,11 +666,20 @@ export class HistoryService { direction: "forward" | "backward", visitor: (messages: MuxMessage[]) => boolean | void | Promise ): Promise> { + const chatPath = this.getChatHistoryPath(workspaceId); + const archivePath = this.getChatArchivePath(workspaceId); try { if (direction === "forward") { - await this.iterateForward(workspaceId, visitor); + // Archived rows are strictly older than active rows. + const completed = await this.iterateForward(archivePath, visitor); + if (completed) { + await this.iterateForward(chatPath, visitor); + } } else { - await this.iterateBackward(workspaceId, visitor); + const completed = await this.iterateBackward(chatPath, visitor); + if (completed) { + await this.iterateBackward(archivePath, visitor); + } } return Ok(undefined); } catch (error) { @@ -680,14 +725,28 @@ export class HistoryService { private async getMaxHistorySequence(workspaceId: string): Promise { let maxSequence = -1; - await this.iterateForward(workspaceId, (messages) => { + // Full scan of the active file (cheap post-rotation; see getNextHistorySequence + // for why we don't trust the tail alone). + await this.iterateForward(this.getChatHistoryPath(workspaceId), (messages) => { const newest = this.getNewestHistorySequence(messages); if (newest !== undefined && newest > maxSequence) { maxSequence = newest; } }); - return maxSequence; + // The archive holds strictly-older sequences than chat.jsonl, so it only + // decides the counter when chat.jsonl is missing/hand-edited. Scan its tail + // until a sequenced row is found instead of parsing the whole archive. + let archiveMax = -1; + await this.iterateBackward(this.getChatArchivePath(workspaceId), (messages) => { + const newest = this.getNewestHistorySequence(messages); + if (newest !== undefined && newest > archiveMax) { + archiveMax = newest; + } + return archiveMax === -1; // keep scanning until any sequence is found + }); + + return Math.max(maxSequence, archiveMax); } async hasHistoryBeforeSequence( @@ -704,7 +763,7 @@ export class HistoryService { ); let hasOlder = false; - await this.iterateBackward(workspaceId, (messages) => { + const visitor = (messages: MuxMessage[]): boolean | void => { for (const message of messages) { const sequence = message.metadata?.historySequence; if (!isNonNegativeInteger(sequence)) { @@ -716,7 +775,14 @@ export class HistoryService { return false; } } - }); + }; + + // Newest rows live in chat.jsonl; continue into the sealed archive only + // when the active file has no older rows. + const completed = await this.iterateBackward(this.getChatHistoryPath(workspaceId), visitor); + if (completed && !hasOlder) { + await this.iterateBackward(this.getChatArchivePath(workspaceId), visitor); + } return hasOlder; } @@ -741,35 +807,45 @@ export class HistoryService { ); try { - // Scan boundaries newest→oldest and pick the first window that has rows older than the cursor. - for (let skip = 0; ; skip++) { - const boundaryOffset = await this.findLastBoundaryByteOffset(workspaceId, skip); - if (boundaryOffset === null) { - break; - } + // Scan boundaries newest→oldest and pick the first window that has rows older + // than the cursor. Boundaries newer than the rotation point live in chat.jsonl; + // older ones live in the sealed archive. + for (const filePath of [ + this.getChatHistoryPath(workspaceId), + this.getChatArchivePath(workspaceId), + ]) { + for (let skip = 0; ; skip++) { + const boundaryOffset = await this.findLastBoundaryByteOffset(filePath, skip); + if (boundaryOffset === null) { + break; + } - const tailMessages = await this.readHistoryFromOffset(workspaceId, boundaryOffset); - const windowMessages = tailMessages.filter((message) => { - const sequence = message.metadata?.historySequence; - return isNonNegativeInteger(sequence) && sequence < beforeHistorySequence; - }); + const tailMessages = await this.readHistoryFromOffset(filePath, boundaryOffset); + const windowMessages = tailMessages.filter((message) => { + const sequence = message.metadata?.historySequence; + return isNonNegativeInteger(sequence) && sequence < beforeHistorySequence; + }); - if (windowMessages.length === 0) { - continue; - } + if (windowMessages.length === 0) { + continue; + } - const oldestWindowSequence = this.getOldestHistorySequence(windowMessages); - assert( - oldestWindowSequence !== undefined, - "window messages filtered by historySequence must include a sequence" - ); + const oldestWindowSequence = this.getOldestHistorySequence(windowMessages); + assert( + oldestWindowSequence !== undefined, + "window messages filtered by historySequence must include a sequence" + ); - const hasOlder = await this.hasHistoryBeforeSequence(workspaceId, oldestWindowSequence); - return Ok({ messages: windowMessages, hasOlder }); + const hasOlder = await this.hasHistoryBeforeSequence(workspaceId, oldestWindowSequence); + return Ok({ messages: windowMessages, hasOlder }); + } } // No older boundary window found. Fall back to pre-boundary rows (or empty on uncompacted history). - const allMessages = await this.readChatHistory(workspaceId); + const allMessages = [ + ...(await this.readArchivedHistory(workspaceId)), + ...(await this.readChatHistory(workspaceId)), + ]; const preBoundaryMessages = allMessages.filter((message) => { const sequence = message.metadata?.historySequence; return isNonNegativeInteger(sequence) && sequence < beforeHistorySequence; @@ -797,9 +873,10 @@ export class HistoryService { * Read messages from a compaction boundary onward. * Falls back to full history if no boundary exists (new/uncompacted workspace). * - * @param skip How many boundaries to skip (counting from the latest). 0 = read - * from the latest boundary, 1 = from the penultimate, etc. When the - * requested boundary doesn't exist, falls back to the next-available + * @param skip How many boundaries to skip (counting from the latest, across + * chat.jsonl and the sealed archive). 0 = read from the latest + * boundary, 1 = from the penultimate, etc. When the requested + * boundary doesn't exist, falls back to the next-available * boundary, then to full history. * * Prefer this over iterateFullHistory() for provider-request assembly and any path @@ -807,31 +884,180 @@ export class HistoryService { */ async getHistoryFromLatestBoundary(workspaceId: string, skip = 0): Promise> { try { - // Try the requested boundary, falling back to less-skipped boundaries + // One-time lazy migration: seal any pre-boundary prefix left in chat.jsonl + // by older builds so this read (and every later one) stays O(active epoch). + await this.ensureSealedHistoryRotated(workspaceId); + + const chatPath = this.getChatHistoryPath(workspaceId); + const archivePath = this.getChatArchivePath(workspaceId); + + // Try the requested boundary in chat.jsonl, falling back to less-skipped boundaries. + let chatBoundaryCount = 0; + let chatFallbackOffset: number | null = null; for (let s = skip; s >= 0; s--) { - const offset = await this.findLastBoundaryByteOffset(workspaceId, s); + const offset = await this.findLastBoundaryByteOffset(chatPath, s); if (offset !== null) { - const messages = await this.readHistoryFromOffset(workspaceId, offset); - return Ok(messages); + if (s === skip) { + return Ok(await this.readHistoryFromOffset(chatPath, offset)); + } + // chat.jsonl has fewer boundaries than requested; remember its oldest + // boundary as a fallback and keep counting into the archive. + chatBoundaryCount = s + 1; + chatFallbackOffset = offset; + break; } } + // Boundaries older than chat.jsonl live in the sealed archive. A window that + // starts at an archive boundary spans the archive tail plus all of chat.jsonl. + for (let s = skip - chatBoundaryCount; s >= 0; s--) { + const offset = await this.findLastBoundaryByteOffset(archivePath, s); + if (offset !== null) { + const archived = await this.readHistoryFromOffset(archivePath, offset); + const active = await this.readChatHistory(workspaceId); + return Ok([...archived, ...active]); + } + } + + if (chatFallbackOffset !== null) { + return Ok(await this.readHistoryFromOffset(chatPath, chatFallbackOffset)); + } + // No boundaries at all — workspace is uncompacted, full read is the only option - const messages = await this.readChatHistory(workspaceId); - return Ok(messages); + const archived = await this.readArchivedHistory(workspaceId); + const active = await this.readChatHistory(workspaceId); + return Ok([...archived, ...active]); } catch (error) { const message = getErrorMessage(error); return Err(`Failed to read history from boundary: ${message}`); } } + // ── Sealed-history rotation ───────────────────────────────────────────── + // Compaction (and /clear --soft) appends a durable context boundary but, by + // itself, never shrinks chat.jsonl. Rotation moves the sealed prefix — + // everything before the latest durable boundary — into chat-archive.jsonl so + // hot-path reads and the per-turn updateHistory rewrite stay O(active epoch). + // Pre-boundary history remains fully accessible (Load More, exports, usage + // rebuilds) through the archive-aware read paths above. + + /** + * One-time-per-process check that seals any pre-boundary prefix left in + * chat.jsonl. Newly written boundaries rotate eagerly at write time; this + * lazily migrates files produced before rotation existed (or by crashes + * between boundary write and rotation). + */ + private async ensureSealedHistoryRotated(workspaceId: string): Promise { + if (this.sealedRotationChecked.has(workspaceId)) { + return; + } + this.sealedRotationChecked.add(workspaceId); + + try { + // Cheap unlocked probe first so the common no-op case takes no lock. + const offset = await this.findLastBoundaryByteOffset(this.getChatHistoryPath(workspaceId)); + if (offset === null || offset === 0) { + return; + } + await this.fileLocks.withLock(workspaceId, () => + this.rotateSealedHistoryUnlocked(workspaceId) + ); + } catch (error) { + // Rotation is an optimization — reads remain correct on unrotated files. + log.warn("Failed to rotate sealed chat history", { + workspaceId, + error: getErrorMessage(error), + }); + } + } + + /** + * Move the sealed prefix of chat.jsonl (everything before the latest durable + * context boundary) into chat-archive.jsonl. Must be called while holding the + * workspace file lock. + * + * Crash safety: archived lines are fsynced before chat.jsonl is rewritten, so + * a crash in between leaves duplicated rows in archive + chat.jsonl. The next + * rotation deduplicates by skipping prefix rows whose historySequence is + * already covered by the archive. + */ + private async rotateSealedHistoryUnlocked(workspaceId: string): Promise { + const chatPath = this.getChatHistoryPath(workspaceId); + const archivePath = this.getChatArchivePath(workspaceId); + + const boundaryOffset = await this.findLastBoundaryByteOffset(chatPath); + if (boundaryOffset === null || boundaryOffset === 0) { + return; // Nothing sealed — boundary already starts the file (or no boundary). + } + + const fileBuffer = await fs.readFile(chatPath); + const sealedPrefix = fileBuffer.subarray(0, boundaryOffset).toString("utf-8"); + const activeTail = fileBuffer.subarray(boundaryOffset); + + // Crash-replay dedupe: find the newest sequence already archived. + let archivedMaxSequence = -1; + await this.iterateBackward(archivePath, (messages) => { + const newest = this.getNewestHistorySequence(messages); + if (newest !== undefined && newest > archivedMaxSequence) { + archivedMaxSequence = newest; + } + return archivedMaxSequence === -1; // scan until a sequenced row is found + }); + + const linesToArchive: string[] = []; + for (const line of sealedPrefix.split("\n")) { + const trimmed = line.trim(); + if (trimmed.length === 0) { + continue; + } + try { + const message = JSON.parse(trimmed) as MuxMessage; + const sequence = message.metadata?.historySequence; + if (isNonNegativeInteger(sequence) && sequence <= archivedMaxSequence) { + continue; // Already archived by a rotation that crashed before the chat rewrite. + } + } catch { + // Malformed line — preserve it in the archive (read paths skip it anyway). + } + linesToArchive.push(trimmed); + } + + if (linesToArchive.length > 0) { + // Append + fsync BEFORE rewriting chat.jsonl: a crash must never lose + // sealed rows, only (at worst) duplicate them, which the dedupe above heals. + const fh = await fs.open(archivePath, "a"); + try { + await fh.writeFile(linesToArchive.join("\n") + "\n"); + await fh.sync(); + } finally { + await fh.close(); + } + } + + await writeFileAtomic(chatPath, activeTail); + + log.debug("Rotated sealed chat history into archive", { + workspaceId, + sealedBytes: boundaryOffset, + archivedLines: linesToArchive.length, + }); + } + /** - * Read the last N messages from chat.jsonl by reading the file in reverse. + * Read the last N messages from history by reading files in reverse. * Much cheaper than iterateFullHistory() when only the tail is needed. + * Continues into the sealed archive when the active epoch has fewer than N rows. */ async getLastMessages(workspaceId: string, n: number): Promise> { try { - const messages = await this.readLastMessages(workspaceId, n); + const messages = await this.readLastMessagesFromFile(this.getChatHistoryPath(workspaceId), n); + if (messages.length < n) { + const archived = await this.readLastMessagesFromFile( + this.getChatArchivePath(workspaceId), + n - messages.length + ); + return Ok([...archived, ...messages]); + } return Ok(messages); } catch (error) { const message = getErrorMessage(error); @@ -840,17 +1066,24 @@ export class HistoryService { } /** - * Check if a workspace has any chat history without parsing the file. + * Check if a workspace has any chat history without parsing the files. * Much cheaper than iterateFullHistory() when only an emptiness check is needed. */ async hasHistory(workspaceId: string): Promise { - const filePath = this.getChatHistoryPath(workspaceId); - try { - const stat = await fs.stat(filePath); - return stat.size > 0; - } catch { - return false; + for (const filePath of [ + this.getChatHistoryPath(workspaceId), + this.getChatArchivePath(workspaceId), + ]) { + try { + const stat = await fs.stat(filePath); + if (stat.size > 0) { + return true; + } + } catch { + // Missing file — keep checking. + } } + return false; } /** @@ -1182,22 +1415,60 @@ export class HistoryService { } } + /** Serialize messages as JSONL rows tagged with workspace context. */ + private serializeHistoryEntries(messages: readonly MuxMessage[], workspaceId: string): string { + return messages.map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n").join(""); + } + + /** + * Best-effort rotation after a durable boundary lands via append/update. + * Failures are non-fatal: reads remain correct on unrotated files and the + * lazy per-process check retries later. + */ + private async rotateAfterBoundaryWriteUnlocked( + workspaceId: string, + message: MuxMessage + ): Promise { + if (!isDurableContextBoundaryMarker(message)) { + return; + } + try { + await this.rotateSealedHistoryUnlocked(workspaceId); + } catch (error) { + log.warn("Failed to rotate sealed chat history after boundary write", { + workspaceId, + messageId: message.id, + error: getErrorMessage(error), + }); + } + } + async appendToHistory(workspaceId: string, message: MuxMessage): Promise> { return this.fileLocks.withLock(workspaceId, async () => { - return this._appendToHistoryUnlocked(workspaceId, message); + const result = await this._appendToHistoryUnlocked(workspaceId, message); + if (result.success) { + // A new durable boundary seals the previous epoch — rotate it out of + // chat.jsonl so subsequent reads/rewrites stay O(active epoch). + await this.rotateAfterBoundaryWriteUnlocked(workspaceId, message); + } + return result; }); } /** * Update an existing message in history by historySequence - * Reads entire history, replaces the matching message, and rewrites the file + * Reads the active chat.jsonl, replaces the matching message, and rewrites the file. + * + * This runs on every stream end, so it must stay O(active epoch): targets are + * always in the active epoch (stream placeholders, compaction summaries), + * never in the sealed archive. */ async updateHistory(workspaceId: string, message: MuxMessage): Promise> { return this.fileLocks.withLock(workspaceId, async () => { try { const historyPath = this.getChatHistoryPath(workspaceId); - // Read all messages — structural rewrite requires full file content + // Read the active epoch — structural rewrite requires full file content const messages = await this.readChatHistory(workspaceId); const targetSequence = message.metadata?.historySequence; @@ -1212,6 +1483,7 @@ export class HistoryService { // Find and replace the message with matching historySequence let found = false; + let persistedMessage: MuxMessage | undefined; for (let i = 0; i < messages.length; i++) { if (messages[i].metadata?.historySequence === targetSequence) { const existingMessage = messages[i]; @@ -1235,22 +1507,27 @@ export class HistoryService { historySequence: targetSequence, }, }; + persistedMessage = messages[i]; found = true; break; } } - if (!found) { + if (!found || !persistedMessage) { return Err(`No message found with historySequence ${targetSequence}`); } // Rewrite entire file - const historyEntries = messages - .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") - .join(""); + const historyEntries = this.serializeHistoryEntries(messages, workspaceId); // Atomic write prevents corruption if app crashes mid-write await writeFileAtomic(historyPath, historyEntries); + + // Compaction updates the streamed summary row in-place with boundary + // metadata — seal the previous epoch once that lands. Check the persisted + // row (not the incoming message) so preserved boundary metadata counts. + await this.rotateAfterBoundaryWriteUnlocked(workspaceId, persistedMessage); + return Ok(undefined); } catch (error) { const message = getErrorMessage(error); @@ -1273,13 +1550,25 @@ export class HistoryService { const filteredMessages = messages.filter((msg) => msg.id !== messageId); if (filteredMessages.length === messages.length) { - return Err(`Message with ID ${messageId} not found in history`); + // Not in the active epoch — the row may live in the sealed archive + // (rare: cleanup paths almost always target recent rows). + const archiveMessages = await this.readArchivedHistory(workspaceId); + const filteredArchive = archiveMessages.filter((msg) => msg.id !== messageId); + if (filteredArchive.length === archiveMessages.length) { + return Err(`Message with ID ${messageId} not found in history`); + } + + // Archived rows are strictly older than active rows, so deleting one + // can never affect the sequence counter. + await writeFileAtomic( + this.getChatArchivePath(workspaceId), + this.serializeHistoryEntries(filteredArchive, workspaceId) + ); + return Ok(undefined); } const historyPath = this.getChatHistoryPath(workspaceId); - const historyEntries = filteredMessages - .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") - .join(""); + const historyEntries = this.serializeHistoryEntries(filteredMessages, workspaceId); // Atomic write prevents corruption if app crashes mid-write await writeFileAtomic(historyPath, historyEntries); @@ -1341,11 +1630,20 @@ export class HistoryService { const messages = await this.readChatHistory(workspaceId); const messageIndex = messages.findIndex((msg) => msg.id === messageId); + const keepTargetMessage = options?.keepTargetMessage === true; + if (messageIndex === -1) { - return Err(`Message with ID ${messageId} not found in history`); + // Editing/forking from a pre-boundary message: the target lives in the + // sealed archive. Everything after the cut (the archive tail AND the + // entire active epoch) is discarded, so collapse the remainder back + // into chat.jsonl and drop the archive. + return this.truncateAfterArchivedMessageUnlocked( + workspaceId, + messageId, + keepTargetMessage + ); } - const keepTargetMessage = options?.keepTargetMessage === true; // Response-level forks branch from the selected assistant turn, so they retain the target // message while discarding anything that came after it. const truncatedMessages = messages.slice( @@ -1355,9 +1653,7 @@ export class HistoryService { // Rewrite the history file with truncated messages const historyPath = this.getChatHistoryPath(workspaceId); - const historyEntries = truncatedMessages - .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") - .join(""); + const historyEntries = this.serializeHistoryEntries(truncatedMessages, workspaceId); // Atomic write prevents corruption if app crashes mid-write await writeFileAtomic(historyPath, historyEntries); @@ -1399,6 +1695,74 @@ export class HistoryService { }); } + /** + * Truncation branch for targets in the sealed archive. The truncated remainder + * becomes the new chat.jsonl (it may contain old boundaries; a later boundary + * write re-seals it) and the archive is removed. Must be called while holding + * the workspace file lock. + */ + private async truncateAfterArchivedMessageUnlocked( + workspaceId: string, + messageId: string, + keepTargetMessage: boolean + ): Promise> { + try { + const archiveMessages = await this.readArchivedHistory(workspaceId); + const messageIndex = archiveMessages.findIndex((msg) => msg.id === messageId); + + if (messageIndex === -1) { + return Err(`Message with ID ${messageId} not found in history`); + } + + const truncatedMessages = archiveMessages.slice( + 0, + keepTargetMessage ? messageIndex + 1 : messageIndex + ); + + await writeFileAtomic( + this.getChatHistoryPath(workspaceId), + this.serializeHistoryEntries(truncatedMessages, workspaceId) + ); + await fs.rm(this.getChatArchivePath(workspaceId), { force: true }); + // chat.jsonl may contain sealed epochs again — allow the lazy check to re-run. + this.sealedRotationChecked.delete(workspaceId); + + // Update sequence counter to continue from where we truncated. + // Self-healing read path: skip malformed persisted historySequence values. + const maxTruncatedSeq = truncatedMessages.reduce((max, msg) => { + const seq = msg.metadata?.historySequence; + if (seq === undefined) { + return max; + } + + if (!isNonNegativeInteger(seq)) { + log.warn( + "Ignoring malformed persisted historySequence while updating sequence counter after archived truncation", + { + workspaceId, + messageId: msg.id, + historySequence: seq, + } + ); + return max; + } + + return seq > max ? seq : max; + }, -1); + const nextSeq = maxTruncatedSeq + 1; + assert( + isNonNegativeInteger(nextSeq), + "next history sequence counter after archived truncation must be a non-negative integer" + ); + this.sequenceCounters.set(workspaceId, nextSeq); + + return Ok(undefined); + } catch (error) { + const message = getErrorMessage(error); + return Err(`Failed to truncate history: ${message}`); + } + } + /** * Truncate history by removing approximately the given percentage of tokens from the beginning * @param workspaceId The workspace ID @@ -1412,33 +1776,34 @@ export class HistoryService { return this.fileLocks.withLock(workspaceId, async () => { try { const historyPath = this.getChatHistoryPath(workspaceId); + const archivePath = this.getChatArchivePath(workspaceId); - // Fast path: 100% truncation = delete entire file + // Fast path: 100% truncation = delete entire history (active + sealed archive) if (percentage >= 1.0) { // Need sequence numbers for return value before deleting - const messages = await this.readChatHistory(workspaceId); + const messages = [ + ...(await this.readArchivedHistory(workspaceId)), + ...(await this.readChatHistory(workspaceId)), + ]; const deletedSequences = messages .map((msg) => msg.metadata?.historySequence) .filter((s): s is number => isNonNegativeInteger(s)); - try { - await fs.unlink(historyPath); - } catch (error) { - // Ignore ENOENT - file already deleted - if ( - !(error && typeof error === "object" && "code" in error && error.code === "ENOENT") - ) { - throw error; - } - } + await fs.rm(historyPath, { force: true }); + await fs.rm(archivePath, { force: true }); // Reset sequence counter when clearing history this.sequenceCounters.set(workspaceId, 0); return Ok(deletedSequences); } - // Structural rewrite requires full file content - const messages = await this.readChatHistory(workspaceId); + // Structural rewrite requires full history content (oldest rows live in + // the sealed archive). Percentage truncation is a rare recovery path + // (compaction-failure retry), so the O(total-history) read is acceptable. + const messages = [ + ...(await this.readArchivedHistory(workspaceId)), + ...(await this.readChatHistory(workspaceId)), + ]; if (messages.length === 0) { return Ok([]); // Nothing to truncate } @@ -1472,16 +1837,8 @@ export class HistoryService { // If we're removing all messages, use fast path if (removeCount >= messages.length) { - try { - await fs.unlink(historyPath); - } catch (error) { - // Ignore ENOENT - if ( - !(error && typeof error === "object" && "code" in error && error.code === "ENOENT") - ) { - throw error; - } - } + await fs.rm(historyPath, { force: true }); + await fs.rm(archivePath, { force: true }); this.sequenceCounters.set(workspaceId, 0); const deletedSequences = messages .map((msg) => msg.metadata?.historySequence) @@ -1496,13 +1853,15 @@ export class HistoryService { .map((msg) => msg.metadata?.historySequence) .filter((s): s is number => isNonNegativeInteger(s)); - // Rewrite the history file with remaining messages - const historyEntries = remainingMessages - .map((msg) => JSON.stringify({ ...msg, workspaceId }) + "\n") - .join(""); + // Collapse the remainder into chat.jsonl and drop the archive (the cut + // may fall anywhere inside it). It may contain old boundaries; a later + // boundary write re-seals it. + const historyEntries = this.serializeHistoryEntries(remainingMessages, workspaceId); // Atomic write prevents corruption if app crashes mid-write await writeFileAtomic(historyPath, historyEntries); + await fs.rm(archivePath, { force: true }); + this.sealedRotationChecked.delete(workspaceId); // Update sequence counter to continue from where we are. // Self-healing read path: skip malformed persisted historySequence values. @@ -1557,6 +1916,16 @@ export class HistoryService { async migrateWorkspaceId(oldWorkspaceId: string, newWorkspaceId: string): Promise> { return this.fileLocks.withLock(newWorkspaceId, async () => { try { + // Migrate the sealed archive first so a crash mid-migration never leaves + // the active file pointing at a stale-ID archive. + const archiveMessages = await this.readArchivedHistory(newWorkspaceId); + if (archiveMessages.length > 0) { + await writeFileAtomic( + this.getChatArchivePath(newWorkspaceId), + this.serializeHistoryEntries(archiveMessages, newWorkspaceId) + ); + } + // Read messages from the NEW workspace location (directory was already renamed). // Structural rewrite requires full file content. const messages = await this.readChatHistory(newWorkspaceId); @@ -1570,9 +1939,7 @@ export class HistoryService { // Rewrite all messages with new workspace ID const newHistoryPath = this.getChatHistoryPath(newWorkspaceId); - const historyEntries = messages - .map((msg) => JSON.stringify({ ...msg, workspaceId: newWorkspaceId }) + "\n") - .join(""); + const historyEntries = this.serializeHistoryEntries(messages, newWorkspaceId); // Atomic write prevents corruption if app crashes mid-write await writeFileAtomic(newHistoryPath, historyEntries); diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 514ab2255e..fc3342da84 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -45,6 +45,7 @@ import { import { getWorkspacePathHintForProject } from "@/node/services/workspaceProjectRepos"; import { validateWorkspaceName } from "@/common/utils/validation/workspaceValidation"; import { ensurePrivateDir, isErrnoWithCode } from "@/node/utils/fs"; +import { CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths"; import { stripTrailingSlashes } from "@/node/utils/pathUtils"; import { getProjects, isMultiProject } from "@/common/utils/multiProject"; import { generateGitStatusScript, parseGitStatusScriptOutput } from "@/common/utils/git/gitStatus"; @@ -893,6 +894,48 @@ async function copyFileBestEffort(params: { } } +/** + * Concatenate source files (skipping missing ones) into destPath. + * Returns false when no source exists or on write failure. + */ +async function concatFilesBestEffort(params: { + srcPaths: string[]; + destPath: string; + logContext: Record; +}): Promise { + const chunks: Buffer[] = []; + for (const srcPath of params.srcPaths) { + try { + chunks.push(await fsPromises.readFile(srcPath)); + } catch (error: unknown) { + if (!isErrnoWithCode(error, "ENOENT")) { + log.error("Failed to read session artifact file for concatenation", { + ...params.logContext, + srcPath, + error: getErrorMessage(error), + }); + } + } + } + + if (chunks.length === 0) { + return false; + } + + try { + await fsPromises.mkdir(path.dirname(params.destPath), { recursive: true }); + await fsPromises.writeFile(params.destPath, Buffer.concat(chunks)); + return true; + } catch (error: unknown) { + log.error("Failed to write concatenated session artifact file", { + ...params.logContext, + destPath: params.destPath, + error: getErrorMessage(error), + }); + return false; + } +} + async function copyDirIfMissingBestEffort(params: { srcDir: string; destDir: string; @@ -982,7 +1025,8 @@ async function archiveChildSessionArtifactsIntoParentSessionDir(params: { // 1) Archive the child session transcript (chat.jsonl + partial.json) into the parent session dir // BEFORE deleting ~/.mux/sessions/. try { - const childChatPath = path.join(params.childSessionDir, "chat.jsonl"); + const childChatPath = path.join(params.childSessionDir, CHAT_FILE_NAME); + const childChatArchivePath = path.join(params.childSessionDir, CHAT_ARCHIVE_FILE_NAME); const childPartialPath = path.join(params.childSessionDir, "partial.json"); const archivedChatPath = getSubagentTranscriptChatPath( @@ -1003,8 +1047,12 @@ async function archiveChildSessionArtifactsIntoParentSessionDir(params: { archivedChatPath, }); } else { - const didCopyChat = await copyFileBestEffort({ - srcPath: childChatPath, + // Sub-agent sessions can auto-compact, which rotates sealed history into + // chat-archive.jsonl. The archived transcript is a one-time snapshot of a + // dead workspace, so concatenate archive + active file into a single + // chat.jsonl for the transcript reader. + const didCopyChat = await concatFilesBestEffort({ + srcPaths: [childChatArchivePath, childChatPath], destPath: archivedChatPath, logContext: { parentWorkspaceId: params.parentWorkspaceId, @@ -5794,7 +5842,10 @@ export class WorkspaceService extends EventEmitter { await ensurePrivateDir(newSessionDir); const sessionFiles = [ - "chat.jsonl", + CHAT_FILE_NAME, + // Sealed pre-boundary history must travel with chat.jsonl so the fork + // keeps full Load More/paging access to older epochs. + CHAT_ARCHIVE_FILE_NAME, "session-timing.json", ADDITIONAL_SYSTEM_CONTEXT_FILENAME, // Preserve the enabled/disabled toggle when forking so the fork From 7cdcb121ce784c9c2a4910437446d4b56e79105b Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 10:45:52 -0500 Subject: [PATCH 2/9] tests: update planCommands boundary test for sealed-history rotation --- tests/ipc/agents/planCommands.test.ts | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/ipc/agents/planCommands.test.ts b/tests/ipc/agents/planCommands.test.ts index 3a1847d486..b020da945a 100644 --- a/tests/ipc/agents/planCommands.test.ts +++ b/tests/ipc/agents/planCommands.test.ts @@ -320,7 +320,21 @@ describeIntegration("Plan Commands Integration", () => { expect(appendResult.success).toBe(true); - const data = await fs.readFile(chatHistoryPath, "utf-8"); + // Appending a durable boundary rotates the sealed prefix (legacy summary + + // malformed boundary) into chat-archive.jsonl; full history spans both files. + const archivePath = path.join(env.config.getSessionDir(workspaceId), "chat-archive.jsonl"); + const archiveData = await fs.readFile(archivePath, "utf-8"); + const activeData = await fs.readFile(chatHistoryPath, "utf-8"); + + // The active file holds only the new durable boundary onward. + const activeEntries = activeData + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0) + .map((line) => JSON.parse(line) as { id: string }); + expect(activeEntries.map((entry) => entry.id)).toEqual([appendSummary.id]); + + const data = archiveData + activeData; const entries = data .split("\n") .map((line) => line.trim()) From c0aabb3aa25fe805e8ce57d5ff8155337c692df3 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 10:53:53 -0500 Subject: [PATCH 3/9] =?UTF-8?q?fix:=20address=20Codex=20review=20=E2=80=94?= =?UTF-8?q?=20archive-aware=20analytics=20ETL=20+=20truncation=20sequence?= =?UTF-8?q?=20floor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node/services/analytics/etl.test.ts | 27 ++++++++ src/node/services/analytics/etl.ts | 83 +++++++++++++++++------- src/node/services/historyService.test.ts | 15 +++++ src/node/services/historyService.ts | 33 ++++++---- 4 files changed, 120 insertions(+), 38 deletions(-) diff --git a/src/node/services/analytics/etl.test.ts b/src/node/services/analytics/etl.test.ts index 73c033fc3f..60d3299fb6 100644 --- a/src/node/services/analytics/etl.test.ts +++ b/src/node/services/analytics/etl.test.ts @@ -867,6 +867,33 @@ describe("ingestWorkspace", () => { expect(refreshedHeadRows[0].tool_name).toBe("bash"); expect(Number(refreshedHeadRows[0].total_cost_usd)).toBeCloseTo(originalHeadTotalCostUsd, 12); }); + + test("ingests sealed pre-boundary rows from chat-archive.jsonl", async () => { + const conn = await createTestConn(); + const sessionDir = await createTempSessionDir(); + const workspaceId = "ws-with-archive"; + + // HistoryService rotation moves pre-boundary rows into chat-archive.jsonl; + // analytics must read both files or pre-compaction usage disappears. + await fs.writeFile( + path.join(sessionDir, "chat-archive.jsonl"), + [makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n" + ); + await writeChatJsonl(sessionDir, [ + makeUserLine(), + makeAssistantLine({ sequence: 3, inputTokens: 33 }), + ]); + + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + + expect(await queryEventCount(conn, workspaceId)).toBe(2); + const rows = await queryRows( + conn, + "SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens", + [workspaceId] + ); + expect(rows.map((row) => Number(row.input_tokens))).toEqual([11, 33]); + }); }); describe("readPersistedWorkspaceHeadSignature", () => { diff --git a/src/node/services/analytics/etl.ts b/src/node/services/analytics/etl.ts index 5587fbc9fc..f8b2180b59 100644 --- a/src/node/services/analytics/etl.ts +++ b/src/node/services/analytics/etl.ts @@ -9,8 +9,56 @@ import { getErrorMessage } from "@/common/utils/errors"; import { createDisplayUsage } from "@/common/utils/tokens/displayUsage"; import { log } from "@/node/services/log"; import { toUtcDateString } from "@/node/services/analytics/dateUtils"; +import { CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths"; export const CHAT_FILE_NAME = "chat.jsonl"; + +/** + * Sealed pre-boundary history rotates from chat.jsonl into chat-archive.jsonl + * (see HistoryService). Analytics must consider both files or pre-compaction + * usage/events would silently disappear after the first rotation. + * + * Returns null when chat.jsonl does not exist (no workspace history). The + * reported mtime is the max across both files so watermark staleness checks + * see archive-only mutations too. + */ +async function statSessionChatHistory(sessionDir: string): Promise<{ mtimeMs: number } | null> { + let mtimeMs: number; + try { + mtimeMs = (await fs.stat(path.join(sessionDir, CHAT_FILE_NAME))).mtimeMs; + } catch (error) { + if (isRecord(error) && error.code === "ENOENT") { + return null; + } + throw error; + } + + try { + const archiveStat = await fs.stat(path.join(sessionDir, CHAT_ARCHIVE_FILE_NAME)); + mtimeMs = Math.max(mtimeMs, archiveStat.mtimeMs); + } catch (error) { + if (!(isRecord(error) && error.code === "ENOENT")) { + throw error; + } + } + + return { mtimeMs }; +} + +/** Read full workspace history: sealed archive (older) followed by chat.jsonl. */ +async function readSessionChatHistoryContents(sessionDir: string): Promise { + let archived = ""; + try { + archived = await fs.readFile(path.join(sessionDir, CHAT_ARCHIVE_FILE_NAME), "utf-8"); + } catch (error) { + if (!(isRecord(error) && error.code === "ENOENT")) { + throw error; + } + } + + const active = await fs.readFile(path.join(sessionDir, CHAT_FILE_NAME), "utf-8"); + return archived + active; +} const METADATA_FILE_NAME = "metadata.json"; const SUBAGENT_TRANSCRIPTS_DIR_NAME = "subagent-transcripts"; const SESSION_USAGE_FILE_NAME = "session-usage.json"; @@ -1009,19 +1057,11 @@ export async function ingestWorkspace( assert(workspaceId.trim().length > 0, "ingestWorkspace: workspaceId is required"); assert(sessionDir.trim().length > 0, "ingestWorkspace: sessionDir is required"); - const chatPath = path.join(sessionDir, CHAT_FILE_NAME); - - let stat: Awaited>; - try { - stat = await fs.stat(chatPath); - } catch (error) { - if (isRecord(error) && error.code === "ENOENT") { - // Remove stale analytics state when the workspace history file no longer exists. - await clearWorkspaceAnalyticsState(conn, workspaceId); - return; - } - - throw error; + const stat = await statSessionChatHistory(sessionDir); + if (stat === null) { + // Remove stale analytics state when the workspace history file no longer exists. + await clearWorkspaceAnalyticsState(conn, workspaceId); + return; } const watermark = await readWatermark(conn, workspaceId); @@ -1035,7 +1075,7 @@ export async function ingestWorkspace( return; } - const chatContents = await fs.readFile(chatPath, "utf-8"); + const chatContents = await readSessionChatHistoryContents(sessionDir); const lines = chatContents.split("\n").filter((line) => line.trim().length > 0); let responseIndex = 0; @@ -1450,22 +1490,15 @@ export async function parseWorkspaceFromDisk( assert(workspaceId.trim().length > 0, "parseWorkspaceFromDisk: workspaceId is required"); assert(sessionDir.trim().length > 0, "parseWorkspaceFromDisk: sessionDir is required"); - const chatPath = path.join(sessionDir, CHAT_FILE_NAME); - - let stat: Awaited>; - try { - stat = await fs.stat(chatPath); - } catch (error) { - if (isRecord(error) && error.code === "ENOENT") { - return null; - } - throw error; + const stat = await statSessionChatHistory(sessionDir); + if (stat === null) { + return null; } const persistedMeta = await readWorkspaceMetaFromDisk(sessionDir); const workspaceMeta = mergeWorkspaceMeta(persistedMeta, suppliedMeta); - const chatContents = await fs.readFile(chatPath, "utf-8"); + const chatContents = await readSessionChatHistoryContents(sessionDir); const lines = chatContents.split("\n").filter((line) => line.trim().length > 0); let responseIndex = 0; diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 5abec1b899..5afe6750d7 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1576,6 +1576,21 @@ describe("HistoryService", () => { expect(msg.metadata?.historySequence).toBe(2); }); + it("never reuses archived sequences after truncating the whole active epoch", async () => { + await appendNumberedMessages(service, wsId, 3); // msg-0..2, seq 0..2 → archived + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + await service.appendToHistory(wsId, createMuxMessage("post-0", "user", "after")); // seq 4 + + // Truncate at the boundary itself (without keeping it) — the active file + // becomes empty while the archive still holds seq 0..2. + const truncateResult = await service.truncateAfterMessage(wsId, "boundary-1"); + expect(truncateResult.success).toBe(true); + + const msg = createMuxMessage("new-msg", "user", "fresh"); + await service.appendToHistory(wsId, msg); + expect(msg.metadata?.historySequence).toBe(3); + }); + it("deletes archived rows via deleteMessage", async () => { await appendNumberedMessages(service, wsId, 3); await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 33e5e2fd55..9f1156e277 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -735,8 +735,18 @@ export class HistoryService { }); // The archive holds strictly-older sequences than chat.jsonl, so it only - // decides the counter when chat.jsonl is missing/hand-edited. Scan its tail - // until a sequenced row is found instead of parsing the whole archive. + // decides the counter when chat.jsonl is missing/hand-edited. + const archiveMax = await this.getArchiveTailMaxSequence(workspaceId); + + return Math.max(maxSequence, archiveMax); + } + + /** + * Newest sequenced row in the sealed archive, or -1 when none. Scans the + * archive tail until a sequenced row is found instead of parsing the whole + * file (archived appends are sequence-ordered). + */ + private async getArchiveTailMaxSequence(workspaceId: string): Promise { let archiveMax = -1; await this.iterateBackward(this.getChatArchivePath(workspaceId), (messages) => { const newest = this.getNewestHistorySequence(messages); @@ -745,8 +755,7 @@ export class HistoryService { } return archiveMax === -1; // keep scanning until any sequence is found }); - - return Math.max(maxSequence, archiveMax); + return archiveMax; } async hasHistoryBeforeSequence( @@ -995,14 +1004,7 @@ export class HistoryService { const activeTail = fileBuffer.subarray(boundaryOffset); // Crash-replay dedupe: find the newest sequence already archived. - let archivedMaxSequence = -1; - await this.iterateBackward(archivePath, (messages) => { - const newest = this.getNewestHistorySequence(messages); - if (newest !== undefined && newest > archivedMaxSequence) { - archivedMaxSequence = newest; - } - return archivedMaxSequence === -1; // scan until a sequenced row is found - }); + const archivedMaxSequence = await this.getArchiveTailMaxSequence(workspaceId); const linesToArchive: string[] = []; for (const line of sealedPrefix.split("\n")) { @@ -1680,7 +1682,12 @@ export class HistoryService { return seq > max ? seq : max; }, -1); - const nextSeq = maxTruncatedSeq + 1; + // Sealed archive rows keep their sequences across an active-epoch + // truncation. When the truncation empties the active file, floor the + // counter with the archive max so new appends can never reuse archived + // sequence numbers. + const archiveMaxSeq = await this.getArchiveTailMaxSequence(workspaceId); + const nextSeq = Math.max(maxTruncatedSeq, archiveMaxSeq) + 1; assert( isNonNegativeInteger(nextSeq), "next history sequence counter after truncation must be a non-negative integer" From cad54083fbf60df6218f2554a0a1a87d65dd70bb Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:00:02 -0500 Subject: [PATCH 4/9] fix: no-op percentage truncation must not collapse the archive --- src/node/services/historyService.test.ts | 18 ++++++++++++++++++ src/node/services/historyService.ts | 7 +++++++ 2 files changed, 25 insertions(+) diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 5afe6750d7..87f40d712d 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1625,6 +1625,24 @@ describe("HistoryService", () => { ).toBe(false); }); + it("keeps the archive intact on a no-op percentage truncation", async () => { + await appendNumberedMessages(service, wsId, 3); + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); + + const chatBefore = await fs.readFile(chatPath(wsId), "utf-8"); + const archiveBefore = await fs.readFile(archivePath(wsId), "utf-8"); + + const truncateResult = await service.truncateHistory(wsId, 0); + expect(truncateResult.success).toBe(true); + if (truncateResult.success) { + expect(truncateResult.data).toEqual([]); + } + + // No-op truncation must not collapse the archive back into chat.jsonl. + expect(await fs.readFile(chatPath(wsId), "utf-8")).toBe(chatBefore); + expect(await fs.readFile(archivePath(wsId), "utf-8")).toBe(archiveBefore); + }); + it("hasHistory sees archive-only workspaces", async () => { await appendNumberedMessages(service, wsId, 1); await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 9f1156e277..5527ae3782 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -1842,6 +1842,13 @@ export class HistoryService { removeCount++; } + // No-op truncation (percentage 0 or rounding to zero tokens) must not + // rewrite anything — collapsing the archive back into chat.jsonl would + // undo rotation and put lifetime history back on the hot path. + if (removeCount === 0) { + return Ok([]); + } + // If we're removing all messages, use fast path if (removeCount >= messages.length) { await fs.rm(historyPath, { force: true }); From 496521e7dbe32e50e7cea0a7a42242b363f9f10a Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:08:01 -0500 Subject: [PATCH 5/9] fix: archive floor for deleteMessage counter; archive-aware live subagent transcripts --- src/node/orpc/router.ts | 27 +++++++++++++++++++++--- src/node/services/historyService.test.ts | 15 +++++++++++++ src/node/services/historyService.ts | 7 +++++- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index ae8cfe3720..39a59e9e09 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -106,6 +106,7 @@ import { type SubagentTranscriptArtifactIndexEntry, } from "@/node/services/subagentTranscriptArtifacts"; import { getErrorMessage } from "@/common/utils/errors"; +import { CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths"; import { WorkflowActionRegistry } from "@/node/services/workflows/WorkflowActionRegistry"; import { shouldDisableHostWorkflowActions, @@ -4332,6 +4333,8 @@ export const router = (authToken?: string) => { const readTranscriptFromPaths = async (params: { workspaceId: string; chatPath?: string; + /** Sealed pre-boundary history (chat-archive.jsonl) for live sessions. */ + chatArchivePath?: string; partialPath?: string; logLabel: string; }): Promise => { @@ -4341,6 +4344,12 @@ export const router = (authToken?: string) => { if (params.chatPath && !isPathInsideDir(workspaceSessionDir, params.chatPath)) { throw new Error("Refusing to read transcript outside workspace session dir"); } + if ( + params.chatArchivePath && + !isPathInsideDir(workspaceSessionDir, params.chatArchivePath) + ) { + throw new Error("Refusing to read transcript archive outside workspace session dir"); + } if (params.partialPath && !isPathInsideDir(workspaceSessionDir, params.partialPath)) { throw new Error("Refusing to read partial outside workspace session dir"); } @@ -4348,6 +4357,14 @@ export const router = (authToken?: string) => { const partial = params.partialPath ? await readPartialJsonBestEffort(params.partialPath) : null; + // Live sessions may have rotated sealed history into chat-archive.jsonl + // (older rows), which precedes chat.jsonl (active epoch). + const archivedMessages = params.chatArchivePath + ? await readChatJsonlAllowMissing({ + chatPath: params.chatArchivePath, + logLabel: `${params.logLabel} (archive)`, + }) + : null; const messages = params.chatPath ? await readChatJsonlAllowMissing({ chatPath: params.chatPath, @@ -4356,11 +4373,14 @@ export const router = (authToken?: string) => { : null; // If we only archived partial.json (e.g. interrupted stream), still allow viewing. - if (!messages && !partial) { + if (!messages && !archivedMessages && !partial) { throw new Error(`Transcript not found (missing ${params.logLabel})`); } - return mergePartialIntoHistory(messages ?? [], partial); + return mergePartialIntoHistory( + [...(archivedMessages ?? []), ...(messages ?? [])], + partial + ); }; let resolved: { @@ -4391,7 +4411,8 @@ export const router = (authToken?: string) => { const taskSessionDir = context.config.getSessionDir(taskId); const messages = await readTranscriptFromPaths({ workspaceId: taskId, - chatPath: path.join(taskSessionDir, "chat.jsonl"), + chatPath: path.join(taskSessionDir, CHAT_FILE_NAME), + chatArchivePath: path.join(taskSessionDir, CHAT_ARCHIVE_FILE_NAME), partialPath: path.join(taskSessionDir, "partial.json"), logLabel: `${taskId}/chat.jsonl`, }); diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 87f40d712d..08d8828254 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1625,6 +1625,21 @@ describe("HistoryService", () => { ).toBe(false); }); + it("never reuses archived sequences after deleting the whole active epoch in a fresh process", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 → archived + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + + // Fresh process: no cached sequence counter. Deleting the lone active row + // must not cache a counter below the archived rows. + const restarted = new HistoryService(config); + const deleteResult = await restarted.deleteMessage(wsId, "boundary-1"); + expect(deleteResult.success).toBe(true); + + const msg = createMuxMessage("new-msg", "user", "fresh"); + await restarted.appendToHistory(wsId, msg); + expect(msg.metadata?.historySequence).toBe(3); + }); + it("keeps the archive intact on a no-op percentage truncation", async () => { await appendNumberedMessages(service, wsId, 3); await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 5527ae3782..7f24fa51ab 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -1597,7 +1597,12 @@ export class HistoryService { return seq > max ? seq : max; }, -1); - const nextSeq = maxSeq + 1; + // Sealed archive rows keep their sequences across active-file deletes. + // Without this floor, deleting the last sequenced active row in a fresh + // process would cache a counter below archived rows and reuse their + // historySequence values on the next append. + const archiveMaxSeq = await this.getArchiveTailMaxSequence(workspaceId); + const nextSeq = Math.max(maxSeq, archiveMaxSeq) + 1; assert( isNonNegativeInteger(nextSeq), "next history sequence counter after delete must be a non-negative integer" From 9efdc20b1ba8135d37e599cb02dc1320732b5822 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:14:17 -0500 Subject: [PATCH 6/9] fix: archive floor when renaming archive-only sessions --- src/node/services/historyService.test.ts | 19 +++++++++++++++++++ src/node/services/historyService.ts | 8 ++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 08d8828254..1416ea1be2 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1640,6 +1640,25 @@ describe("HistoryService", () => { expect(msg.metadata?.historySequence).toBe(3); }); + it("seeds the counter from the archive when renaming an archive-only session", async () => { + await appendNumberedMessages(service, wsId, 3); // seq 0..2 → archived + await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); // seq 3 + // Archive-only session: the active file is gone but sealed rows remain. + await fs.rm(chatPath(wsId)); + + const newWsId = "ws-rotation-renamed"; + await fs.rename(config.getSessionDir(wsId), config.getSessionDir(newWsId)); + + // Fresh process: no cached counter for either workspace ID. + const restarted = new HistoryService(config); + const migrateResult = await restarted.migrateWorkspaceId(wsId, newWsId); + expect(migrateResult.success).toBe(true); + + const msg = createMuxMessage("new-msg", "user", "fresh"); + await restarted.appendToHistory(newWsId, msg); + expect(msg.metadata?.historySequence).toBe(3); + }); + it("keeps the archive intact on a no-op percentage truncation", async () => { await appendNumberedMessages(service, wsId, 3); await service.appendToHistory(wsId, boundaryMessage("boundary-1", 1)); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 7f24fa51ab..17b9c8e2d9 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -1949,9 +1949,13 @@ export class HistoryService { // Structural rewrite requires full file content. const messages = await this.readChatHistory(newWorkspaceId); if (messages.length === 0) { - // No messages to migrate, just transfer sequence counter + // No active messages to migrate, just transfer the sequence counter. + // Floor it with the archive max: an archive-only session (active file + // deleted/truncated) renamed in a fresh process has no cached counter, + // and seeding 0 would reuse archived historySequence values. const oldCounter = this.sequenceCounters.get(oldWorkspaceId) ?? 0; - this.sequenceCounters.set(newWorkspaceId, oldCounter); + const archiveFloor = (await this.getArchiveTailMaxSequence(newWorkspaceId)) + 1; + this.sequenceCounters.set(newWorkspaceId, Math.max(oldCounter, archiveFloor)); this.sequenceCounters.delete(oldWorkspaceId); return Ok(undefined); } From d3ef1e5be868fdded67f08981f5c82f906630a72 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:20:00 -0500 Subject: [PATCH 7/9] fix: keep analytics for archive-only sessions --- src/node/services/analytics/etl.test.ts | 17 ++++++++ src/node/services/analytics/etl.ts | 56 ++++++++++++------------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/node/services/analytics/etl.test.ts b/src/node/services/analytics/etl.test.ts index 60d3299fb6..19cc51f7ef 100644 --- a/src/node/services/analytics/etl.test.ts +++ b/src/node/services/analytics/etl.test.ts @@ -894,6 +894,23 @@ describe("ingestWorkspace", () => { ); expect(rows.map((row) => Number(row.input_tokens))).toEqual([11, 33]); }); + + test("keeps analytics for archive-only sessions (missing chat.jsonl)", async () => { + const conn = await createTestConn(); + const sessionDir = await createTempSessionDir(); + const workspaceId = "ws-archive-only"; + + // An archive-only session (active file deleted/truncated) still has history; + // it must be ingested rather than treated as a removed workspace. + await fs.writeFile( + path.join(sessionDir, "chat-archive.jsonl"), + [makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n" + ); + + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + + expect(await queryEventCount(conn, workspaceId)).toBe(1); + }); }); describe("readPersistedWorkspaceHeadSignature", () => { diff --git a/src/node/services/analytics/etl.ts b/src/node/services/analytics/etl.ts index f8b2180b59..58266c18c8 100644 --- a/src/node/services/analytics/etl.ts +++ b/src/node/services/analytics/etl.ts @@ -18,46 +18,44 @@ export const CHAT_FILE_NAME = "chat.jsonl"; * (see HistoryService). Analytics must consider both files or pre-compaction * usage/events would silently disappear after the first rotation. * - * Returns null when chat.jsonl does not exist (no workspace history). The - * reported mtime is the max across both files so watermark staleness checks - * see archive-only mutations too. + * Returns null only when NEITHER file exists (no workspace history) — an + * archive-only session must keep its analytics state. The reported mtime is + * the max across both files so watermark staleness checks see archive-only + * mutations too. */ async function statSessionChatHistory(sessionDir: string): Promise<{ mtimeMs: number } | null> { - let mtimeMs: number; - try { - mtimeMs = (await fs.stat(path.join(sessionDir, CHAT_FILE_NAME))).mtimeMs; - } catch (error) { - if (isRecord(error) && error.code === "ENOENT") { - return null; - } - throw error; - } - - try { - const archiveStat = await fs.stat(path.join(sessionDir, CHAT_ARCHIVE_FILE_NAME)); - mtimeMs = Math.max(mtimeMs, archiveStat.mtimeMs); - } catch (error) { - if (!(isRecord(error) && error.code === "ENOENT")) { - throw error; + let mtimeMs: number | null = null; + for (const fileName of [CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME]) { + try { + const stat = await fs.stat(path.join(sessionDir, fileName)); + mtimeMs = mtimeMs === null ? stat.mtimeMs : Math.max(mtimeMs, stat.mtimeMs); + } catch (error) { + if (!(isRecord(error) && error.code === "ENOENT")) { + throw error; + } } } - return { mtimeMs }; + return mtimeMs === null ? null : { mtimeMs }; } -/** Read full workspace history: sealed archive (older) followed by chat.jsonl. */ +/** + * Read full workspace history: sealed archive (older) followed by chat.jsonl. + * Either file may be missing (uncompacted or archive-only sessions). + */ async function readSessionChatHistoryContents(sessionDir: string): Promise { - let archived = ""; - try { - archived = await fs.readFile(path.join(sessionDir, CHAT_ARCHIVE_FILE_NAME), "utf-8"); - } catch (error) { - if (!(isRecord(error) && error.code === "ENOENT")) { - throw error; + let contents = ""; + for (const fileName of [CHAT_ARCHIVE_FILE_NAME, CHAT_FILE_NAME]) { + try { + contents += await fs.readFile(path.join(sessionDir, fileName), "utf-8"); + } catch (error) { + if (!(isRecord(error) && error.code === "ENOENT")) { + throw error; + } } } - const active = await fs.readFile(path.join(sessionDir, CHAT_FILE_NAME), "utf-8"); - return archived + active; + return contents; } const METADATA_FILE_NAME = "metadata.json"; const SUBAGENT_TRANSCRIPTS_DIR_NAME = "subagent-transcripts"; From b47c557e25c07bac9d12b6e1dcc53443c8655eb6 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:26:21 -0500 Subject: [PATCH 8/9] fix: reingest analytics on combined-mtime regression (active file removed) --- src/node/services/analytics/etl.test.ts | 35 +++++++++++++++++++++++++ src/node/services/analytics/etl.ts | 5 +++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/node/services/analytics/etl.test.ts b/src/node/services/analytics/etl.test.ts index 19cc51f7ef..da034db5eb 100644 --- a/src/node/services/analytics/etl.test.ts +++ b/src/node/services/analytics/etl.test.ts @@ -895,6 +895,41 @@ describe("ingestWorkspace", () => { expect(rows.map((row) => Number(row.input_tokens))).toEqual([11, 33]); }); + test("reingests when the active file disappears leaving an older archive", async () => { + const conn = await createTestConn(); + const sessionDir = await createTempSessionDir(); + const workspaceId = "ws-mtime-regression"; + + const archivePath = path.join(sessionDir, "chat-archive.jsonl"); + await fs.writeFile( + archivePath, + [makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n" + ); + // Make the archive strictly older than chat.jsonl so the watermark is based + // on the active file's mtime. + const olderTime = new Date(Date.now() - 60_000); + await fs.utimes(archivePath, olderTime, olderTime); + await writeChatJsonl(sessionDir, [ + makeUserLine(), + makeAssistantLine({ sequence: 3, inputTokens: 33 }), + ]); + + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + expect(await queryEventCount(conn, workspaceId)).toBe(2); + + // Deleting chat.jsonl regresses the combined mtime to the older archive's. + // Ingestion must still re-run and drop the removed active epoch's rows. + await fs.rm(path.join(sessionDir, CHAT_FILE_NAME)); + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + + const rows = await queryRows( + conn, + "SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens", + [workspaceId] + ); + expect(rows.map((row) => Number(row.input_tokens))).toEqual([11]); + }); + test("keeps analytics for archive-only sessions (missing chat.jsonl)", async () => { const conn = await createTestConn(); const sessionDir = await createTempSessionDir(); diff --git a/src/node/services/analytics/etl.ts b/src/node/services/analytics/etl.ts index 58266c18c8..a822cab060 100644 --- a/src/node/services/analytics/etl.ts +++ b/src/node/services/analytics/etl.ts @@ -1069,7 +1069,10 @@ export async function ingestWorkspace( // Keep delegation rollups fresh even when chat.jsonl is unchanged. await ingestDelegationRollups(conn, workspaceId, sessionDir, workspaceMeta); - if (stat.mtimeMs <= watermark.lastModified) { + // Skip only when the combined mtime is unchanged. An mtime *regression* means + // a file disappeared (e.g. chat.jsonl deleted leaving the older archive), and + // ingestion must re-run so the rebuild path can drop stale rows. + if (stat.mtimeMs === watermark.lastModified) { return; } From f28d78122c994c20dc3830821c75dcc088e6c129 Mon Sep 17 00:00:00 2001 From: Ammar Date: Fri, 12 Jun 2026 11:34:47 -0500 Subject: [PATCH 9/9] fix: presence-aware analytics watermark change signal --- src/node/services/analytics/etl.test.ts | 35 +++++++++++++++++++++ src/node/services/analytics/etl.ts | 41 ++++++++++++++++--------- 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/src/node/services/analytics/etl.test.ts b/src/node/services/analytics/etl.test.ts index da034db5eb..269d40ec27 100644 --- a/src/node/services/analytics/etl.test.ts +++ b/src/node/services/analytics/etl.test.ts @@ -930,6 +930,41 @@ describe("ingestWorkspace", () => { expect(rows.map((row) => Number(row.input_tokens))).toEqual([11]); }); + test("reingests when chat.jsonl disappears even if the archive mtime matches the stored max", async () => { + const conn = await createTestConn(); + const sessionDir = await createTempSessionDir(); + const workspaceId = "ws-same-tick-deletion"; + + const archivePath = path.join(sessionDir, "chat-archive.jsonl"); + const chatPath = path.join(sessionDir, CHAT_FILE_NAME); + await fs.writeFile( + archivePath, + [makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n" + ); + await writeChatJsonl(sessionDir, [ + makeUserLine(), + makeAssistantLine({ sequence: 3, inputTokens: 33 }), + ]); + // Same-tick rotation: both files share an identical mtime, so the max mtime + // alone cannot detect the active file's later disappearance. + const sharedTime = new Date(Date.now() - 60_000); + await fs.utimes(archivePath, sharedTime, sharedTime); + await fs.utimes(chatPath, sharedTime, sharedTime); + + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + expect(await queryEventCount(conn, workspaceId)).toBe(2); + + await fs.rm(chatPath); + await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" }); + + const rows = await queryRows( + conn, + "SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens", + [workspaceId] + ); + expect(rows.map((row) => Number(row.input_tokens))).toEqual([11]); + }); + test("keeps analytics for archive-only sessions (missing chat.jsonl)", async () => { const conn = await createTestConn(); const sessionDir = await createTempSessionDir(); diff --git a/src/node/services/analytics/etl.ts b/src/node/services/analytics/etl.ts index a822cab060..41adc6815f 100644 --- a/src/node/services/analytics/etl.ts +++ b/src/node/services/analytics/etl.ts @@ -19,24 +19,34 @@ export const CHAT_FILE_NAME = "chat.jsonl"; * usage/events would silently disappear after the first rotation. * * Returns null only when NEITHER file exists (no workspace history) — an - * archive-only session must keep its analytics state. The reported mtime is - * the max across both files so watermark staleness checks see archive-only - * mutations too. + * archive-only session must keep its analytics state. + * + * - `mtimeMs` is the max across both files; use it for "which copy is newer" + * recency comparisons (rebuild dedup winners). + * - `changeSignal` additionally folds in each file's size and presence, so the + * watermark staleness check still fires when a file disappears even if the + * surviving file's mtime equals the previously stored max (e.g. same-tick + * writes followed by deleting chat.jsonl). */ -async function statSessionChatHistory(sessionDir: string): Promise<{ mtimeMs: number } | null> { +async function statSessionChatHistory( + sessionDir: string +): Promise<{ mtimeMs: number; changeSignal: number } | null> { let mtimeMs: number | null = null; + let changeSignal = 0; for (const fileName of [CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME]) { try { const stat = await fs.stat(path.join(sessionDir, fileName)); mtimeMs = mtimeMs === null ? stat.mtimeMs : Math.max(mtimeMs, stat.mtimeMs); + changeSignal += stat.mtimeMs + stat.size; } catch (error) { if (!(isRecord(error) && error.code === "ENOENT")) { throw error; } + changeSignal -= 1; // presence marker: distinguishes a missing file } } - return mtimeMs === null ? null : { mtimeMs }; + return mtimeMs === null ? null : { mtimeMs, changeSignal }; } /** @@ -315,7 +325,7 @@ interface ParsedWorkspaceData { workspaceId: string; sessionDir: string; events: IngestEvent[]; - stat: { mtimeMs: number }; + stat: { mtimeMs: number; changeSignal: number }; workspaceMeta: WorkspaceMeta; delegationRollupRaw: DelegationRollupRaw; archivedTranscripts: ParsedWorkspaceData[]; @@ -1069,10 +1079,11 @@ export async function ingestWorkspace( // Keep delegation rollups fresh even when chat.jsonl is unchanged. await ingestDelegationRollups(conn, workspaceId, sessionDir, workspaceMeta); - // Skip only when the combined mtime is unchanged. An mtime *regression* means - // a file disappeared (e.g. chat.jsonl deleted leaving the older archive), and - // ingestion must re-run so the rebuild path can drop stale rows. - if (stat.mtimeMs === watermark.lastModified) { + // Skip only when the combined change signal (mtimes + sizes + presence) is + // unchanged. Any append/rewrite/rotation/file-deletion changes the signal, so + // ingestion re-runs and the rebuild path can drop stale rows — even when the + // surviving file's mtime equals the previously stored value. + if (stat.changeSignal === watermark.lastModified) { return; } @@ -1145,7 +1156,7 @@ export async function ingestWorkspace( await writeWatermark(conn, workspaceId, { lastSequence: parsedMaxSequence ?? -1, - lastModified: stat.mtimeMs, + lastModified: stat.changeSignal, }); } else { let maxSequence = watermark.lastSequence; @@ -1166,7 +1177,7 @@ export async function ingestWorkspace( await writeWatermark(conn, workspaceId, { lastSequence: maxSequence, - lastModified: stat.mtimeMs, + lastModified: stat.changeSignal, }); } @@ -1538,7 +1549,7 @@ export async function parseWorkspaceFromDisk( workspaceId, sessionDir, events, - stat: { mtimeMs: stat.mtimeMs }, + stat: { mtimeMs: stat.mtimeMs, changeSignal: stat.changeSignal }, workspaceMeta, delegationRollupRaw, archivedTranscripts, @@ -1684,7 +1695,9 @@ export async function rebuildAll( const maxSequence = getMaxSequence(workspace.events) ?? -1; await writeWatermark(conn, workspace.workspaceId, { lastSequence: maxSequence, - lastModified: workspace.stat.mtimeMs, + // Watermark staleness compares against the combined change signal, + // not the raw mtime (which is only used for dedup-winner recency). + lastModified: workspace.stat.changeSignal, }); await writeDelegationRollupsFromParsed(