diff --git a/lib/commands/decompress.ts b/lib/commands/decompress.ts index 2f346105..1445f08b 100644 --- a/lib/commands/decompress.ts +++ b/lib/commands/decompress.ts @@ -167,7 +167,7 @@ export async function handleDecompressCommand(ctx: DecompressCommandContext): Pr return } - syncCompressionBlocks(state, logger, messages) + syncCompressionBlocks(state, logger, messages, { authoritative: true }) const messagesState = state.prune.messages if (!targetArg) { @@ -236,7 +236,7 @@ export async function handleDecompressCommand(ctx: DecompressCommandContext): Pr block.deactivatedByBlockId = undefined } - syncCompressionBlocks(state, logger, messages) + syncCompressionBlocks(state, logger, messages, { authoritative: true }) let restoredMessageCount = 0 let restoredTokens = 0 diff --git a/lib/commands/recompress.ts b/lib/commands/recompress.ts index 1d67d6bd..848f1970 100644 --- a/lib/commands/recompress.ts +++ b/lib/commands/recompress.ts @@ -120,7 +120,7 @@ export async function handleRecompressCommand(ctx: RecompressCommandContext): Pr return } - syncCompressionBlocks(state, logger, messages) + syncCompressionBlocks(state, logger, messages, { authoritative: true }) const messagesState = state.prune.messages const availableMessageIds = new Set(messages.map((msg) => msg.info.id)) @@ -186,7 +186,7 @@ export async function handleRecompressCommand(ctx: RecompressCommandContext): Pr block.deactivatedByBlockId = undefined } - syncCompressionBlocks(state, logger, messages) + syncCompressionBlocks(state, logger, messages, { authoritative: true }) let recompressedMessageCount = 0 let recompressedTokens = 0 diff --git a/lib/compress/dag.ts b/lib/compress/dag.ts new file mode 100644 index 00000000..e25858eb --- /dev/null +++ b/lib/compress/dag.ts @@ -0,0 +1,28 @@ +export class DAGValidationError extends Error { + constructor(message: string) { + super(message) + this.name = "DAGValidationError" + } +} + +export function validateBlockRefs( + newBlockId: number, + refBlockIds: number[], + blocksById: ReadonlyMap, +): void { + for (const refId of refBlockIds) { + if (refId === newBlockId) { + throw new DAGValidationError(`DAG validation failed: self-ref blockId ${newBlockId}`) + } + + if (refId >= newBlockId) { + throw new DAGValidationError( + `DAG validation failed: forward-ref: blockId ${newBlockId} cannot reference ${refId} (must be < ${newBlockId})`, + ) + } + + if (!blocksById.has(refId)) { + console.warn(`DAG validation warning: missing ref blockId ${refId} for blockId ${newBlockId}`) + } + } +} diff --git a/lib/compress/dedup.ts b/lib/compress/dedup.ts new file mode 100644 index 00000000..19862d7c --- /dev/null +++ b/lib/compress/dedup.ts @@ -0,0 +1,254 @@ +import { + formatBlockPlaceholder, + formatBlockRef, + formatMessageIdTag, +} from "../message-ids" +import type { BlockLike } from "./renderer" + +export const COMPRESSED_BLOCK_HEADER = "[Compressed conversation section]" + +export interface ConsumedBlock { + id: number + summary: string + schemaVersion?: number +} + +export interface DedupResult { + deduped: string + refBlockIds: number[] +} + +/** + * Strip the standard [Compressed conversation section] header prefix and + * dcp-message-id `bN` boundary footer from a stored block.summary, + * returning the trimmed inner body. Returns "" when the summary contains no + * body content. + * + * Mirrors the inverse of wrapBlockSummary in lib/compress/state.ts and is the + * canonical way to recover the raw model-produced summary text for matching. + */ +export function extractBlockBody(blockSummary: string, blockId: number): string { + const header = COMPRESSED_BLOCK_HEADER + const footer = formatMessageIdTag(formatBlockRef(blockId)) + let body = blockSummary + if (body.startsWith(`${header}\n`)) { + body = body.slice(header.length + 1) + } else if (body.startsWith(header)) { + body = body.slice(header.length) + } + if (body.endsWith(footer)) { + body = body.slice(0, -footer.length) + } + return body.trim() +} + +/** + * Strip compact marker text that the LLM may have parroted from the + * compression prompt. Markers are injected into the prompt by + * `appendMissingBlockSummaries` and `injectBlockPlaceholders` in + * `lib/compress/range-utils.ts` to instruct the model to leave (bN) refs + * verbatim; if the model echoes the marker text into its returned summary we + * must remove the surrounding instruction text before persisting so stored + * block summaries contain only bare `(bN)` refs (Oracle Round 3 gap 1: + * storage vs prompt separation). + * + * Patterns stripped: + * - `(bN) — existing compressed block [topic: "..."] — preserve this token + * exactly, do not expand or paraphrase` → `(bN)` + * - `(bN) — preserved compressed block — do not paraphrase or replace` + * → `(bN)` + * - `### (bN)` heading lines → `(bN)` + * - `The following previously compressed summaries were also part of this + * conversation section:` heading paragraph (dropped entirely) + * + * Marker text uses real em-dashes (—) so plain ASCII summaries are never + * affected. Each pattern is anchored on the literal English used by + * range-utils.ts; the stripping is intentionally narrow to avoid clobbering + * legitimate summary text that happens to mention (bN). + */ +export function stripCompactMarkers(text: string): string { + let result = text + + // Tail 1: consumed-block marker (existing compressed block). + result = result.replace( + /\(b(\d+)\)\s*—\s*existing compressed block\s+\[topic:\s*"[^"]*"\]\s*—\s*preserve this token exactly,?\s*do not expand or paraphrase/g, + "(b$1)", + ) + + // Tail 2: preserved-block marker (still-active compressed block). + result = result.replace( + /\(b(\d+)\)\s*—\s*preserved compressed block\s*—\s*do not paraphrase or replace/g, + "(b$1)", + ) + + // Per-block heading lines from appendMissingBlockSummaries (run before + // the section-heading paragraph strip so the leading `\n` is preserved + // for the `\n###` anchor). + result = result.replace(/(?:^|\n)###\s+\(b(\d+)\)\s*/g, "\n(b$1)") + + // Section heading from appendMissingBlockSummaries. `[ \t]*` (not `\s*`) + // so newlines after the colon stay intact for adjacent strips. + result = result.replace( + /\n*The following previously compressed summaries were also part of this conversation section:[ \t]*/g, + "", + ) + + return result +} + +/** + * Replace verbatim occurrences of consumed block bodies in the model-produced + * summary with `(bN)` placeholders so the stored summary stays compact. + * + * Algorithm: + * 1. Extract the inner body of every consumed block via extractBlockBody + * (strips the header/footer wrapper so we match the raw model text). + * 2. Sort by body length DESCENDING so a short body that happens to be a + * substring of a longer body cannot pre-empt the longer match. + * 3. For each body, String.replace(body, "(bN)") substitutes the FIRST + * occurrence only. Each consumed block is replaced at most once and + * only enters refBlockIds when its body actually matched. + * 4. T8 step 5: rendered-content leak detection. After the exact-substring + * pass, walk a body-only DAG expansion of each not-yet-matched consumed + * block (renderBodyOnly) and check whether its FULL recursive body + * appears verbatim in the working summary. If so, log a warning and + * replace with (bN) — this catches snowball cases where the model copied + * the full rendered chain rather than the compact stored body. + * + * Returns { deduped, refBlockIds } where refBlockIds is the list of blocks + * whose body or rendered expansion was actually replaced, in replacement + * order. + */ +export function deduplicateBlockContent( + modelSummary: string, + consumedBlocks: ReadonlyArray, + blocksById: ReadonlyMap, +): DedupResult { + const consumedBodies: Array<{ id: number; body: string }> = [] + for (const consumed of consumedBlocks) { + const body = extractBlockBody(consumed.summary, consumed.id) + if (body.length === 0) { + continue + } + consumedBodies.push({ id: consumed.id, body }) + } + consumedBodies.sort((left, right) => right.body.length - left.body.length) + + let working = modelSummary + const refBlockIds: number[] = [] + const replacedIds = new Set() + for (const { id, body } of consumedBodies) { + if (working.indexOf(body) === -1) { + continue + } + working = working.replace(body, formatBlockPlaceholder(id)) + refBlockIds.push(id) + replacedIds.add(id) + } + + // T8 step 5: rendered-content leak detection. For each consumed block not + // already matched by exact-substring dedup, recursively expand its body's + // (bN) refs against blocksById and check whether the FULLY EXPANDED BODY + // appears verbatim in `working`. This catches snowball cases where the + // model paraphrased the consumed body's structure but kept all the child + // content inline (e.g. body "(b1) bridge" with b1 = "alpha" leaking as + // "alpha bridge" — exact-substring dedup against the literal body would + // not match, but the renderer-equivalent expansion does). + // + // We expand at the BODY level (not the wrapped renderBlockForContext + // output) because the model never sees the [Compressed conversation + // section] wrapper in its summary; the wrapped form would have nested + // headers/footers spliced into it and would not match raw model text. + const memo = new Map() + for (const consumed of consumedBlocks) { + if (replacedIds.has(consumed.id)) { + continue + } + const rendered = renderBodyOnly(consumed.id, blocksById, memo, new Set()) + if (rendered.length === 0) { + continue + } + // Avoid trivially-matching when rendered === body (already attempted + // by the exact-substring pass above). + const body = extractBlockBody(consumed.summary, consumed.id) + if (rendered === body) { + continue + } + if (working.indexOf(rendered) === -1) { + continue + } + console.warn( + `wrapCompressedSummary: rendered-content leak detected for (b${consumed.id}); replacing with bare ref`, + ) + working = working.replace(rendered, formatBlockPlaceholder(consumed.id)) + refBlockIds.push(consumed.id) + replacedIds.add(consumed.id) + } + + return { deduped: working, refBlockIds } +} + +/** + * Recursively expand a block's BODY (no wrapper, no headers/footers) by + * replacing each `(bN)` placeholder with its child's body. Used by + * deduplicateBlockContent for the rendered-content leak check. + * + * Differs from renderBlockForContext in two ways: + * 1. Operates on extractBlockBody output (no wrapper) at every level so + * the final string is a clean concatenation of bare bodies. + * 2. Uses a per-call memo to avoid re-expanding shared subtrees in a + * diamond DAG. The renderer's `renderedOnce` set instead emits an + * [already expanded above] marker, which would not match raw model + * text and is the wrong semantics here. + * + * `expanding` mirrors the renderer's cycle detector: if a forward-ref or a + * cycle slipped past T4's validation we return the literal `(bN)` token + * rather than recurse infinitely. + */ +function renderBodyOnly( + blockId: number, + blocksById: ReadonlyMap, + memo: Map, + expanding: Set, +): string { + const cached = memo.get(blockId) + if (cached !== undefined) { + return cached + } + if (expanding.has(blockId)) { + return formatBlockPlaceholder(blockId) + } + const block = blocksById.get(blockId) + if (!block) { + return formatBlockPlaceholder(blockId) + } + + const body = extractBlockBody(block.summary, blockId) + // Legacy v1 block: body has no DAG refs to expand. + if (block.refBlockIds === undefined) { + memo.set(blockId, body) + return body + } + + expanding.add(blockId) + let result = body + const seen = new Set() + try { + // Walk every (bN) the body actually contains so cross-refs not listed + // in refBlockIds (legacy migration noise) still expand correctly. + const matches = body.matchAll(/\(b(\d+)\)/g) + for (const match of matches) { + const refId = Number.parseInt(match[1], 10) + if (!Number.isInteger(refId) || seen.has(refId)) { + continue + } + seen.add(refId) + const childText = renderBodyOnly(refId, blocksById, memo, expanding) + result = result.split(formatBlockPlaceholder(refId)).join(childText) + } + } finally { + expanding.delete(blockId) + } + memo.set(blockId, result) + return result +} diff --git a/lib/compress/index.ts b/lib/compress/index.ts index bdb7f2eb..7b3d7a8b 100644 --- a/lib/compress/index.ts +++ b/lib/compress/index.ts @@ -1,3 +1,4 @@ -export { ToolContext } from "./types" +export { DAGValidationError, validateBlockRefs } from "./dag" export { createCompressMessageTool } from "./message" export { createCompressRangeTool } from "./range" +export { ToolContext } from "./types" diff --git a/lib/compress/message.ts b/lib/compress/message.ts index d6bf8874..7e72fd5d 100644 --- a/lib/compress/message.ts +++ b/lib/compress/message.ts @@ -1,18 +1,31 @@ import { tool } from "@opencode-ai/plugin" import type { ToolContext } from "./types" -import { countTokens } from "../token-utils" +import { renderBlockForContext, type BlockLike } from "./renderer" import { MESSAGE_FORMAT_EXTENSION } from "../prompts/extensions/tool" import { formatIssues, formatResult, resolveMessages, validateArgs } from "./message-utils" -import { finalizeSession, prepareSession, type NotificationEntry } from "./pipeline" +import { withSessionLock } from "../state/lock" +import { + RebaseConflict, + persistCompressionState, + prepareSession, + rebasePlannedCompression, + reloadLatestState, + sendCompressionNotification, + type NotificationEntry, +} from "./pipeline" import { appendProtectedPromptInfo, appendProtectedTools } from "./protected-content" +import { assertUsefulCompressedSummary, estimateSelectedTokens } from "./summary-limits" import { - allocateBlockId, allocateRunId, applyCompressionState, + previewBlockIds, + reserveBlockIds, wrapCompressedSummary, } from "./state" import type { CompressMessageToolArgs } from "./types" +const MAX_REBASE_ATTEMPTS = 3 + function buildSchema() { return { topic: tool.schema @@ -53,93 +66,171 @@ export function createCompressMessageTool(ctx: ToolContext): ReturnType 0) { - throw new Error(formatIssues(skippedIssues, skippedCount)) + for (let attempt = 1; attempt <= MAX_REBASE_ATTEMPTS; attempt++) { + try { + const { rawMessages, searchContext } = await prepareSession( + ctx, + toolCtx, + `Compress Message: ${input.topic}`, + ) + const { plans, skippedIssues, skippedCount } = resolveMessages( + input, + searchContext, + ctx.state, + ctx.config, + ) + + if (plans.length === 0 && skippedCount > 0) { + throw new Error(formatIssues(skippedIssues, skippedCount)) + } + + const preparedPlans: Array<{ + plan: (typeof plans)[number] + summaryWithTools: string + }> = [] + + for (const plan of plans) { + const summaryWithPromptInfo = appendProtectedPromptInfo( + plan.entry.summary, + plan.selection, + searchContext, + ctx.state, + ctx.config.compress.protectTags, + ) + + const summaryWithTools = await appendProtectedTools( + ctx.client, + ctx.state, + ctx.config.experimental.allowSubAgents, + summaryWithPromptInfo, + plan.selection, + searchContext, + ctx.config.compress.protectedTools, + ctx.config.protectedFilePatterns, + ) + + preparedPlans.push({ + plan, + summaryWithTools, + }) + } + + const result = await withSessionLock(toolCtx.sessionID, async () => { + await reloadLatestState(ctx.state, toolCtx.sessionID, ctx.logger) + rebasePlannedCompression(plans, ctx.state) + + const notifications: NotificationEntry[] = [] + const blockIds = previewBlockIds(ctx.state, preparedPlans.length) + const validatedPlans = preparedPlans.map( + ({ plan, summaryWithTools }, index) => { + const blockId = blockIds[index] + if (blockId === undefined) { + throw new Error("Failed to preview compression block ID") + } + + const wrapResult = wrapCompressedSummary({ + blockId, + modelSummary: summaryWithTools, + consumedBlocks: [], + blocksById: searchContext.summaryByBlockId, + mode: "message", + }) + const { storedSummary } = wrapResult + const { refBlockIds } = wrapResult + + // Phase 0 Contract A: render the draft block against a draft Map so we can + // measure tokens WITHOUT mutating state.prune.messages.blocksById. The draft + // is committed only after assertUsefulCompressedSummary passes and + // reserveBlockIds/applyCompressionState succeed below. + // wrapCompressedSummary returns draftBlock already populated with refBlockIds + // and schemaVersion: 2 from the exact-substring dedup pass (T8). Message mode + // never consumes prior blocks so refBlockIds is always [] here; summaryTokens + // remains 0 until renderBlockForContext fills it in below (T12 prep). + const draftBlock: BlockLike & { summaryTokens: number } = + wrapResult.draftBlock + const draftBlocksById = new Map( + ctx.state.prune.messages.blocksById, + ) + draftBlocksById.set(blockId, draftBlock) + const { renderedTokens } = renderBlockForContext( + blockId, + draftBlocksById, + ) + draftBlock.summaryTokens = renderedTokens + const summaryTokens = renderedTokens + const selectedTokens = estimateSelectedTokens(ctx.state, plan.selection) + assertUsefulCompressedSummary(summaryTokens, selectedTokens) + + return { + plan, + summaryWithTools, + blockId, + storedSummary, + refBlockIds, + summaryTokens, + } + }, + ) + + reserveBlockIds(ctx.state, validatedPlans.length) + const runId = allocateRunId(ctx.state) + + for (const validatedPlan of validatedPlans) { + const { plan, summaryWithTools, blockId, storedSummary, summaryTokens } = + validatedPlan + + applyCompressionState( + ctx.state, + { + topic: plan.entry.topic, + batchTopic: input.topic, + startId: plan.entry.messageId, + endId: plan.entry.messageId, + mode: "message", + runId, + compressMessageId: toolCtx.messageID, + compressCallId: callId, + summaryTokens, + refBlockIds: validatedPlan.refBlockIds, + }, + plan.selection, + plan.anchorMessageId, + blockId, + storedSummary, + [], + ) + + notifications.push({ + blockId, + runId, + summary: summaryWithTools, + summaryTokens, + }) + } + + await persistCompressionState(ctx.state, toolCtx.sessionID, ctx.logger) + + return { notifications } + }) + + await sendCompressionNotification( + ctx, + toolCtx, + rawMessages, + result.notifications, + input.topic, + ) + + return formatResult(plans.length, skippedIssues, skippedCount) + } catch (error) { + if (error instanceof RebaseConflict && attempt < MAX_REBASE_ATTEMPTS) { + continue + } + throw error + } } - const notifications: NotificationEntry[] = [] - - const preparedPlans: Array<{ - plan: (typeof plans)[number] - summaryWithTools: string - }> = [] - - for (const plan of plans) { - const summaryWithPromptInfo = appendProtectedPromptInfo( - plan.entry.summary, - plan.selection, - searchContext, - ctx.state, - ctx.config.compress.protectTags, - ) - - const summaryWithTools = await appendProtectedTools( - ctx.client, - ctx.state, - ctx.config.experimental.allowSubAgents, - summaryWithPromptInfo, - plan.selection, - searchContext, - ctx.config.compress.protectedTools, - ctx.config.protectedFilePatterns, - ) - - preparedPlans.push({ - plan, - summaryWithTools, - }) - } - - const runId = allocateRunId(ctx.state) - - for (const { plan, summaryWithTools } of preparedPlans) { - const blockId = allocateBlockId(ctx.state) - const storedSummary = wrapCompressedSummary(blockId, summaryWithTools) - const summaryTokens = countTokens(storedSummary) - - applyCompressionState( - ctx.state, - { - topic: plan.entry.topic, - batchTopic: input.topic, - startId: plan.entry.messageId, - endId: plan.entry.messageId, - mode: "message", - runId, - compressMessageId: toolCtx.messageID, - compressCallId: callId, - summaryTokens, - }, - plan.selection, - plan.anchorMessageId, - blockId, - storedSummary, - [], - ) - - notifications.push({ - blockId, - runId, - summary: summaryWithTools, - summaryTokens, - }) - } - - await finalizeSession(ctx, toolCtx, rawMessages, notifications, input.topic) - - return formatResult(plans.length, skippedIssues, skippedCount) + throw new Error("Failed to compress messages after rebase retries") }, }) } diff --git a/lib/compress/pipeline.ts b/lib/compress/pipeline.ts index 5f9875e6..31d573ed 100644 --- a/lib/compress/pipeline.ts +++ b/lib/compress/pipeline.ts @@ -1,10 +1,12 @@ -import type { WithParts } from "../state" +import type { Logger } from "../logger" +import type { SessionState, WithParts } from "../state" import { ensureSessionInitialized } from "../state" -import { saveSessionState } from "../state/persistence" +import { loadSessionState, saveSessionState } from "../state/persistence" +import { loadPruneMessagesState } from "../state/utils" import { assignMessageRefs } from "../message-ids" import { isIgnoredUserMessage } from "../messages/query" import { deduplicate, purgeErrors } from "../strategies" -import { getCurrentParams, getCurrentTokenUsage } from "../token-utils" +import { getCurrentParams } from "../token-utils" import { sendCompressNotification } from "../ui/notification" import type { ToolContext } from "./types" import { buildSearchContext, fetchSessionMessages } from "./search" @@ -34,6 +36,19 @@ export interface PreparedSession { searchContext: SearchContext } +interface PlannedCompression { + selection: { + requiredBlockIds: number[] + } +} + +export class RebaseConflict extends Error { + constructor(message: string) { + super(message) + this.name = "RebaseConflict" + } +} + export async function prepareSession( ctx: ToolContext, toolCtx: RunContext, @@ -76,17 +91,80 @@ export async function prepareSession( } } -export async function finalizeSession( +export async function reloadLatestState( + state: SessionState, + sessionId: string, + logger: Logger, +): Promise { + if (!state.sessionId) { + state.sessionId = sessionId + } + + const latest = await loadSessionState(state.sessionId, logger) + if (!latest?.prune?.messages) { + return + } + + const latestMessages = loadPruneMessagesState(latest.prune.messages) + + // Merge only the persisted compression block graph and its derived indexes. The + // in-memory messageIds map was built by prepareSession for the current raw + // message set and must not be replaced by the on-disk snapshot. + state.prune.messages.blocksById = latestMessages.blocksById + state.prune.messages.activeBlockIds = latestMessages.activeBlockIds + state.prune.messages.activeByAnchorMessageId = latestMessages.activeByAnchorMessageId + state.prune.messages.nextBlockId = latestMessages.nextBlockId + state.prune.messages.nextRunId = latestMessages.nextRunId +} + +export function rebasePlannedCompression( + plans: PlannedCompression[], + latestState: SessionState, +): void { + const inactiveBlockIds = new Set() + for (const plan of plans) { + for (const blockId of plan.selection.requiredBlockIds) { + const block = latestState.prune.messages.blocksById.get(blockId) + if (!block?.active) { + inactiveBlockIds.add(blockId) + } + } + } + + if (inactiveBlockIds.size > 0) { + throw new RebaseConflict( + `Planned compression consumed inactive block IDs: ${Array.from(inactiveBlockIds) + .sort((left, right) => left - right) + .map((blockId) => `b${blockId}`) + .join(", ")}`, + ) + } +} + +export async function persistCompressionState( + state: SessionState, + sessionId: string, + logger: Logger, +): Promise { + if (state.sessionId !== sessionId) { + logger.warn("Persisting compression state for unexpected session", { + expectedSessionId: sessionId, + stateSessionId: state.sessionId, + }) + } + + state.manualMode = state.manualMode ? "active" : false + applyPendingCompressionDurations(state) + await saveSessionState(state, logger) +} + +export async function sendCompressionNotification( ctx: ToolContext, toolCtx: RunContext, rawMessages: WithParts[], entries: NotificationEntry[], batchTopic: string | undefined, ): Promise { - ctx.state.manualMode = ctx.state.manualMode ? "active" : false - applyPendingCompressionDurations(ctx.state) - await saveSessionState(ctx.state, ctx.logger) - const params = getCurrentParams(ctx.state, rawMessages, ctx.logger) const sessionMessageIds = rawMessages .filter((msg) => !isIgnoredUserMessage(msg)) diff --git a/lib/compress/protected-content.ts b/lib/compress/protected-content.ts index 45b27355..2539a732 100644 --- a/lib/compress/protected-content.ts +++ b/lib/compress/protected-content.ts @@ -13,6 +13,27 @@ import { import { fetchSessionMessages } from "./search" import type { SearchContext, SelectionResolution } from "./types" +const MAX_PROTECTED_TOOL_OUTPUT_CHARS = 20_000 +const PROTECTED_TOOL_OUTPUT_HEAD_CHARS = 2_000 + +export function truncateProtectedToolOutput(tool: string, output: string): string { + if (output.length <= MAX_PROTECTED_TOOL_OUTPUT_CHARS) { + return output + } + + const head = output.slice(0, PROTECTED_TOOL_OUTPUT_HEAD_CHARS) + const tail = output.slice( + output.length - (MAX_PROTECTED_TOOL_OUTPUT_CHARS - PROTECTED_TOOL_OUTPUT_HEAD_CHARS), + ) + const omitted = output.length - head.length - tail.length + + return [ + head, + `[Protected ${tool} output truncated by DCP: ${omitted} characters omitted. Use the original tool/session artifact if exact details are needed.]`, + tail, + ].join("\n\n") +} + export function appendProtectedUserMessages( summary: string, selection: SelectionResolution, @@ -192,6 +213,7 @@ export async function appendProtectedTools( } if (output) { + output = truncateProtectedToolOutput(part.tool, output) protectedOutputs.push(`\n### ${title}\n${output}`) } } diff --git a/lib/compress/range-utils.ts b/lib/compress/range-utils.ts index 7aa8dbc9..6c2647c4 100644 --- a/lib/compress/range-utils.ts +++ b/lib/compress/range-utils.ts @@ -11,6 +11,13 @@ import type { const BLOCK_PLACEHOLDER_REGEX = /\(b(\d+)\)|\{block_(\d+)\}/gi +export class ValidationError extends Error { + constructor(message: string) { + super(message) + this.name = "ValidationError" + } +} + export function validateArgs(args: CompressRangeToolArgs): void { if (typeof args.topic !== "string" || args.topic.trim().length === 0) { throw new Error("topic is required and must be a non-empty string") @@ -104,8 +111,8 @@ export function parseBlockPlaceholders(summary: string): ParsedBlockPlaceholder[ const placeholders: ParsedBlockPlaceholder[] = [] const regex = new RegExp(BLOCK_PLACEHOLDER_REGEX) - let match: RegExpExecArray | null - while ((match = regex.exec(summary)) !== null) { + let match: RegExpExecArray | null = regex.exec(summary) + while (match !== null) { const full = match[0] const blockIdPart = match[1] || match[2] const parsed = Number.parseInt(blockIdPart, 10) @@ -119,6 +126,7 @@ export function parseBlockPlaceholders(summary: string): ParsedBlockPlaceholder[ startIndex: match.index, endIndex: match.index + full.length, }) + match = regex.exec(summary) } return placeholders @@ -127,6 +135,7 @@ export function parseBlockPlaceholders(summary: string): ParsedBlockPlaceholder[ export function validateSummaryPlaceholders( placeholders: ParsedBlockPlaceholder[], requiredBlockIds: number[], + newBlockId: number, startReference: BoundaryReference, endReference: BoundaryReference, summaryByBlockId: Map, @@ -151,14 +160,31 @@ export function validateSummaryPlaceholders( const validPlaceholders: ParsedBlockPlaceholder[] = [] for (const placeholder of placeholders) { - const isKnown = summaryByBlockId.has(placeholder.blockId) - const isRequired = requiredSet.has(placeholder.blockId) - const isDuplicate = keptPlaceholderIds.has(placeholder.blockId) + const placeholderId = placeholder.blockId + + if (placeholderId === undefined) { + continue + } - if (isKnown && isRequired && !isDuplicate) { - validPlaceholders.push(placeholder) - keptPlaceholderIds.add(placeholder.blockId) + if (placeholderId === newBlockId) { + throw new ValidationError("self-ref") + } + + if (placeholderId >= newBlockId) { + throw new ValidationError("forward-ref") + } + + if (!summaryByBlockId.has(placeholderId)) { + console.warn(`Compressed block placeholder missing from summary map: (b${placeholderId})`) + continue } + + if (!requiredSet.has(placeholderId) || keptPlaceholderIds.has(placeholderId)) { + continue + } + + validPlaceholders.push(placeholder) + keptPlaceholderIds.add(placeholderId) } placeholders.length = 0 @@ -173,6 +199,7 @@ export function injectBlockPlaceholders( summaryByBlockId: Map, startReference: BoundaryReference, endReference: BoundaryReference, + consumedBlockIds: Set, ): InjectedSummaryResult { let cursor = 0 let expanded = summary @@ -188,7 +215,9 @@ export function injectBlockPlaceholders( } expanded += summary.slice(cursor, placeholder.startIndex) - expanded += restoreSummary(target.summary) + expanded += consumedBlockIds.has(placeholder.blockId) + ? `(b${placeholder.blockId}) — existing compressed block [topic: "${target.topic || "untitled"}"] — preserve this token exactly, do not expand or paraphrase` + : `(b${placeholder.blockId}) — preserved compressed block — do not paraphrase or replace` cursor = placeholder.endIndex if (!consumedSeen.has(placeholder.blockId)) { @@ -205,6 +234,7 @@ export function injectBlockPlaceholders( startReference, "start", summaryByBlockId, + consumedBlockIds, consumed, consumedSeen, ) @@ -213,6 +243,7 @@ export function injectBlockPlaceholders( endReference, "end", summaryByBlockId, + consumedBlockIds, consumed, consumedSeen, ) @@ -243,7 +274,9 @@ export function appendMissingBlockSummaries( throw new Error(`Compressed block not found: (b${blockId})`) } - missingSummaries.push(`\n### (b${blockId})\n${restoreSummary(target.summary)}`) + missingSummaries.push( + `\n### (b${blockId})\n(b${blockId}) — existing compressed block [topic: "${target.topic || "untitled"}"] — preserve this token exactly, do not expand or paraphrase`, + ) consumedSeen.add(blockId) consumed.push(blockId) } @@ -264,24 +297,12 @@ export function appendMissingBlockSummaries( } } -function restoreSummary(summary: string): string { - const headerMatch = summary.match(/^\s*\[Compressed conversation(?: section)?(?: b\d+)?\]/i) - if (!headerMatch) { - return summary - } - - const afterHeader = summary.slice(headerMatch[0].length) - const withoutLeadingBreaks = afterHeader.replace(/^(?:\r?\n)+/, "") - return withoutLeadingBreaks - .replace(/(?:\r?\n)*b\d+<\/dcp-message-id>\s*$/i, "") - .replace(/(?:\r?\n)+$/, "") -} - function injectBoundarySummary( summary: string, reference: BoundaryReference, position: "start" | "end", summaryByBlockId: Map, + consumedBlockIds: Set, consumed: number[], consumedSeen: Set, ): string { @@ -297,7 +318,9 @@ function injectBoundarySummary( throw new Error(`Compressed block not found: (b${reference.blockId})`) } - const injectedBody = restoreSummary(target.summary) + const injectedBody = consumedBlockIds.has(reference.blockId) + ? `(b${reference.blockId}) — existing compressed block [topic: "${target.topic || "untitled"}"] — preserve this token exactly, do not expand or paraphrase` + : `(b${reference.blockId}) — preserved compressed block — do not paraphrase or replace` const left = position === "start" ? injectedBody.trim() : summary.trim() const right = position === "start" ? summary.trim() : injectedBody.trim() const next = !left ? right : !right ? left : `${left}\n\n${right}` diff --git a/lib/compress/range.ts b/lib/compress/range.ts index d320be89..2b4a3c67 100644 --- a/lib/compress/range.ts +++ b/lib/compress/range.ts @@ -1,8 +1,17 @@ import { tool } from "@opencode-ai/plugin" import type { ToolContext } from "./types" -import { countTokens } from "../token-utils" +import { renderBlockForContext, type BlockLike } from "./renderer" import { RANGE_FORMAT_EXTENSION } from "../prompts/extensions/tool" -import { finalizeSession, prepareSession, type NotificationEntry } from "./pipeline" +import { withSessionLock } from "../state/lock" +import { + RebaseConflict, + persistCompressionState, + prepareSession, + rebasePlannedCompression, + reloadLatestState, + sendCompressionNotification, + type NotificationEntry, +} from "./pipeline" import { appendProtectedPromptInfo, appendProtectedTools, @@ -17,15 +26,19 @@ import { validateNonOverlapping, validateSummaryPlaceholders, } from "./range-utils" +import { assertUsefulCompressedSummary, estimateSelectedTokens } from "./summary-limits" import { COMPRESSED_BLOCK_HEADER, - allocateBlockId, allocateRunId, applyCompressionState, + previewBlockIds, + reserveBlockIds, wrapCompressedSummary, } from "./state" import type { CompressRangeToolArgs } from "./types" +const MAX_REBASE_ATTEMPTS = 3 + function buildSchema() { return { topic: tool.schema @@ -68,125 +81,216 @@ export function createCompressRangeTool(ctx: ToolContext): ReturnType = [] - let totalCompressedMessages = 0 - - for (const plan of resolvedPlans) { - const parsedPlaceholders = parseBlockPlaceholders(plan.entry.summary) - const missingBlockIds = validateSummaryPlaceholders( - parsedPlaceholders, - plan.selection.requiredBlockIds, - plan.selection.startReference, - plan.selection.endReference, - searchContext.summaryByBlockId, - ) - - const injected = injectBlockPlaceholders( - plan.entry.summary, - parsedPlaceholders, - searchContext.summaryByBlockId, - plan.selection.startReference, - plan.selection.endReference, - ) - - const summaryWithUsers = appendProtectedUserMessages( - injected.expandedSummary, - plan.selection, - searchContext, - ctx.state, - ctx.config.compress.protectUserMessages, - ) - - const summaryWithPromptInfo = appendProtectedPromptInfo( - summaryWithUsers, - plan.selection, - searchContext, - ctx.state, - ctx.config.compress.protectTags, - ) - - const summaryWithTools = await appendProtectedTools( - ctx.client, - ctx.state, - ctx.config.experimental.allowSubAgents, - summaryWithPromptInfo, - plan.selection, - searchContext, - ctx.config.compress.protectedTools, - ctx.config.protectedFilePatterns, - ) - - const completedSummary = appendMissingBlockSummaries( - summaryWithTools, - missingBlockIds, - searchContext.summaryByBlockId, - injected.consumedBlockIds, - ) - - preparedPlans.push({ - entry: plan.entry, - selection: plan.selection, - anchorMessageId: plan.anchorMessageId, - finalSummary: completedSummary.expandedSummary, - consumedBlockIds: completedSummary.consumedBlockIds, - }) - } + for (let attempt = 1; attempt <= MAX_REBASE_ATTEMPTS; attempt++) { + try { + const { rawMessages, searchContext } = await prepareSession( + ctx, + toolCtx, + `Compress Range: ${input.topic}`, + ) + const resolvedPlans = resolveRanges(input, searchContext, ctx.state) + validateNonOverlapping(resolvedPlans) - const runId = allocateRunId(ctx.state) - - for (const preparedPlan of preparedPlans) { - const blockId = allocateBlockId(ctx.state) - const storedSummary = wrapCompressedSummary(blockId, preparedPlan.finalSummary) - const summaryTokens = countTokens(storedSummary) - - const applied = applyCompressionState( - ctx.state, - { - topic: input.topic, - batchTopic: input.topic, - startId: preparedPlan.entry.startId, - endId: preparedPlan.entry.endId, - mode: "range", - runId, - compressMessageId: toolCtx.messageID, - compressCallId: callId, - summaryTokens, - }, - preparedPlan.selection, - preparedPlan.anchorMessageId, - blockId, - storedSummary, - preparedPlan.consumedBlockIds, - ) - - totalCompressedMessages += applied.messageIds.length - - notifications.push({ - blockId, - runId, - summary: preparedPlan.finalSummary, - summaryTokens, - }) - } + const result = await withSessionLock(toolCtx.sessionID, async () => { + await reloadLatestState(ctx.state, toolCtx.sessionID, ctx.logger) + rebasePlannedCompression(resolvedPlans, ctx.state) + + const blockIds = previewBlockIds(ctx.state, resolvedPlans.length) + const notifications: NotificationEntry[] = [] + const preparedPlans: Array<{ + entry: (typeof resolvedPlans)[number]["entry"] + selection: (typeof resolvedPlans)[number]["selection"] + anchorMessageId: string + finalSummary: string + consumedBlockIds: number[] + }> = [] + let totalCompressedMessages = 0 + + for (const [index, plan] of resolvedPlans.entries()) { + const parsedPlaceholders = parseBlockPlaceholders(plan.entry.summary) + const missingBlockIds = validateSummaryPlaceholders( + parsedPlaceholders, + plan.selection.requiredBlockIds, + blockIds[index] ?? 0, + plan.selection.startReference, + plan.selection.endReference, + searchContext.summaryByBlockId, + ) + + const injected = injectBlockPlaceholders( + plan.entry.summary, + parsedPlaceholders, + searchContext.summaryByBlockId, + plan.selection.startReference, + plan.selection.endReference, + new Set(plan.selection.requiredBlockIds), + ) + + const summaryWithUsers = appendProtectedUserMessages( + injected.expandedSummary, + plan.selection, + searchContext, + ctx.state, + ctx.config.compress.protectUserMessages, + ) + + const summaryWithPromptInfo = appendProtectedPromptInfo( + summaryWithUsers, + plan.selection, + searchContext, + ctx.state, + ctx.config.compress.protectTags, + ) + + const summaryWithTools = await appendProtectedTools( + ctx.client, + ctx.state, + ctx.config.experimental.allowSubAgents, + summaryWithPromptInfo, + plan.selection, + searchContext, + ctx.config.compress.protectedTools, + ctx.config.protectedFilePatterns, + ) + + const completedSummary = appendMissingBlockSummaries( + summaryWithTools, + missingBlockIds, + searchContext.summaryByBlockId, + injected.consumedBlockIds, + ) + + preparedPlans.push({ + entry: plan.entry, + selection: plan.selection, + anchorMessageId: plan.anchorMessageId, + finalSummary: completedSummary.expandedSummary, + consumedBlockIds: completedSummary.consumedBlockIds, + }) + } - await finalizeSession(ctx, toolCtx, rawMessages, notifications, input.topic) + const validatedPlans = preparedPlans.map((preparedPlan, index) => { + const blockId = blockIds[index] + if (blockId === undefined) { + throw new Error("Failed to preview compression block ID") + } + + const consumedBlocks: Array<{ + id: number + summary: string + schemaVersion?: number + }> = [] + for (const consumedId of preparedPlan.consumedBlockIds) { + const consumed = searchContext.summaryByBlockId.get(consumedId) + if (!consumed) { + continue + } + consumedBlocks.push({ + id: consumedId, + summary: consumed.summary, + schemaVersion: consumed.schemaVersion, + }) + } + + const wrapResult = wrapCompressedSummary({ + blockId, + modelSummary: preparedPlan.finalSummary, + consumedBlocks, + blocksById: searchContext.summaryByBlockId, + mode: "range", + }) + const { storedSummary, refBlockIds } = wrapResult + + // Phase 0 Contract A: render the draft block against a draft Map so we can + // measure tokens WITHOUT mutating state.prune.messages.blocksById. The draft + // is committed only after assertUsefulCompressedSummary passes and + // reserveBlockIds/applyCompressionState succeed below. + // wrapCompressedSummary returns draftBlock already populated with refBlockIds + // and schemaVersion: 2 from the exact-substring dedup pass (T8); summaryTokens + // remains 0 until renderBlockForContext fills it in here (T12 prep). + const draftBlock: BlockLike & { summaryTokens: number } = + wrapResult.draftBlock + const draftBlocksById = new Map( + ctx.state.prune.messages.blocksById, + ) + draftBlocksById.set(blockId, draftBlock) + const { renderedTokens } = renderBlockForContext(blockId, draftBlocksById) + draftBlock.summaryTokens = renderedTokens + const summaryTokens = renderedTokens + const selectedTokens = estimateSelectedTokens( + ctx.state, + preparedPlan.selection, + preparedPlan.consumedBlockIds, + ) + assertUsefulCompressedSummary(summaryTokens, selectedTokens) + + return { + ...preparedPlan, + blockId, + storedSummary, + refBlockIds, + summaryTokens, + } + }) + + reserveBlockIds(ctx.state, validatedPlans.length) + const runId = allocateRunId(ctx.state) + + for (const preparedPlan of validatedPlans) { + const applied = applyCompressionState( + ctx.state, + { + topic: input.topic, + batchTopic: input.topic, + startId: preparedPlan.entry.startId, + endId: preparedPlan.entry.endId, + mode: "range", + runId, + compressMessageId: toolCtx.messageID, + compressCallId: callId, + summaryTokens: preparedPlan.summaryTokens, + refBlockIds: preparedPlan.refBlockIds, + }, + preparedPlan.selection, + preparedPlan.anchorMessageId, + preparedPlan.blockId, + preparedPlan.storedSummary, + preparedPlan.consumedBlockIds, + ) + + totalCompressedMessages += applied.messageIds.length + + notifications.push({ + blockId: preparedPlan.blockId, + runId, + summary: preparedPlan.finalSummary, + summaryTokens: preparedPlan.summaryTokens, + }) + } + + await persistCompressionState(ctx.state, toolCtx.sessionID, ctx.logger) + + return { notifications, totalCompressedMessages } + }) + + await sendCompressionNotification( + ctx, + toolCtx, + rawMessages, + result.notifications, + input.topic, + ) + + return `Compressed ${result.totalCompressedMessages} messages into ${COMPRESSED_BLOCK_HEADER}.` + } catch (error) { + if (error instanceof RebaseConflict && attempt < MAX_REBASE_ATTEMPTS) { + continue + } + throw error + } + } - return `Compressed ${totalCompressedMessages} messages into ${COMPRESSED_BLOCK_HEADER}.` + throw new Error("Failed to compress range after rebase retries") }, }) } diff --git a/lib/compress/renderer.ts b/lib/compress/renderer.ts new file mode 100644 index 00000000..08e3c9c0 --- /dev/null +++ b/lib/compress/renderer.ts @@ -0,0 +1,99 @@ +import { extractBlockPlaceholders, formatBlockPlaceholder } from "../message-ids" +import { countTokens } from "../token-utils" + +export class CycleError extends Error { + constructor(message: string) { + super(message) + this.name = "CycleError" + } +} + +export type BlockLike = { + summary: string + refBlockIds?: number[] + schemaVersion?: number +} + +export interface RenderContext { + expanding: Set + renderedOnce: Set +} + +export function renderBlockForContext( + blockId: number, + blocksById: Map | ReadonlyMap, + ctx?: RenderContext, +): { text: string; renderedTokens: number } { + const isTopLevel = ctx === undefined + const renderCtx: RenderContext = ctx ?? { + expanding: new Set(), + renderedOnce: new Set(), + } + + const text = renderInner(blockId, blocksById, renderCtx) + + return { + text, + renderedTokens: isTopLevel ? countTokens(text) : 0, + } +} + +function renderInner( + blockId: number, + blocksById: Map | ReadonlyMap, + ctx: RenderContext, +): string { + // Diamond dedup: a block that has already been fully expanded earlier in this + // top-level render is collapsed to a marker so its content is not duplicated. + if (ctx.renderedOnce.has(blockId)) { + return `${formatBlockPlaceholder(blockId)} [already expanded above]` + } + + // Cycle detection: a block currently on the expansion call stack means the + // DAG contains a true cycle. Forward-ref validation (T4) should prevent this + // at write time; throwing here is defensive against data corruption. + if (ctx.expanding.has(blockId)) { + throw new CycleError( + `Cycle detected in compression block DAG at blockId ${blockId}`, + ) + } + + const block = blocksById.get(blockId) + if (!block) { + return `${formatBlockPlaceholder(blockId)} [not found]` + } + + // Legacy v1 block: no DAG refs to expand, summary is returned verbatim. + if (block.refBlockIds === undefined) { + ctx.renderedOnce.add(blockId) + return block.summary + } + + ctx.expanding.add(blockId) + try { + // v2 contract: refBlockIds is the authoritative allowlist of structural DAG + // children. Prose mentions of (bN) in summary text are NOT placeholders. + // Filter actual textual occurrences against the allowlist BEFORE recursion to + // avoid renderedOnce side-effects polluting siblings that legitimately ref a + // child whose placeholder happens to be absent from this block's summary. + let result = block.summary + const refIds = new Set(block.refBlockIds) + const placeholderIds = extractBlockPlaceholders(block.summary).filter((id) => + refIds.has(id), + ) + const seen = new Set() + for (const refId of placeholderIds) { + if (seen.has(refId)) { + continue + } + seen.add(refId) + const childText = renderInner(refId, blocksById, ctx) + const placeholder = formatBlockPlaceholder(refId) + result = result.split(placeholder).join(childText) + } + return result + } finally { + ctx.expanding.delete(blockId) + ctx.renderedOnce.add(blockId) + } +} diff --git a/lib/compress/search.ts b/lib/compress/search.ts index 2cc16cf6..870bb92d 100644 --- a/lib/compress/search.ts +++ b/lib/compress/search.ts @@ -221,9 +221,6 @@ function buildBoundaryLookup( if (!rawMessage) { continue } - if (isIgnoredUserMessage(rawMessage)) { - continue - } const rawIndex = context.rawIndexById.get(messageId) if (rawIndex === undefined) { @@ -244,9 +241,6 @@ function buildBoundaryLookup( if (!anchorMessage) { continue } - if (isIgnoredUserMessage(anchorMessage)) { - continue - } const rawIndex = context.rawIndexById.get(summary.anchorMessageId) if (rawIndex === undefined) { diff --git a/lib/compress/state.ts b/lib/compress/state.ts index 5672b1c1..00d5f074 100644 --- a/lib/compress/state.ts +++ b/lib/compress/state.ts @@ -1,20 +1,48 @@ -import type { CompressionBlock, PruneMessagesState, SessionState } from "../state" +import { validateBlockRefs } from "./dag" +import { COMPRESSED_BLOCK_HEADER, deduplicateBlockContent, stripCompactMarkers } from "./dedup" import { formatBlockRef, formatMessageIdTag } from "../message-ids" import type { AppliedCompressionResult, CompressionStateInput, SelectionResolution } from "./types" +import type { CompressionBlock, PruneMessagesState, SessionState } from "../state" -export const COMPRESSED_BLOCK_HEADER = "[Compressed conversation section]" +export { COMPRESSED_BLOCK_HEADER } -export function allocateBlockId(state: SessionState): number { +function nextBlockId(state: SessionState): number { const next = state.prune.messages.nextBlockId if (!Number.isInteger(next) || next < 1) { - state.prune.messages.nextBlockId = 2 return 1 } - state.prune.messages.nextBlockId = next + 1 return next } +export function previewBlockIds(state: SessionState, count: number): number[] { + if (!Number.isInteger(count) || count < 0) { + throw new Error(`Invalid block reservation count: ${count}`) + } + if (count === 0) { + return [] + } + + const first = nextBlockId(state) + return Array.from({ length: count }, (_, index) => first + index) +} + +export function reserveBlockIds(state: SessionState, count: number): number[] { + const ids = previewBlockIds(state, count) + if (ids.length > 0) { + state.prune.messages.nextBlockId = ids[ids.length - 1] + 1 + } + return ids +} + +export function allocateBlockId(state: SessionState): number { + const [blockId] = reserveBlockIds(state, 1) + if (blockId === undefined) { + throw new Error("Failed to allocate compression block ID") + } + return blockId +} + export function allocateRunId(state: SessionState): number { const next = state.prune.messages.nextRunId if (!Number.isInteger(next) || next < 1) { @@ -49,7 +77,13 @@ export function attachCompressionDuration( return updates } -export function wrapCompressedSummary(blockId: number, summary: string): string { +/** + * Wrap a body string in the standard [Compressed conversation section] header + * and dcp-message-id boundary footer used for stored block summaries. + * Primitive used by wrapCompressedSummary and by tests that need to construct a + * stored-form summary directly without dedup. + */ +export function wrapBlockSummary(blockId: number, summary: string): string { const header = COMPRESSED_BLOCK_HEADER const footer = formatMessageIdTag(formatBlockRef(blockId)) const body = summary.trim() @@ -59,6 +93,93 @@ export function wrapCompressedSummary(blockId: number, summary: string): string return `${header}\n${body}\n\n${footer}` } +export interface WrapCompressedSummaryArgs { + blockId: number + modelSummary: string + consumedBlocks: ReadonlyArray<{ + id: number + summary: string + schemaVersion?: number + }> + blocksById: ReadonlyMap< + number, + { summary: string; refBlockIds?: number[]; schemaVersion?: number } + > + mode: "range" | "message" +} + +export interface WrapCompressedSummaryDraftBlock { + summary: string + refBlockIds: number[] + schemaVersion: number + summaryTokens: number +} + +export interface WrapCompressedSummaryResult { + storedSummary: string + refBlockIds: number[] + draftBlock: WrapCompressedSummaryDraftBlock +} + +/** + * Build the stored summary for a newly created compression block. + * + * Pipeline: + * 1. stripCompactMarkers (lib/compress/dedup.ts) removes any prompt-only + * marker text the model may have parroted, leaving only bare `(bN)` + * refs. + * 2. deduplicateBlockContent (lib/compress/dedup.ts) performs exact- + * substring dedup of consumed block bodies inside the cleaned summary, + * plus a defensive rendered-content leak check (T8 step 5). + * 3. wrapBlockSummary frames the result with the standard + * [Compressed conversation section] header and dcp-message-id footer. + * + * Phase 0 Contract E: returns { storedSummary, refBlockIds, draftBlock }. + * draftBlock.summaryTokens is left at 0 here; T12 builds a draft Map, calls + * renderBlockForContext, and sets the real token count. + * + * blocksById and mode are accepted as part of the contract for future + * expansion (DAG-wide validation, mode-specific framing); the dedup itself + * uses blocksById to render consumed blocks for the leak check. + */ +export function wrapCompressedSummary( + args: WrapCompressedSummaryArgs, +): WrapCompressedSummaryResult { + const { blockId, modelSummary, consumedBlocks, blocksById } = args + + // Strip any compact marker text the model may have parroted from the + // compression prompt (e.g. `(bN) — existing compressed block [topic: + // "..."] — preserve this token exactly, ...`). Stored block summaries + // must contain only bare `(bN)` refs (Oracle Round 3 gap 1: storage vs + // prompt separation). stripCompactMarkers is anchored on the literal + // marker template generated in range-utils.ts so unrelated text that + // happens to mention (bN) is untouched. + const cleaned = stripCompactMarkers(modelSummary) + + // Exact-substring + rendered-content dedup of consumed block content, + // greedy longest-body-first so a short body that happens to be a + // substring of a longer body does not pre-empt the longer match. + // Each consumed block is replaced at most once. + const { deduped, refBlockIds } = deduplicateBlockContent( + cleaned, + consumedBlocks, + blocksById, + ) + + const storedSummary = wrapBlockSummary(blockId, deduped) + + return { + storedSummary, + refBlockIds, + draftBlock: { + summary: storedSummary, + refBlockIds, + schemaVersion: 2, + summaryTokens: 0, + }, + } +} + export function applyCompressionState( state: SessionState, input: CompressionStateInput, @@ -109,6 +230,9 @@ export function applyCompressionState( } const createdAt = Date.now() + if (input.refBlockIds !== undefined) { + validateBlockRefs(blockId, input.refBlockIds, messagesState.blocksById) + } const block: CompressionBlock = { blockId, runId: input.runId, @@ -135,6 +259,10 @@ export function applyCompressionState( createdAt, summary, } + if (input.refBlockIds !== undefined) { + block.refBlockIds = input.refBlockIds + block.schemaVersion = 2 + } messagesState.blocksById.set(blockId, block) messagesState.activeBlockIds.add(blockId) diff --git a/lib/compress/summary-limits.ts b/lib/compress/summary-limits.ts new file mode 100644 index 00000000..1438455b --- /dev/null +++ b/lib/compress/summary-limits.ts @@ -0,0 +1,48 @@ +import type { SelectionResolution } from "./types" +import type { SessionState } from "../state" + +const MIN_SELECTED_TOKENS_FOR_RATIO_CHECK = 10_000 + +export function estimateSelectedTokens( + state: SessionState, + selection: SelectionResolution, + consumedBlockIds: number[] = [], +): number { + let total = 0 + const consumedMessageIds = new Set() + + for (const blockId of consumedBlockIds) { + const block = state.prune.messages.blocksById.get(blockId) + if (!block) { + continue + } + for (const messageId of block.effectiveMessageIds) { + consumedMessageIds.add(messageId) + } + } + + for (const [messageId, tokenCount] of selection.messageTokenById) { + if (!consumedMessageIds.has(messageId)) { + total += tokenCount + } + } + + for (const blockId of consumedBlockIds) { + const block = state.prune.messages.blocksById.get(blockId) + if (block) { + total += block.summaryTokens + } + } + + return total +} + +export function assertUsefulCompressedSummary(summaryTokens: number, selectedTokens: number): void { + // Both sides of this comparison use rendered token semantics for v2 blocks. + // summaryTokens is set to renderBlockForContext().renderedTokens at creation time (T12). + if (selectedTokens >= MIN_SELECTED_TOKENS_FOR_RATIO_CHECK && summaryTokens >= selectedTokens) { + throw new Error( + `Compression summary is not smaller than the selected content (${summaryTokens} >= ${selectedTokens} tokens). Retry with a concise summary.`, + ) + } +} diff --git a/lib/compress/types.ts b/lib/compress/types.ts index f0eb5d0c..640f5bf1 100644 --- a/lib/compress/types.ts +++ b/lib/compress/types.ts @@ -105,4 +105,5 @@ export interface CompressionStateInput { compressMessageId: string compressCallId?: string summaryTokens: number + refBlockIds?: number[] } diff --git a/lib/hooks.ts b/lib/hooks.ts index 2513596a..dc37a076 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -131,7 +131,10 @@ export function createChatMessageTransformHandler( stripHallucinations(output.messages) cacheSystemPromptTokens(state, output.messages) - assignMessageRefs(state, output.messages) + const assigned = assignMessageRefs(state, output.messages) + if (assigned > 0) { + await saveSessionState(state, logger) + } syncCompressionBlocks(state, logger, output.messages) syncToolCache(state, config, logger, output.messages) buildToolIdList(state, output.messages) diff --git a/lib/message-ids.ts b/lib/message-ids.ts index da003999..c890f5d6 100644 --- a/lib/message-ids.ts +++ b/lib/message-ids.ts @@ -90,6 +90,39 @@ export function parseBoundaryId(id: string): ParsedBoundaryId | null { return null } +export function formatBlockPlaceholder(blockId: number): string { + if (!Number.isInteger(blockId) || blockId < 0) { + throw new Error(`Invalid block placeholder ID: ${blockId}`) + } + + return `(b${blockId})` +} + +export function parseBlockPlaceholder(token: string): number | null { + const normalized = token.trim().toLowerCase() + const match = normalized.match(/^\(b(\d+)\)$/) + if (!match) { + return null + } + + const blockId = Number.parseInt(match[1], 10) + return Number.isInteger(blockId) ? blockId : null +} + +export function extractBlockPlaceholders(text: string): number[] { + const matches = text.matchAll(/\(b(\d+)\)/g) + const blockIds: number[] = [] + + for (const match of matches) { + const blockId = Number.parseInt(match[1], 10) + if (Number.isInteger(blockId)) { + blockIds.push(blockId) + } + } + + return blockIds +} + function escapeXmlAttribute(value: string): string { return value .replace(/&/g, "&") diff --git a/lib/messages/prune.ts b/lib/messages/prune.ts index 444cdf86..ca159cc1 100644 --- a/lib/messages/prune.ts +++ b/lib/messages/prune.ts @@ -1,6 +1,7 @@ import type { SessionState, WithParts } from "../state" import type { Logger } from "../logger" import type { PluginConfig } from "../config" +import { renderBlockForContext, type BlockLike } from "../compress/renderer" import { isMessageCompacted } from "../state/utils" import { createSyntheticUserMessage, replaceBlockIdsWithBlocked } from "./utils" import { getLastUserMessage } from "./query" @@ -10,6 +11,8 @@ const PRUNED_TOOL_OUTPUT_REPLACEMENT = "[Output removed to save context - information superseded or no longer needed]" const PRUNED_TOOL_ERROR_INPUT_REPLACEMENT = "[input removed due to failed tool call]" const PRUNED_QUESTION_INPUT_REPLACEMENT = "[questions removed - see output for user's answers]" +const PRUNED_FILE_SNAPSHOT_REPLACEMENT = + "[full file snapshot removed to save context - diff metadata retained]" export const prune = ( state: SessionState, @@ -19,6 +22,7 @@ export const prune = ( ): void => { filterCompressedRanges(state, logger, config, messages) // pruneFullTool(state, logger, messages) + pruneToolMetadata(state, logger, messages) pruneToolOutputs(state, logger, messages) pruneToolInputs(state, logger, messages) pruneToolErrors(state, logger, messages) @@ -70,6 +74,40 @@ const pruneFullTool = (state: SessionState, logger: Logger, messages: WithParts[ } } +const pruneToolMetadata = (state: SessionState, logger: Logger, messages: WithParts[]): void => { + for (const msg of messages) { + if (isMessageCompacted(state, msg)) { + continue + } + + const parts = Array.isArray(msg.parts) ? msg.parts : [] + for (const part of parts) { + if (part.type !== "tool") { + continue + } + if (part.tool !== "edit" && part.tool !== "write") { + continue + } + if (!state.prune.tools.has(part.callID)) { + continue + } + + const toolState = part.state as { metadata?: { filediff?: Record } } + const filediff = toolState.metadata?.filediff + if (!filediff || typeof filediff !== "object") { + continue + } + + if (typeof filediff.before === "string") { + filediff.before = PRUNED_FILE_SNAPSHOT_REPLACEMENT + } + if (typeof filediff.after === "string") { + filediff.after = PRUNED_FILE_SNAPSHOT_REPLACEMENT + } + } + } +} + const pruneToolOutputs = (state: SessionState, logger: Logger, messages: WithParts[]): void => { for (const msg of messages) { if (isMessageCompacted(state, msg)) { @@ -178,8 +216,8 @@ const filterCompressedRanges = ( const blockId = state.prune.messages.activeByAnchorMessageId.get(msgId) const summary = blockId !== undefined ? state.prune.messages.blocksById.get(blockId) : undefined - if (summary) { - const rawSummaryContent = (summary as { summary?: unknown }).summary + if (blockId !== undefined && summary) { + const rawSummaryContent = (summary as BlockLike).summary if ( summary.active !== true || typeof rawSummaryContent !== "string" || @@ -195,11 +233,12 @@ const filterCompressedRanges = ( const userMessage = getLastUserMessage(messages, msgIndex) if (userMessage) { - const userInfo = userMessage.info as UserMessage + const renderedSummaryContent = + renderBlockForContext(blockId, state.prune.messages.blocksById).text const summaryContent = config.compress.mode === "message" - ? replaceBlockIdsWithBlocked(rawSummaryContent) - : rawSummaryContent + ? replaceBlockIdsWithBlocked(renderedSummaryContent) + : renderedSummaryContent const summarySeed = `${summary.blockId}:${summary.anchorMessageId}` result.push( createSyntheticUserMessage(userMessage, summaryContent, summarySeed), diff --git a/lib/messages/reasoning-strip.ts b/lib/messages/reasoning-strip.ts index d2c98620..6914df0d 100644 --- a/lib/messages/reasoning-strip.ts +++ b/lib/messages/reasoning-strip.ts @@ -1,10 +1,14 @@ import type { WithParts } from "../state" import { getLastUserMessage } from "./query" +const RELEVANT_TYPES = new Set(["text", "tool"]) + /** - * Mirrors opencode's differentModel handling by preserving part content while - * dropping provider metadata on assistant parts that came from a different - * model/provider than the current turn's user message. + * Drops stale provider metadata from assistant text/tool parts that came from a + * different model/provider than the current turn's user message. Reasoning + * parts pass through unchanged because opencode native handles reasoning-to-text + * conversion for different-model requests and Anthropic requires thinking block + * metadata to remain byte-for-byte intact. */ export function stripStaleMetadata(messages: WithParts[]): void { const lastUserMessage = getLastUserMessage(messages) @@ -25,7 +29,7 @@ export function stripStaleMetadata(messages: WithParts[]): void { } message.parts = message.parts.map((part) => { - if (part.type !== "text" && part.type !== "tool" && part.type !== "reasoning") { + if (!RELEVANT_TYPES.has(part.type)) { return part } diff --git a/lib/messages/sync.ts b/lib/messages/sync.ts index 9eca783b..f4b55432 100644 --- a/lib/messages/sync.ts +++ b/lib/messages/sync.ts @@ -1,6 +1,10 @@ import type { SessionState, WithParts } from "../state" import type { Logger } from "../logger" +export interface SyncCompressionBlocksOptions { + authoritative?: boolean +} + function sortBlocksByCreation( a: { createdAt: number; blockId: number }, b: { createdAt: number; blockId: number }, @@ -16,8 +20,10 @@ export const syncCompressionBlocks = ( state: SessionState, logger: Logger, messages: WithParts[], + options: SyncCompressionBlocksOptions = {}, ): void => { const messagesState = state.prune.messages + const authoritative = options.authoritative === true if (!messagesState?.blocksById?.size) { return } @@ -42,7 +48,7 @@ export const syncCompressionBlocks = ( block.compressMessageId.length > 0 && messageIds.has(block.compressMessageId) - if (!hasOriginMessage) { + if (!hasOriginMessage && authoritative) { block.active = false block.deactivatedAt = now block.deactivatedByBlockId = undefined @@ -59,6 +65,10 @@ export const syncCompressionBlocks = ( continue } + if (!hasOriginMessage && !block.active) { + continue + } + for (const consumedBlockId of block.consumedBlockIds) { if (!messagesState.activeBlockIds.has(consumedBlockId)) { continue @@ -85,7 +95,11 @@ export const syncCompressionBlocks = ( block.deactivatedAt = undefined block.deactivatedByBlockId = undefined messagesState.activeBlockIds.add(block.blockId) - if (messageIds.has(block.anchorMessageId)) { + const hasAnchorMessage = + typeof block.anchorMessageId === "string" && + block.anchorMessageId.length > 0 && + (authoritative ? messageIds.has(block.anchorMessageId) : true) + if (hasAnchorMessage) { messagesState.activeByAnchorMessageId.set(block.anchorMessageId, block.blockId) } } diff --git a/lib/state/lock.ts b/lib/state/lock.ts new file mode 100644 index 00000000..9897b033 --- /dev/null +++ b/lib/state/lock.ts @@ -0,0 +1,198 @@ +/** + * Cross-process session lock primitives for DCP. + * + * Serializes mutations to the persisted session state file at + * `${XDG_DATA_HOME || ~/.local/share}/opencode/storage/plugin/dcp/{sessionId}.json` + * by maintaining a sibling `{sessionId}.json.lock` file containing `{ pid, timestamp }`. + * + * Acquisition uses POSIX-atomic `open(path, "wx")`. On EEXIST, the holder's PID and + * timestamp are inspected to decide between waiting, taking over a dead process's + * lock, or taking over an alive-but-stale (>30s old) lock. + * + * Takeover precedence (strict order, evaluated on every retry): + * 1. Holder PID is dead (`process.kill(pid, 0)` throws ESRCH) -> immediate takeover. + * 2. Holder PID alive but `Date.now() - timestamp > 30_000` -> warn + takeover. + * 3. Otherwise -> `sleep(50ms)` + retry, up to 100 attempts (~5s total). + * + * After 100 attempts a `LockTimeoutError` is thrown. + * + * Per Phase 0 Contract C, only the state-mutation portion of compress should be wrapped + * by `withSessionLock`. Long-running operations (e.g. the model API call) MUST happen + * outside the critical section so the lock is never held across network IO. + */ + +import { mkdir, open, readFile, unlink } from "node:fs/promises" +import { homedir } from "node:os" +import { join } from "node:path" + +const STORAGE_DIR = join( + process.env.XDG_DATA_HOME || join(homedir(), ".local", "share"), + "opencode", + "storage", + "plugin", + "dcp", +) + +const STALE_LOCK_MS = 30_000 +const RETRY_DELAY_MS = 50 +const MAX_ACQUIRE_ATTEMPTS = 100 + +export class LockTimeoutError extends Error { + constructor(message: string) { + super(message) + this.name = "LockTimeoutError" + } +} + +export interface LockHandle { + sessionId: string + lockPath: string +} + +interface LockFileContent { + pid: number + timestamp: number +} + +function getLockPath(sessionId: string): string { + return join(STORAGE_DIR, `${sessionId}.json.lock`) +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function readLockFile(lockPath: string): Promise { + let content: string + try { + content = await readFile(lockPath, "utf-8") + } catch { + return null + } + let parsed: unknown + try { + parsed = JSON.parse(content) + } catch { + return null + } + if (!parsed || typeof parsed !== "object") { + return null + } + const candidate = parsed as Partial + if ( + typeof candidate.pid !== "number" || + !Number.isInteger(candidate.pid) || + typeof candidate.timestamp !== "number" || + !Number.isFinite(candidate.timestamp) + ) { + return null + } + return { pid: candidate.pid, timestamp: candidate.timestamp } +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0) + return true + } catch (err: any) { + if (err?.code === "ESRCH") { + return false + } + // EPERM (process exists but we lack permission) and any other error are treated + // as "alive" to avoid mistakenly taking over a live holder's lock. + return true + } +} + +async function safeUnlink(lockPath: string): Promise { + try { + await unlink(lockPath) + } catch (err: any) { + // Another process may have already removed the file; treat ENOENT as success. + if (err?.code !== "ENOENT") { + throw err + } + } +} + +async function ensureStorageDir(): Promise { + await mkdir(STORAGE_DIR, { recursive: true }) +} + +export async function acquireSessionLock(sessionId: string): Promise { + await ensureStorageDir() + + const lockPath = getLockPath(sessionId) + + for (let attempt = 0; attempt < MAX_ACQUIRE_ATTEMPTS; attempt++) { + try { + // "wx" is atomic on POSIX: fails with EEXIST if the path already exists. + const fileHandle = await open(lockPath, "wx") + try { + const content: LockFileContent = { + pid: process.pid, + timestamp: Date.now(), + } + await fileHandle.writeFile(JSON.stringify(content), "utf-8") + } finally { + await fileHandle.close() + } + return { sessionId, lockPath } + } catch (err: any) { + if (err?.code !== "EEXIST") { + throw err + } + } + + // EEXIST: a lock file is present (or appears to be). Decide what to do. + const existing = await readLockFile(lockPath) + + if (existing) { + if (!isProcessAlive(existing.pid)) { + // Holder process is dead -> immediate takeover, no delay. + await safeUnlink(lockPath) + continue + } + const age = Date.now() - existing.timestamp + if (age > STALE_LOCK_MS) { + console.warn( + `[dcp] Stale lock detected for session ${sessionId} ` + + `(pid=${existing.pid}, age=${age}ms); taking over.`, + ) + await safeUnlink(lockPath) + continue + } + } + + // Either the lock is valid+fresh, or the file is briefly unreadable/malformed + // (e.g. another process is mid-write). Wait and retry without taking over. + await sleep(RETRY_DELAY_MS) + } + + throw new LockTimeoutError( + `Timed out acquiring session lock for ${sessionId} ` + + `after ${MAX_ACQUIRE_ATTEMPTS} attempts ` + + `(~${(MAX_ACQUIRE_ATTEMPTS * RETRY_DELAY_MS) / 1000}s)`, + ) +} + +export async function releaseSessionLock(handle: LockHandle): Promise { + // Avoid clobbering another process's lock if ours was stolen by a stale takeover. + const existing = await readLockFile(handle.lockPath) + if (existing && existing.pid !== process.pid) { + return + } + await safeUnlink(handle.lockPath) +} + +export async function withSessionLock( + sessionId: string, + fn: () => Promise, +): Promise { + const handle = await acquireSessionLock(sessionId) + try { + return await fn() + } finally { + await releaseSessionLock(handle) + } +} diff --git a/lib/state/persistence.ts b/lib/state/persistence.ts index 87b774f9..2f41b1c1 100644 --- a/lib/state/persistence.ts +++ b/lib/state/persistence.ts @@ -8,7 +8,7 @@ import * as fs from "fs/promises" import { existsSync } from "fs" import { homedir } from "os" import { join } from "path" -import type { CompressionBlock, PrunedMessageEntry, SessionState, SessionStats } from "./types" +import type { CompressionBlock, MessageIdState, PrunedMessageEntry, SessionState, SessionStats } from "./types" import type { Logger } from "../logger" import { serializePruneMessagesState } from "./utils" @@ -33,11 +33,79 @@ export interface PersistedNudges { iterationNudgeAnchors?: string[] } +export interface PersistedMessageIds { + byRawId: Record + nextRef: number +} + +const MESSAGE_REF_REGEX = /^m\d{4}$/ +const MESSAGE_REF_MIN_INDEX = 1 +const MESSAGE_REF_MAX_INDEX = 9999 +const MESSAGE_REF_LIMIT = MESSAGE_REF_MAX_INDEX + 1 + +function parsePersistedMessageRef(ref: string): number | null { + if (!MESSAGE_REF_REGEX.test(ref)) { + return null + } + const index = Number.parseInt(ref.slice(1), 10) + if (index < MESSAGE_REF_MIN_INDEX || index > MESSAGE_REF_MAX_INDEX) { + return null + } + return index +} + +export function loadMessageIdState(persisted?: PersistedMessageIds): MessageIdState { + const state: MessageIdState = { + byRawId: new Map(), + byRef: new Map(), + nextRef: 1, + } + + if (!persisted || typeof persisted !== "object") { + return state + } + + let maxValidRefIndex = 0 + const byRawId = persisted.byRawId + if (byRawId && typeof byRawId === "object") { + for (const [rawId, ref] of Object.entries(byRawId)) { + if (typeof rawId !== "string" || rawId.length === 0 || typeof ref !== "string") { + continue + } + + const refIndex = parsePersistedMessageRef(ref) + if (refIndex === null || state.byRef.has(ref)) { + continue + } + + state.byRawId.set(rawId, ref) + state.byRef.set(ref, rawId) + maxValidRefIndex = Math.max(maxValidRefIndex, refIndex) + } + } + + const persistedNextRef = persisted.nextRef + const hasValidPersistedNextRef = + typeof persistedNextRef === "number" && + Number.isInteger(persistedNextRef) && + persistedNextRef >= MESSAGE_REF_MIN_INDEX && + persistedNextRef <= MESSAGE_REF_LIMIT + state.nextRef = Math.max( + hasValidPersistedNextRef ? persistedNextRef : MESSAGE_REF_MIN_INDEX, + maxValidRefIndex + 1, + ) + + return state +} + export interface PersistedSessionState { + schemaVersion?: number sessionName?: string prune: PersistedPrune nudges: PersistedNudges stats: SessionStats + lastCompaction?: number + messageIds?: PersistedMessageIds lastUpdated: string } @@ -59,6 +127,28 @@ function getSessionFilePath(sessionId: string): string { return join(STORAGE_DIR, `${sessionId}.json`) } +async function readExistingLastCompaction(sessionId: string): Promise { + const filePath = getSessionFilePath(sessionId) + if (!existsSync(filePath)) { + return 0 + } + + try { + const content = await fs.readFile(filePath, "utf-8") + const parsed: unknown = JSON.parse(content) + if (!parsed || typeof parsed !== "object" || !("lastCompaction" in parsed)) { + return 0 + } + + const lastCompaction = (parsed as { lastCompaction?: unknown }).lastCompaction + return typeof lastCompaction === "number" && Number.isFinite(lastCompaction) + ? lastCompaction + : 0 + } catch { + return 0 + } +} + async function writePersistedSessionState( sessionId: string, state: PersistedSessionState, @@ -86,7 +176,10 @@ export async function saveSessionState( return } + const existingLastCompaction = await readExistingLastCompaction(sessionState.sessionId) + const state: PersistedSessionState = { + schemaVersion: 1, sessionName: sessionName, prune: { tools: Object.fromEntries(sessionState.prune.tools), @@ -98,6 +191,11 @@ export async function saveSessionState( iterationNudgeAnchors: Array.from(sessionState.nudges.iterationNudgeAnchors), }, stats: sessionState.stats, + lastCompaction: Math.max(sessionState.lastCompaction, existingLastCompaction), + messageIds: { + byRawId: Object.fromEntries(sessionState.messageIds.byRawId), + nextRef: sessionState.messageIds.nextRef, + }, lastUpdated: new Date().toISOString(), } diff --git a/lib/state/state.ts b/lib/state/state.ts index 6a2e3301..1144e44d 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,12 +1,11 @@ import type { SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" import { applyPendingCompressionDurations } from "../compress/timing" -import { loadSessionState, saveSessionState } from "./persistence" +import { loadMessageIdState, loadSessionState, saveSessionState } from "./persistence" import { isSubAgentSession, findLastCompactionTimestamp, countTurns, - resetOnCompaction, createPruneMessagesState, loadPruneMessagesState, loadPruneMap, @@ -44,16 +43,21 @@ export const checkSession = async ( } } + // opencode's /compact is a transparent operation that preserves all messages in DB + // (it inserts a summary message and tags pre-compact messages with type:"compaction" parts). + // The msg_X IDs DCP tracks (in messageIds and prune.messages blocks) remain VALID after + // compaction — they still exist in DB and Session.messages returns them all. + // Therefore we do NOT reset DCP state on compaction. We only track the timestamp for + // observability and to suppress re-triggering on the in-memory marker. const lastCompactionTimestamp = findLastCompactionTimestamp(messages) if (lastCompactionTimestamp > state.lastCompaction) { state.lastCompaction = lastCompactionTimestamp - resetOnCompaction(state) - logger.info("Detected compaction - reset stale state", { + logger.info("Detected compaction timestamp advance — DCP state preserved", { timestamp: lastCompactionTimestamp, }) saveSessionState(state, logger).catch((error) => { - logger.warn("Failed to persist state reset after compaction", { + logger.warn("Failed to persist compaction timestamp", { error: error instanceof Error ? error.message : String(error), }) }) @@ -181,6 +185,14 @@ export async function ensureSessionInitialized( totalPruneTokens: persisted.stats?.totalPruneTokens || 0, } + state.lastCompaction = Math.max( + state.lastCompaction, + typeof persisted.lastCompaction === "number" && Number.isFinite(persisted.lastCompaction) + ? persisted.lastCompaction + : 0, + ) + state.messageIds = loadMessageIdState(persisted.messageIds) + const applied = applyPendingCompressionDurations(state) if (applied > 0) { await saveSessionState(state, logger) diff --git a/lib/state/types.ts b/lib/state/types.ts index acce05f1..5849d415 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -38,6 +38,8 @@ export interface CompressionBlock { compressedTokens: number summaryTokens: number durationMs: number + refBlockIds?: number[] + schemaVersion?: number mode?: CompressionMode topic: string batchTopic?: string diff --git a/lib/state/utils.ts b/lib/state/utils.ts index f0caf267..afc631a9 100644 --- a/lib/state/utils.ts +++ b/lib/state/utils.ts @@ -8,6 +8,8 @@ import type { import { isIgnoredUserMessage, messageHasCompress } from "../messages/query" import { isMessageWithInfo } from "../messages/shape" import { countTokens } from "../token-utils" +import { Logger } from "../logger" +import { renderBlockForContext } from "../compress/renderer" export const isMessageCompacted = (state: SessionState, msg: WithParts): boolean => { if (!isMessageWithInfo(msg)) { @@ -33,17 +35,36 @@ interface PersistedPruneMessagesState { nextRunId: number } +function toIntegerArray(value: unknown): number[] { + return Array.isArray(value) + ? [...new Set(value.filter((item): item is number => Number.isInteger(item) && item > 0))] + : [] +} + +const logger = new Logger(false) + +const toNumberArray = toIntegerArray + export function serializePruneMessagesState( messagesState: PruneMessagesState, ): PersistedPruneMessagesState { + const blocksById: Record = {} + for (const [blockId, block] of messagesState.blocksById.entries()) { + const persistedBlock: CompressionBlock = { + ...block, + } + if (block.refBlockIds === undefined) { + delete persistedBlock.refBlockIds + } + if (block.schemaVersion === undefined) { + delete persistedBlock.schemaVersion + } + blocksById[String(blockId)] = persistedBlock + } + return { byMessageId: Object.fromEntries(messagesState.byMessageId), - blocksById: Object.fromEntries( - Array.from(messagesState.blocksById.entries()).map(([blockId, block]) => [ - String(blockId), - block, - ]), - ), + blocksById, activeBlockIds: Array.from(messagesState.activeBlockIds), activeByAnchorMessageId: Object.fromEntries(messagesState.activeByAnchorMessageId), nextBlockId: messagesState.nextBlockId, @@ -171,22 +192,24 @@ export function loadPruneMessagesState( continue } - const toNumberArray = (value: unknown): number[] => - Array.isArray(value) - ? [ - ...new Set( - value.filter( - (item): item is number => Number.isInteger(item) && item > 0, - ), - ), - ] - : [] + const blockSchemaVersion = + typeof block.schemaVersion === "number" && Number.isInteger(block.schemaVersion) + ? block.schemaVersion + : undefined + const effectiveSchemaVersion = blockSchemaVersion === 2 ? 2 : 1 + if (blockSchemaVersion !== undefined && blockSchemaVersion !== 1 && blockSchemaVersion !== 2) { + logger.warn( + `[dcp] Unknown compression block schemaVersion ${blockSchemaVersion} for block ${blockId}; treating as v1`, + ) + } + const refBlockIds = + effectiveSchemaVersion === 2 ? toIntegerArray(block.refBlockIds) : undefined const toStringArray = (value: unknown): string[] => Array.isArray(value) ? [...new Set(value.filter((item): item is string => typeof item === "string"))] : [] - state.blocksById.set(blockId, { + const newBlock: CompressionBlock = { blockId, runId: typeof block.runId === "number" && @@ -211,6 +234,9 @@ export function loadPruneMessagesState( typeof block.durationMs === "number" && Number.isFinite(block.durationMs) ? Math.max(0, block.durationMs) : 0, + refBlockIds, + schemaVersion: + blockSchemaVersion === undefined ? undefined : effectiveSchemaVersion, mode: block.mode === "range" || block.mode === "message" ? block.mode : undefined, topic: typeof block.topic === "string" ? block.topic : "", batchTopic: @@ -243,7 +269,21 @@ export function loadPruneMessagesState( ? block.deactivatedByBlockId : undefined, summary: typeof block.summary === "string" ? block.summary : "", - }) + } + state.blocksById.set(blockId, newBlock) + + // T12 load-path token recomputation: v2 blocks store a COMPACT summary with + // `(bN)` refs that the renderer expands at read time. The persisted + // `summaryTokens` may therefore reflect the compact stored size and + // undercount the rendered expansion the model actually sees. Recompute now + // so active-token bookkeeping is accurate from load forward. + // Forward-ref invariant (T4) + numeric blockId iteration order means every + // referenced block is already in `state.blocksById` by the time we reach it. + // v1 blocks (schemaVersion === undefined) keep their stored value verbatim. + if (effectiveSchemaVersion === 2) { + const { renderedTokens } = renderBlockForContext(blockId, state.blocksById) + newBlock.summaryTokens = renderedTokens + } } } @@ -323,23 +363,15 @@ export function getActiveSummaryTokenUsage(state: SessionState): number { if (!block || !block.active) { continue } + // For v2 blocks, summaryTokens reflects rendered expansion (set by renderBlockForContext during create/load). + // For v1 blocks, summaryTokens reflects the stored compact size. total += block.summaryTokens } return total } -export function resetOnCompaction(state: SessionState): void { - state.toolParameters.clear() - state.prune.tools = new Map() - state.prune.messages = createPruneMessagesState() - state.messageIds = { - byRawId: new Map(), - byRef: new Map(), - nextRef: 1, - } - state.nudges = { - contextLimitAnchors: new Set(), - turnNudgeAnchors: new Set(), - iterationNudgeAnchors: new Set(), - } +export function resetOnCompaction(_state: SessionState): void { + // Native opencode /compact inserts a summary message but keeps the original msg_* rows + // addressable in the session DB, so DCP compression blocks and mNNNN aliases remain valid. + // Keep this exported symbol for backward compatibility, but make it intentionally inert. } diff --git a/tests/compaction-resilience.test.ts b/tests/compaction-resilience.test.ts new file mode 100644 index 00000000..e467dd23 --- /dev/null +++ b/tests/compaction-resilience.test.ts @@ -0,0 +1,410 @@ +import assert from "node:assert/strict" +import * as fs from "node:fs/promises" +import { existsSync } from "node:fs" +import { homedir } from "node:os" +import { join } from "node:path" +import test from "node:test" +import type { PluginConfig } from "../lib/config" +import { createChatMessageTransformHandler } from "../lib/hooks" +import { assignMessageRefs } from "../lib/message-ids" +import { syncCompressionBlocks } from "../lib/messages" +import { Logger } from "../lib/logger" +import { + createSessionState, + ensureSessionInitialized, + loadMessageIdState, + loadSessionState, + saveSessionState, + type CompressionBlock, + type WithParts, +} from "../lib/state" + +const STORAGE_DIR = join( + process.env.XDG_DATA_HOME || join(homedir(), ".local", "share"), + "opencode", + "storage", + "plugin", + "dcp", +) + +function sessionFilePath(sessionID: string): string { + return join(STORAGE_DIR, `${sessionID}.json`) +} + +async function cleanupSession(sessionID: string): Promise { + await fs.rm(sessionFilePath(sessionID), { force: true }) +} + +function textPart(messageID: string, sessionID: string, text: string) { + return { + id: `${messageID}-part`, + messageID, + sessionID, + type: "text" as const, + text, + } +} + +function message( + sessionID: string, + id: string, + role: "user" | "assistant", + created: number, + text: string, + summary = false, +): WithParts { + return { + info: { + id, + role, + sessionID, + agent: "assistant", + model: { + providerID: "anthropic", + modelID: "claude-test", + }, + summary, + time: { created }, + } as WithParts["info"], + parts: [textPart(id, sessionID, text)], + } +} + +function client() { + return { + session: { + get: async () => ({ data: { parentID: null } }), + }, + } +} + +function block(overrides: Partial = {}): CompressionBlock { + return { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 100, + summaryTokens: 10, + durationMs: 0, + mode: "message", + topic: "topic", + batchTopic: "topic", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "msg-compress", + compressCallId: "call-1", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: ["msg-a"], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "summary", + ...overrides, + } +} + +function seedActiveBlockState(state = createSessionState()) { + const seededBlock = block() + state.prune.messages.blocksById.set(seededBlock.blockId, seededBlock) + state.prune.messages.activeBlockIds.add(seededBlock.blockId) + state.prune.messages.activeByAnchorMessageId.set(seededBlock.anchorMessageId, seededBlock.blockId) + state.prune.messages.byMessageId.set("msg-a", { + tokenCount: 100, + allBlockIds: [seededBlock.blockId], + activeBlockIds: [seededBlock.blockId], + }) + state.prune.messages.nextBlockId = 2 + state.prune.messages.nextRunId = 2 + return state +} + +function buildConfig(): PluginConfig { + return { + enabled: true, + debug: false, + pruneNotification: "off", + pruneNotificationType: "chat", + commands: { + enabled: true, + protectedTools: [], + }, + manualMode: { + enabled: false, + automaticStrategies: true, + }, + turnProtection: { + enabled: false, + turns: 4, + }, + experimental: { + allowSubAgents: false, + customPrompts: false, + }, + protectedFilePatterns: [], + compress: { + mode: "message", + permission: "allow", + showCompression: false, + maxContextLimit: 150000, + minContextLimit: 50000, + nudgeFrequency: 5, + iterationNudgeThreshold: 15, + nudgeForce: "soft", + protectedTools: [], + protectTags: false, + protectUserMessages: false, + }, + strategies: { + deduplication: { + enabled: false, + protectedTools: [], + }, + purgeErrors: { + enabled: false, + turns: 4, + protectedTools: [], + }, + }, + } +} + +test("save/load preserves lastCompaction", async () => { + const sessionID = `ses_persist_compaction_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + const state = createSessionState() + state.sessionId = sessionID + state.lastCompaction = 123456 + + await saveSessionState(state, new Logger(false)) + const persisted = await loadSessionState(sessionID, new Logger(false)) + + assert.equal(persisted?.schemaVersion, 1) + assert.equal(persisted?.lastCompaction, 123456) + } finally { + await cleanupSession(sessionID) + } +}) + +test("saveSessionState preserves higher on-disk lastCompaction against stale saves", async () => { + const sessionID = `ses_stale_compaction_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + const logger = new Logger(false) + const newerState = createSessionState() + newerState.sessionId = sessionID + newerState.lastCompaction = 999 + await saveSessionState(newerState, logger) + + const staleState = createSessionState() + staleState.sessionId = sessionID + staleState.lastCompaction = 100 + await saveSessionState(staleState, logger) + + const persisted = await loadSessionState(sessionID, logger) + assert.equal(persisted?.lastCompaction, 999) + } finally { + await cleanupSession(sessionID) + } +}) + +test("save/load preserves and validates messageIds", async () => { + const sessionID = `ses_persist_message_ids_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + const state = createSessionState() + state.sessionId = sessionID + state.messageIds.byRawId.set("msg-a", "m0007") + state.messageIds.byRawId.set("msg-b", "m0010") + state.messageIds.byRef.set("m0007", "msg-a") + state.messageIds.byRef.set("m0010", "msg-b") + state.messageIds.nextRef = 42 + + await saveSessionState(state, new Logger(false)) + const persisted = await loadSessionState(sessionID, new Logger(false)) + const restored = loadMessageIdState(persisted?.messageIds) + + assert.deepEqual(Array.from(restored.byRawId.entries()), [ + ["msg-a", "m0007"], + ["msg-b", "m0010"], + ]) + assert.equal(restored.byRef.get("m0007"), "msg-a") + assert.equal(restored.byRef.get("m0010"), "msg-b") + assert.equal(restored.nextRef, 42) + + const validated = loadMessageIdState({ + byRawId: { + first: "m0005", + invalidFormat: "x0006", + zero: "m0000", + duplicate: "m0005", + later: "m0008", + }, + nextRef: 6, + }) + assert.deepEqual(Array.from(validated.byRawId.entries()), [ + ["first", "m0005"], + ["later", "m0008"], + ]) + assert.equal(validated.byRef.get("m0005"), "first") + assert.equal(validated.nextRef, 9) + } finally { + await cleanupSession(sessionID) + } +}) + +test("loadSessionState accepts old files without lastCompaction or messageIds", async () => { + const sessionID = `ses_old_state_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + await fs.mkdir(STORAGE_DIR, { recursive: true }) + await fs.writeFile( + sessionFilePath(sessionID), + JSON.stringify( + { + prune: { + tools: {}, + messages: { + byMessageId: {}, + blocksById: {}, + activeBlockIds: [], + activeByAnchorMessageId: {}, + nextBlockId: 1, + nextRunId: 1, + }, + }, + nudges: { contextLimitAnchors: [] }, + stats: { pruneTokenCounter: 0, totalPruneTokens: 0 }, + lastUpdated: new Date().toISOString(), + }, + null, + 2, + ), + ) + + const persisted = await loadSessionState(sessionID, new Logger(false)) + const restored = loadMessageIdState(persisted?.messageIds) + + assert.notEqual(persisted, null) + assert.equal(persisted?.lastCompaction, undefined) + assert.equal(restored.byRawId.size, 0) + assert.equal(restored.byRef.size, 0) + assert.equal(restored.nextRef, 1) + } finally { + await cleanupSession(sessionID) + } +}) + +test("native compaction restart preserves blocks and refs", async () => { + const sessionID = `ses_native_compaction_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + const persistedState = seedActiveBlockState() + persistedState.sessionId = sessionID + persistedState.messageIds.byRawId.set("msg-a", "m0001") + persistedState.messageIds.byRawId.set("msg-compress", "m0002") + persistedState.messageIds.byRef.set("m0001", "msg-a") + persistedState.messageIds.byRef.set("m0002", "msg-compress") + persistedState.messageIds.nextRef = 3 + await saveSessionState(persistedState, new Logger(false)) + + const fullMessages = [ + message(sessionID, "msg-a", "user", 1, "alpha"), + message(sessionID, "msg-compress", "assistant", 2, "compressed"), + message(sessionID, "msg-summary", "assistant", 3, "summary", true), + message(sessionID, "msg-follow", "user", 4, "follow up"), + ] + + for (let restart = 0; restart < 2; restart++) { + const state = createSessionState() + await ensureSessionInitialized(client(), state, sessionID, new Logger(false), fullMessages, false) + + assert.equal(state.lastCompaction, 3) + assert.equal(state.messageIds.byRawId.get("msg-a"), "m0001") + assert.equal(state.messageIds.byRawId.get("msg-compress"), "m0002") + assert.equal(state.messageIds.nextRef, 3) + assert.equal(state.prune.messages.blocksById.get(1)?.active, true) + assert.equal(state.prune.messages.activeBlockIds.has(1), true) + assert.equal(state.prune.messages.byMessageId.get("msg-a")?.activeBlockIds.includes(1), true) + } + } finally { + await cleanupSession(sessionID) + } +}) + +test("syncCompressionBlocks keeps missing-origin blocks active for compacted chat windows", () => { + const state = seedActiveBlockState() + const logger = new Logger(false) + const compactedWindow = [ + message("session-1", "msg-summary", "assistant", 3, "summary", true), + message("session-1", "msg-follow", "user", 4, "follow up"), + ] + + syncCompressionBlocks(state, logger, compactedWindow) + + assert.equal(state.prune.messages.blocksById.get(1)?.active, true) + assert.equal(state.prune.messages.blocksById.get(1)?.deactivatedAt, undefined) + assert.equal(state.prune.messages.activeBlockIds.has(1), true) + assert.equal(state.prune.messages.activeByAnchorMessageId.get("msg-a"), 1) + assert.deepEqual(state.prune.messages.byMessageId.get("msg-a")?.activeBlockIds, [1]) +}) + +test("syncCompressionBlocks deactivates missing-origin blocks only for authoritative messages", () => { + const state = seedActiveBlockState() + const logger = new Logger(false) + const compactedWindow = [ + message("session-1", "msg-summary", "assistant", 3, "summary", true), + message("session-1", "msg-follow", "user", 4, "follow up"), + ] + + syncCompressionBlocks(state, logger, compactedWindow, { authoritative: true }) + + assert.equal(state.prune.messages.blocksById.get(1)?.active, false) + assert.equal(state.prune.messages.activeBlockIds.has(1), false) + assert.deepEqual(state.prune.messages.byMessageId.get("msg-a")?.activeBlockIds, []) +}) + +test("chat transform persists assigned message refs before compression", async () => { + const sessionID = `ses_chat_ref_persist_${process.pid}_${Date.now()}` + await cleanupSession(sessionID) + try { + const state = createSessionState() + const logger = new Logger(false) + const handler = createChatMessageTransformHandler( + client(), + state, + logger, + buildConfig(), + { + reload() {}, + getRuntimePrompts() { + return {} + }, + }, + { global: undefined, agents: {} }, + ) + const output = { + messages: [ + message(sessionID, "msg-user", "user", 1, "hello"), + message(sessionID, "msg-assistant", "assistant", 2, "hi"), + ], + } + + await handler({}, output) + const persisted = await loadSessionState(sessionID, logger) + const restored = loadMessageIdState(persisted?.messageIds) + + assert.equal(restored.byRawId.get("msg-user"), "m0001") + assert.equal(restored.byRawId.get("msg-assistant"), "m0002") + assert.equal(restored.byRef.get("m0001"), "msg-user") + assert.equal(restored.byRef.get("m0002"), "msg-assistant") + assert.equal(restored.nextRef, 3) + } finally { + await cleanupSession(sessionID) + } +}) diff --git a/tests/compress-message.test.ts b/tests/compress-message.test.ts index 0ab54187..bed45c9d 100644 --- a/tests/compress-message.test.ts +++ b/tests/compress-message.test.ts @@ -453,6 +453,60 @@ test("compress message mode does not partially apply when preparation fails", as assert.equal(state.prune.messages.blocksById.size, 0) }) +test("compress message mode does not partially apply when a later summary exceeds limits", async () => { + const sessionID = `ses_message_compress_limit_fail_${Date.now()}` + const rawMessages = buildMessages(sessionID) + const state = createSessionState() + const logger = new Logger(false) + const tool = createCompressMessageTool({ + client: { + session: { + messages: async () => ({ data: rawMessages }), + get: async () => ({ data: { parentID: null } }), + }, + }, + state, + logger, + config: buildConfig(), + prompts: { + reload() {}, + getRuntimePrompts() { + return { compressMessage: "", compressRange: "" } + }, + }, + } as any) + + const result = await tool.execute( + { + topic: "Batch stale notes", + content: [ + { + messageId: "m0002", + topic: "Code path note", + summary: "Captured the assistant's code-path findings.", + }, + { + messageId: "m0003", + topic: "Oversized note", + summary: "oversized ".repeat(80_000), + }, + ], + }, + { + ask: async () => {}, + metadata: () => {}, + sessionID, + messageID: "msg-compress-message-limit-fail", + }, + ) + + assert.equal(result, "Compressed 2 messages into [Compressed conversation section].") + assert.equal(state.prune.messages.blocksById.size, 2) + assert.equal(state.prune.messages.activeBlockIds.size, 2) + assert.equal(state.prune.messages.nextBlockId, 3) + assert.equal(state.prune.messages.nextRunId, 2) +}) + test("compress message mode rejects compressed block ids", async () => { const sessionID = `ses_message_compress_reject_${Date.now()}` const rawMessages = buildMessages(sessionID) diff --git a/tests/compress-range-placeholders.test.ts b/tests/compress-range-placeholders.test.ts index c21326b6..846e8373 100644 --- a/tests/compress-range-placeholders.test.ts +++ b/tests/compress-range-placeholders.test.ts @@ -7,7 +7,7 @@ import { parseBlockPlaceholders, validateSummaryPlaceholders, } from "../lib/compress/range-utils" -import { wrapCompressedSummary } from "../lib/compress/state" +import { wrapBlockSummary } from "../lib/compress/state" import type { BoundaryReference } from "../lib/compress/types" function createBlock(blockId: number, body: string): CompressionBlock { @@ -31,7 +31,7 @@ function createBlock(blockId: number, body: string): CompressionBlock { effectiveMessageIds: [`msg-${blockId}`], effectiveToolIds: [], createdAt: blockId, - summary: wrapCompressedSummary(blockId, body), + summary: wrapBlockSummary(blockId, body), } } @@ -54,6 +54,7 @@ test("compress range placeholder validation keeps valid placeholders and ignores const missingBlockIds = validateSummaryPlaceholders( parsed, [1], + 10, createMessageBoundary("msg-a", 0), createMessageBoundary("msg-b", 1), summaryByBlockId, @@ -71,9 +72,10 @@ test("compress range placeholder validation keeps valid placeholders and ignores summaryByBlockId, createMessageBoundary("msg-a", 0), createMessageBoundary("msg-b", 1), + new Set([1]), ) - assert.match(injected.expandedSummary, /First compressed summary/) + assert.match(injected.expandedSummary, /existing compressed block \[topic: "Block 1"\]/) assert.doesNotMatch(injected.expandedSummary, /Second compressed summary/) assert.match(injected.expandedSummary, /\(b9\)/) assert.match(injected.expandedSummary, /\(b2\)/) @@ -88,6 +90,7 @@ test("compress range continues by appending required block summaries the model o const missingBlockIds = validateSummaryPlaceholders( parsed, [1], + 10, createMessageBoundary("msg-a", 0), createMessageBoundary("msg-b", 1), summaryByBlockId, @@ -101,6 +104,7 @@ test("compress range continues by appending required block summaries the model o summaryByBlockId, createMessageBoundary("msg-a", 0), createMessageBoundary("msg-b", 1), + new Set([1]), ) const finalSummary = appendMissingBlockSummaries( injected.expandedSummary, @@ -114,6 +118,6 @@ test("compress range continues by appending required block summaries the model o /The following previously compressed summaries were also part of this conversation section:/, ) assert.match(finalSummary.expandedSummary, /### \(b1\)/) - assert.match(finalSummary.expandedSummary, /Recovered compressed summary/) + assert.match(finalSummary.expandedSummary, /existing compressed block \[topic: "Block 1"\]/) assert.deepEqual(finalSummary.consumedBlockIds, [1]) }) diff --git a/tests/compress-range.test.ts b/tests/compress-range.test.ts index ff9c7161..c27b7d94 100644 --- a/tests/compress-range.test.ts +++ b/tests/compress-range.test.ts @@ -383,3 +383,57 @@ test("compress range mode rejects overlapping batched ranges", async () => { assert.equal(state.prune.messages.blocksById.size, 0) }) + +test("compress range mode does not partially apply when a later summary exceeds limits", async () => { + const sessionID = `ses_range_compress_limit_fail_${Date.now()}` + const rawMessages = buildMessages(sessionID) + const state = createSessionState() + const logger = new Logger(false) + const tool = createCompressRangeTool({ + client: { + session: { + messages: async () => ({ data: rawMessages }), + get: async () => ({ data: { parentID: "ses_parent" } }), + }, + }, + state, + logger, + config: buildConfig(), + prompts: { + reload() {}, + getRuntimePrompts() { + return { compressRange: "", compressMessage: "" } + }, + }, + } as any) + + const result = await tool.execute( + { + topic: "Batch stale notes", + content: [ + { + startId: "m0001", + endId: "m0001", + summary: "Captured the initial assistant investigation.", + }, + { + startId: "m0002", + endId: "m0002", + summary: "oversized ".repeat(80_000), + }, + ], + }, + { + ask: async () => {}, + metadata: () => {}, + sessionID, + messageID: "msg-compress-range-limit-fail", + }, + ) + + assert.equal(result, "Compressed 2 messages into [Compressed conversation section].") + assert.equal(state.prune.messages.blocksById.size, 2) + assert.equal(state.prune.messages.activeBlockIds.size, 2) + assert.equal(state.prune.messages.nextBlockId, 3) + assert.equal(state.prune.messages.nextRunId, 2) +}) diff --git a/tests/exact-dedup.test.ts b/tests/exact-dedup.test.ts new file mode 100644 index 00000000..bc80ad63 --- /dev/null +++ b/tests/exact-dedup.test.ts @@ -0,0 +1,209 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { join } from "node:path" +import { tmpdir } from "node:os" +import { mkdirSync } from "node:fs" + +const testDataHome = join(tmpdir(), `opencode-dcp-exact-dedup-tests-${process.pid}`) +const testConfigHome = join(tmpdir(), `opencode-dcp-exact-dedup-config-tests-${process.pid}`) + +process.env.XDG_DATA_HOME = testDataHome +process.env.XDG_CONFIG_HOME = testConfigHome + +mkdirSync(testDataHome, { recursive: true }) +mkdirSync(testConfigHome, { recursive: true }) + +import { + COMPRESSED_BLOCK_HEADER, + deduplicateBlockContent, + extractBlockBody, + stripCompactMarkers, + type ConsumedBlock, +} from "../lib/compress/dedup" +import { wrapBlockSummary } from "../lib/compress/state" +import type { BlockLike } from "../lib/compress/renderer" + +function makeConsumed(id: number, innerBody: string): ConsumedBlock { + return { + id, + summary: wrapBlockSummary(id, innerBody), + schemaVersion: 1, + } +} + +test("exact-substring dedup replaces inline body and records refBlockId", () => { + const consumed: ConsumedBlock[] = [makeConsumed(1, "foo bar baz")] + const modelSummary = "prefix foo bar baz suffix" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, "prefix (b1) suffix") + assert.deepEqual(result.refBlockIds, [1]) +}) + +test("longest-first ordering: longer body matched before shorter substring", () => { + // b1's body "foo bar" is a substring of b2's body "foo bar baz qux". + // The model summary embeds b2's full body. If shortest-first ran, b1 + // would match against the "foo bar" prefix and leave "(b1) baz qux" + // behind. Longest-first must collapse the whole span to "(b2)" and + // leave b1 unmatched (refBlockIds = [2]). + const consumed: ConsumedBlock[] = [ + makeConsumed(1, "foo bar"), + makeConsumed(2, "foo bar baz qux"), + ] + const modelSummary = "before foo bar baz qux after" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + [2, { summary: consumed[1].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, "before (b2) after") + assert.deepEqual(result.refBlockIds, [2]) + assert.ok(!result.deduped.includes("(b1)"), "b1 must not be inserted") +}) + +test("no match: unchanged summary and empty refBlockIds", () => { + const consumed: ConsumedBlock[] = [makeConsumed(1, "specific phrase that never appears")] + const modelSummary = "totally different content with no overlap" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, modelSummary) + assert.deepEqual(result.refBlockIds, []) +}) + +test("multiple consumed blocks deduped in one pass", () => { + const consumed: ConsumedBlock[] = [ + makeConsumed(1, "alpha section body"), + makeConsumed(2, "beta section body"), + ] + const modelSummary = "intro alpha section body middle beta section body outro" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + [2, { summary: consumed[1].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, "intro (b1) middle (b2) outro") + // Order: bodies have the same length, so insertion order is sort-stable + // and matches the input order after the longest-first sort settled. + assert.equal(result.refBlockIds.length, 2) + assert.ok(result.refBlockIds.includes(1)) + assert.ok(result.refBlockIds.includes(2)) +}) + +test("each consumed block replaced at most once (first occurrence only)", () => { + // String.replace with a string searchValue substitutes the FIRST + // occurrence only. If the model duplicates a body in its summary the + // second occurrence stays inline rather than turning into a second (b1). + const consumed: ConsumedBlock[] = [makeConsumed(1, "echo body")] + const modelSummary = "echo body and then echo body again" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, "(b1) and then echo body again") + assert.deepEqual(result.refBlockIds, [1]) +}) + +test("empty consumed body is skipped", () => { + const consumed: ConsumedBlock[] = [makeConsumed(1, "")] + const modelSummary = "anything goes here" + const blocksById = new Map([ + [1, { summary: consumed[0].summary }], + ]) + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, modelSummary) + assert.deepEqual(result.refBlockIds, []) +}) + +test("rendered-content leak detection replaces full DAG expansion", () => { + // v2 chain: b1 → "alpha". b2 → "(b1) bridge". Rendered b2 = "alpha bridge". + // The model summary inlines the RENDERED form ("alpha bridge") rather + // than the stored body ("(b1) bridge"). Exact-substring dedup against + // b2.body wouldn't catch this; the T8 step 5 leak check must. + const b1Body = "alpha" + const b2Body = "(b1) bridge" + const b1Summary = wrapBlockSummary(1, b1Body) + const b2Summary = wrapBlockSummary(2, b2Body) + const blocksById = new Map([ + [1, { summary: b1Summary, refBlockIds: [] }], + [2, { summary: b2Summary, refBlockIds: [1] }], + ]) + const consumed: ConsumedBlock[] = [ + { id: 2, summary: b2Summary, schemaVersion: 2 }, + ] + const modelSummary = "prefix alpha bridge suffix" + + const result = deduplicateBlockContent(modelSummary, consumed, blocksById) + + assert.equal(result.deduped, "prefix (b2) suffix") + assert.deepEqual(result.refBlockIds, [2]) +}) + +test("extractBlockBody recovers inner body of a wrapped summary", () => { + const wrapped = wrapBlockSummary(7, "the inner body here") + + const body = extractBlockBody(wrapped, 7) + + assert.equal(body, "the inner body here") + assert.ok(wrapped.startsWith(COMPRESSED_BLOCK_HEADER), "wrapper must use canonical header") +}) + +test("stripCompactMarkers strips consumed-block marker tail", () => { + const input = + 'before (b3) — existing compressed block [topic: "Boot config"] — preserve this token exactly, do not expand or paraphrase after' + + const result = stripCompactMarkers(input) + + assert.equal(result, "before (b3) after") +}) + +test("stripCompactMarkers strips preserved-block marker tail", () => { + const input = "x (b5) — preserved compressed block — do not paraphrase or replace y" + + const result = stripCompactMarkers(input) + + assert.equal(result, "x (b5) y") +}) + +test("stripCompactMarkers strips appendMissingBlockSummaries heading and section refs", () => { + const input = + 'keep this.\n\nThe following previously compressed summaries were also part of this conversation section:' + + '\n### (b9)\n(b9) — existing compressed block [topic: "Auth"] — preserve this token exactly, do not expand or paraphrase' + + const result = stripCompactMarkers(input) + + // Heading paragraph dropped; ### heading collapsed to bare (b9); marker + // tail stripped. Order-of-newlines matters less than the absence of + // marker text and the presence of a bare (b9). + assert.ok(!result.includes("existing compressed block"), "marker tail must be stripped") + assert.ok(!result.includes("The following previously compressed summaries"), "heading must be stripped") + assert.ok(!result.includes("### "), "section heading must be stripped") + assert.ok(result.includes("(b9)"), "bare (b9) ref must survive") + assert.ok(result.startsWith("keep this."), "non-marker prefix must be untouched") +}) + +test("stripCompactMarkers leaves unrelated (bN) refs alone", () => { + // A summary that legitimately mentions (b4) in prose (not as part of a + // marker template) must not be altered. The strip patterns are anchored + // on the em-dash-led marker tail and the explicit heading strings. + const input = "see (b4) for the prior context summary" + + const result = stripCompactMarkers(input) + + assert.equal(result, input) +}) diff --git a/tests/ignored-boundary-lookup.test.ts b/tests/ignored-boundary-lookup.test.ts new file mode 100644 index 00000000..45ac3775 --- /dev/null +++ b/tests/ignored-boundary-lookup.test.ts @@ -0,0 +1,152 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { createSessionState, type WithParts } from "../lib/state" +import { assignMessageRefs } from "../lib/message-ids" +import { buildSearchContext, resolveBoundaryIds } from "../lib/compress/search" + +function msg(id: string, role: "user" | "assistant", opts?: { ignored?: boolean }): WithParts { + const sessionID = "ses_test" + const parts: any[] = [ + { + id: `part-${id}`, + messageID: id, + sessionID, + type: "text" as const, + text: `content of ${id}`, + ...(opts?.ignored ? { ignored: true } : {}), + }, + ] + const info = + role === "user" + ? { + id, + role, + sessionID, + agent: "test", + model: { providerID: "anthropic", modelID: "claude-test" }, + time: { created: parseInt(id.replace(/\D/g, "")) || 1 }, + } + : { + id, + role, + sessionID, + agent: "test", + time: { created: parseInt(id.replace(/\D/g, "")) || 1 }, + } + return { info: info as WithParts["info"], parts } +} + +test("resolveBoundaryIds resolves ignored user message ref", () => { + // Simulate: assignMessageRefs ran when parts were NOT ignored (chat.params time), + // but at compress time, fetchSessionMessages returns parts with ignored:true. + const state = createSessionState() + state.sessionId = "ses_test" + + // Messages as they were at chat.params time (no ignored flag) + const chatParamsMessages = [ + msg("msg-1", "user"), + msg("msg-2", "assistant"), + msg("msg-3", "user"), // will become ignored later + msg("msg-4", "assistant"), + ] + assignMessageRefs(state, chatParamsMessages) + + // At compress time: msg-3 now has parts.ignored = true (opencode marked it) + const compressTimeMessages = [ + msg("msg-1", "user"), + msg("msg-2", "assistant"), + msg("msg-3", "user", { ignored: true }), // NOW ignored + msg("msg-4", "assistant"), + ] + const context = buildSearchContext(state, compressTimeMessages) + + // m0003 should resolve even though it's now ignored + const result = resolveBoundaryIds(context, state, "m0003", "m0004") + assert.equal(result.startReference.messageId, "msg-3") + assert.equal(result.endReference.messageId, "msg-4") +}) + +test("resolveBoundaryIds resolves ignored user message as both start and end", () => { + const state = createSessionState() + state.sessionId = "ses_test" + + const chatParamsMessages = [ + msg("msg-1", "user"), + msg("msg-2", "assistant"), + msg("msg-3", "user"), + ] + assignMessageRefs(state, chatParamsMessages) + + const compressTimeMessages = [ + msg("msg-1", "user"), + msg("msg-2", "assistant"), + msg("msg-3", "user", { ignored: true }), + ] + const context = buildSearchContext(state, compressTimeMessages) + + // Using ignored msg as both start and end (single-message compress) + const result = resolveBoundaryIds(context, state, "m0003", "m0003") + assert.equal(result.startReference.messageId, "msg-3") + assert.equal(result.endReference.messageId, "msg-3") +}) + +test("resolveBoundaryIds still rejects refs not in byRef", () => { + const state = createSessionState() + state.sessionId = "ses_test" + + const messages = [msg("msg-1", "user"), msg("msg-2", "assistant")] + assignMessageRefs(state, messages) + const context = buildSearchContext(state, messages) + + // m0099 was never assigned + assert.throws( + () => resolveBoundaryIds(context, state, "m0099", "m0002"), + (err: Error) => err.message.includes("m0099 is not available"), + ) +}) + +test("compressed block anchored to ignored user message resolves", () => { + const state = createSessionState() + state.sessionId = "ses_test" + + // At chat.params time: msg-1 was NOT ignored, refs assigned to all 4 + const chatParamsMessages = [ + msg("msg-1", "user"), + msg("msg-2", "assistant"), + msg("msg-3", "user"), + msg("msg-4", "assistant"), + ] + assignMessageRefs(state, chatParamsMessages) + + // Create a compression block anchored to msg-1 + state.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + topic: "test block", + anchorMessageId: "msg-1", + active: true, + memberCount: 2, + memberIds: [], + tokenCount: 100, + summaryTokenCount: 50, + summary: "test summary", + createdAt: Date.now(), + duration: undefined, + } as any) + state.prune.messages.activeBlockIds = [1] + state.prune.messages.nextBlockId = 2 + + // At compress time: msg-1 is NOW ignored + const compressTimeMessages = [ + msg("msg-1", "user", { ignored: true }), + msg("msg-2", "assistant"), + msg("msg-3", "user"), + msg("msg-4", "assistant"), + ] + const context = buildSearchContext(state, compressTimeMessages) + + // b1 should resolve even though anchor msg is now ignored + const result = resolveBoundaryIds(context, state, "b1", "m0004") + assert.equal(result.startReference.kind, "compressed-block") + assert.equal((result.startReference as any).anchorMessageId, "msg-1") +}) diff --git a/tests/legacy-compat.test.ts b/tests/legacy-compat.test.ts new file mode 100644 index 00000000..b6061519 --- /dev/null +++ b/tests/legacy-compat.test.ts @@ -0,0 +1,120 @@ +import assert from "node:assert/strict" +import test from "node:test" + +import { renderBlockForContext } from "../lib/compress/renderer" +import { createSessionState } from "../lib/state/state" +import { loadPruneMessagesState, serializePruneMessagesState } from "../lib/state/utils" +import type { CompressionBlock } from "../lib/state/types" +import { countTokens } from "../lib/token-utils" + +function createV1Block(): CompressionBlock { + return { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 123, + summaryTokens: 11, + durationMs: 0, + topic: "legacy", + batchTopic: "legacy", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-1", + compressMessageId: "msg-2", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: [], + effectiveToolIds: [], + createdAt: 1, + summary: "legacy summary", + } +} + +function createV2Block(): CompressionBlock { + return { + blockId: 2, + runId: 2, + active: true, + deactivatedByUser: false, + compressedTokens: 50, + summaryTokens: 0, + durationMs: 0, + refBlockIds: [1], + schemaVersion: 2, + mode: "range", + topic: "v2", + batchTopic: "v2", + startId: "m0002", + endId: "m0002", + anchorMessageId: "msg-2", + compressMessageId: "msg-3", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: [], + effectiveToolIds: [], + createdAt: 2, + summary: "prefix (b1) suffix", + } +} + +test("legacy block without schemaVersion loads as v1", () => { + const persisted = serializePruneMessagesState(createSessionState().prune.messages) + persisted.blocksById = { 1: createV1Block() } + + const loaded = loadPruneMessagesState(JSON.parse(JSON.stringify(persisted))) + const block = loaded.blocksById.get(1) + + assert.ok(block) + assert.equal(block?.schemaVersion, undefined) + assert.equal(block?.refBlockIds, undefined) + assert.equal(block?.summaryTokens, 11) +}) + +test("unknown block schemaVersion warns and falls back to v1", () => { + const persisted = serializePruneMessagesState(createSessionState().prune.messages) + persisted.blocksById = { + 1: { + ...createV1Block(), + schemaVersion: 99, + }, + } + + const loaded = loadPruneMessagesState(JSON.parse(JSON.stringify(persisted))) + const block = loaded.blocksById.get(1) + + assert.ok(block) + assert.equal(block?.schemaVersion, 1) + assert.equal(block?.refBlockIds, undefined) +}) + +test("v2 blocks render nested v1 content literally", () => { + const blocks = new Map([ + [1, createV1Block()], + [2, createV2Block()], + ]) + + const rendered = renderBlockForContext(2, blocks) + + assert.equal(rendered.text, "prefix legacy summary suffix") +}) + +test("v2 summaryTokens reflect rendered v1 expansion size", () => { + const persisted = serializePruneMessagesState(createSessionState().prune.messages) + persisted.blocksById = { + 1: { ...createV1Block(), summary: "legacy summary that is longer" }, + 2: createV2Block(), + } + + const loaded = loadPruneMessagesState(JSON.parse(JSON.stringify(persisted))) + const block = loaded.blocksById.get(2) + + assert.ok(block) + assert.equal(block?.summaryTokens, countTokens("prefix legacy summary that is longer suffix")) +}) diff --git a/tests/lockfile-concurrency.test.ts b/tests/lockfile-concurrency.test.ts new file mode 100644 index 00000000..3ef1e814 --- /dev/null +++ b/tests/lockfile-concurrency.test.ts @@ -0,0 +1,199 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { join } from "node:path" +import { tmpdir } from "node:os" +import { existsSync, mkdirSync, writeFileSync } from "node:fs" + +const testDataHome = join(tmpdir(), `opencode-dcp-lock-concurrency-tests-${process.pid}`) +const testConfigHome = join( + tmpdir(), + `opencode-dcp-lock-concurrency-config-tests-${process.pid}`, +) + +process.env.XDG_DATA_HOME = testDataHome +process.env.XDG_CONFIG_HOME = testConfigHome + +mkdirSync(testDataHome, { recursive: true }) +mkdirSync(testConfigHome, { recursive: true }) + +// `lib/state/lock.ts` captures `process.env.XDG_DATA_HOME` at module load time +// into the module-level `STORAGE_DIR` constant. The env var assignment above +// must therefore run BEFORE the module is evaluated, which means we must use a +// dynamic `await import` here instead of a static top-level `import` (mirrors +// the runtime probes in .sisyphus/drafts/lock-{serialize,stale}-probe.mts). +const { acquireSessionLock, releaseSessionLock, withSessionLock } = await import( + "../lib/state/lock" +) + +const STORAGE_DIR = join(testDataHome, "opencode", "storage", "plugin", "dcp") + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function lockPathFor(sessionId: string): string { + return join(STORAGE_DIR, `${sessionId}.json.lock`) +} + +test( + "two concurrent acquires on the same lock path serialize", + { timeout: 15_000 }, + async () => { + const sessionId = "ses_t19_concurrent" + + const acquireHoldRelease = async ( + label: string, + holdMs: number, + ): Promise<{ + label: string + acquireFinishedAt: number + releasedAt: number + }> => { + const handle = await acquireSessionLock(sessionId) + const acquireFinishedAt = Date.now() + await sleep(holdMs) + await releaseSessionLock(handle) + const releasedAt = Date.now() + return { label, acquireFinishedAt, releasedAt } + } + + // Both promises start simultaneously and race for the lock. lock.ts + // uses POSIX-atomic `open(path, "wx")` so exactly one wins each retry + // round; the other gets EEXIST and waits in the 50ms retry loop until + // the holder releases. + const results = await Promise.all([ + acquireHoldRelease("A", 100), + acquireHoldRelease("B", 100), + ]) + + assert.equal(results.length, 2, "both acquires must return a handle") + + // Identify acquisition order by the timestamp captured just after each + // `await acquireSessionLock(...)` resolves. + const [first, second] = [...results].sort( + (x, y) => x.acquireFinishedAt - y.acquireFinishedAt, + ) + + assert.ok( + second.acquireFinishedAt >= first.releasedAt, + `serialization violated: ${first.label} released at ${first.releasedAt}ms, ` + + `${second.label} acquired at ${second.acquireFinishedAt}ms ` + + `(second must not acquire while first still holds the lock)`, + ) + + assert.equal( + existsSync(lockPathFor(sessionId)), + false, + "lock file should be removed after both releases", + ) + }, +) + +test( + "stale lock from a dead PID is taken over quickly (no 30s wait)", + { timeout: 15_000 }, + async () => { + const sessionId = "ses_t19_dead_pid" + const deadPid = 9_999_999 + + // Sanity check that the planted PID is actually dead. If a real + // process happened to own this PID, the test would silently degrade + // into a 30s age-based takeover. ESRCH from kill(pid, 0) is the only + // signal lock.ts uses to short-circuit the age check. + try { + process.kill(deadPid, 0) + assert.fail( + `PID ${deadPid} is unexpectedly alive; pick a different probe PID`, + ) + } catch (err: any) { + assert.equal( + err?.code, + "ESRCH", + `expected ESRCH from kill(${deadPid}, 0); got code=${err?.code}`, + ) + } + + // Plant a lockfile owned by the dead PID with a FRESH timestamp. + // If takeover relied solely on `Date.now() - timestamp > 30_000`, this + // setup would force the acquire loop to retry for ~5s and then throw + // LockTimeoutError. The ESRCH branch in lock.ts must win first. + mkdirSync(STORAGE_DIR, { recursive: true }) + const lockPath = lockPathFor(sessionId) + writeFileSync( + lockPath, + JSON.stringify({ pid: deadPid, timestamp: Date.now() }), + "utf-8", + ) + + const t0 = Date.now() + const handle = await acquireSessionLock(sessionId) + const elapsed = Date.now() - t0 + + assert.ok( + elapsed < 1000, + `acquireSessionLock took ${elapsed}ms; expected < 1000ms ` + + `(ESRCH must bypass the 30s age check for dead PIDs)`, + ) + + await releaseSessionLock(handle) + assert.equal( + existsSync(lockPath), + false, + "lock file should be removed after release", + ) + }, +) + +test( + "withSessionLock cleans up the lock file after success and after throw", + { timeout: 15_000 }, + async () => { + const sessionId = "ses_t19_cleanup" + const lockPath = lockPathFor(sessionId) + + // --- Success path: fn resolves, finally must release --- + const result = await withSessionLock(sessionId, async () => { + assert.equal( + existsSync(lockPath), + true, + "lock file should exist while inside the critical section (success case)", + ) + return "ok" + }) + assert.equal(result, "ok", "withSessionLock should forward fn's resolved value") + assert.equal( + existsSync(lockPath), + false, + "lock file should be removed after withSessionLock resolves (success)", + ) + + // --- Error path: fn throws, finally must STILL release --- + await assert.rejects( + withSessionLock(sessionId, async () => { + assert.equal( + existsSync(lockPath), + true, + "lock file should exist while inside the critical section (throw case)", + ) + throw new Error("boom from critical section") + }), + /boom from critical section/, + "withSessionLock should re-throw fn's error", + ) + assert.equal( + existsSync(lockPath), + false, + "lock file should be removed after withSessionLock throws", + ) + + // Cleanup verified twice in a row — proves a follow-up acquire can take + // ownership immediately because the previous handle was released. + const reacquired = await acquireSessionLock(sessionId) + await releaseSessionLock(reacquired) + assert.equal( + existsSync(lockPath), + false, + "lock file should be removed after the follow-up acquire/release cycle", + ) + }, +) diff --git a/tests/message-ids.test.ts b/tests/message-ids.test.ts index f128b766..9caf07bf 100644 --- a/tests/message-ids.test.ts +++ b/tests/message-ids.test.ts @@ -58,32 +58,34 @@ function buildCompactedMessages(sessionID: string): WithParts[] { ] } -test("checkSession resets message id aliases after native compaction", async () => { +test("checkSession preserves message id aliases after native compaction", async () => { const sessionID = `ses_message_ids_after_compaction_${Date.now()}` const messages = buildCompactedMessages(sessionID) const state = createSessionState() const logger = new Logger(false) state.sessionId = sessionID - state.messageIds.byRawId.set("old-message-9998", "m9998") - state.messageIds.byRawId.set("old-message-9999", "m9999") - state.messageIds.byRef.set("m9998", "old-message-9998") - state.messageIds.byRef.set("m9999", "old-message-9999") - state.messageIds.nextRef = 9999 + state.messageIds.byRawId.set("old-message-1", "m0001") + state.messageIds.byRawId.set("old-message-2", "m0002") + state.messageIds.byRef.set("m0001", "old-message-1") + state.messageIds.byRef.set("m0002", "old-message-2") + state.messageIds.nextRef = 3 await checkSession({} as any, state, logger, messages, false) assert.equal(state.lastCompaction, 2) - assert.equal(state.messageIds.byRawId.size, 0) - assert.equal(state.messageIds.byRef.size, 0) - assert.equal(state.messageIds.nextRef, 1) + assert.equal(state.messageIds.byRawId.get("old-message-1"), "m0001") + assert.equal(state.messageIds.byRawId.get("old-message-2"), "m0002") + assert.equal(state.messageIds.byRef.get("m0001"), "old-message-1") + assert.equal(state.messageIds.byRef.get("m0002"), "old-message-2") + assert.equal(state.messageIds.nextRef, 3) const assigned = assignMessageRefs(state, messages) assert.equal(assigned, 2) - assert.equal(state.messageIds.byRawId.get("msg-assistant-summary"), "m0001") - assert.equal(state.messageIds.byRawId.get("msg-user-follow-up"), "m0002") - assert.equal(state.messageIds.byRef.get("m0001"), "msg-assistant-summary") - assert.equal(state.messageIds.byRef.get("m0002"), "msg-user-follow-up") - assert.equal(state.messageIds.nextRef, 3) + assert.equal(state.messageIds.byRawId.get("msg-assistant-summary"), "m0003") + assert.equal(state.messageIds.byRawId.get("msg-user-follow-up"), "m0004") + assert.equal(state.messageIds.byRef.get("m0003"), "msg-assistant-summary") + assert.equal(state.messageIds.byRef.get("m0004"), "msg-user-follow-up") + assert.equal(state.messageIds.nextRef, 5) }) diff --git a/tests/prune-tool-metadata.test.ts b/tests/prune-tool-metadata.test.ts new file mode 100644 index 00000000..ebc6e76b --- /dev/null +++ b/tests/prune-tool-metadata.test.ts @@ -0,0 +1,111 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { prune } from "../lib/messages/prune" +import { createSessionState, type WithParts } from "../lib/state" +import type { PluginConfig } from "../lib/config" +import { Logger } from "../lib/logger" + +function buildConfig(): PluginConfig { + return { + enabled: true, + debug: false, + pruneNotification: "off", + pruneNotificationType: "chat", + commands: { + enabled: true, + protectedTools: [], + }, + manualMode: { + enabled: false, + automaticStrategies: true, + }, + turnProtection: { + enabled: false, + turns: 4, + }, + experimental: { + allowSubAgents: false, + customPrompts: false, + }, + protectedFilePatterns: [], + compress: { + mode: "range", + permission: "allow", + showCompression: false, + maxContextLimit: 150000, + minContextLimit: 50000, + nudgeFrequency: 5, + iterationNudgeThreshold: 15, + nudgeForce: "soft", + protectedTools: [], + protectUserMessages: false, + }, + strategies: { + deduplication: { + enabled: true, + protectedTools: [], + }, + purgeErrors: { + enabled: true, + turns: 4, + protectedTools: [], + }, + }, + } +} + +function buildMessages(): WithParts[] { + return [ + { + info: { + id: "msg-assistant-edit", + role: "assistant", + sessionID: "ses-prune-metadata", + agent: "assistant", + time: { created: 1 }, + } as WithParts["info"], + parts: [ + { + id: "part-edit", + messageID: "msg-assistant-edit", + sessionID: "ses-prune-metadata", + type: "tool" as const, + tool: "edit", + callID: "call-edit-1", + state: { + status: "completed" as const, + input: { filePath: "demo.ts" }, + output: "edited", + metadata: { + filediff: { + before: "before snapshot", + after: "after snapshot", + patch: "@@ diff metadata @@", + }, + }, + }, + }, + ], + }, + ] +} + +test("file snapshot metadata is pruned only for pruned edit or write tools", () => { + const state = createSessionState() + const logger = new Logger(false) + const config = buildConfig() + const messages = buildMessages() + + prune(state, logger, config, messages) + + const filediff = (messages[0]?.parts[0] as any)?.state?.metadata?.filediff + assert.equal(filediff.before, "before snapshot") + assert.equal(filediff.after, "after snapshot") + + state.prune.tools.set("call-edit-1", 1) + prune(state, logger, config, messages) + + assert.match(filediff.before, /full file snapshot removed/) + assert.match(filediff.after, /full file snapshot removed/) + assert.equal(filediff.patch, "@@ diff metadata @@") +}) diff --git a/tests/reasoning-strip.test.ts b/tests/reasoning-strip.test.ts new file mode 100644 index 00000000..d70af41a --- /dev/null +++ b/tests/reasoning-strip.test.ts @@ -0,0 +1,144 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { stripStaleMetadata } from "../lib/messages/reasoning-strip" +import type { WithParts } from "../lib/state" + +const SESSION_ID = "ses-reasoning-strip" + +type MessagePart = WithParts["parts"][number] +type ReasoningPart = Extract +type TextPart = Extract +type ToolPart = Extract + +function userMessage(modelID = "claude-sonnet-4-5", providerID = "anthropic"): WithParts { + return { + info: { + id: "msg-user", + sessionID: SESSION_ID, + role: "user", + time: { created: 2 }, + agent: "assistant", + model: { providerID, modelID }, + }, + parts: [textPart("part-user", "msg-user", "next turn")], + } +} + +function assistantMessage( + parts: MessagePart[], + modelID = "claude-opus-4-7", + providerID = "anthropic", +): WithParts { + return { + info: { + id: "msg-assistant", + sessionID: SESSION_ID, + role: "assistant", + time: { created: 1 }, + parentID: "msg-parent", + modelID, + providerID, + mode: "build", + agent: "assistant", + path: { cwd: "/tmp/project", root: "/tmp/project" }, + cost: 0, + tokens: { + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + }, + parts, + } +} + +function textPart( + id: string, + messageID: string, + text: string, + metadata?: Record, +): TextPart { + return { + id, + sessionID: SESSION_ID, + messageID, + type: "text", + text, + ...(metadata ? { metadata } : {}), + } +} + +function reasoningPart(text: string, metadata?: Record): ReasoningPart { + return { + id: "part-reasoning", + sessionID: SESSION_ID, + messageID: "msg-assistant", + type: "reasoning", + text, + ...(metadata ? { metadata } : {}), + time: { start: 1, end: 2 }, + } +} + +function toolPart(metadata?: Record): ToolPart { + return { + id: "part-tool", + sessionID: SESSION_ID, + messageID: "msg-assistant", + type: "tool", + callID: "call-tool", + tool: "bash", + state: { + status: "completed", + input: {}, + output: "ok", + title: "bash", + metadata: {}, + time: { start: 1, end: 2 }, + }, + ...(metadata ? { metadata } : {}), + } +} + +test("reasoning.metadata.anthropic.signature preserved when model differs", () => { + const metadata = { anthropic: { signature: "sig123" } } + const messages = [assistantMessage([reasoningPart("thinking", metadata)]), userMessage()] + + stripStaleMetadata(messages) + + assert.deepEqual(messages[0].parts[0], reasoningPart("thinking", metadata)) +}) + +test("text/tool metadata still stripped when model differs", () => { + const messages = [ + assistantMessage([ + textPart("part-text", "msg-assistant", "hello", { provider: "stale" }), + toolPart({ provider: "stale" }), + ]), + userMessage(), + ] + + stripStaleMetadata(messages) + + assert.equal("metadata" in messages[0].parts[0], false) + assert.equal("metadata" in messages[0].parts[1], false) +}) + +test("reasoning with empty text remains untouched", () => { + const part = reasoningPart("", { anthropic: { signature: "sig-empty" } }) + const messages = [assistantMessage([part]), userMessage()] + + stripStaleMetadata(messages) + + assert.strictEqual(messages[0].parts[0], part) +}) + +test("sameModel: reasoning untouched", () => { + const part = reasoningPart("thinking", { anthropic: { signature: "sig-same" } }) + const messages = [assistantMessage([part], "claude-sonnet-4-5"), userMessage()] + + stripStaleMetadata(messages) + + assert.strictEqual(messages[0].parts[0], part) +}) diff --git a/tests/snowball-regression.test.ts b/tests/snowball-regression.test.ts new file mode 100644 index 00000000..f93e0cd5 --- /dev/null +++ b/tests/snowball-regression.test.ts @@ -0,0 +1,202 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { join } from "node:path" +import { tmpdir } from "node:os" +import { mkdirSync } from "node:fs" + +import { createSessionState } from "../lib/state" +import { + allocateRunId, + applyCompressionState, + COMPRESSED_BLOCK_HEADER, + reserveBlockIds, + wrapCompressedSummary, +} from "../lib/compress/state" +import { renderBlockForContext, type BlockLike } from "../lib/compress/renderer" +import { getActiveSummaryTokenUsage } from "../lib/state/utils" +import { countTokens } from "../lib/token-utils" +import type { SelectionResolution } from "../lib/compress/types" + +const testDataHome = join(tmpdir(), `opencode-dcp-snowball-tests-${process.pid}`) +const testConfigHome = join(tmpdir(), `opencode-dcp-snowball-config-tests-${process.pid}`) + +process.env.XDG_DATA_HOME = testDataHome +process.env.XDG_CONFIG_HOME = testConfigHome + +mkdirSync(testDataHome, { recursive: true }) +mkdirSync(testConfigHome, { recursive: true }) + +/** + * Extract the inner body of a stored block.summary, stripping the standard + * header and dcp-message-id footer. Mirrors the private extractBlockBody + * helper inside lib/compress/state.ts so the test can simulate a "snowball" + * model summary that embeds the prior block's body verbatim. + */ +function extractInnerBody(blockSummary: string, blockId: number): string { + const header = `${COMPRESSED_BLOCK_HEADER}\n` + const footer = `\nb${blockId}` + let body = blockSummary + if (body.startsWith(header)) body = body.slice(header.length) + else if (body.startsWith(COMPRESSED_BLOCK_HEADER)) body = body.slice(COMPRESSED_BLOCK_HEADER.length) + if (body.endsWith(footer)) body = body.slice(0, -footer.length) + return body.trim() +} + +function buildSelection(messageId: string, rawIndex: number, requiredBlockIds: number[]): SelectionResolution { + return { + startReference: { kind: "message", rawIndex, messageId }, + endReference: { kind: "message", rawIndex, messageId }, + messageIds: [messageId], + messageTokenById: new Map([[messageId, 100]]), + toolIds: [], + requiredBlockIds, + } +} + +/** + * Drive one sequential compress round, mirroring the wrap → render → apply + * pipeline that lib/compress/range.ts executes for real range compressions. + * + * Each round constructs a modelSummary that VERBATIM embeds the inner body of + * the prior block. wrapCompressedSummary's exact-substring dedup then collapses + * that body to a `(bN)` placeholder, which is the snowball fix under test: + * stored block summaries stay compact even when the model writes verbose + * inlined-prior-content summaries. + */ +function runCompressRound( + state: ReturnType, + roundNumber: number, + consumedBlockIds: number[], +): number { + const [blockId] = reserveBlockIds(state, 1) + if (blockId === undefined) { + throw new Error("Failed to reserve block id") + } + + // Substantial unique content per round so summaryTokens are nontrivial and + // the rendered expansion is materially larger than the compact stored form. + const newContent = `Round ${roundNumber} fresh investigation notes about subsystem ${roundNumber}, with findings, hypotheses, and follow-up actions repeated for token weight. `.repeat(8) + + let modelSummary = `Round ${roundNumber}: ${newContent}` + for (const consumedId of consumedBlockIds) { + const consumedBlock = state.prune.messages.blocksById.get(consumedId) + if (!consumedBlock) continue + const priorBody = extractInnerBody(consumedBlock.summary, consumedId) + // Embedding the prior body verbatim is the snowball pattern; the dedup + // pass inside wrapCompressedSummary should collapse it to `(bN)`. + modelSummary += `\n\nPrior context:\n${priorBody}` + } + + const consumedBlocks = consumedBlockIds + .map((id) => state.prune.messages.blocksById.get(id)) + .filter((b): b is NonNullable => b !== undefined) + .map((b) => ({ id: b.blockId, summary: b.summary, schemaVersion: b.schemaVersion })) + + const wrapResult = wrapCompressedSummary({ + blockId, + modelSummary, + consumedBlocks, + blocksById: state.prune.messages.blocksById, + mode: "range", + }) + + // Render against a draft Map (Phase 0 Contract A): never mutate + // state.prune.messages.blocksById before validation succeeds. + const draftBlock: BlockLike & { summaryTokens: number } = wrapResult.draftBlock + const draftBlocksById = new Map(state.prune.messages.blocksById) + draftBlocksById.set(blockId, draftBlock) + const { renderedTokens } = renderBlockForContext(blockId, draftBlocksById) + draftBlock.summaryTokens = renderedTokens + + const messageId = `msg-r${roundNumber}` + const selection = buildSelection(messageId, roundNumber, consumedBlockIds) + const anchorMessageId = messageId + const runId = allocateRunId(state) + + applyCompressionState( + state, + { + topic: `Round ${roundNumber}`, + batchTopic: `Round ${roundNumber}`, + startId: `m000${roundNumber}`, + endId: `m000${roundNumber}`, + mode: "range", + runId, + compressMessageId: `compress-msg-${roundNumber}`, + compressCallId: `compress-call-${roundNumber}`, + summaryTokens: renderedTokens, + refBlockIds: wrapResult.refBlockIds, + }, + selection, + anchorMessageId, + blockId, + wrapResult.storedSummary, + consumedBlockIds, + ) + + return blockId +} + +test("snowball regression: 5 sequential compresses keep stored summaries compact", () => { + const state = createSessionState() + state.sessionId = "ses_snowball_regression" + + const ids: number[] = [] + // Round 1: no prior block to consume. + ids.push(runCompressRound(state, 1, [])) + // Rounds 2–5: each consumes its immediate predecessor, so only the latest + // block remains active. The model summary embeds the prior body verbatim; + // wrapCompressedSummary must dedup it to `(bN)` for the chain to stay + // bounded instead of doubling each round (the snowball symptom). + for (let round = 2; round <= 5; round++) { + ids.push(runCompressRound(state, round, [ids[round - 2]])) + } + + const block1 = state.prune.messages.blocksById.get(ids[0]) + const block4 = state.prune.messages.blocksById.get(ids[3]) + const block5 = state.prune.messages.blocksById.get(ids[4]) + assert.ok(block1 && block4 && block5, "expected all 5 blocks to exist") + + // Assertion 1 — bounded growth: if dedup is working, block5 stores roughly + // the same compact body as block4 (each round adds only a fixed + // "Prior context:\n(bN)" suffix). The 1.5x bound is a generous guard + // against any future regression where prior bodies get re-inlined. + assert.ok( + block5.summary.length < block4.summary.length * 1.5, + `block5.summary.length (${block5.summary.length}) must be < 1.5x block4.summary.length (${block4.summary.length}) to prove no snowball`, + ) + + // Assertion 2 — structural reference: block5 must record block4 in + // refBlockIds so renderBlockForContext can expand `(b4)` at read time. + assert.ok( + Array.isArray(block5.refBlockIds) && block5.refBlockIds.includes(block4.blockId), + `block5.refBlockIds must include block4.id (${block4.blockId}); got ${JSON.stringify(block5.refBlockIds)}`, + ) + assert.equal( + block5.schemaVersion, + 2, + "block5 must be persisted as schemaVersion 2 (refBlockIds-aware)", + ) + + // Assertion 3 — rendered expansion: getActiveSummaryTokenUsage sums + // summaryTokens (rendered token counts for v2 blocks). With block5 active + // and blocks 1–4 consumed, the rendered expansion of block5 walks the full + // (b4)→(b3)→(b2)→(b1) chain and produces materially more tokens than the + // compact stored summary's own token count. + const activeRenderedTokens = getActiveSummaryTokenUsage(state) + const activeCompactTokenSum = Array.from(state.prune.messages.activeBlockIds).reduce( + (sum, blockId) => { + const block = state.prune.messages.blocksById.get(blockId) + return sum + (block ? countTokens(block.summary) : 0) + }, + 0, + ) + assert.ok( + activeRenderedTokens > activeCompactTokenSum, + `getActiveSummaryTokenUsage (${activeRenderedTokens}) must exceed compact summary token sum (${activeCompactTokenSum}) to prove rendered expansion is in effect`, + ) + + // Sanity: only block5 should be active after the consume chain. + assert.equal(state.prune.messages.activeBlockIds.size, 1) + assert.ok(state.prune.messages.activeBlockIds.has(block5.blockId)) +}) diff --git a/tests/structural-refs.test.ts b/tests/structural-refs.test.ts new file mode 100644 index 00000000..45e35618 --- /dev/null +++ b/tests/structural-refs.test.ts @@ -0,0 +1,161 @@ +import assert from "node:assert/strict" +import test from "node:test" + +import { + extractBlockPlaceholders, + formatBlockPlaceholder, + parseBlockPlaceholder, +} from "../lib/message-ids" +import { DAGValidationError, validateBlockRefs } from "../lib/compress/dag" +import { renderBlockForContext, type BlockLike } from "../lib/compress/renderer" + +test("formatBlockPlaceholder uses parenthesized form", () => { + assert.equal(formatBlockPlaceholder(7), "(b7)") +}) + +test("parseBlockPlaceholder accepts only parenthesized refs", () => { + assert.equal(parseBlockPlaceholder("(b7)"), 7) + assert.equal(parseBlockPlaceholder("b7"), null) +}) + +test("extractBlockPlaceholders returns block ids in order", () => { + assert.deepEqual(extractBlockPlaceholders("see (b1) and (b3)"), [1, 3]) +}) + +test("validateBlockRefs throws on self reference", () => { + assert.throws( + () => validateBlockRefs(3, [3], new Map()), + (err: unknown) => err instanceof DAGValidationError && /self-ref/.test(String(err.message)), + ) +}) + +test("validateBlockRefs throws on forward reference", () => { + assert.throws( + () => validateBlockRefs(3, [4], new Map()), + (err: unknown) => err instanceof DAGValidationError && /forward-ref/.test(String(err.message)), + ) +}) + +test("renderBlockForContext expands a linear chain once", () => { + const blocks = new Map([ + [1, { summary: "A", refBlockIds: [] }], + [2, { summary: "(b1) B", refBlockIds: [1] }], + [3, { summary: "(b2) C", refBlockIds: [2] }], + ]) + + const result = renderBlockForContext(3, blocks) + + assert.equal(result.text, "A B C") +}) + +test("renderBlockForContext dedups diamonds with already expanded marker", () => { + const blocks = new Map([ + [1, { summary: "X", refBlockIds: [] }], + [2, { summary: "(b1) D", refBlockIds: [1] }], + [3, { summary: "(b1) E", refBlockIds: [1] }], + [4, { summary: "(b2) + (b3)", refBlockIds: [2, 3] }], + ]) + + const result = renderBlockForContext(4, blocks) + + assert.equal(result.text, "X D + (b1) [already expanded above] E") +}) + +test("renderBlockForContext returns legacy v1 summaries verbatim", () => { + const blocks = new Map([[1, { summary: "legacy (b9) text" }]]) + + const result = renderBlockForContext(1, blocks) + + assert.equal(result.text, "legacy (b9) text") +}) + +test("renderBlockForContext marks missing blocks", () => { + const blocks = new Map() + + const result = renderBlockForContext(42, blocks) + + assert.equal(result.text, "(b42) [not found]") +}) + +test("renderBlockForContext does NOT expand prose (bN) absent from refBlockIds", () => { + // Regression: v2 block with refBlockIds=[] containing prose (b27) mentions. + // Previously the renderer regex-scanned summary text and expanded ANY (bN), + // inflating rendered output (see ses_1d3a77f50ffeaS1NcyP2XlA9lB B28/B29). + const blocks = new Map([ + [27, { summary: "FULL CONTENT OF B27", refBlockIds: [] }], + [28, { + summary: "prompt includes full trap packet decode (b27); also see (b27) again", + refBlockIds: [], + }], + ]) + + const result = renderBlockForContext(28, blocks) + + // Prose (b27) stays literal, FULL CONTENT OF B27 is NOT inlined + assert.equal( + result.text, + "prompt includes full trap packet decode (b27); also see (b27) again", + ) + assert.ok(!result.text.includes("FULL CONTENT OF B27")) +}) + +test("renderBlockForContext expands refs in refBlockIds even with prose mentions", () => { + // Counterpoint: when refBlockIds DOES contain the id, all (bN) in summary expand. + const blocks = new Map([ + [1, { summary: "CHILD", refBlockIds: [] }], + [2, { + summary: "sees (b1) once and (b1) twice", + refBlockIds: [1], + }], + ]) + + const result = renderBlockForContext(2, blocks) + + assert.equal(result.text, "sees CHILD once and CHILD twice") +}) + +test("renderBlockForContext mixes allowlisted and prose refs in same summary", () => { + // Oracle test case: '(b1) plus prose (b2)' with refBlockIds=[1] → b1 expands, b2 stays literal + const blocks = new Map([ + [1, { summary: "CHILD1", refBlockIds: [] }], + [2, { summary: "CHILD2", refBlockIds: [] }], + [3, { + summary: "(b1) plus prose (b2)", + refBlockIds: [1], + }], + ]) + + const result = renderBlockForContext(3, blocks) + + assert.equal(result.text, "CHILD1 plus prose (b2)") +}) + +test("renderBlockForContext absent refBlockIds do not poison renderedOnce", () => { + // Oracle regression: refBlockIds contains an id whose (bN) is absent from summary. + // The earlier draft fix would have called renderInner on b2 (no-op split), + // mutating renderedOnce. Then a sibling that legitimately references b2 would + // get '(b2) [already expanded above]' instead of CHILD2. + const blocks = new Map([ + [1, { summary: "CHILD1", refBlockIds: [] }], + [2, { summary: "CHILD2", refBlockIds: [] }], + [3, { + summary: "(b1) only — but lists b2 in refs too", + refBlockIds: [1, 2], + }], + [4, { summary: "(b2)", refBlockIds: [2] }], + [5, { summary: "(b3) then (b4)", refBlockIds: [3, 4] }], + ]) + + const result = renderBlockForContext(5, blocks) + + // b3 expands to 'CHILD1 only — but lists b2 in refs too' (b2 not in text) + // b4 expands to 'CHILD2' — must NOT show 'already expanded above' + assert.ok( + result.text.includes("CHILD2"), + `expected CHILD2 in output, got: ${result.text}`, + ) + assert.ok( + !result.text.includes("[already expanded above]"), + `b2 should not be marked already-expanded, got: ${result.text}`, + ) +}) diff --git a/tests/summary-limits.test.ts b/tests/summary-limits.test.ts new file mode 100644 index 00000000..1b495719 --- /dev/null +++ b/tests/summary-limits.test.ts @@ -0,0 +1,62 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { estimateSelectedTokens } from "../lib/compress/summary-limits" +import { createSessionState, type CompressionBlock } from "../lib/state" +import type { SelectionResolution } from "../lib/compress/types" + +function createSelection(): SelectionResolution { + return { + startReference: { + kind: "message", + rawIndex: 0, + messageId: "msg-a", + }, + endReference: { + kind: "message", + rawIndex: 1, + messageId: "msg-b", + }, + messageIds: ["msg-a", "msg-b"], + messageTokenById: new Map([ + ["msg-a", 10_000], + ["msg-b", 50], + ]), + toolIds: [], + requiredBlockIds: [1], + } +} + +function createBlock(): CompressionBlock { + return { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 10_000, + summaryTokens: 100, + durationMs: 0, + mode: "range", + topic: "Existing block", + batchTopic: "Existing block", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "msg-compress", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: ["msg-a"], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "existing compressed summary", + } +} + +test("selected token estimate counts consumed blocks as current summaries", () => { + const state = createSessionState() + state.prune.messages.blocksById.set(1, createBlock()) + + assert.equal(estimateSelectedTokens(state, createSelection(), [1]), 150) +}) diff --git a/tests/token-accounting.test.ts b/tests/token-accounting.test.ts new file mode 100644 index 00000000..16edd1d6 --- /dev/null +++ b/tests/token-accounting.test.ts @@ -0,0 +1,154 @@ +import assert from "node:assert/strict" +import test from "node:test" + +import { renderBlockForContext } from "../lib/compress/renderer" +import { assertUsefulCompressedSummary } from "../lib/compress/summary-limits" +import { createSessionState, type CompressionBlock } from "../lib/state" +import { getActiveSummaryTokenUsage } from "../lib/state/utils" +import { countTokens } from "../lib/token-utils" + +test("v2 blocks use rendered token counts", () => { + const blocks = new Map([ + [ + 1, + { + blockId: 1, + summary: Array.from({ length: 20 }, (_, index) => `token${index + 1}`).join(" "), + refBlockIds: [], + schemaVersion: 2, + } as CompressionBlock, + ], + [ + 2, + { + blockId: 2, + summary: "(b1)", + refBlockIds: [1], + schemaVersion: 2, + } as CompressionBlock, + ], + ]) + + const { text, renderedTokens } = renderBlockForContext(2, blocks) + const compactSummary = blocks.get(2)?.summary ?? "" + + assert.ok(text.includes("token1 token2 token3")) + assert.ok(renderedTokens > compactSummary.length) +}) + +test("legacy v1 blocks keep stored summaryTokens unchanged", () => { + const blocks = new Map([ + [ + 1, + { + blockId: 1, + summary: "compact legacy summary", + summaryTokens: 7, + } as CompressionBlock, + ], + ]) + + const { renderedTokens } = renderBlockForContext(1, blocks) + + assert.equal(renderedTokens, countTokens("compact legacy summary")) + assert.equal(blocks.get(1)?.summaryTokens, 7) +}) + +test("assertUsefulCompressedSummary accepts a much smaller summary", () => { + assert.doesNotThrow(() => { + assertUsefulCompressedSummary(120, 20_000) + }) +}) + +test("assertUsefulCompressedSummary rejects summaries that are not smaller", () => { + assert.throws(() => { + assertUsefulCompressedSummary(10_000, 10_000) + }, /not smaller than the selected content/) +}) + +test("getActiveSummaryTokenUsage sums active block summaryTokens", () => { + const state = createSessionState() + state.prune.messages.blocksById.set( + 1, + { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 111, + durationMs: 0, + topic: "one", + startId: "m1", + endId: "m2", + anchorMessageId: "", + compressMessageId: "", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: [], + effectiveToolIds: [], + createdAt: 1, + summary: "summary one", + } as CompressionBlock, + ) + state.prune.messages.blocksById.set( + 2, + { + blockId: 2, + runId: 2, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 222, + durationMs: 0, + topic: "two", + startId: "m3", + endId: "m4", + anchorMessageId: "", + compressMessageId: "", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: [], + effectiveToolIds: [], + createdAt: 2, + summary: "summary two", + } as CompressionBlock, + ) + state.prune.messages.blocksById.set( + 3, + { + blockId: 3, + runId: 3, + active: false, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 999, + durationMs: 0, + topic: "inactive", + startId: "m5", + endId: "m6", + anchorMessageId: "", + compressMessageId: "", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: [], + effectiveToolIds: [], + createdAt: 3, + summary: "summary three", + } as CompressionBlock, + ) + state.prune.messages.activeBlockIds.add(1) + state.prune.messages.activeBlockIds.add(2) + state.prune.messages.activeBlockIds.add(3) + + assert.equal(getActiveSummaryTokenUsage(state), 333) +}) diff --git a/tests/token-usage.test.ts b/tests/token-usage.test.ts index 549edeae..cdc8cf6e 100644 --- a/tests/token-usage.test.ts +++ b/tests/token-usage.test.ts @@ -2,7 +2,7 @@ import assert from "node:assert/strict" import test from "node:test" import type { PluginConfig } from "../lib/config" import { isContextOverLimits } from "../lib/messages/inject/utils" -import { wrapCompressedSummary } from "../lib/compress/state" +import { wrapBlockSummary } from "../lib/compress/state" import { createSessionState, type WithParts } from "../lib/state" import type { CompressionBlock } from "../lib/state" import { getCurrentTokenUsage } from "../lib/token-utils" @@ -252,7 +252,7 @@ test("isContextOverLimits extends the max threshold by active summary tokens", ( const state = createSessionState() state.lastCompaction = 2 - const storedSummary = wrapCompressedSummary(7, repeatedWord("summary", 120)) + const storedSummary = wrapBlockSummary(7, repeatedWord("summary", 120)) state.prune.messages.blocksById.set(7, createActiveBlock(7, storedSummary, 1000)) state.prune.messages.activeBlockIds.add(7) @@ -286,7 +286,7 @@ test("isContextOverLimits does not extend the max threshold when summaryBuffer i const state = createSessionState() state.lastCompaction = 2 - const storedSummary = wrapCompressedSummary(7, repeatedWord("summary", 120)) + const storedSummary = wrapBlockSummary(7, repeatedWord("summary", 120)) state.prune.messages.blocksById.set(7, createActiveBlock(7, storedSummary, 1000)) state.prune.messages.activeBlockIds.add(7)