Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/common/constants/paths.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import { join } from "path";
const LEGACY_MUX_DIR_NAME = ".cmux";
const MUX_DIR_NAME = ".mux";

/**
* Session-dir file holding the active chat history epoch (latest compaction
* boundary onward). Example: ~/.mux/sessions/<workspace>/chat.jsonl
*/
export const CHAT_FILE_NAME = "chat.jsonl";

/**
* Session-dir file holding sealed pre-boundary chat history. HistoryService
* rotates everything before the latest durable context boundary out of
* chat.jsonl into this append-only archive so per-turn reads/rewrites stay
* O(active epoch) instead of O(lifetime history).
*/
export const CHAT_ARCHIVE_FILE_NAME = "chat-archive.jsonl";

/**
* Migrate from the legacy ~/.cmux directory into ~/.mux for rebranded installs.
* Called on startup to preserve data created by earlier releases.
Expand Down
27 changes: 24 additions & 3 deletions src/node/orpc/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import {
type SubagentTranscriptArtifactIndexEntry,
} from "@/node/services/subagentTranscriptArtifacts";
import { getErrorMessage } from "@/common/utils/errors";
import { CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths";
import { WorkflowActionRegistry } from "@/node/services/workflows/WorkflowActionRegistry";
import {
shouldDisableHostWorkflowActions,
Expand Down Expand Up @@ -4332,6 +4333,8 @@ export const router = (authToken?: string) => {
const readTranscriptFromPaths = async (params: {
workspaceId: string;
chatPath?: string;
/** Sealed pre-boundary history (chat-archive.jsonl) for live sessions. */
chatArchivePath?: string;
partialPath?: string;
logLabel: string;
}): Promise<MuxMessage[]> => {
Expand All @@ -4341,13 +4344,27 @@ export const router = (authToken?: string) => {
if (params.chatPath && !isPathInsideDir(workspaceSessionDir, params.chatPath)) {
throw new Error("Refusing to read transcript outside workspace session dir");
}
if (
params.chatArchivePath &&
!isPathInsideDir(workspaceSessionDir, params.chatArchivePath)
) {
throw new Error("Refusing to read transcript archive outside workspace session dir");
}
if (params.partialPath && !isPathInsideDir(workspaceSessionDir, params.partialPath)) {
throw new Error("Refusing to read partial outside workspace session dir");
}

const partial = params.partialPath
? await readPartialJsonBestEffort(params.partialPath)
: null;
// Live sessions may have rotated sealed history into chat-archive.jsonl
// (older rows), which precedes chat.jsonl (active epoch).
const archivedMessages = params.chatArchivePath
? await readChatJsonlAllowMissing({
chatPath: params.chatArchivePath,
logLabel: `${params.logLabel} (archive)`,
})
: null;
const messages = params.chatPath
? await readChatJsonlAllowMissing({
chatPath: params.chatPath,
Expand All @@ -4356,11 +4373,14 @@ export const router = (authToken?: string) => {
: null;

// If we only archived partial.json (e.g. interrupted stream), still allow viewing.
if (!messages && !partial) {
if (!messages && !archivedMessages && !partial) {
throw new Error(`Transcript not found (missing ${params.logLabel})`);
}

return mergePartialIntoHistory(messages ?? [], partial);
return mergePartialIntoHistory(
[...(archivedMessages ?? []), ...(messages ?? [])],
partial
);
};

let resolved: {
Expand Down Expand Up @@ -4391,7 +4411,8 @@ export const router = (authToken?: string) => {
const taskSessionDir = context.config.getSessionDir(taskId);
const messages = await readTranscriptFromPaths({
workspaceId: taskId,
chatPath: path.join(taskSessionDir, "chat.jsonl"),
chatPath: path.join(taskSessionDir, CHAT_FILE_NAME),
chatArchivePath: path.join(taskSessionDir, CHAT_ARCHIVE_FILE_NAME),
partialPath: path.join(taskSessionDir, "partial.json"),
logLabel: `${taskId}/chat.jsonl`,
});
Expand Down
114 changes: 114 additions & 0 deletions src/node/services/analytics/etl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,120 @@ describe("ingestWorkspace", () => {
expect(refreshedHeadRows[0].tool_name).toBe("bash");
expect(Number(refreshedHeadRows[0].total_cost_usd)).toBeCloseTo(originalHeadTotalCostUsd, 12);
});

test("ingests sealed pre-boundary rows from chat-archive.jsonl", async () => {
const conn = await createTestConn();
const sessionDir = await createTempSessionDir();
const workspaceId = "ws-with-archive";

// HistoryService rotation moves pre-boundary rows into chat-archive.jsonl;
// analytics must read both files or pre-compaction usage disappears.
await fs.writeFile(
path.join(sessionDir, "chat-archive.jsonl"),
[makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n"
);
await writeChatJsonl(sessionDir, [
makeUserLine(),
makeAssistantLine({ sequence: 3, inputTokens: 33 }),
]);

await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });

expect(await queryEventCount(conn, workspaceId)).toBe(2);
const rows = await queryRows(
conn,
"SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens",
[workspaceId]
);
expect(rows.map((row) => Number(row.input_tokens))).toEqual([11, 33]);
});

test("reingests when the active file disappears leaving an older archive", async () => {
const conn = await createTestConn();
const sessionDir = await createTempSessionDir();
const workspaceId = "ws-mtime-regression";

const archivePath = path.join(sessionDir, "chat-archive.jsonl");
await fs.writeFile(
archivePath,
[makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n"
);
// Make the archive strictly older than chat.jsonl so the watermark is based
// on the active file's mtime.
const olderTime = new Date(Date.now() - 60_000);
await fs.utimes(archivePath, olderTime, olderTime);
await writeChatJsonl(sessionDir, [
makeUserLine(),
makeAssistantLine({ sequence: 3, inputTokens: 33 }),
]);

await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });
expect(await queryEventCount(conn, workspaceId)).toBe(2);

// Deleting chat.jsonl regresses the combined mtime to the older archive's.
// Ingestion must still re-run and drop the removed active epoch's rows.
await fs.rm(path.join(sessionDir, CHAT_FILE_NAME));
await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });

const rows = await queryRows(
conn,
"SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens",
[workspaceId]
);
expect(rows.map((row) => Number(row.input_tokens))).toEqual([11]);
});

test("reingests when chat.jsonl disappears even if the archive mtime matches the stored max", async () => {
const conn = await createTestConn();
const sessionDir = await createTempSessionDir();
const workspaceId = "ws-same-tick-deletion";

const archivePath = path.join(sessionDir, "chat-archive.jsonl");
const chatPath = path.join(sessionDir, CHAT_FILE_NAME);
await fs.writeFile(
archivePath,
[makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n"
);
await writeChatJsonl(sessionDir, [
makeUserLine(),
makeAssistantLine({ sequence: 3, inputTokens: 33 }),
]);
// Same-tick rotation: both files share an identical mtime, so the max mtime
// alone cannot detect the active file's later disappearance.
const sharedTime = new Date(Date.now() - 60_000);
await fs.utimes(archivePath, sharedTime, sharedTime);
await fs.utimes(chatPath, sharedTime, sharedTime);

await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });
expect(await queryEventCount(conn, workspaceId)).toBe(2);

await fs.rm(chatPath);
await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });

const rows = await queryRows(
conn,
"SELECT input_tokens FROM events WHERE workspace_id = ? ORDER BY input_tokens",
[workspaceId]
);
expect(rows.map((row) => Number(row.input_tokens))).toEqual([11]);
});

test("keeps analytics for archive-only sessions (missing chat.jsonl)", async () => {
const conn = await createTestConn();
const sessionDir = await createTempSessionDir();
const workspaceId = "ws-archive-only";

// An archive-only session (active file deleted/truncated) still has history;
// it must be ingested rather than treated as a removed workspace.
await fs.writeFile(
path.join(sessionDir, "chat-archive.jsonl"),
[makeUserLine(), makeAssistantLine({ sequence: 1, inputTokens: 11 })].join("\n") + "\n"
);

await ingestWorkspace(conn, workspaceId, sessionDir, { projectPath: "/proj" });

expect(await queryEventCount(conn, workspaceId)).toBe(1);
});
});

describe("readPersistedWorkspaceHeadSignature", () => {
Expand Down
109 changes: 78 additions & 31 deletions src/node/services/analytics/etl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,64 @@ import { getErrorMessage } from "@/common/utils/errors";
import { createDisplayUsage } from "@/common/utils/tokens/displayUsage";
import { log } from "@/node/services/log";
import { toUtcDateString } from "@/node/services/analytics/dateUtils";
import { CHAT_ARCHIVE_FILE_NAME } from "@/common/constants/paths";

export const CHAT_FILE_NAME = "chat.jsonl";

/**
* Sealed pre-boundary history rotates from chat.jsonl into chat-archive.jsonl
* (see HistoryService). Analytics must consider both files or pre-compaction
* usage/events would silently disappear after the first rotation.
*
* Returns null only when NEITHER file exists (no workspace history) — an
* archive-only session must keep its analytics state.
*
* - `mtimeMs` is the max across both files; use it for "which copy is newer"
* recency comparisons (rebuild dedup winners).
* - `changeSignal` additionally folds in each file's size and presence, so the
* watermark staleness check still fires when a file disappears even if the
* surviving file's mtime equals the previously stored max (e.g. same-tick
* writes followed by deleting chat.jsonl).
*/
async function statSessionChatHistory(
sessionDir: string
): Promise<{ mtimeMs: number; changeSignal: number } | null> {
let mtimeMs: number | null = null;
let changeSignal = 0;
for (const fileName of [CHAT_FILE_NAME, CHAT_ARCHIVE_FILE_NAME]) {
try {
const stat = await fs.stat(path.join(sessionDir, fileName));
mtimeMs = mtimeMs === null ? stat.mtimeMs : Math.max(mtimeMs, stat.mtimeMs);
changeSignal += stat.mtimeMs + stat.size;
} catch (error) {
if (!(isRecord(error) && error.code === "ENOENT")) {
throw error;
}
changeSignal -= 1; // presence marker: distinguishes a missing file
}
}

return mtimeMs === null ? null : { mtimeMs, changeSignal };
}

/**
* Read full workspace history: sealed archive (older) followed by chat.jsonl.
* Either file may be missing (uncompacted or archive-only sessions).
*/
async function readSessionChatHistoryContents(sessionDir: string): Promise<string> {
let contents = "";
for (const fileName of [CHAT_ARCHIVE_FILE_NAME, CHAT_FILE_NAME]) {
try {
contents += await fs.readFile(path.join(sessionDir, fileName), "utf-8");
} catch (error) {
if (!(isRecord(error) && error.code === "ENOENT")) {
throw error;
}
}
}

return contents;
}
const METADATA_FILE_NAME = "metadata.json";
const SUBAGENT_TRANSCRIPTS_DIR_NAME = "subagent-transcripts";
const SESSION_USAGE_FILE_NAME = "session-usage.json";
Expand Down Expand Up @@ -269,7 +325,7 @@ interface ParsedWorkspaceData {
workspaceId: string;
sessionDir: string;
events: IngestEvent[];
stat: { mtimeMs: number };
stat: { mtimeMs: number; changeSignal: number };
workspaceMeta: WorkspaceMeta;
delegationRollupRaw: DelegationRollupRaw;
archivedTranscripts: ParsedWorkspaceData[];
Expand Down Expand Up @@ -1009,19 +1065,11 @@ export async function ingestWorkspace(
assert(workspaceId.trim().length > 0, "ingestWorkspace: workspaceId is required");
assert(sessionDir.trim().length > 0, "ingestWorkspace: sessionDir is required");

const chatPath = path.join(sessionDir, CHAT_FILE_NAME);

let stat: Awaited<ReturnType<typeof fs.stat>>;
try {
stat = await fs.stat(chatPath);
} catch (error) {
if (isRecord(error) && error.code === "ENOENT") {
// Remove stale analytics state when the workspace history file no longer exists.
await clearWorkspaceAnalyticsState(conn, workspaceId);
return;
}

throw error;
const stat = await statSessionChatHistory(sessionDir);
if (stat === null) {
// Remove stale analytics state when the workspace history file no longer exists.
await clearWorkspaceAnalyticsState(conn, workspaceId);
return;
}

const watermark = await readWatermark(conn, workspaceId);
Expand All @@ -1031,11 +1079,15 @@ export async function ingestWorkspace(
// Keep delegation rollups fresh even when chat.jsonl is unchanged.
await ingestDelegationRollups(conn, workspaceId, sessionDir, workspaceMeta);

if (stat.mtimeMs <= watermark.lastModified) {
// Skip only when the combined change signal (mtimes + sizes + presence) is
// unchanged. Any append/rewrite/rotation/file-deletion changes the signal, so
// ingestion re-runs and the rebuild path can drop stale rows — even when the
// surviving file's mtime equals the previously stored value.
if (stat.changeSignal === watermark.lastModified) {
return;
}

const chatContents = await fs.readFile(chatPath, "utf-8");
const chatContents = await readSessionChatHistoryContents(sessionDir);
const lines = chatContents.split("\n").filter((line) => line.trim().length > 0);

let responseIndex = 0;
Expand Down Expand Up @@ -1104,7 +1156,7 @@ export async function ingestWorkspace(

await writeWatermark(conn, workspaceId, {
lastSequence: parsedMaxSequence ?? -1,
lastModified: stat.mtimeMs,
lastModified: stat.changeSignal,
});
} else {
let maxSequence = watermark.lastSequence;
Expand All @@ -1125,7 +1177,7 @@ export async function ingestWorkspace(

await writeWatermark(conn, workspaceId, {
lastSequence: maxSequence,
lastModified: stat.mtimeMs,
lastModified: stat.changeSignal,
});
}

Expand Down Expand Up @@ -1450,22 +1502,15 @@ export async function parseWorkspaceFromDisk(
assert(workspaceId.trim().length > 0, "parseWorkspaceFromDisk: workspaceId is required");
assert(sessionDir.trim().length > 0, "parseWorkspaceFromDisk: sessionDir is required");

const chatPath = path.join(sessionDir, CHAT_FILE_NAME);

let stat: Awaited<ReturnType<typeof fs.stat>>;
try {
stat = await fs.stat(chatPath);
} catch (error) {
if (isRecord(error) && error.code === "ENOENT") {
return null;
}
throw error;
const stat = await statSessionChatHistory(sessionDir);
if (stat === null) {
return null;
}

const persistedMeta = await readWorkspaceMetaFromDisk(sessionDir);
const workspaceMeta = mergeWorkspaceMeta(persistedMeta, suppliedMeta);

const chatContents = await fs.readFile(chatPath, "utf-8");
const chatContents = await readSessionChatHistoryContents(sessionDir);
const lines = chatContents.split("\n").filter((line) => line.trim().length > 0);

let responseIndex = 0;
Expand Down Expand Up @@ -1504,7 +1549,7 @@ export async function parseWorkspaceFromDisk(
workspaceId,
sessionDir,
events,
stat: { mtimeMs: stat.mtimeMs },
stat: { mtimeMs: stat.mtimeMs, changeSignal: stat.changeSignal },
workspaceMeta,
delegationRollupRaw,
archivedTranscripts,
Expand Down Expand Up @@ -1650,7 +1695,9 @@ export async function rebuildAll(
const maxSequence = getMaxSequence(workspace.events) ?? -1;
await writeWatermark(conn, workspace.workspaceId, {
lastSequence: maxSequence,
lastModified: workspace.stat.mtimeMs,
// Watermark staleness compares against the combined change signal,
// not the raw mtime (which is only used for dedup-winner recency).
lastModified: workspace.stat.changeSignal,
});

await writeDelegationRollupsFromParsed(
Expand Down
Loading
Loading