Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions db/migrations/002_resumable_queue_jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS check_run_completed_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_owner TEXT;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_expires_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS recovery_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS last_queue_message_at TIMESTAMPTZ;

CREATE INDEX IF NOT EXISTS jobs_lease_expiry_idx
ON jobs (lease_expires_at)
WHERE status = 'running' AND lease_expires_at IS NOT NULL;

CREATE INDEX IF NOT EXISTS jobs_terminal_check_idx
ON jobs (status, check_run_completed_at)
WHERE check_run_id IS NOT NULL AND check_run_completed_at IS NULL;

CREATE INDEX IF NOT EXISTS jobs_unleased_running_idx
ON jobs (last_queue_message_at, heartbeat_at)
WHERE status = 'running' AND lease_expires_at IS NULL;

DELETE FROM file_reviews fr
USING (
SELECT id, ROW_NUMBER() OVER (PARTITION BY job_id, file_path ORDER BY created_at ASC, id ASC) AS row_number
FROM file_reviews
) ranked
WHERE fr.id = ranked.id
AND ranked.row_number > 1;

CREATE UNIQUE INDEX IF NOT EXISTS file_reviews_job_file_path_key
ON file_reviews (job_id, file_path);
53 changes: 53 additions & 0 deletions src/server/core/job-recovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { AppBindings } from '@server/env';
import { getTerminalJobsNeedingCheckRunCompletion, markJobCheckRunCompleted, recoverExpiredJobLeases } from '@server/db/jobs';
import { logger } from '@server/core/logger';
import { GitHubService } from '@server/services/github';

const MAX_RECOVERY_COUNT = 3;

export async function recoverJobs(env: AppBindings) {
try {
const recovered = await recoverExpiredJobLeases(env, MAX_RECOVERY_COUNT);
for (const jobId of recovered.requeuedJobIds) {
await env.REVIEW_QUEUE.send({
jobId,
deliveryId: crypto.randomUUID(),
phase: 'review',
});
}

if (recovered.requeuedJobIds.length > 0 || recovered.failedJobs.length > 0) {
logger.warn('Expired job leases recovered', {
requeued: recovered.requeuedJobIds.length,
failed: recovered.failedJobs.length,
});
}
} catch (err) {
logger.error('Failed to recover expired job leases', err instanceof Error ? err : new Error(String(err)));
}
}

export async function completeTerminalCheckRuns(env: AppBindings) {
const jobs = await getTerminalJobsNeedingCheckRunCompletion(env);
for (const job of jobs) {
if (!job.check_run_id) continue;

try {
const github = new GitHubService(env, job.installation_id);
await github.updateCheckRun(job.owner, job.repo, job.check_run_id, {
status: 'completed',
conclusion: job.status === 'superseded' ? 'neutral' : 'failure',
title: job.status === 'superseded' ? 'Review superseded' : 'Review failed',
summary: job.error_msg ?? (job.status === 'superseded' ? 'Superseded by a newer commit or job.' : 'Review failed.'),
});
await markJobCheckRunCompleted(env, job.id);
} catch (error) {
logger.error(`Failed to complete terminal check run for job ${job.id}`, error instanceof Error ? error : new Error(String(error)));
}
}
}

export async function runOpportunisticJobMaintenance(env: AppBindings) {
await recoverJobs(env);
await completeTerminalCheckRuns(env);
}
13 changes: 10 additions & 3 deletions src/server/core/model-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import { findClosestValidLine, findPositionForLine, getValidNewLines, getValidPo
import type { FileDiff } from './diff';
import { jsonrepair } from 'jsonrepair';

const MAX_LOGGED_MODEL_OUTPUT_CHARS = 2_000;

function truncateForLog(value: string) {
if (value.length <= MAX_LOGGED_MODEL_OUTPUT_CHARS) return value;
return `${value.slice(0, MAX_LOGGED_MODEL_OUTPUT_CHARS)}... [truncated ${value.length - MAX_LOGGED_MODEL_OUTPUT_CHARS} chars]`;
}

function hasReviewKeys(input: string) {
return /"(findings|overall_explanation|overall_correctness|overall_confidence_score|summary)"\s*:/.test(input);
}
Expand Down Expand Up @@ -253,7 +260,7 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
throw new Error('Model response did not contain review JSON keys.');
}
} catch (e) {
logger.error('Failed to extract JSON from model response', { raw, error: e });
logger.error('Failed to extract JSON from model response', { raw: truncateForLog(raw), error: e });
throw new Error('Could not find JSON root in model response.');
}

Expand All @@ -269,14 +276,14 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
try {
repaired = jsonrepair(preprocessed);
} catch (e) {
logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed, error: e });
logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed: truncateForLog(preprocessed), error: e });
}

let parsedJson: any;
try {
parsedJson = JSON.parse(repaired);
} catch (e) {
logger.error('Critical JSON parse error after extraction and repair', { repaired, error: e });
logger.error('Critical JSON parse error after extraction and repair', { repaired: truncateForLog(repaired), error: e });
throw new Error(`Invalid JSON format: ${e instanceof Error ? e.message : 'Unknown error'}`);
}

Expand Down
Loading