Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 183 additions & 46 deletions apps/memos-local-plugin/adapters/openclaw/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ const BEFORE_PROMPT_SOFT_TIMEOUT_MS = Number.parseInt(
process.env.MEMOS_BEFORE_PROMPT_SOFT_TIMEOUT_MS ?? "12000",
10,
);
const AGENT_END_BINDING_CAP_MS = Math.min(
60_000,
BEFORE_PROMPT_SOFT_TIMEOUT_MS * 5,
);
const TOOL_FAILURE_REPAIR_HINT =
"This tool has failed multiple times in a row. You may want to call `memos_search` for relevant past experience before deciding what to do next.";
const TOOL_FAILURE_HINT_THRESHOLD = 3;
Expand Down Expand Up @@ -1029,12 +1033,17 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
// clear its own turn without deleting a newer turn's mapping.
type EpisodeBinding = {
sessionId: SessionId;
episodeId: EpisodeId;
episodeId?: EpisodeId;
seq: number;
keys: string[];
turnStart?: Promise<RetrievalResultDTO>;
turnGeneration: number;
runId?: string;
userText?: string;
};
const latestEpisodeBySession = new Map<SessionId, EpisodeBinding>();
const episodeBindingByTurnKey = new Map<string, EpisodeBinding>();
const sessionTurnGeneration = new Map<SessionId, number>();
let episodeBindingSeq = 0;
// Per-toolCallId start timestamps so `after_tool_call` can compute duration
// when the host doesn't populate `durationMs`.
Expand Down Expand Up @@ -1127,20 +1136,69 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
return keys;
}

function rememberEpisodeBinding(
sessionId: SessionId,
episodeId: EpisodeId,
ctx: { runId?: string },
userText: string | undefined,
seq: number = ++episodeBindingSeq,
): EpisodeBinding {
function bindingStillLive(binding: EpisodeBinding): boolean {
for (const key of binding.keys) {
if (episodeBindingByTurnKey.get(key)?.seq === binding.seq) return true;
}
return false;
}

function commitEpisodeId(
binding: EpisodeBinding,
pkt: RetrievalResultDTO,
): void {
if (!bindingStillLive(binding)) return;
const candidate = pkt.query?.episodeId as EpisodeId | undefined;
if (!candidate) return;
const routedSessionId = (pkt.query?.sessionId ?? binding.sessionId) as SessionId;
if (opts.core.episodeExists(candidate)) {
binding.episodeId = candidate;
} else {
const fallback = opts.core.resolveOpenEpisodeId(routedSessionId);
if (fallback) binding.episodeId = fallback;
}
if (routedSessionId !== binding.sessionId) {
for (const key of turnBindingKeys(
routedSessionId,
{ runId: binding.runId },
binding.userText,
)) {
episodeBindingByTurnKey.set(key, binding);
}
}
}

function rememberEpisodeBinding(input: {
sessionId: SessionId;
ctx: { runId?: string };
userText: string | undefined;
turnStart?: Promise<RetrievalResultDTO>;
episodeId?: EpisodeId;
turnGeneration: number;
seq?: number;
}): EpisodeBinding {
if (!input.turnStart && input.episodeId === undefined) {
throw new Error(
"rememberEpisodeBinding: turnStart required when episodeId is pending",
);
}
if (input.ctx.runId === undefined && input.userText?.trim()) {
opts.log.warn("memos.binding.missing_run_id", {
sessionId: input.sessionId,
});
}
const seq = input.seq ?? ++episodeBindingSeq;
const binding: EpisodeBinding = {
sessionId,
episodeId,
sessionId: input.sessionId,
episodeId: input.episodeId,
seq,
keys: turnBindingKeys(sessionId, ctx, userText),
keys: turnBindingKeys(input.sessionId, input.ctx, input.userText),
turnStart: input.turnStart,
turnGeneration: input.turnGeneration,
runId: input.ctx.runId,
userText: input.userText,
};
latestEpisodeBySession.set(sessionId, binding);
latestEpisodeBySession.set(input.sessionId, binding);
for (const key of binding.keys) {
episodeBindingByTurnKey.set(key, binding);
}
Expand All @@ -1150,17 +1208,20 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
function findEpisodeBinding(
sessionId: SessionId,
ctx: { runId?: string },
userText?: string,
userText: string | undefined,
mode: "strict_run" | "legacy" = "legacy",
): EpisodeBinding | undefined {
for (const key of turnBindingKeys(sessionId, ctx, userText)) {
const binding = episodeBindingByTurnKey.get(key);
if (binding) return binding;
}
if (mode === "strict_run") return undefined;
return latestEpisodeBySession.get(sessionId);
}

function forgetEpisodeBinding(binding: EpisodeBinding | undefined): void {
if (!binding) return;
binding.turnStart = undefined;
for (const key of binding.keys) {
if (episodeBindingByTurnKey.get(key)?.seq === binding.seq) {
episodeBindingByTurnKey.delete(key);
Expand All @@ -1171,6 +1232,49 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
}
}

async function resolveEpisodeIdForTurnEnd(
sessionId: SessionId,
ctx: { runId?: string },
userText: string,
): Promise<{ episodeId: EpisodeId; binding: EpisodeBinding | undefined } | null> {
if (!ctx.runId) {
opts.log.warn("memos.agent_end.missing_run_id", { sessionId });
}
const binding = findEpisodeBinding(sessionId, ctx, userText, "strict_run");

if (binding?.episodeId && bindingStillLive(binding)) {
if (opts.core.episodeExists(binding.episodeId)) {
return { episodeId: binding.episodeId, binding };
}
}

if (binding?.turnStart) {
const awaited = await withSoftTimeout(binding.turnStart, AGENT_END_BINDING_CAP_MS);
if (awaited.ok) {
commitEpisodeId(binding, awaited.value);
const id = binding.episodeId;
if (id && opts.core.episodeExists(id)) {
return { episodeId: id, binding };
}
} else {
opts.log.warn("memos.agent_end.turn_start_await_timeout", {
sessionId,
timeoutMs: AGENT_END_BINDING_CAP_MS,
});
}
}

const opened = await opts.core.openEpisode({
sessionId,
userMessage: userText,
});
const canonical = opts.core.reconcileEpisodeId(sessionId, opened);
if (canonical && opts.core.isEpisodeWritable(canonical)) {
return { episodeId: canonical, binding };
}
return null;
}

function forgetSessionBindings(sessionId: SessionId): void {
latestEpisodeBySession.delete(sessionId);
for (const [key, binding] of episodeBindingByTurnKey.entries()) {
Expand All @@ -1179,7 +1283,9 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
}

function currentEpisodeId(sessionId: SessionId): EpisodeId | undefined {
return latestEpisodeBySession.get(sessionId)?.episodeId;
const id = latestEpisodeBySession.get(sessionId)?.episodeId;
if (!id) return undefined;
return opts.core.isEpisodeWritable(id) ? id : undefined;
}

async function handleBeforePrompt(
Expand Down Expand Up @@ -1256,12 +1362,28 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
},
};

const turnGeneration = (sessionTurnGeneration.get(sessionId) ?? 0) + 1;
sessionTurnGeneration.set(sessionId, turnGeneration);

const turnStartPromise = opts.core.onTurnStart(turn);
turnStartPromise.catch((err) => {
opts.log.warn("memos.onTurnStart.late_failure", {
err: err instanceof Error ? err.message : String(err),
});
const binding = rememberEpisodeBinding({
sessionId,
ctx,
userText: prompt,
turnStart: turnStartPromise,
turnGeneration,
});

turnStartPromise
.then((packet) => {
commitEpisodeId(binding, packet);
})
.catch((err) => {
opts.log.warn("memos.onTurnStart.late_failure", {
err: err instanceof Error ? err.message : String(err),
});
});

const turnStartResult = await withSoftTimeout(
turnStartPromise,
BEFORE_PROMPT_SOFT_TIMEOUT_MS,
Expand All @@ -1273,20 +1395,14 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
agentId: ctx.agentId,
timeoutMs: BEFORE_PROMPT_SOFT_TIMEOUT_MS,
});
} else if (packet) {
commitEpisodeId(binding, packet);
}
// The pipeline orchestrator (V7 §0.1) may have migrated the
// session id (new-task → new session) or reopened a closed
// episode (revision). We trust the ids returned in the packet,
// not our own derivation, so `onTurnEnd` lands on the same row.

const routedSessionId = (packet?.query.sessionId ?? sessionId) as SessionId;
const routedEpisodeId = packet?.query.episodeId as EpisodeId | undefined;
if (routedEpisodeId) {
const seq = ++episodeBindingSeq;
rememberEpisodeBinding(routedSessionId, routedEpisodeId, ctx, prompt, seq);
if (routedSessionId !== sessionId) {
rememberEpisodeBinding(sessionId, routedEpisodeId, ctx, prompt, seq);
}
}
const routedEpisodeId =
binding.episodeId ??
(packet?.query.episodeId as EpisodeId | undefined);

const renderedBlock = renderContextBlock(packet, {
// Avoid making OpenClaw do a second tool-driven search when
Expand Down Expand Up @@ -1422,30 +1538,51 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
const isSubagentAnnouncement = isOpenClawSubagentAnnouncementPrompt(turn.userText);
const hasSubagentSpawn = toolCalls.some((tc) => tc.name === "sessions_spawn");

// Resolve (or lazily open) the target episode. Three cases:
// 1. `before_prompt_build` already ran this turn → we have the
// routed episode binding for this run/user turn.
// 2. The host skipped `before_prompt_build` (e.g. /new with no
// prompt build) → create an episode on the fly so the write
// path has a real row to hang traces on.
// 3. Any failure here falls back to opening a new episode —
// better to capture under a fresh id than to drop the turn.
let binding = findEpisodeBinding(sessionId, ctx, turn.userText);
let episodeId = binding?.episodeId;
if (!episodeId) {
if (isSubagentAnnouncement) {
if (isSubagentAnnouncement) {
const probe = findEpisodeBinding(
sessionId,
ctx,
turn.userText,
"strict_run",
);
if (!probe?.episodeId && !probe?.turnStart) {
opts.log.info("memos.agent_end.skipped", {
reason: "subagent_announcement_without_parent_episode",
sessionKey: ctx.sessionKey,
});
return;
}
await opts.core.openSession({ agent: opts.agent, sessionId, namespace, meta: { namespace } });
episodeId = await opts.core.openEpisode({
}

await opts.core.openSession({
agent: opts.agent,
sessionId,
namespace,
meta: { namespace },
});

const resolved = await resolveEpisodeIdForTurnEnd(
sessionId,
ctx,
turn.userText,
);
if (!resolved) {
opts.log.warn("memos.agent_end.skipped", {
reason: "no_writable_episode",
sessionKey: ctx.sessionKey,
});
return;
}
const { episodeId } = resolved;
let binding = resolved.binding;
if (!binding) {
binding = rememberEpisodeBinding({
sessionId,
userMessage: turn.userText,
ctx,
userText: turn.userText,
episodeId,
turnGeneration: sessionTurnGeneration.get(sessionId) ?? 0,
});
binding = rememberEpisodeBinding(sessionId, episodeId, ctx, turn.userText);
}

const turnResult: TurnResultDTO = {
Expand Down
2 changes: 1 addition & 1 deletion apps/memos-local-plugin/agent-contract/episode-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export type DerivedTaskStatus = Exclude<TaskStatusFilter, "">;
* the task list — the soft-fail framing (未达沉淀阈值) lives on the
* skill pipeline pill, not the main task status.
*/
export const R_NEGATIVE_FLOOR = -0.5;
export const R_NEGATIVE_FLOOR = -0.15;

/**
* Recently-finalized grace window: a closed-but-just-ended episode
Expand Down
11 changes: 11 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ export interface MemoryCore {
/** Optional initial user text (for adapters that know it). */
userMessage?: string;
}): Promise<EpisodeId>;
/** Current open episode for the session, if any (no new row). */
resolveOpenEpisodeId(sessionId: SessionId): EpisodeId | undefined;
/** Whether the episode exists and is still `open`. */
isEpisodeWritable(episodeId: EpisodeId): boolean;
/** Whether the episode row exists (any status). */
episodeExists(episodeId: EpisodeId): boolean;
/** Prefer orchestrator open episode; else a writable candidate. */
reconcileEpisodeId(
sessionId: SessionId,
candidate?: EpisodeId,
): EpisodeId | undefined;
closeEpisode(episodeId: EpisodeId): Promise<void>;

// ── pipeline (per turn) ──
Expand Down
Loading