diff --git a/src/skillify/skillopt-improve.ts b/src/skillify/skillopt-improve.ts index 881d6339..70782fa2 100644 --- a/src/skillify/skillopt-improve.ts +++ b/src/skillify/skillopt-improve.ts @@ -82,6 +82,38 @@ export interface ImproveOpts { prior?: (name: string, author: string) => string[]; alreadyProposed?: (name: string, author: string, edits: Edit[]) => boolean; recordEdit?: (name: string, author: string, edits: Edit[]) => void; + // Deeplake insert→read lag tolerance: the invocation row is written by a SEPARATE process + // (capture.js) and lands in Deeplake on a short visibility lag (expected, not a defect), so a + // worker firing on a fast reaction can read stale. Poll findInvocation with linear backoff + // before giving up. Injectable for tests. + invocationRetries?: number; // extra attempts after the first (default 5) + invocationBackoffMs?: number; // linear backoff base ms: sleep = base * attempt (default 3000) + sleep?: (ms: number) => Promise; // default real timer +} + +const DEFAULT_INVOCATION_RETRIES = 5; +const DEFAULT_INVOCATION_BACKOFF_MS = 3000; +const realSleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); + +/** + * findInvocation, tolerant of Deeplake's insert→read visibility lag (expected latency, not a + * defect). The window's skill-invocation row is captured by a SEPARATE process and may not be queryable the instant the + * worker fires on a fast reaction. The row is near-certain to land (capture is a reliable path), so + * poll with linear backoff; but it's NOT guaranteed (capture may be disabled/errored), so the + * attempts are BOUNDED — on exhaustion we return null and the caller gives up gracefully (no + * publish) instead of spinning. Runs inside the already-detached worker, so the waits block nothing. + * Only a not-found (null) result is retried — a query ERROR (e.g. 402) propagates immediately. + */ +async function findInvocationWithRetry(opts: ImproveOpts, name: string, author: string): Promise { + const retries = opts.invocationRetries ?? DEFAULT_INVOCATION_RETRIES; + const backoffMs = opts.invocationBackoffMs ?? DEFAULT_INVOCATION_BACKOFF_MS; + const sleep = opts.sleep ?? realSleep; + for (let attempt = 0; ; attempt++) { + const inv = await findInvocation(opts.query, opts.sessionsTable, opts.sessionId, name, author, opts.toolUseId); + if (inv) return inv; + if (attempt >= retries) return null; + await sleep(backoffMs * (attempt + 1)); + } } export async function improveSkillIfFailed(opts: ImproveOpts): Promise { @@ -89,7 +121,7 @@ export async function improveSkillIfFailed(opts: ImproveOpts): Promise { it("invocation not in the session → not judged", async () => { const { query } = makeQuery(); - expect(await improveSkillIfFailed(base(query, { skillRef: "ghost--x" }))).toMatchObject({ judged: false }); + // invocationRetries:0 → skip the real Bug#1 backoff; this exercises the immediate not-found path. + expect(await improveSkillIfFailed(base(query, { skillRef: "ghost--x", invocationRetries: 0 }))).toMatchObject({ judged: false }); }); it("failed but the skill isn't in the org table → judged, not improved", async () => { @@ -130,4 +131,59 @@ describe("improveSkillIfFailed", () => { expect(r).toMatchObject({ judged: true, failed: true, improved: false, reason: expect.stringContaining("dedup") }); expect(inserts).toHaveLength(0); }); + + // Deeplake insert→read visibility lag (expected latency, not a defect): the invocation row is + // written by a SEPARATE process (capture.js) and lands on a short visibility lag, so a worker + // that fires on a fast reaction reads stale and finds nothing. The window-retry (K=3) only helps + // if the user keeps typing; a single fast/final reaction would silently no-op. So the worker + // must retry-with-backoff itself. + it("retries findInvocation when the row hasn't propagated yet (Deeplake lag) → then judges + improves", async () => { + let sessionsCalls = 0; + const inserts: string[] = []; + const query = vi.fn(async (sql: string) => { + if (sql.includes("INSERT INTO")) { inserts.push(sql); return []; } + if (sql.includes("/sessions/")) { sessionsCalls++; return sessionsCalls <= 2 ? [] : sessionRows("s1"); } // miss twice, then visible + if (sql.includes('FROM "skills"')) return [SKILL_ROW]; + return []; + }); + const sleeps: number[] = []; + const r = await improveSkillIfFailed(base(query, { + invocationRetries: 5, invocationBackoffMs: 1000, sleep: async (ms: number) => { sleeps.push(ms); }, + })); + expect(r).toMatchObject({ judged: true, failed: true, improved: true, version: 3 }); + expect(sessionsCalls).toBe(4); // 3 retry polls (miss, miss, hit) + 1 windowAroundInvocation query + expect(sleeps).toEqual([1000, 2000]); // linear backoff between the two misses, then hit + expect(inserts).toHaveLength(1); + }); + + it("gives up gracefully (no publish) when the row never propagates — bounded retries, e.g. capture disabled", async () => { + let sessionsCalls = 0; + const inserts: string[] = []; + const query = vi.fn(async (sql: string) => { + if (sql.includes("INSERT INTO")) { inserts.push(sql); return []; } + if (sql.includes("/sessions/")) { sessionsCalls++; return []; } // never lands + if (sql.includes('FROM "skills"')) return [SKILL_ROW]; + return []; + }); + const sleeps: number[] = []; + const r = await improveSkillIfFailed(base(query, { + invocationRetries: 3, invocationBackoffMs: 1, sleep: async (ms: number) => { sleeps.push(ms); }, + })); + expect(r).toMatchObject({ judged: false, improved: false, reason: "invocation not found in session" }); + expect(sessionsCalls).toBe(4); // 1 initial + 3 bounded retries, then stop + expect(sleeps).toHaveLength(3); // backed off 3 times then gave up + expect(inserts).toHaveLength(0); // nothing inserted — graceful no-op + }); + + it("does NOT retry on a query error (e.g. 402) — fails fast, no spinning", async () => { + let sessionsCalls = 0; + const query = vi.fn(async (sql: string) => { + if (sql.includes("/sessions/")) { sessionsCalls++; throw new Error("402 insufficient balance"); } + if (sql.includes('FROM "skills"')) return [SKILL_ROW]; + return []; + }); + await expect(improveSkillIfFailed(base(query, { invocationRetries: 5, invocationBackoffMs: 1, sleep: async () => {} }))) + .rejects.toThrow(/402/); + expect(sessionsCalls).toBe(1); // threw on the first query — no retry loop + }); });