diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 4974ee16d..d05d02a9d 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -589,9 +589,7 @@ export function createMemoryCore( if (nowMs - lastDirtyClosedScan < 30_000) return; lastDirtyClosedScan = nowMs; try { - const dirtyClosed = handle.repos.episodes - .list({ status: "closed", limit: 500 }) - .filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep)); + const dirtyClosed = collectDirtyClosedEpisodes(); if (dirtyClosed.length > 0) { await recoverDirtyClosedEpisodes(dirtyClosed); } @@ -914,9 +912,7 @@ export function createMemoryCore( await recoverOpenEpisodesAsSessionEnd(stale); } } - const dirtyClosed = handle.repos.episodes - .list({ status: "closed", limit: 500 }) - .filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep)); + const dirtyClosed = collectDirtyClosedEpisodes(); if (dirtyClosed.length > 0) { await recoverDirtyClosedEpisodes(dirtyClosed); } @@ -1175,7 +1171,15 @@ export function createMemoryCore( continue; } - const snapshot = snapshotFromRecoveredEpisode(ep, endedAt); + // Pre-stamp before emitting finalized: if the watchdog fires mid-scoring, + // the next startup's condition-4 check will see DIRTY_REWARD_RESCORE and + // skip this episode rather than looping indefinitely. + handle.repos.episodes.updateMeta(episodeId, { + recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE, + }); + const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, { + recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE, + }); debugStartupRecovery("H3", "startup_recovery_emit_finalized", { episodeId, sessionId: ep.sessionId, @@ -1241,6 +1245,7 @@ export function createMemoryCore( episodes: Array }>, ): Promise { log.info("init.dirty_closed_episodes.rescore", { count: episodes.length }); + const rescored: EpisodeId[] = []; for (const ep of episodes) { if (isLightweightEpisode(ep)) continue; const episodeId = ep.id as EpisodeId; @@ -1249,6 +1254,7 @@ export function createMemoryCore( closeReason: "finalized", recoveredAtStartup: endedAt, recoveryReason: "dirty_reward_rescore", + rewardDirty: undefined, }); const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, { recoveryReason: "dirty_reward_rescore", @@ -1258,10 +1264,36 @@ export function createMemoryCore( episode: snapshot, closedBy: "finalized", }); + rescored.push(episodeId); + } + // Drain the capture pass (patches reflections + α onto existing traces). + await handle.flush(); + // In lightweight mode flush() returns before draining the reward + // subscriber. Explicitly run reward for any episode whose trace count + // still mismatches — mirrors the pattern in recoverOpenEpisodesAsSessionEnd. + for (const episodeId of rescored) { + if (episodeRewardIsDirty(handle.repos.episodes.getById(episodeId) ?? {} as never)) { + await handle.rewardRunner.run({ episodeId, feedback: [], trigger: "manual" }); + } } await handle.flush(); } + function collectDirtyClosedEpisodes(): (EpisodeRow & { meta?: Record })[] { + const dirty: (EpisodeRow & { meta?: Record })[] = []; + let offset = 0; + const pageSize = 500; + while (true) { + const page = handle.repos.episodes.list({ status: "closed", limit: pageSize, offset }); + for (const ep of page) { + if (episodeRewardIsDirty(ep)) dirty.push(ep); + } + if (page.length < pageSize) break; + offset += pageSize; + } + return dirty; + } + function episodeRewardIsDirty(ep: EpisodeRow & { meta?: Record }): boolean { const meta = ep.meta ?? {}; if (meta.lightweightMemory === true) return false; @@ -1274,7 +1306,14 @@ export function createMemoryCore( if ( ep.rTask == null && (ep.traceIds?.length ?? 0) > 0 && - (meta.closeReason === "finalized" || meta.recoveryReason === "missed_session_end") + // Episodes already attempted by a recovery path carry recoveryReason "dirty_reward_rescore". + // Excluding them prevents a crash-respawn loop when the watchdog fires + // mid-scoring and leaves rTask null: without this guard the next startup + // would re-pick the episode via closeReason="finalized" indefinitely. + meta.recoveryReason !== "dirty_reward_rescore" && + (meta.closeReason === "finalized" || + meta.closeReason === "abandoned" || + meta.recoveryReason === "missed_session_end") ) { return true; } diff --git a/apps/memos-local-plugin/core/reward/reward.ts b/apps/memos-local-plugin/core/reward/reward.ts index 1b6d3354c..d2247932c 100644 --- a/apps/memos-local-plugin/core/reward/reward.ts +++ b/apps/memos-local-plugin/core/reward/reward.ts @@ -119,6 +119,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner { try { const existingMeta = episode.meta ?? {}; const wasFinalized = existingMeta.closeReason === "finalized"; + deps.episodesRepo.setRTask(input.episodeId, 0); deps.episodesRepo.updateMeta(input.episodeId, { ...(wasFinalized ? {} : { closeReason: "abandoned", abandonReason: skipReason }), reward: { @@ -128,6 +129,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner { trigger: input.trigger, skipped: true, }, + rewardDirty: undefined, }); } catch (err) { warnings.push({ diff --git a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts index 88d5cbbd4..5d6f3237d 100644 --- a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts +++ b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts @@ -1456,4 +1456,189 @@ algorithm: expect(meta.reward?.traceCount).toBe(1); expect(meta.reward?.traceIds).toEqual(["tr_missing_reward"]); }); + + it("dirty-reward recovery does not insert orphan traces (regression: rescore loop guard)", async () => { + // Regression test for the rescore loop: + // When recoverDirtyClosedEpisodes re-emits episode.finalized, capture's + // runReflect used to insert new trace rows for "orphan steps" — steps + // whose timestamps didn't match any existing DB row. For recovered + // episodes this happens whenever a trace has tool calls with endedAt + // timestamps different from the trace's own ts, because the snapshot + // rebuilds a separate tool-role turn for each call. + // + // Without the guard the orphan insert grows trace_ids_json, keeping + // reward.traceCount != traceIds.length forever and looping on every + // bridge restart. The guard (meta.recoveryReason === "dirty_reward_rescore") + // skips the insert, so trace_ids_json stays stable and the episode + // stops appearing dirty after a single recovery pass. + + home = await makeTmpHome({ agent: "openclaw" }); + + const seeder = await bootstrapMemoryCore({ + agent: "openclaw", + home: home.home, + config: home.config, + pkgVersion: "rescore-loop-seed", + }); + await seeder.init(); + await seeder.shutdown(); + + const Sqlite = (await import("better-sqlite3")).default; + const writeDb = new Sqlite(home.home.dbFile); + const BASE = Date.now() - 5_000; + + writeDb + .prepare( + `INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`, + ) + .run("se_loop", "openclaw", BASE, BASE, "{}"); + + // Episode is dirty: traceCount=1 but trace_ids_json has 2 IDs. + writeDb + .prepare( + `INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, ?, ?, ?, 'closed', ?)`, + ) + .run( + "ep_loop", + "se_loop", + BASE, + BASE + 1, + JSON.stringify(["tr_loop_a", "tr_loop_b"]), + 0.5, + JSON.stringify({ + closeReason: "finalized", + reward: { rHuman: 0.5, scoredAt: BASE - 1000, traceCount: 1 }, + }), + ); + + // tr_loop_a: plain text trace — no orphan risk. + writeDb + .prepare( + `INSERT INTO traces ( + id, episode_id, session_id, ts, user_text, agent_text, summary, + tool_calls_json, reflection, agent_thinking, value, alpha, r_human, + priority, tags_json, error_signatures_json, vec_summary, vec_action, + share_scope, share_target, shared_at, turn_id, schema_version + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`, + ) + .run( + "tr_loop_a", + "ep_loop", + "se_loop", + BASE, + "帮我分析一下这段Python代码的性能瓶颈,并给出优化建议。", + "这段代码的主要性能问题在于嵌套循环,时间复杂度是O(n²),可以用哈希表将其优化到O(n)。", + "Python代码性能分析", + "[]", + null, + null, + 0, + 0, + null, + 0.5, + "[]", + "[]", + BASE, + 1, + ); + + // tr_loop_b: trace with a tool call whose endedAt differs from the trace ts. + // snapshotFromRecoveredEpisode creates a tool-role turn with ts=BASE+300, + // which does NOT appear in traceByTs (only BASE and BASE+100 are in the map). + // Without the guard this step is treated as an orphan and a new trace is + // inserted, growing trace_ids_json from 2 to 3 and keeping the episode dirty. + const toolCallWithDifferentTs = JSON.stringify([ + { + name: "bash", + input: { command: "python -c 'import cProfile; cProfile.run(\"main()\")'"}, + output: "ncalls tottime ... main 1 0.003", + endedAt: BASE + 300, + }, + ]); + writeDb + .prepare( + `INSERT INTO traces ( + id, episode_id, session_id, ts, user_text, agent_text, summary, + tool_calls_json, reflection, agent_thinking, value, alpha, r_human, + priority, tags_json, error_signatures_json, vec_summary, vec_action, + share_scope, share_target, shared_at, turn_id, schema_version + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`, + ) + .run( + "tr_loop_b", + "ep_loop", + "se_loop", + BASE + 100, + "请用cProfile验证一下", + "运行结果确认了瓶颈在内层循环,优化后耗时减少了约80%。", + "cProfile性能验证", + toolCallWithDifferentTs, + null, + null, + 0, + 0, + null, + 0.5, + "[]", + "[]", + BASE + 100, + 1, + ); + writeDb.close(); + + // First recovery: episode is dirty (traceCount=1 != ids_len=2). + core = await bootstrapMemoryCore({ + agent: "openclaw", + home: home.home, + config: home.config, + pkgVersion: "rescore-loop-recover-1", + }); + await core.init(); + await core.shutdown(); + core = null; + + const readDb1 = new Sqlite(home.home.dbFile, { readonly: true }); + const ep1 = readDb1 + .prepare("SELECT trace_ids_json, meta_json, r_task FROM episodes WHERE id = ?") + .get("ep_loop") as { trace_ids_json: string; meta_json: string; r_task: number | null } | undefined; + readDb1.close(); + + expect(ep1).toBeDefined(); + const ids1 = JSON.parse(ep1!.trace_ids_json) as string[]; + // Guard: no orphan trace was inserted during dirty-reward recovery. + expect(ids1.length).toBe(2); + const meta1 = JSON.parse(ep1!.meta_json) as { + recoveryReason?: string; + reward?: { traceCount?: number }; + }; + expect(meta1.recoveryReason).toBe(RECOVERY_REASONS.DIRTY_REWARD_RESCORE); + // After recovery traceCount matches ids_len: episode is no longer dirty. + expect(meta1.reward?.traceCount).toBe(2); + + // Second recovery (simulates next bridge restart): episode should not + // be re-scored because traceCount(2) == trace_ids_json.length(2). + core = await bootstrapMemoryCore({ + agent: "openclaw", + home: home.home, + config: home.config, + pkgVersion: "rescore-loop-recover-2", + }); + await core.init(); + + const readDb2 = new Sqlite(home.home.dbFile, { readonly: true }); + const ep2 = readDb2 + .prepare("SELECT trace_ids_json, meta_json FROM episodes WHERE id = ?") + .get("ep_loop") as { trace_ids_json: string; meta_json: string } | undefined; + readDb2.close(); + + expect(ep2).toBeDefined(); + const ids2 = JSON.parse(ep2!.trace_ids_json) as string[]; + // Still 2 — no new orphan inserts on the second restart. + expect(ids2.length).toBe(2); + const meta2 = JSON.parse(ep2!.meta_json) as { + reward?: { traceCount?: number }; + }; + // traceCount unchanged: the episode was not re-scored. + expect(meta2.reward?.traceCount).toBe(2); + }); });