Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/app/api/issues/reconcile/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "@/lib/issue-reconciliation";
import { computeLinkedPrHealth, toPersistedLinkedPrHealth, type LinkedPrHealth } from "@/lib/linked-pr-health";
import { authorizeRequest } from "@/lib/auth";
import { reconcileStalePrFixItems } from "@/lib/pr-fix-queue";

/**
* Reconcile issue state against PR state for all tracked repos.
Expand Down Expand Up @@ -52,6 +53,8 @@ export async function POST(request: Request) {
}

let totalIssuesReconciled = 0;
let totalPrFixStaleChecked = 0;
let totalPrFixStaleMarked = 0;
let totalMergedPrsFound = 0;
let totalOpenPrsChecked = 0;
let totalIssuesClosed = 0;
Expand Down Expand Up @@ -233,6 +236,31 @@ export async function POST(request: Request) {

totalIssuesReconciled++;
}

// Reconcile pr-fix-queue items: mark stale when the upstream PR is
// merged or closed. Uses the mergedPrsMap already built above. No
// model judgment, deterministic.
const mergedOrClosedPrsByRepo = new Map<string, Set<number>>();
const prStateByRepo = new Map<string, Map<number, "merged" | "closed">>();
for (const pr of mergedPrsMap.values()) {
if (!mergedOrClosedPrsByRepo.has(repo.fullName)) mergedOrClosedPrsByRepo.set(repo.fullName, new Set());
if (!prStateByRepo.has(repo.fullName)) prStateByRepo.set(repo.fullName, new Map());
mergedOrClosedPrsByRepo.get(repo.fullName)!.add(pr.number);
prStateByRepo.get(repo.fullName)!.set(pr.number, "merged");
}
for (const pr of closedPrsList) {
if (pr.merged_at) continue; // already counted
if (!mergedOrClosedPrsByRepo.has(repo.fullName)) mergedOrClosedPrsByRepo.set(repo.fullName, new Set());
if (!prStateByRepo.has(repo.fullName)) prStateByRepo.set(repo.fullName, new Map());
mergedOrClosedPrsByRepo.get(repo.fullName)!.add(pr.number);
prStateByRepo.get(repo.fullName)!.set(pr.number, "closed");
}
const staleResult = await reconcileStalePrFixItems(prisma, mergedOrClosedPrsByRepo, prStateByRepo);
totalPrFixStaleChecked += staleResult.checked;
totalPrFixStaleMarked += staleResult.markedStale;
if (staleResult.errored > 0) {
errors.push(`${repo.fullName}: pr-fix-queue reconcile errored on ${staleResult.errored} item(s)`);
}
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
console.error(`Reconciliation failed for ${repo.fullName}:`, error);
Expand All @@ -249,6 +277,8 @@ export async function POST(request: Request) {
issuesClosed: totalIssuesClosed,
labelsChanged: totalLabelsChanged,
lanesClassified: totalLaneClassified,
prFixQueueStaleChecked: totalPrFixStaleChecked,
prFixQueueStaleMarked: totalPrFixStaleMarked,
errors,
});
} catch (error) {
Expand Down
79 changes: 78 additions & 1 deletion src/lib/pr-fix-queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, beforeEach } from "vitest";
import { enqueuePrFixItem, listQueuedPrFixItems, markPrFixItem, toAgentQueuePrFixItem, PrFixQueueClient } from "./pr-fix-queue";
import { enqueuePrFixItem, listQueuedPrFixItems, markPrFixItem, toAgentQueuePrFixItem, reconcileStalePrFixItems, PrFixQueueClient } from "./pr-fix-queue";

function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } {
const items: any[] = [];
Expand Down Expand Up @@ -28,6 +28,8 @@ function makeClient(): PrFixQueueClient & { items: any[]; history: any[] } {
},
findMany: async ({ where, orderBy }: any) => {
let result = items.slice();
if (where?.repo) result = result.filter((i) => i.repo === where.repo);
if (where?.pr?.in) result = result.filter((i) => where.pr.in.includes(i.pr));
if (where?.status) {
result = Array.isArray(where.status.in)
? result.filter((i) => where.status.in.includes(i.status))
Expand Down Expand Up @@ -130,3 +132,78 @@ describe("PR review-fix queue", () => {
expect(client.history.at(-1)).toMatchObject({ action: "mark", status: "FIXED", note: "pushed fix + validation" });
});
});

describe("reconcileStalePrFixItems", () => {
it("marks queued items stale when the upstream PR is merged/closed", async () => {
const client = makeClient();
await enqueuePrFixItem(client, {
repo: "misospace/miso-chat",
pr: 566,
reason: "AI review failed",
feedback: "feedback",
evidenceKey: "k1",
});
await enqueuePrFixItem(client, {
repo: "misospace/miso-chat",
pr: 567,
reason: "test 567",
feedback: "f",
evidenceKey: "k2",
});
await enqueuePrFixItem(client, {
repo: "misospace/miso-chat",
pr: 568,
reason: "test 568",
feedback: "f",
evidenceKey: "k3",
});

const mergedOrClosed = new Map<string, Set<number>>([
["misospace/miso-chat", new Set([566, 568])],
]);
const states = new Map<string, Map<number, "merged" | "closed">>([
["misospace/miso-chat", new Map([[566, "merged"], [568, "closed"]])],
]);

const result = await reconcileStalePrFixItems(client, mergedOrClosed, states);
expect(result.checked).toBe(2);
expect(result.markedStale).toBe(2);
expect(result.errored).toBe(0);

// listQueuedPrFixItems filters by status (only QUEUED/[QUEUED,BLOCKED]),
// so it would not return STALE rows after the reconcile. Inspect the
// test client's items array directly to verify the state transition.
const byId = new Map(client.items.map((i) => [i.pr, i]));
expect(byId.get(566)?.status).toBe("STALE");
expect(byId.get(567)?.status).toBe("QUEUED"); // not in merged/closed set
expect(byId.get(568)?.status).toBe("STALE");
});

it("does not touch items already in non-QUEUED status", async () => {
const client = makeClient();
await enqueuePrFixItem(client, {
repo: "misospace/miso-chat",
pr: 570,
reason: "x",
feedback: "f",
evidenceKey: "k1",
});
await markPrFixItem(client, { repo: "misospace/miso-chat", pr: 570, status: "FIXED" });
const mergedOrClosed = new Map<string, Set<number>>([
["misospace/miso-chat", new Set([570])],
]);
const states = new Map<string, Map<number, "merged" | "closed">>([
["misospace/miso-chat", new Map([[570, "merged"]])],
]);
const result = await reconcileStalePrFixItems(client, mergedOrClosed, states);
expect(result.checked).toBe(0);
expect(result.markedStale).toBe(0);
});

it("returns zero counts for repos with no merged/closed PRs", async () => {
const client = makeClient();
const result = await reconcileStalePrFixItems(client, new Map(), new Map());
expect(result.checked).toBe(0);
expect(result.markedStale).toBe(0);
});
});
57 changes: 57 additions & 0 deletions src/lib/pr-fix-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,63 @@ export async function markPrFixItem(client: PrFixQueueClient, input: MarkPrFixIn
});
}

/**
* Mark queued pr-fix items as stale when the upstream PR is merged or closed.
*
* This is a deterministic cleanup that catches the failure mode where
* pr-followup/sync enqueues items without checking the upstream PR's state,
* leaving merged/closed PRs in the worker queue. The data source is whatever
* caller passes in — the issues/reconcile route already builds a
* `mergedOrClosedPrsByRepo` map per tracked repo, so we just consume it.
*
* Returns counts for logging/audit. No model judgment.
*/
export async function reconcileStalePrFixItems(
client: PrFixQueueClient,
mergedOrClosedPrsByRepo: Map<string, Set<number>>,
prStateByRepo: Map<string, Map<number, "merged" | "closed">>,
): Promise<{ checked: number; markedStale: number; errored: number }> {
let checked = 0;
let markedStale = 0;
let errored = 0;

for (const [repo, prNumbers] of mergedOrClosedPrsByRepo) {
if (prNumbers.size === 0) continue;
const queued = await client.prFixQueueItem.findMany({
where: {
repo,
pr: { in: Array.from(prNumbers) },
status: "QUEUED",
},
});
checked += queued.length;
for (const item of queued) {
try {
const state = prStateByRepo.get(repo)?.get(item.pr) ?? "merged";
await client.$transaction(async (tx) => {
await tx.prFixQueueItem.update({
where: { id: item.id },
data: { status: "STALE" },
});
await tx.prFixHistory.create({
data: {
itemId: item.id,
action: "mark",
status: "STALE",
note: `Upstream PR state=${state} at reconcile time`,
},
});
});
markedStale++;
} catch (err) {
errored++;
}
}
}

return { checked, markedStale, errored };
}

export function toAgentQueuePrFixItem(item: any) {
const fixType = normalizePrFixType(item.type);
return {
Expand Down