From 8d14a674300991d9168766c362c59c2beb875f19 Mon Sep 17 00:00:00 2001 From: Saffron Date: Fri, 12 Jun 2026 07:18:03 -0600 Subject: [PATCH 1/3] fix(pr-fix-queue): mark stale items when upstream PR is merged/closed --- src/app/api/issues/reconcile/route.ts | 30 ++++++++++ src/lib/pr-fix-queue.test.ts | 83 ++++++++++++++++++++++++++- src/lib/pr-fix-queue.ts | 57 ++++++++++++++++++ 3 files changed, 169 insertions(+), 1 deletion(-) diff --git a/src/app/api/issues/reconcile/route.ts b/src/app/api/issues/reconcile/route.ts index 126709b..64b109e 100644 --- a/src/app/api/issues/reconcile/route.ts +++ b/src/app/api/issues/reconcile/route.ts @@ -11,6 +11,7 @@ import { } from "@/lib/issue-reconciliation"; import { computeLinkedPrHealth, toPersistedLinkedPrHealth, type LinkedPrHealth } from "@/lib/linked-pr-health"; import { authorizeRequest } from "@/lib/auth"; +import { reconcileStalePrFixItems } from "@/lib/pr-fix-queue"; /** * Reconcile issue state against PR state for all tracked repos. @@ -52,6 +53,8 @@ export async function POST(request: Request) { } let totalIssuesReconciled = 0; + let totalPrFixStaleChecked = 0; + let totalPrFixStaleMarked = 0; let totalMergedPrsFound = 0; let totalOpenPrsChecked = 0; let totalIssuesClosed = 0; @@ -233,6 +236,31 @@ export async function POST(request: Request) { totalIssuesReconciled++; } + + // Reconcile pr-fix-queue items: mark stale when the upstream PR is + // merged or closed. Uses the mergedPrsMap already built above. No + // model judgment, deterministic. + const mergedOrClosedPrsByRepo = new Map>(); + const prStateByRepo = new Map>(); + for (const pr of mergedPrsMap.values()) { + if (!mergedOrClosedPrsByRepo.has(repo.fullName)) mergedOrClosedPrsByRepo.set(repo.fullName, new Set()); + if (!prStateByRepo.has(repo.fullName)) prStateByRepo.set(repo.fullName, new Map()); + mergedOrClosedPrsByRepo.get(repo.fullName)!.add(pr.number); + prStateByRepo.get(repo.fullName)!.set(pr.number, "merged"); + } + for (const pr of closedPrsList) { + if (pr.merged_at) continue; // already counted + if (!mergedOrClosedPrsByRepo.has(repo.fullName)) mergedOrClosedPrsByRepo.set(repo.fullName, new Set()); + if (!prStateByRepo.has(repo.fullName)) prStateByRepo.set(repo.fullName, new Map()); + mergedOrClosedPrsByRepo.get(repo.fullName)!.add(pr.number); + prStateByRepo.get(repo.fullName)!.set(pr.number, "closed"); + } + const staleResult = await reconcileStalePrFixItems(prisma, mergedOrClosedPrsByRepo, prStateByRepo); + totalPrFixStaleChecked += staleResult.checked; + totalPrFixStaleMarked += staleResult.markedStale; + if (staleResult.errored > 0) { + errors.push(`${repo.fullName}: pr-fix-queue reconcile errored on ${staleResult.errored} item(s)`); + } } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; console.error(`Reconciliation failed for ${repo.fullName}:`, error); @@ -249,6 +277,8 @@ export async function POST(request: Request) { issuesClosed: totalIssuesClosed, labelsChanged: totalLabelsChanged, lanesClassified: totalLaneClassified, + prFixQueueStaleChecked: totalPrFixStaleChecked, + prFixQueueStaleMarked: totalPrFixStaleMarked, errors, }); } catch (error) { diff --git a/src/lib/pr-fix-queue.test.ts b/src/lib/pr-fix-queue.test.ts index 964fea6..a3ca8f0 100644 --- a/src/lib/pr-fix-queue.test.ts +++ b/src/lib/pr-fix-queue.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, beforeEach } from "vitest"; -import { enqueuePrFixItem, listQueuedPrFixItems, markPrFixItem, toAgentQueuePrFixItem, PrFixQueueClient } from "./pr-fix-queue"; +import { enqueuePrFixItem, listQueuedPrFixItems, markPrFixItem, toAgentQueuePrFixItem, reconcileStalePrFixItems, PrFixQueueClient } from "./pr-fix-queue"; function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } { const items: any[] = []; @@ -9,7 +9,15 @@ function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } { items, history, $transaction: async (fn: any) => fn(client), + _findMany: async ({ where }: any) => { + let result = items.slice(); + if (where?.repo) result = result.filter((i) => i.repo === where.repo); + if (where?.pr?.in) result = result.filter((i) => where.pr.in.includes(i.pr)); + if (where?.status) result = result.filter((i) => i.status === where.status); + return result; + }, prFixQueueItem: { + findMany: async (args: any) => client._findMany(args), findUnique: async ({ where }: any) => items.find((i) => i.repo === where.repo_pr.repo && i.pr === where.repo_pr.pr) ?? null, create: async ({ data }: any) => { const item = { @@ -130,3 +138,76 @@ describe("PR review-fix queue", () => { expect(client.history.at(-1)).toMatchObject({ action: "mark", status: "FIXED", note: "pushed fix + validation" }); }); }); + +describe("reconcileStalePrFixItems", () => { + it("marks queued items stale when the upstream PR is merged/closed", async () => { + const client = makeClient(); + await enqueuePrFixItem(client, { + repo: "misospace/miso-chat", + pr: 566, + reason: "AI review failed", + feedback: "feedback", + evidenceKey: "k1", + }); + await enqueuePrFixItem(client, { + repo: "misospace/miso-chat", + pr: 567, + reason: "test 567", + feedback: "f", + evidenceKey: "k2", + }); + await enqueuePrFixItem(client, { + repo: "misospace/miso-chat", + pr: 568, + reason: "test 568", + feedback: "f", + evidenceKey: "k3", + }); + + const mergedOrClosed = new Map>([ + ["misospace/miso-chat", new Set([566, 568])], + ]); + const states = new Map>([ + ["misospace/miso-chat", new Map([[566, "merged"], [568, "closed"]])], + ]); + + const result = await reconcileStalePrFixItems(client, mergedOrClosed, states); + expect(result.checked).toBe(2); + expect(result.markedStale).toBe(2); + expect(result.errored).toBe(0); + + const items = await listQueuedPrFixItems(client, { lane: null, includeBlocked: true }); + const byId = new Map(items.map((i) => [i.pr, i])); + expect(byId.get(566)?.status).toBe("STALE"); + expect(byId.get(567)?.status).toBe("QUEUED"); // not in merged/closed set + expect(byId.get(568)?.status).toBe("STALE"); + }); + + it("does not touch items already in non-QUEUED status", async () => { + const client = makeClient(); + await enqueuePrFixItem(client, { + repo: "misospace/miso-chat", + pr: 570, + reason: "x", + feedback: "f", + evidenceKey: "k1", + }); + await markPrFixItem(client, { repo: "misospace/miso-chat", pr: 570, status: "FIXED" }); + const mergedOrClosed = new Map>([ + ["misospace/miso-chat", new Set([570])], + ]); + const states = new Map>([ + ["misospace/miso-chat", new Map([[570, "merged"]])], + ]); + const result = await reconcileStalePrFixItems(client, mergedOrClosed, states); + expect(result.checked).toBe(0); + expect(result.markedStale).toBe(0); + }); + + it("returns zero counts for repos with no merged/closed PRs", async () => { + const client = makeClient(); + const result = await reconcileStalePrFixItems(client, new Map(), new Map()); + expect(result.checked).toBe(0); + expect(result.markedStale).toBe(0); + }); +}); diff --git a/src/lib/pr-fix-queue.ts b/src/lib/pr-fix-queue.ts index 923184e..d874ebd 100644 --- a/src/lib/pr-fix-queue.ts +++ b/src/lib/pr-fix-queue.ts @@ -187,6 +187,63 @@ export async function markPrFixItem(client: PrFixQueueClient, input: MarkPrFixIn }); } +/** + * Mark queued pr-fix items as stale when the upstream PR is merged or closed. + * + * This is a deterministic cleanup that catches the failure mode where + * pr-followup/sync enqueues items without checking the upstream PR's state, + * leaving merged/closed PRs in the worker queue. The data source is whatever + * caller passes in — the issues/reconcile route already builds a + * `mergedOrClosedPrsByRepo` map per tracked repo, so we just consume it. + * + * Returns counts for logging/audit. No model judgment. + */ +export async function reconcileStalePrFixItems( + client: PrFixQueueClient, + mergedOrClosedPrsByRepo: Map>, + prStateByRepo: Map>, +): Promise<{ checked: number; markedStale: number; errored: number }> { + let checked = 0; + let markedStale = 0; + let errored = 0; + + for (const [repo, prNumbers] of mergedOrClosedPrsByRepo) { + if (prNumbers.size === 0) continue; + const queued = await client.prFixQueueItem.findMany({ + where: { + repo, + pr: { in: Array.from(prNumbers) }, + status: "QUEUED", + }, + }); + checked += queued.length; + for (const item of queued) { + try { + const state = prStateByRepo.get(repo)?.get(item.pr) ?? "merged"; + await client.$transaction(async (tx) => { + await tx.prFixQueueItem.update({ + where: { id: item.id }, + data: { status: "STALE" }, + }); + await tx.prFixHistory.create({ + data: { + itemId: item.id, + action: "mark", + status: "STALE", + note: `Upstream PR state=${state} at reconcile time`, + }, + }); + }); + markedStale++; + } catch (err) { + errored++; + } + } + } + + return { checked, markedStale, errored }; +} + export function toAgentQueuePrFixItem(item: any) { const fixType = normalizePrFixType(item.type); return { From bf7200c1588fe3da908d4b3b7099c9568fd0daae Mon Sep 17 00:00:00 2001 From: Saffron Date: Fri, 12 Jun 2026 08:06:42 -0600 Subject: [PATCH 2/3] fix(pr-fix-queue): extend test client findMany for where.repo / where.pr.in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added a duplicate `findMany` property to the `prFixQueueItem` test client (one via a \_findMany helper, one inline), which TypeScript flags as TS1117. The duplicate is redundant — `reconcileStalePrFixItems` only filters by `where.repo`, `where.pr.in`, and `where.status`, all of which the existing `findMany` can support with a couple of small additions. - Remove the duplicate findMany - Remove the unused \_findMany helper - Extend the original findMany to filter on where.repo / where.pr.in Fixes the validate CI failure on PR #362. --- src/lib/pr-fix-queue.test.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/lib/pr-fix-queue.test.ts b/src/lib/pr-fix-queue.test.ts index a3ca8f0..d674843 100644 --- a/src/lib/pr-fix-queue.test.ts +++ b/src/lib/pr-fix-queue.test.ts @@ -9,15 +9,7 @@ function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } { items, history, $transaction: async (fn: any) => fn(client), - _findMany: async ({ where }: any) => { - let result = items.slice(); - if (where?.repo) result = result.filter((i) => i.repo === where.repo); - if (where?.pr?.in) result = result.filter((i) => where.pr.in.includes(i.pr)); - if (where?.status) result = result.filter((i) => i.status === where.status); - return result; - }, prFixQueueItem: { - findMany: async (args: any) => client._findMany(args), findUnique: async ({ where }: any) => items.find((i) => i.repo === where.repo_pr.repo && i.pr === where.repo_pr.pr) ?? null, create: async ({ data }: any) => { const item = { @@ -36,6 +28,8 @@ function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } { }, findMany: async ({ where, orderBy }: any) => { let result = items.slice(); + if (where?.repo) result = result.filter((i) => i.repo === where.repo); + if (where?.pr?.in) result = result.filter((i) => where.pr.in.includes(i.pr)); if (where?.status) { result = Array.isArray(where.status.in) ? result.filter((i) => where.status.in.includes(i.status)) From 974a4d449cd2d0344926251acad5513bdaee10eb Mon Sep 17 00:00:00 2001 From: saffron-fix Date: Sat, 13 Jun 2026 09:29:27 -0600 Subject: [PATCH 3/3] fix(pr-fix-queue): verify stale state via client.items, not listQueuedPrFixItems MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit listQueuedPrFixItems filters by status (QUEUED, or [QUEUED,BLOCKED] with includeBlocked), so the post-reconcile assertion could not observe rows the function had just transitioned to STALE — byId.get(566) was undefined. Inspect the test client's items array directly to verify the state transition regardless of status, matching the existing test style that already reads client.items[0] / client.items[1]. Targets the CI failure on PR #362: src/lib/pr-fix-queue.test.ts > reconcileStalePrFixItems > marks queued items stale when the upstream PR is merged/closed expected PR 566 status STALE but got undefined --- src/lib/pr-fix-queue.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib/pr-fix-queue.test.ts b/src/lib/pr-fix-queue.test.ts index d674843..559fdf9 100644 --- a/src/lib/pr-fix-queue.test.ts +++ b/src/lib/pr-fix-queue.test.ts @@ -170,8 +170,10 @@ describe("reconcileStalePrFixItems", () => { expect(result.markedStale).toBe(2); expect(result.errored).toBe(0); - const items = await listQueuedPrFixItems(client, { lane: null, includeBlocked: true }); - const byId = new Map(items.map((i) => [i.pr, i])); + // listQueuedPrFixItems filters by status (only QUEUED/[QUEUED,BLOCKED]), + // so it would not return STALE rows after the reconcile. Inspect the + // test client's items array directly to verify the state transition. + const byId = new Map(client.items.map((i) => [i.pr, i])); expect(byId.get(566)?.status).toBe("STALE"); expect(byId.get(567)?.status).toBe("QUEUED"); // not in merged/closed set expect(byId.get(568)?.status).toBe("STALE");