Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,7 @@ export function createMemoryCore(
if (nowMs - lastDirtyClosedScan < 30_000) return;
lastDirtyClosedScan = nowMs;
try {
const dirtyClosed = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
const dirtyClosed = collectDirtyClosedEpisodes();
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
Expand Down Expand Up @@ -914,9 +912,7 @@ export function createMemoryCore(
await recoverOpenEpisodesAsSessionEnd(stale);
}
}
const dirtyClosed = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
const dirtyClosed = collectDirtyClosedEpisodes();
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
Expand Down Expand Up @@ -1175,7 +1171,15 @@ export function createMemoryCore(
continue;
}

const snapshot = snapshotFromRecoveredEpisode(ep, endedAt);
// Pre-stamp before emitting finalized: if the watchdog fires mid-scoring,
// the next startup's condition-4 check will see DIRTY_REWARD_RESCORE and
// skip this episode rather than looping indefinitely.
handle.repos.episodes.updateMeta(episodeId, {
recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE,
});
const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, {
recoveryReason: RECOVERY_REASONS.DIRTY_REWARD_RESCORE,
});
debugStartupRecovery("H3", "startup_recovery_emit_finalized", {
episodeId,
sessionId: ep.sessionId,
Expand Down Expand Up @@ -1241,6 +1245,7 @@ export function createMemoryCore(
episodes: Array<EpisodeRow & { meta?: Record<string, unknown> }>,
): Promise<void> {
log.info("init.dirty_closed_episodes.rescore", { count: episodes.length });
const rescored: EpisodeId[] = [];
for (const ep of episodes) {
if (isLightweightEpisode(ep)) continue;
const episodeId = ep.id as EpisodeId;
Expand All @@ -1249,6 +1254,7 @@ export function createMemoryCore(
closeReason: "finalized",
recoveredAtStartup: endedAt,
recoveryReason: "dirty_reward_rescore",
rewardDirty: undefined,
});
const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, {
recoveryReason: "dirty_reward_rescore",
Expand All @@ -1258,10 +1264,36 @@ export function createMemoryCore(
episode: snapshot,
closedBy: "finalized",
});
rescored.push(episodeId);
}
// Drain the capture pass (patches reflections + α onto existing traces).
await handle.flush();
// In lightweight mode flush() returns before draining the reward
// subscriber. Explicitly run reward for any episode whose trace count
// still mismatches — mirrors the pattern in recoverOpenEpisodesAsSessionEnd.
for (const episodeId of rescored) {
if (episodeRewardIsDirty(handle.repos.episodes.getById(episodeId) ?? {} as never)) {
await handle.rewardRunner.run({ episodeId, feedback: [], trigger: "manual" });
}
}
await handle.flush();
}

function collectDirtyClosedEpisodes(): (EpisodeRow & { meta?: Record<string, unknown> })[] {
const dirty: (EpisodeRow & { meta?: Record<string, unknown> })[] = [];
let offset = 0;
const pageSize = 500;
while (true) {
const page = handle.repos.episodes.list({ status: "closed", limit: pageSize, offset });
for (const ep of page) {
if (episodeRewardIsDirty(ep)) dirty.push(ep);
}
if (page.length < pageSize) break;
offset += pageSize;
}
return dirty;
}

function episodeRewardIsDirty(ep: EpisodeRow & { meta?: Record<string, unknown> }): boolean {
const meta = ep.meta ?? {};
if (meta.lightweightMemory === true) return false;
Expand All @@ -1274,7 +1306,14 @@ export function createMemoryCore(
if (
ep.rTask == null &&
(ep.traceIds?.length ?? 0) > 0 &&
(meta.closeReason === "finalized" || meta.recoveryReason === "missed_session_end")
// Episodes already attempted by a recovery path carry recoveryReason "dirty_reward_rescore".
// Excluding them prevents a crash-respawn loop when the watchdog fires
// mid-scoring and leaves rTask null: without this guard the next startup
// would re-pick the episode via closeReason="finalized" indefinitely.
meta.recoveryReason !== "dirty_reward_rescore" &&
(meta.closeReason === "finalized" ||
meta.closeReason === "abandoned" ||
meta.recoveryReason === "missed_session_end")
) {
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/core/reward/reward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner {
try {
const existingMeta = episode.meta ?? {};
const wasFinalized = existingMeta.closeReason === "finalized";
deps.episodesRepo.setRTask(input.episodeId, 0);
deps.episodesRepo.updateMeta(input.episodeId, {
...(wasFinalized ? {} : { closeReason: "abandoned", abandonReason: skipReason }),
reward: {
Expand All @@ -128,6 +129,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner {
trigger: input.trigger,
skipped: true,
},
rewardDirty: undefined,
});
} catch (err) {
warnings.push({
Expand Down
185 changes: 185 additions & 0 deletions apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1456,4 +1456,189 @@ algorithm:
expect(meta.reward?.traceCount).toBe(1);
expect(meta.reward?.traceIds).toEqual(["tr_missing_reward"]);
});

it("dirty-reward recovery does not insert orphan traces (regression: rescore loop guard)", async () => {
// Regression test for the rescore loop:
// When recoverDirtyClosedEpisodes re-emits episode.finalized, capture's
// runReflect used to insert new trace rows for "orphan steps" — steps
// whose timestamps didn't match any existing DB row. For recovered
// episodes this happens whenever a trace has tool calls with endedAt
// timestamps different from the trace's own ts, because the snapshot
// rebuilds a separate tool-role turn for each call.
//
// Without the guard the orphan insert grows trace_ids_json, keeping
// reward.traceCount != traceIds.length forever and looping on every
// bridge restart. The guard (meta.recoveryReason === "dirty_reward_rescore")
// skips the insert, so trace_ids_json stays stable and the episode
// stops appearing dirty after a single recovery pass.

home = await makeTmpHome({ agent: "openclaw" });

const seeder = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-seed",
});
await seeder.init();
await seeder.shutdown();

const Sqlite = (await import("better-sqlite3")).default;
const writeDb = new Sqlite(home.home.dbFile);
const BASE = Date.now() - 5_000;

writeDb
.prepare(
`INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`,
)
.run("se_loop", "openclaw", BASE, BASE, "{}");

// Episode is dirty: traceCount=1 but trace_ids_json has 2 IDs.
writeDb
.prepare(
`INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, ?, ?, ?, 'closed', ?)`,
)
.run(
"ep_loop",
"se_loop",
BASE,
BASE + 1,
JSON.stringify(["tr_loop_a", "tr_loop_b"]),
0.5,
JSON.stringify({
closeReason: "finalized",
reward: { rHuman: 0.5, scoredAt: BASE - 1000, traceCount: 1 },
}),
);

// tr_loop_a: plain text trace — no orphan risk.
writeDb
.prepare(
`INSERT INTO traces (
id, episode_id, session_id, ts, user_text, agent_text, summary,
tool_calls_json, reflection, agent_thinking, value, alpha, r_human,
priority, tags_json, error_signatures_json, vec_summary, vec_action,
share_scope, share_target, shared_at, turn_id, schema_version
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`,
)
.run(
"tr_loop_a",
"ep_loop",
"se_loop",
BASE,
"帮我分析一下这段Python代码的性能瓶颈,并给出优化建议。",
"这段代码的主要性能问题在于嵌套循环,时间复杂度是O(n²),可以用哈希表将其优化到O(n)。",
"Python代码性能分析",
"[]",
null,
null,
0,
0,
null,
0.5,
"[]",
"[]",
BASE,
1,
);

// tr_loop_b: trace with a tool call whose endedAt differs from the trace ts.
// snapshotFromRecoveredEpisode creates a tool-role turn with ts=BASE+300,
// which does NOT appear in traceByTs (only BASE and BASE+100 are in the map).
// Without the guard this step is treated as an orphan and a new trace is
// inserted, growing trace_ids_json from 2 to 3 and keeping the episode dirty.
const toolCallWithDifferentTs = JSON.stringify([
{
name: "bash",
input: { command: "python -c 'import cProfile; cProfile.run(\"main()\")'"},
output: "ncalls tottime ... main 1 0.003",
endedAt: BASE + 300,
},
]);
writeDb
.prepare(
`INSERT INTO traces (
id, episode_id, session_id, ts, user_text, agent_text, summary,
tool_calls_json, reflection, agent_thinking, value, alpha, r_human,
priority, tags_json, error_signatures_json, vec_summary, vec_action,
share_scope, share_target, shared_at, turn_id, schema_version
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`,
)
.run(
"tr_loop_b",
"ep_loop",
"se_loop",
BASE + 100,
"请用cProfile验证一下",
"运行结果确认了瓶颈在内层循环,优化后耗时减少了约80%。",
"cProfile性能验证",
toolCallWithDifferentTs,
null,
null,
0,
0,
null,
0.5,
"[]",
"[]",
BASE + 100,
1,
);
writeDb.close();

// First recovery: episode is dirty (traceCount=1 != ids_len=2).
core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-recover-1",
});
await core.init();
await core.shutdown();
core = null;

const readDb1 = new Sqlite(home.home.dbFile, { readonly: true });
const ep1 = readDb1
.prepare("SELECT trace_ids_json, meta_json, r_task FROM episodes WHERE id = ?")
.get("ep_loop") as { trace_ids_json: string; meta_json: string; r_task: number | null } | undefined;
readDb1.close();

expect(ep1).toBeDefined();
const ids1 = JSON.parse(ep1!.trace_ids_json) as string[];
// Guard: no orphan trace was inserted during dirty-reward recovery.
expect(ids1.length).toBe(2);
const meta1 = JSON.parse(ep1!.meta_json) as {
recoveryReason?: string;
reward?: { traceCount?: number };
};
expect(meta1.recoveryReason).toBe(RECOVERY_REASONS.DIRTY_REWARD_RESCORE);
// After recovery traceCount matches ids_len: episode is no longer dirty.
expect(meta1.reward?.traceCount).toBe(2);

// Second recovery (simulates next bridge restart): episode should not
// be re-scored because traceCount(2) == trace_ids_json.length(2).
core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-recover-2",
});
await core.init();

const readDb2 = new Sqlite(home.home.dbFile, { readonly: true });
const ep2 = readDb2
.prepare("SELECT trace_ids_json, meta_json FROM episodes WHERE id = ?")
.get("ep_loop") as { trace_ids_json: string; meta_json: string } | undefined;
readDb2.close();

expect(ep2).toBeDefined();
const ids2 = JSON.parse(ep2!.trace_ids_json) as string[];
// Still 2 — no new orphan inserts on the second restart.
expect(ids2.length).toBe(2);
const meta2 = JSON.parse(ep2!.meta_json) as {
reward?: { traceCount?: number };
};
// traceCount unchanged: the episode was not re-scored.
expect(meta2.reward?.traceCount).toBe(2);
});
});