diff --git a/db/migrations/002_resumable_queue_jobs.sql b/db/migrations/002_resumable_queue_jobs.sql new file mode 100644 index 0000000..b521486 --- /dev/null +++ b/db/migrations/002_resumable_queue_jobs.sql @@ -0,0 +1,30 @@ +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS check_run_completed_at TIMESTAMPTZ; +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_owner TEXT; +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_expires_at TIMESTAMPTZ; +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMPTZ; +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS recovery_count INTEGER NOT NULL DEFAULT 0; +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS last_queue_message_at TIMESTAMPTZ; +ALTER TABLE file_reviews ADD COLUMN IF NOT EXISTS transient_error_count INTEGER NOT NULL DEFAULT 0; + +CREATE INDEX IF NOT EXISTS jobs_lease_expiry_idx + ON jobs (lease_expires_at) + WHERE status = 'running' AND lease_expires_at IS NOT NULL; + +CREATE INDEX IF NOT EXISTS jobs_terminal_check_idx + ON jobs (status, check_run_completed_at) + WHERE check_run_id IS NOT NULL AND check_run_completed_at IS NULL; + +CREATE INDEX IF NOT EXISTS jobs_unleased_running_idx + ON jobs (last_queue_message_at, heartbeat_at) + WHERE status = 'running' AND lease_expires_at IS NULL; + +DELETE FROM file_reviews fr +USING ( + SELECT id, ROW_NUMBER() OVER (PARTITION BY job_id, file_path ORDER BY created_at ASC, id ASC) AS row_number + FROM file_reviews +) ranked +WHERE fr.id = ranked.id + AND ranked.row_number > 1; + +CREATE UNIQUE INDEX IF NOT EXISTS file_reviews_job_file_path_key + ON file_reviews (job_id, file_path); diff --git a/scripts/test.mjs b/scripts/test.mjs index 0eeed33..804fba1 100644 --- a/scripts/test.mjs +++ b/scripts/test.mjs @@ -71,7 +71,7 @@ if (!usableEnvValue(process.env.TEST_DATABASE_URL)) { process.exit(1); } -process.env.DATABASE_URL = usableEnvValue(process.env.DATABASE_URL) ?? process.env.TEST_DATABASE_URL; +process.env.DATABASE_URL = process.env.TEST_DATABASE_URL; run(process.execPath, ['scripts/migrate.mjs']); run(process.execPath, ['node_modules/vitest/vitest.mjs', 'run']); diff --git a/src/client/pages/settings.tsx b/src/client/pages/settings.tsx index 5394c67..cc75810 100644 --- a/src/client/pages/settings.tsx +++ b/src/client/pages/settings.tsx @@ -39,12 +39,12 @@ const DEFAULT_GLOBAL_CONFIG: ModelRouteConfig = { ], }; -function normalizeGlobalConfig(config: any): ModelRouteConfig { +export function normalizeGlobalConfig(config: any): ModelRouteConfig { if (!config || !config.main) return DEFAULT_GLOBAL_CONFIG; return { main: config.main, - fallbacks: config.fallbacks?.length ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks, - size_overrides: config.size_overrides ?? DEFAULT_GLOBAL_CONFIG.size_overrides, + fallbacks: Array.isArray(config.fallbacks) ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks, + size_overrides: Array.isArray(config.size_overrides) ? config.size_overrides : DEFAULT_GLOBAL_CONFIG.size_overrides, }; } diff --git a/src/server/core/job-recovery.ts b/src/server/core/job-recovery.ts new file mode 100644 index 0000000..92e0ae7 --- /dev/null +++ b/src/server/core/job-recovery.ts @@ -0,0 +1,53 @@ +import type { AppBindings } from '@server/env'; +import { getTerminalJobsNeedingCheckRunCompletion, markJobCheckRunCompleted, recoverExpiredJobLeases } from '@server/db/jobs'; +import { logger } from '@server/core/logger'; +import { GitHubService } from '@server/services/github'; + +const MAX_RECOVERY_COUNT = 3; + +export async function recoverJobs(env: AppBindings) { + try { + const recovered = await recoverExpiredJobLeases(env, MAX_RECOVERY_COUNT); + for (const jobId of recovered.requeuedJobIds) { + await env.REVIEW_QUEUE.send({ + jobId, + deliveryId: crypto.randomUUID(), + phase: 'review', + }); + } + + if (recovered.requeuedJobIds.length > 0 || recovered.failedJobs.length > 0) { + logger.warn('Expired job leases recovered', { + requeued: recovered.requeuedJobIds.length, + failed: recovered.failedJobs.length, + }); + } + } catch (err) { + logger.error('Failed to recover expired job leases', err instanceof Error ? err : new Error(String(err))); + } +} + +export async function completeTerminalCheckRuns(env: AppBindings) { + const jobs = await getTerminalJobsNeedingCheckRunCompletion(env); + for (const job of jobs) { + if (!job.check_run_id) continue; + + try { + const github = new GitHubService(env, job.installation_id); + await github.updateCheckRun(job.owner, job.repo, job.check_run_id, { + status: 'completed', + conclusion: job.status === 'superseded' ? 'neutral' : 'failure', + title: job.status === 'superseded' ? 'Review superseded' : 'Review failed', + summary: job.error_msg ?? (job.status === 'superseded' ? 'Superseded by a newer commit or job.' : 'Review failed.'), + }); + await markJobCheckRunCompleted(env, job.id); + } catch (error) { + logger.error(`Failed to complete terminal check run for job ${job.id}`, error instanceof Error ? error : new Error(String(error))); + } + } +} + +export async function runOpportunisticJobMaintenance(env: AppBindings) { + await recoverJobs(env); + await completeTerminalCheckRuns(env); +} diff --git a/src/server/core/model-output.ts b/src/server/core/model-output.ts index fc8ebc9..856912e 100644 --- a/src/server/core/model-output.ts +++ b/src/server/core/model-output.ts @@ -5,6 +5,13 @@ import { findClosestValidLine, findPositionForLine, getValidNewLines, getValidPo import type { FileDiff } from './diff'; import { jsonrepair } from 'jsonrepair'; +const MAX_LOGGED_MODEL_OUTPUT_CHARS = 2_000; + +function truncateForLog(value: string) { + if (value.length <= MAX_LOGGED_MODEL_OUTPUT_CHARS) return value; + return `${value.slice(0, MAX_LOGGED_MODEL_OUTPUT_CHARS)}... [truncated ${value.length - MAX_LOGGED_MODEL_OUTPUT_CHARS} chars]`; +} + function hasReviewKeys(input: string) { return /"(findings|overall_explanation|overall_correctness|overall_confidence_score|summary)"\s*:/.test(input); } @@ -253,7 +260,7 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): { throw new Error('Model response did not contain review JSON keys.'); } } catch (e) { - logger.error('Failed to extract JSON from model response', { raw, error: e }); + logger.error('Failed to extract JSON from model response', { raw: truncateForLog(raw), error: e }); throw new Error('Could not find JSON root in model response.'); } @@ -269,14 +276,14 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): { try { repaired = jsonrepair(preprocessed); } catch (e) { - logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed, error: e }); + logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed: truncateForLog(preprocessed), error: e }); } let parsedJson: any; try { parsedJson = JSON.parse(repaired); } catch (e) { - logger.error('Critical JSON parse error after extraction and repair', { repaired, error: e }); + logger.error('Critical JSON parse error after extraction and repair', { repaired: truncateForLog(repaired), error: e }); throw new Error(`Invalid JSON format: ${e instanceof Error ? e.message : 'Unknown error'}`); } diff --git a/src/server/core/review.ts b/src/server/core/review.ts index de56f0b..bd2d163 100644 --- a/src/server/core/review.ts +++ b/src/server/core/review.ts @@ -1,14 +1,14 @@ import { logger } from './logger'; import { isSupportedGitHubWebhookEvent, type GitHubWebhookEventName, type GitHubWebhookPayload, type IssueCommentWebhookPayload, type PullRequestWebhookPayload } from '@shared/github'; -import { defaultRepoConfig, type ParsedReviewComment, type RepoConfig, type ReviewJobMessage } from '@shared/schema'; +import { defaultRepoConfig, normalizeModelId, type ParsedReviewComment, type RepoConfig, type ReviewJobMessage } from '@shared/schema'; import type { AppBindings } from '@server/env'; -import { getFileReviewsForJobs } from '@server/db/file-reviews'; -import { completeJob, failJob, findExistingJobForHead, getJobForProcessing, insertJob, mapJob, startJobProcessing, completePreparationStep, supersedeOlderJobs, updateJobCheckRun, updateJobStep } from '@server/db/jobs'; +import { getFileReviewsForJobs, recordRetryableFileReviewFailure, upsertFileReview } from '@server/db/file-reviews'; +import { claimJobLease, completeJob, completePreparationStep, failJob, findExistingJobForHead, getJobForProcessing, heartbeatJobLease, insertJob, mapJob, markJobCheckRunCompleted, markJobContinuationQueued, releaseJobLease, supersedeOlderJobs, updateJobCheckRun, updateJobStep } from '@server/db/jobs'; import { filterReviewableFiles, parseUnifiedDiff } from './diff'; import { GitHubService } from '../services/github'; import { GitHubClient } from './github'; -import { ModelService } from '../services/model'; +import { isRetryableModelError, ModelService } from '../services/model'; import { FormatterService } from '../services/formatter'; import { TokenTracker } from './token-tracker'; import { loadRepoConfig } from './config'; @@ -16,6 +16,67 @@ import { getWebhookDelivery } from '@server/db/webhook-deliveries'; type PersistedReviewJob = ReturnType; +export type ReviewJobRunResult = { action: 'ack' } | { action: 'retry'; delaySeconds: number }; + +const REVIEW_CHUNK_FILE_LIMIT = 2; +const REVIEW_CHUNK_WALL_CLOCK_MS = 8 * 60 * 1000; +const JOB_LEASE_SECONDS = 10 * 60; +const BUSY_RETRY_SECONDS = 60; +const RETRYABLE_MODEL_FAILURE_RETRY_SECONDS = 60; +const MAX_RETRYABLE_FILE_REVIEW_FAILURES = 3; + +function isRetryableFileReviewErrorMessage(message: string | null | undefined) { + if (!message) return false; + const lower = message.toLowerCase(); + return ( + lower.includes('all configured review models failed') || + lower.includes('retrying later') || + lower.includes('google request failed with 5') || + lower.includes('cloudflare') || + lower.includes('timeout') || + lower.includes('timed out') || + lower.includes('internal error') || + lower.includes('unavailable') || + lower.includes('high demand') || + lower.includes('temporary') || + lower.includes('[redacted]') || + lower.includes('returned no review content') || + lower.includes('empty response') + ); +} + +function shouldRetryExistingFileReview(review: { file_status: string; error_msg: string | null }) { + return review.file_status === 'failed' && isRetryableFileReviewErrorMessage(review.error_msg); +} + +function countsAsHandledFileReview(review: { file_status: string; error_msg: string | null }) { + return !shouldRetryExistingFileReview(review); +} + +function configuredModelSet(config: RepoConfig) { + const models = new Set(); + const addModel = (model: string | null | undefined) => { + if (model) models.add(normalizeModelId(model)); + }; + + addModel(config.model?.main ?? 'gemma-4-31b-it'); + for (const fallback of config.model?.fallbacks ?? []) { + addModel(fallback); + } + for (const tier of config.model?.size_overrides ?? []) { + addModel(tier.model); + for (const fallback of tier.fallbacks ?? []) { + addModel(fallback); + } + } + + return models; +} + +function canInheritParentFileReview(config: RepoConfig, review: { model_used: string }) { + return configuredModelSet(config).has(normalizeModelId(review.model_used)); +} + function shouldTriggerFromPullRequest(action: PullRequestWebhookPayload['action'], config: RepoConfig['review']) { return (config.on as string[]).includes(action); } @@ -94,341 +155,395 @@ export function extractReviewRequest(input: { return null; } -export async function runReviewJob(env: AppBindings, message: ReviewJobMessage) { - let job: PersistedReviewJob; - - if (message.jobId) { - const row = await getJobForProcessing(env, message.jobId); - if (!row) { - logger.warn(`Job not found for processing: ${message.jobId}`); - return; - } - - job = mapJob(row); - if (job.status === 'superseded') { - logger.info(`Job ${job.id} is superseded, skipping processing.`); - return; - } - if (job.status === 'running') { - logger.info(`Job ${job.id} is already running, skipping duplicate queue delivery.`); - return; - } - } else { - if (!message.eventName) { - logger.warn('Queue message ignored: missing eventName'); - return; - } +export async function runReviewJob(env: AppBindings, message: ReviewJobMessage): Promise { + const resolved = await resolveQueuedJob(env, message); + if (!resolved) { + return { action: 'ack' }; + } - let eventName = message.eventName; - let payload = message.payload as GitHubWebhookPayload | undefined; + const leaseOwner = crypto.randomUUID(); + const claim = await claimJobLease(env, resolved.job.id, leaseOwner, JOB_LEASE_SECONDS); + if (claim.status === 'missing') { + logger.warn(`Job not found for processing: ${resolved.job.id}`); + return { action: 'ack' }; + } + if (claim.status === 'terminal') { + logger.info(`Job ${resolved.job.id} is already terminal (${claim.row.status}), acking queue delivery.`); + return { action: 'ack' }; + } + if (claim.status === 'busy') { + logger.info(`Job ${resolved.job.id} has a fresh lease; retrying queue delivery later.`); + return { action: 'retry', delaySeconds: Math.min(BUSY_RETRY_SECONDS, claim.retryAfterSeconds) }; + } - if (payload === undefined) { - const delivery = await getWebhookDelivery(env, message.deliveryId); - if (!delivery) { - logger.warn(`Queue message ignored: webhook delivery not found: ${message.deliveryId}`); - return; - } + const job = mapJob(claim.row); + const phase = resolved.phase; + const tracker = new TokenTracker(); + const github = new GitHubService(env, job.installationId, tracker); + const model = new ModelService(env, tracker); + const formatter = new FormatterService(env.APP_URL); - eventName = delivery.event_name; - payload = delivery.payload as GitHubWebhookPayload; + try { + if (phase === 'prepare') { + await runPreparePhase(env, job, leaseOwner, github); + } else if (phase === 'finalize') { + await runFinalizePhase(env, job, leaseOwner, github, model, formatter); + } else { + await runReviewPhase(env, job, leaseOwner, github, model); } - if (!isSupportedGitHubWebhookEvent(eventName)) { - logger.info(`Queue message ignored: unsupported GitHub event ${eventName}`); - return; + await releaseJobLease(env, job.id, leaseOwner); + return { action: 'ack' }; + } catch (error) { + const messageText = error instanceof Error ? error.message : 'Unknown review failure'; + if (messageText === 'JOB_SUPERSEDED') { + logger.info(`Job ${job.id} was superseded during execution, stopping.`); + await releaseJobLease(env, job.id, leaseOwner); + return { action: 'ack' }; } - const installationId = String(payload.installation?.id ?? ''); - if (!installationId || !('repository' in payload) || !payload.repository) { - logger.info('Queue message ignored: missing installation or repository info'); - return; + if (isRetryableModelError(error)) { + logger.warn(`Review job hit transient model/provider failure; scheduling delayed continuation: ${job.owner}/${job.repo} PR #${job.prNumber}`, { + error: messageText, + phase, + delaySeconds: RETRYABLE_MODEL_FAILURE_RETRY_SECONDS, + }); + await enqueueJobPhase(env, job.id, phase, RETRYABLE_MODEL_FAILURE_RETRY_SECONDS); + await releaseJobLease(env, job.id, leaseOwner); + return { action: 'ack' }; } - // 1. Load Repo Config - const repoConfig = await loadRepoConfig(env, { - installationId, - owner: payload.repository.owner.login, - repo: payload.repository.name, - }); + logger.error(`Review job failed: ${job.owner}/${job.repo} PR #${job.prNumber}`, error); + await failJobAndCheckRun(env, job, github, messageText); + return { action: 'ack' }; + } +} - if (repoConfig.enabled === false) { - logger.info(`Job ignored: repository ${payload.repository.owner.login}/${payload.repository.name} is disabled`); - return; - } +async function resolveQueuedJob( + env: AppBindings, + message: ReviewJobMessage, +): Promise<{ job: PersistedReviewJob; phase: 'prepare' | 'review' | 'finalize' } | null> { + if (message.jobId) { + const row = await getJobForProcessing(env, message.jobId); + return row ? { job: mapJob(row), phase: message.phase ?? 'review' } : null; + } - // 2. Extract Review Request - const extracted = extractReviewRequest({ - eventName, - payload, - botUsername: env.BOT_USERNAME, - config: repoConfig.parsedJson, - }); + if (!message.eventName) { + logger.warn('Queue message ignored: missing eventName'); + return null; + } - if (!extracted) { - // Handle specific PR closed events if needed (cleanup) - if (eventName === 'pull_request') { - const prPayload = payload as PullRequestWebhookPayload; - if (prPayload.action === 'closed' && repoConfig.parsedJson.review.labels !== false) { - const labels = repoConfig.parsedJson.review.labels; - const gh = new GitHubClient(env, installationId); - await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p1); - await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p2); - await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p3); - } - } - return; - } + let eventName = message.eventName; + let payload = message.payload as GitHubWebhookPayload | undefined; - // 3. Resolve full PR info for mentions - let resolved = extracted; - const githubClient = new GitHubClient(env, installationId); - if (eventName === 'issue_comment') { - const pr = await githubClient.getPullRequest(extracted.owner, extracted.repo, extracted.prNumber); - resolved = { - ...extracted, - prTitle: pr.title, - prAuthor: pr.user.login, - commitSha: pr.head.sha, - baseSha: pr.base.sha, - headRef: pr.head.ref, - baseRef: pr.base.ref, - }; + if (payload === undefined) { + const delivery = await getWebhookDelivery(env, message.deliveryId); + if (!delivery) { + logger.warn(`Queue message ignored: webhook delivery not found: ${message.deliveryId}`); + return null; } - // 4. Duplicate Check - const duplicateJob = await findExistingJobForHead(env, { - owner: resolved.owner, - repo: resolved.repo, - prNumber: resolved.prNumber, - commitSha: resolved.commitSha, - trigger: resolved.trigger, - }); - if (duplicateJob) { - if (duplicateJob.status === 'running') { - logger.info(`Duplicate in-flight job ${duplicateJob.id} is already running for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`); - return; - } - if (duplicateJob.status === 'queued') { - logger.info(`Resuming duplicate in-flight job ${duplicateJob.id} for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`); - job = duplicateJob; - } else { - logger.info(`Duplicate terminal job found for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}, skipping.`); - return; - } - } else { - // 5. Insert Job - job = await insertJob(env, { - installationId: resolved.installationId, - owner: resolved.owner, - repo: resolved.repo, - prNumber: resolved.prNumber, - prTitle: resolved.prTitle, - prAuthor: resolved.prAuthor, - commitSha: resolved.commitSha, - baseSha: resolved.baseSha, - trigger: resolved.trigger, - headRef: resolved.headRef, - baseRef: resolved.baseRef, - configSnapshot: repoConfig.parsedJson, - }); - - // 6. Supersede older jobs - await supersedeOlderJobs(env, { - installationId: resolved.installationId, - owner: resolved.owner, - repo: resolved.repo, - prNumber: resolved.prNumber, - newJobId: job.id, - }); - } + eventName = delivery.event_name; + payload = delivery.payload as GitHubWebhookPayload; } - const tracker = new TokenTracker(); - const github = new GitHubService(env, job.installationId, tracker); - const model = new ModelService(env, tracker); - const formatter = new FormatterService(env.APP_URL); - - let checkRunId = job.checkRunId; + if (!isSupportedGitHubWebhookEvent(eventName)) { + logger.info(`Queue message ignored: unsupported GitHub event ${eventName}`); + return null; + } - try { - tracker.incrementSubrequests(1); - const claimed = await startJobProcessing(env, job.id, 'Preparation'); - if (!claimed) { - logger.info(`Job ${job.id} was already claimed or no longer queued, skipping duplicate queue delivery.`); - return; - } + const installationId = String(payload.installation?.id ?? ''); + if (!installationId || !('repository' in payload) || !payload.repository) { + logger.info('Queue message ignored: missing installation or repository info'); + return null; + } - const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber); - const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig; + const repoConfig = await loadRepoConfig(env, { + installationId, + owner: payload.repository.owner.login, + repo: payload.repository.name, + }); - if (!checkRunId) { - const checkRun = await github.createCheckRun(job.owner, job.repo, { - headSha: pr.head.sha, - title: 'Review queued', - summary: 'Codra has started reviewing this pull request.', - }); - checkRunId = checkRun.id; + if (repoConfig.enabled === false) { + logger.info(`Job ignored: repository ${payload.repository.owner.login}/${payload.repository.name} is disabled`); + return null; + } - tracker.incrementSubrequests(1); - await updateJobCheckRun(env, job.id, checkRun.id); + const extracted = extractReviewRequest({ + eventName, + payload, + botUsername: env.BOT_USERNAME, + config: repoConfig.parsedJson, + }); + + if (!extracted) { + if (eventName === 'pull_request') { + const prPayload = payload as PullRequestWebhookPayload; + if (prPayload.action === 'closed' && repoConfig.parsedJson.review.labels !== false) { + const labels = repoConfig.parsedJson.review.labels; + const gh = new GitHubClient(env, installationId); + await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p1); + await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p2); + await gh.removeIssueLabel(prPayload.repository.owner.login, prPayload.repository.name, prPayload.pull_request.number, labels.p3); + } } + return null; + } - const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber); - const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review); - - tracker.incrementSubrequests(1); - await completePreparationStep(env, job.id, files.length); + let resolved = extracted; + const githubClient = new GitHubClient(env, installationId); + if (eventName === 'issue_comment') { + const pr = await githubClient.getPullRequest(extracted.owner, extracted.repo, extracted.prNumber); + resolved = { + ...extracted, + prTitle: pr.title, + prAuthor: pr.user.login, + commitSha: pr.head.sha, + baseSha: pr.base.sha, + headRef: pr.head.ref, + baseRef: pr.base.ref, + }; + } - tracker.incrementSubrequests(1); - const preparedJob = await getJobForProcessing(env, job.id); - if (preparedJob?.status === 'superseded') { - throw new Error('JOB_SUPERSEDED'); + const duplicateJob = await findExistingJobForHead(env, { + owner: resolved.owner, + repo: resolved.repo, + prNumber: resolved.prNumber, + commitSha: resolved.commitSha, + trigger: resolved.trigger, + }); + if (duplicateJob) { + if (duplicateJob.status === 'queued' || duplicateJob.status === 'running') { + logger.info(`Resuming duplicate in-flight job ${duplicateJob.id} for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}.`); + return { job: duplicateJob, phase: message.phase ?? 'prepare' }; } - tracker.incrementSubrequests(1); - await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' }); - const reviewedComments: ParsedReviewComment[] = []; - const fileSummaries: Array<{ path: string; summary: string; verdict: string }> = []; - const newReviewsToInsert: any[] = []; - let stoppedBeforeAllFiles = false; - - const jobIdsToQuery = [job.id]; - if (job.retryOfJobId) jobIdsToQuery.push(job.retryOfJobId); - const allExistingReviews = await getFileReviewsForJobs(env, jobIdsToQuery); - - const currentJobReviews = allExistingReviews.filter(r => r.job_id === job.id); - const existingReviews = [...currentJobReviews]; - for (const r of allExistingReviews) { - if (r.job_id !== job.id && !existingReviews.some(er => er.file_path === r.file_path)) { - existingReviews.push(r); - } - } + logger.info(`Duplicate terminal job found for ${resolved.owner}/${resolved.repo} PR #${resolved.prNumber}, skipping.`); + return null; + } - const totalLineCount = files.reduce((sum, f) => sum + f.lineCount, 0); - for (const [index, file] of files.entries()) { - // Safety break to avoid hitting Cloudflare 50-subrequest limit - if (!tracker.hasRemainingSubrequests(5)) { - logger.warn(`Approaching subrequest limit (${tracker.getSubrequestCount()}), stopping review loop at file ${index + 1}/${files.length}`); - stoppedBeforeAllFiles = true; - break; - } + const job = await insertJob(env, { + installationId: resolved.installationId, + owner: resolved.owner, + repo: resolved.repo, + prNumber: resolved.prNumber, + prTitle: resolved.prTitle, + prAuthor: resolved.prAuthor, + commitSha: resolved.commitSha, + baseSha: resolved.baseSha, + trigger: resolved.trigger, + headRef: resolved.headRef, + baseRef: resolved.baseRef, + configSnapshot: repoConfig.parsedJson, + }); + + await supersedeOlderJobs(env, { + installationId: resolved.installationId, + owner: resolved.owner, + repo: resolved.repo, + prNumber: resolved.prNumber, + newJobId: job.id, + }); + + return { job, phase: 'prepare' }; +} - // Periodic check for supersession (every 50 files - reduced frequency to save subrequests) - if (index % 50 === 0 && index > 0) { - tracker.incrementSubrequests(1); - const currentJob = await getJobForProcessing(env, job.id); - if (currentJob?.status === 'superseded') { - throw new Error('JOB_SUPERSEDED'); - } - } +async function runPreparePhase( + env: AppBindings, + job: PersistedReviewJob, + leaseOwner: string, + github: GitHubService, +) { + await updateJobStep(env, job.id, 'Preparation', { status: 'running' }); + const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber); + const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig; - const existing = existingReviews.find((r) => r.file_path === file.path && r.file_status === 'done'); + let checkRunId = job.checkRunId; + if (!checkRunId) { + const checkRun = await github.createCheckRun(job.owner, job.repo, { + headSha: pr.head.sha, + title: 'Review queued', + summary: 'Codra has started reviewing this pull request.', + }); + checkRunId = checkRun.id; + await updateJobCheckRun(env, job.id, checkRun.id); + } - if (existing) { - reviewedComments.push(...(existing.parsed_comments as ParsedReviewComment[])); - fileSummaries.push({ - path: file.path, - summary: existing.file_summary ?? '', - verdict: existing.verdict ?? 'comment', - }); + const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber); + const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review); + await completePreparationStep(env, job.id, files.length); + await heartbeatJobLease(env, job.id, leaseOwner, JOB_LEASE_SECONDS); - if (existing.model_used && (existing.input_tokens || existing.output_tokens)) { - tracker.record(existing.model_used, existing.input_tokens ?? 0, existing.output_tokens ?? 0); - } - - // If this review was from a parent job, we'll include it in our batch insert for the current job - if (!currentJobReviews.some((r) => r.file_path === file.path)) { - newReviewsToInsert.push({ - filePath: file.path, - fileStatus: 'done', - modelUsed: existing.model_used, - modelProvider: (existing as any).model_provider, - diffLineCount: existing.diff_line_count, - diffInput: existing.diff_input, - rawAiOutput: existing.raw_ai_output, - parsedComments: existing.parsed_comments as ParsedReviewComment[], - inputTokens: existing.input_tokens, - outputTokens: existing.output_tokens, - durationMs: existing.duration_ms, - verdict: existing.verdict, - fileSummary: existing.file_summary, - overallCorrectness: existing.overall_correctness, - confidenceScore: existing.confidence_score, - errorMessage: null, - }); - } - continue; - } + if (files.length === 0) { + await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' }); + await enqueueJobPhase(env, job.id, 'finalize'); + return; + } - // Update check run less frequently (every 50 files) - if ((index > 0 && index % 50 === 0) || index === files.length - 1) { - await github.updateCheckRun(job.owner, job.repo, checkRunId, { - title: `Reviewing (${index + 1}/${files.length})`, - summary: `Analyzing ${file.path}`, - }); - } + if (checkRunId) { + await github.updateCheckRun(job.owner, job.repo, checkRunId, { + title: `Reviewing (0/${files.length})`, + summary: 'Codra is analyzing changed files.', + }); + } + await enqueueJobPhase(env, job.id, 'review'); +} - const startedAt = Date.now(); - try { - // AI call (ModelService handles its own subrequest incrementing) - const response = await model.reviewFile({ - file, - prTitle: pr.title ?? null, - prDescription: pr.body ?? null, - config: config, - totalLineCount, - }); +async function runReviewPhase( + env: AppBindings, + job: PersistedReviewJob, + leaseOwner: string, + github: GitHubService, + model: ModelService, +) { + if (!hasCompletedStep(job, 'Preparation')) { + await runPreparePhase(env, job, leaseOwner, github); + return; + } - reviewedComments.push(...response.parsed.comments); - fileSummaries.push({ - path: file.path, - summary: response.parsed.fileSummary, - verdict: response.parsed.verdict, - }); + await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' }); + + const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber); + const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig; + const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber); + const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review); + const totalLineCount = files.reduce((sum, file) => sum + file.lineCount, 0); + const startedAt = Date.now(); + let processedThisChunk = 0; + + const jobIdsToQuery = [job.id]; + if (job.retryOfJobId) jobIdsToQuery.push(job.retryOfJobId); + const allExistingReviews = await getFileReviewsForJobs(env, jobIdsToQuery); + const currentReviews = new Map(allExistingReviews.filter((review) => review.job_id === job.id).map((review) => [review.file_path, review])); + const parentReviews = new Map(allExistingReviews.filter((review) => review.job_id !== job.id && review.file_status === 'done').map((review) => [review.file_path, review])); + + for (const file of files) { + const existingReview = currentReviews.get(file.path); + if (existingReview && countsAsHandledFileReview(existingReview)) { + continue; + } - newReviewsToInsert.push({ + const inherited = parentReviews.get(file.path); + if (inherited) { + if (!canInheritParentFileReview(config, inherited)) { + logger.info(`Ignoring inherited review for ${file.path}; parent model ${inherited.model_used} is not in the current model strategy`); + await reviewAndPersistFile(env, job, file, pr, config, totalLineCount, model); + currentReviews.set(file.path, true as any); + processedThisChunk += 1; + await heartbeatAndCheckSuperseded(env, job.id, leaseOwner); + } else { + await upsertFileReview(env, job.id, { filePath: file.path, fileStatus: 'done', - modelUsed: response.modelUsed, - modelProvider: response.provider, - diffLineCount: file.lineCount, - diffInput: response.userPrompt, - rawAiOutput: response.rawText, - parsedComments: response.parsed.comments, - inputTokens: response.inputTokens, - outputTokens: response.outputTokens, - durationMs: Date.now() - startedAt, - verdict: response.parsed.verdict, - fileSummary: response.parsed.fileSummary, - overallCorrectness: response.parsed.overallCorrectness, - confidenceScore: response.parsed.confidenceScore, + modelUsed: inherited.model_used, + modelProvider: inherited.model_provider, + diffLineCount: inherited.diff_line_count, + diffInput: inherited.diff_input, + rawAiOutput: inherited.raw_ai_output, + parsedComments: inherited.parsed_comments as ParsedReviewComment[], + inputTokens: inherited.input_tokens, + outputTokens: inherited.output_tokens, + durationMs: inherited.duration_ms, + verdict: inherited.verdict, + fileSummary: inherited.file_summary, + overallCorrectness: inherited.overall_correctness, + confidenceScore: inherited.confidence_score, errorMessage: null, }); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown file review error'; - logger.error(`File review failed for ${file.path}`, { error }); - - // If we hit a hard limit (subrequests or neuron quota), STOP EVERYTHING. - const isHardLimit = - errorMessage.toLowerCase().includes('subrequest') || - errorMessage.includes('4006') || - errorMessage.toLowerCase().includes('allocation'); - - if (isHardLimit) { - throw error; - } - - fileSummaries.push({ - path: file.path, - summary: `Review failed: ${errorMessage}`, - verdict: 'failed', - }); + currentReviews.set(file.path, inherited); + processedThisChunk += 1; + await heartbeatAndCheckSuperseded(env, job.id, leaseOwner); + } + } else { + await reviewAndPersistFile(env, job, file, pr, config, totalLineCount, model); + currentReviews.set(file.path, true as any); + processedThisChunk += 1; + await heartbeatAndCheckSuperseded(env, job.id, leaseOwner); + } + + if (processedThisChunk >= REVIEW_CHUNK_FILE_LIMIT || Date.now() - startedAt >= REVIEW_CHUNK_WALL_CLOCK_MS) { + break; + } + } + + const latestReviews = await getFileReviewsForJobs(env, [job.id]); + const reviewedPaths = new Set(latestReviews.filter(countsAsHandledFileReview).map((review) => review.file_path)); + const completedCount = files.filter((file) => reviewedPaths.has(file.path)).length; + + if (completedCount >= files.length) { + await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' }); + await enqueueJobPhase(env, job.id, 'finalize'); + return; + } + + if (job.checkRunId) { + await github.updateCheckRun(job.owner, job.repo, job.checkRunId, { + title: `Reviewing (${completedCount}/${files.length})`, + summary: 'Codra is continuing this review in the next queue chunk.', + }); + } + await enqueueJobPhase(env, job.id, 'review'); +} + +async function reviewAndPersistFile( + env: AppBindings, + job: PersistedReviewJob, + file: ReturnType[number], + pr: Awaited>, + config: RepoConfig, + totalLineCount: number, + model: ModelService, +) { + const startedAt = Date.now(); + try { + const response = await model.reviewFile({ + file, + prTitle: pr.title ?? null, + prDescription: pr.body ?? null, + config, + totalLineCount, + }); - newReviewsToInsert.push({ + await upsertFileReview(env, job.id, { + filePath: file.path, + fileStatus: 'done', + modelUsed: response.modelUsed, + modelProvider: response.provider, + diffLineCount: file.lineCount, + diffInput: response.userPrompt, + rawAiOutput: response.rawText, + parsedComments: response.parsed.comments, + inputTokens: response.inputTokens, + outputTokens: response.outputTokens, + durationMs: Date.now() - startedAt, + verdict: response.parsed.verdict, + fileSummary: response.parsed.fileSummary, + overallCorrectness: response.parsed.overallCorrectness, + confidenceScore: response.parsed.confidenceScore, + errorMessage: null, + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown file review error'; + + if (isRetryableModelError(error)) { + const modelId = config.model?.main ?? 'gemma-4-31b-it'; + const failureCount = await recordRetryableFileReviewFailure(env, job.id, { + filePath: file.path, + modelUsed: modelId, + modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google', + diffLineCount: file.lineCount, + diffInput: '', + durationMs: Date.now() - startedAt, + errorMessage, + }); + + if (failureCount >= MAX_RETRYABLE_FILE_REVIEW_FAILURES) { + const finalError = `Review skipped after ${failureCount} repeated model provider outages.`; + await upsertFileReview(env, job.id, { filePath: file.path, fileStatus: 'failed', - modelUsed: config.model?.main ?? 'gemma-4-31b-it', - modelProvider: (config.model?.main ?? 'gemma-4-31b-it').startsWith('@cf/') ? 'cloudflare' : 'google', + modelUsed: modelId, + modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google', diffLineCount: file.lineCount, diffInput: '', rawAiOutput: null, @@ -438,137 +553,206 @@ export async function runReviewJob(env: AppBindings, message: ReviewJobMessage) durationMs: Date.now() - startedAt, verdict: null, fileSummary: null, - errorMessage, + errorMessage: finalError, + }); + logger.error(`File review failed permanently for ${file.path} after transient retries`, { + attempts: failureCount, + error: errorMessage, }); + return; } - } - - // Batch insert all NEW or parent-inherited reviews at once (1 subrequest for reviews, 1 for comments) - if (newReviewsToInsert.length > 0) { - const { batchInsertFileReviews } = await import('@server/db/file-reviews'); - tracker.incrementSubrequests(2); // 1 for reviews, 1 for comments - await batchInsertFileReviews(env, job.id, newReviewsToInsert); - } - if (stoppedBeforeAllFiles) { - tracker.incrementSubrequests(1); - await updateJobStep(env, job.id, 'Reviewing Files', { - status: 'failed', - error: 'Review stopped before all files were analyzed due to subrequest limits.', + logger.warn(`File review deferred for ${file.path}; transient model/provider failure will retry later`, { + error: errorMessage, + attempts: failureCount, }); - throw new Error('Review stopped before all files were analyzed due to subrequest limits.'); - } - - if (fileSummaries.length > 0 && fileSummaries.every((f) => f.verdict === 'failed')) { - tracker.incrementSubrequests(1); - await updateJobStep(env, job.id, 'Reviewing Files', { status: 'failed', error: 'All files failed to review' }); - throw new Error('All files failed to review'); + throw error; } - tracker.incrementSubrequests(1); - await updateJobStep(env, job.id, 'Reviewing Files', { status: 'done' }); + logger.error(`File review failed for ${file.path}`, { error }); - tracker.incrementSubrequests(1); - await updateJobStep(env, job.id, 'Generating Summary', { status: 'running' }); - const hasFailures = fileSummaries.some((f) => f.verdict === 'failed'); - const verdictSummary = formatter.summarizeVerdict(reviewedComments, hasFailures); + const isHardLimit = + errorMessage.toLowerCase().includes('subrequest') || + errorMessage.includes('4006') || + errorMessage.toLowerCase().includes('allocation'); - // Final check before generating summary and posting review - const finalJobCheck = await getJobForProcessing(env, job.id); - if (finalJobCheck?.status === 'superseded') { - throw new Error('JOB_SUPERSEDED'); + if (isHardLimit) { + throw error; } - const summaryResponse = await model.generateSummary({ - prTitle: pr.title ?? null, - verdict: verdictSummary.verdict, - fileSummaries, - config, + const modelId = config.model?.main ?? 'gemma-4-31b-it'; + await upsertFileReview(env, job.id, { + filePath: file.path, + fileStatus: 'failed', + modelUsed: modelId, + modelProvider: modelId.startsWith('@cf/') ? 'cloudflare' : 'google', + diffLineCount: file.lineCount, + diffInput: '', + rawAiOutput: null, + parsedComments: [], + inputTokens: null, + outputTokens: null, + durationMs: Date.now() - startedAt, + verdict: null, + fileSummary: null, + errorMessage, }); + } +} - await updateJobStep(env, job.id, 'Generating Summary', { status: 'done' }); - - const formattedSummary = formatter.formatReviewOverview(pr.head.sha, env.BOT_USERNAME); +async function runFinalizePhase( + env: AppBindings, + job: PersistedReviewJob, + leaseOwner: string, + github: GitHubService, + model: ModelService, + formatter: FormatterService, +) { + await updateJobStep(env, job.id, 'Generating Summary', { status: 'running' }); + + const pr = await github.getPullRequest(job.owner, job.repo, job.prNumber); + const config = (job.configSnapshot ?? defaultRepoConfig) as RepoConfig; + const rawDiff = await github.getPullRequestDiff(job.owner, job.repo, job.prNumber); + const files = filterReviewableFiles(parseUnifiedDiff(rawDiff), config.review); + const reviews = await getFileReviewsForJobs(env, [job.id]); + + if (reviews.length < files.length) { + await updateJobStep(env, job.id, 'Reviewing Files', { status: 'running' }); + await enqueueJobPhase(env, job.id, 'review'); + return; + } - await updateJobStep(env, job.id, 'Completing', { status: 'running' }); - const review = await github.createReview(job.owner, job.repo, job.prNumber, { - commitSha: pr.head.sha, - event: formatter.toReviewEvent(verdictSummary.verdict), - body: formattedSummary, - comments: reviewedComments.map(c => ({ - path: c.path, - position: c.position ?? undefined, - body: formatter.formatInlineComment(c) - })), - }); + const reviewedComments = reviews.flatMap((review) => review.parsed_comments as ParsedReviewComment[]); + const fileSummaries = reviews.map((review) => ({ + path: review.file_path, + summary: review.file_status === 'failed' + ? `Review failed: ${review.error_msg ?? 'Unknown file review error'}` + : (review.file_summary ?? ''), + verdict: review.file_status === 'failed' ? 'failed' : (review.verdict ?? 'comment'), + })); + + if (fileSummaries.length > 0 && fileSummaries.every((file) => file.verdict === 'failed')) { + await updateJobStep(env, job.id, 'Generating Summary', { status: 'failed', error: 'All files failed to review' }); + throw new Error('All files failed to review'); + } - if (config.review.labels !== false) { - const labels = config.review.labels; - const labelMap = { - comment: { name: labels.p1, color: 'f79009' }, - approve: { name: labels.p2, color: '027a48' }, - } as const; - const label = labelMap[verdictSummary.verdict]; - - // Remove other verdict labels if they exist - const allPotentialLabels = [labels.p1, labels.p2, labels.p3]; - for (const l of allPotentialLabels) { - if (l !== label.name) { - await github.removeIssueLabel(job.owner, job.repo, job.prNumber, l); - } + const hasFailures = fileSummaries.some((file) => file.verdict === 'failed'); + const verdictSummary = formatter.summarizeVerdict(reviewedComments, hasFailures); + const summaryResponse = await model.generateSummary({ + prTitle: pr.title ?? null, + verdict: verdictSummary.verdict, + fileSummaries, + config, + }); + await updateJobStep(env, job.id, 'Generating Summary', { status: 'done' }); + await heartbeatAndCheckSuperseded(env, job.id, leaseOwner); + + const formattedSummary = formatter.formatReviewOverview(pr.head.sha, env.BOT_USERNAME); + + await updateJobStep(env, job.id, 'Completing', { status: 'running' }); + const review = await github.createReview(job.owner, job.repo, job.prNumber, { + commitSha: pr.head.sha, + event: formatter.toReviewEvent(verdictSummary.verdict), + body: formattedSummary, + comments: reviewedComments.map(comment => ({ + path: comment.path, + position: comment.position ?? undefined, + body: formatter.formatInlineComment(comment), + })), + }); + + if (config.review.labels !== false) { + const labels = config.review.labels; + const labelMap = { + comment: { name: labels.p1, color: 'f79009' }, + approve: { name: labels.p2, color: '027a48' }, + } as const; + const label = labelMap[verdictSummary.verdict]; + + for (const possibleLabel of [labels.p1, labels.p2, labels.p3]) { + if (possibleLabel !== label.name) { + await github.removeIssueLabel(job.owner, job.repo, job.prNumber, possibleLabel); } - - await github.ensureLabel(job.owner, job.repo, label.name, label.color); - await github.addIssueLabels(job.owner, job.repo, job.prNumber, [label.name]); } - await github.updateCheckRun(job.owner, job.repo, checkRunId, { + await github.ensureLabel(job.owner, job.repo, label.name, label.color); + await github.addIssueLabels(job.owner, job.repo, job.prNumber, [label.name]); + } + + if (job.checkRunId) { + await github.updateCheckRun(job.owner, job.repo, job.checkRunId, { status: 'completed', conclusion: hasFailures ? 'failure' : (verdictSummary.verdict === 'approve' ? 'success' : 'neutral'), title: hasFailures ? 'Review partially failed' : (verdictSummary.verdict === 'approve' ? 'LGTM' : 'Comments posted'), summary: `${reviewedComments.length} inline comments across ${files.length} files.${hasFailures ? ' Some files failed to parse.' : ''}`, }); + } - const finalUsage = tracker.getTotalUsage(); - logger.info(`Final token usage for job ${job.id}:`, { - total: finalUsage, - breakdown: tracker.getBreakdown() - }); + const fileInputTokens = reviews.reduce((sum, review) => sum + (review.input_tokens ?? 0), 0); + const fileOutputTokens = reviews.reduce((sum, review) => sum + (review.output_tokens ?? 0), 0); + await completeJob(env, job.id, { + verdict: verdictSummary.verdict, + fileCount: files.length, + commentCount: reviewedComments.length, + totalInputTokens: fileInputTokens + (summaryResponse.inputTokens ?? 0), + totalOutputTokens: fileOutputTokens + (summaryResponse.outputTokens ?? 0), + summaryMarkdown: formattedSummary, + reviewId: review.id, + summaryModel: summaryResponse.modelUsed, + }); + await updateJobStep(env, job.id, 'Completing', { status: 'done' }); + logger.info(`Review job completed: ${job.owner}/${job.repo} PR #${job.prNumber}`); +} - await completeJob(env, job.id, { - verdict: verdictSummary.verdict, - fileCount: files.length, - commentCount: reviewedComments.length, - totalInputTokens: finalUsage.input, - totalOutputTokens: finalUsage.output, - summaryMarkdown: formattedSummary, - reviewId: review.id, - summaryModel: summaryResponse.modelUsed, - }); - await updateJobStep(env, job.id, 'Completing', { status: 'done' }); - logger.info(`Review job completed: ${job.owner}/${job.repo} PR #${job.prNumber}`); - } catch (error) { - const message = error instanceof Error ? error.message : 'Unknown review failure'; - if (message === 'JOB_SUPERSEDED') { - logger.info(`Job ${job.id} was superseded during execution, stopping.`); - return; - } +async function heartbeatAndCheckSuperseded(env: AppBindings, jobId: string, leaseOwner: string) { + await heartbeatJobLease(env, jobId, leaseOwner, JOB_LEASE_SECONDS); + const currentJob = await getJobForProcessing(env, jobId); + if (currentJob?.status === 'superseded') { + throw new Error('JOB_SUPERSEDED'); + } +} - logger.error(`Review job failed: ${job.owner}/${job.repo} PR #${job.prNumber}`, error); +async function enqueueJobPhase( + env: AppBindings, + jobId: string, + phase: 'prepare' | 'review' | 'finalize', + delaySeconds = 0, +) { + await markJobContinuationQueued(env, jobId); + await env.REVIEW_QUEUE.send( + { + jobId, + deliveryId: crypto.randomUUID(), + phase, + }, + delaySeconds > 0 ? { delaySeconds } : undefined, + ); +} - // Attempt to record failure, but don't crash if we are out of subrequests - try { - await failJob(env, job.id, message); - if (checkRunId) { - await github.updateCheckRun(job.owner, job.repo, checkRunId, { - status: 'completed', - conclusion: 'failure', - title: 'Review failed', - summary: message, - }); - } - } catch (innerError) { - logger.error('Failed to record job failure in DB/GitHub (likely subrequest limit reached)', innerError); +function hasCompletedStep(job: PersistedReviewJob, stepName: string) { + return job.steps.some((step) => step.name === stepName && step.status === 'done'); +} + +async function failJobAndCheckRun( + env: AppBindings, + job: PersistedReviewJob, + github: GitHubService, + message: string, +) { + try { + await failJob(env, job.id, message); + const latest = await getJobForProcessing(env, job.id); + const checkRunId = latest?.check_run_id ?? job.checkRunId; + if (checkRunId) { + await github.updateCheckRun(job.owner, job.repo, checkRunId, { + status: 'completed', + conclusion: 'failure', + title: 'Review failed', + summary: message, + }); + await markJobCheckRunCompleted(env, job.id); } + } catch (innerError) { + logger.error('Failed to record job failure in DB/GitHub', innerError); } } diff --git a/src/server/db/client.ts b/src/server/db/client.ts index 934d487..f37a6fd 100644 --- a/src/server/db/client.ts +++ b/src/server/db/client.ts @@ -13,13 +13,13 @@ function createDbClient(env: DbEnv): DbClient { const sql = postgres(env.HYPERDRIVE.connectionString, { max: 5, fetch_types: false, - prepare: true, + prepare: false, onnotice: () => {}, }); return { async query(sqlText: string, params: unknown[] = []) { - return (await sql.unsafe(sqlText, params.map(normalizeParam) as any[], { prepare: true })) as T[]; + return (await sql.unsafe(sqlText, params.map(normalizeParam) as any[], { prepare: false })) as T[]; }, }; } diff --git a/src/server/db/file-reviews.ts b/src/server/db/file-reviews.ts index bc99aac..19d2ff1 100644 --- a/src/server/db/file-reviews.ts +++ b/src/server/db/file-reviews.ts @@ -93,6 +93,184 @@ export async function insertFileReview( } } +export async function upsertFileReview( + env: Pick, + jobId: string, + input: { + filePath: string; + fileStatus: 'pending' | 'done' | 'skipped' | 'failed'; + modelUsed: string; + modelProvider?: string | null; + diffLineCount: number; + diffInput: string | null; + rawAiOutput: string | null; + parsedComments: ParsedReviewComment[]; + inputTokens: number | null; + outputTokens: number | null; + durationMs: number | null; + verdict: 'approve' | 'comment' | null; + fileSummary: string | null; + overallCorrectness?: string | null; + confidenceScore?: number | null; + errorMessage: string | null; + }, +) { + const [review] = await queryRows<{ id: string }>( + env, + ` + INSERT INTO file_reviews ( + job_id, + file_path, + file_status, + model_used, + diff_line_count, + diff_input, + raw_ai_output, + input_tokens, + output_tokens, + duration_ms, + verdict, + file_summary, + overall_correctness, + confidence_score, + error_msg, + model_provider + ) + VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + ON CONFLICT (job_id, file_path) DO UPDATE SET + file_status = EXCLUDED.file_status, + model_used = EXCLUDED.model_used, + diff_line_count = EXCLUDED.diff_line_count, + diff_input = EXCLUDED.diff_input, + raw_ai_output = EXCLUDED.raw_ai_output, + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + duration_ms = EXCLUDED.duration_ms, + verdict = EXCLUDED.verdict, + file_summary = EXCLUDED.file_summary, + overall_correctness = EXCLUDED.overall_correctness, + confidence_score = EXCLUDED.confidence_score, + error_msg = EXCLUDED.error_msg, + model_provider = EXCLUDED.model_provider, + transient_error_count = 0 + RETURNING id + `, + [ + jobId, + input.filePath, + input.fileStatus, + input.modelUsed, + input.diffLineCount, + input.diffInput, + input.rawAiOutput, + input.inputTokens, + input.outputTokens, + input.durationMs, + input.verdict, + input.fileSummary, + input.overallCorrectness ?? null, + input.confidenceScore ?? null, + input.errorMessage, + input.modelProvider ?? null, + ], + ); + + await queryRows(env, 'DELETE FROM review_comments WHERE file_review_id = $1::uuid', [review.id]); + + if (input.parsedComments.length > 0) { + await queryRows( + env, + ` + INSERT INTO review_comments ( + file_review_id, path, line, position, severity, category, title, body, code_suggestion + ) + SELECT $1::uuid, * FROM UNNEST($2::text[], $3::int[], $4::int[], $5::text[], $6::text[], $7::text[], $8::text[], $9::text[]) + `, + [ + review.id, + input.parsedComments.map(c => c.path), + input.parsedComments.map(c => c.line ?? null), + input.parsedComments.map(c => c.position ?? null), + input.parsedComments.map(c => c.severity), + input.parsedComments.map(c => c.category), + input.parsedComments.map(c => c.title), + input.parsedComments.map(c => c.body), + input.parsedComments.map(c => c.codeSuggestion ?? null), + ], + ); + } +} + +export async function recordRetryableFileReviewFailure( + env: Pick, + jobId: string, + input: { + filePath: string; + modelUsed: string; + modelProvider?: string | null; + diffLineCount: number; + diffInput: string | null; + durationMs: number | null; + errorMessage: string; + }, +) { + const [review] = await queryRows<{ id: string; transient_error_count: number }>( + env, + ` + INSERT INTO file_reviews ( + job_id, + file_path, + file_status, + model_used, + model_provider, + diff_line_count, + diff_input, + raw_ai_output, + input_tokens, + output_tokens, + duration_ms, + verdict, + file_summary, + overall_correctness, + confidence_score, + error_msg, + transient_error_count + ) + VALUES ($1::uuid, $2, 'failed', $3, $4, $5, $6, NULL, NULL, NULL, $7, NULL, NULL, NULL, NULL, $8, 1) + ON CONFLICT (job_id, file_path) DO UPDATE SET + file_status = 'failed', + model_used = EXCLUDED.model_used, + model_provider = EXCLUDED.model_provider, + diff_line_count = EXCLUDED.diff_line_count, + diff_input = EXCLUDED.diff_input, + raw_ai_output = NULL, + input_tokens = NULL, + output_tokens = NULL, + duration_ms = EXCLUDED.duration_ms, + verdict = NULL, + file_summary = NULL, + overall_correctness = NULL, + confidence_score = NULL, + error_msg = EXCLUDED.error_msg, + transient_error_count = file_reviews.transient_error_count + 1 + RETURNING id, transient_error_count + `, + [ + jobId, + input.filePath, + input.modelUsed, + input.modelProvider ?? null, + input.diffLineCount, + input.diffInput, + input.durationMs, + input.errorMessage, + ], + ); + + await queryRows(env, 'DELETE FROM review_comments WHERE file_review_id = $1::uuid', [review.id]); + return review.transient_error_count; +} + export async function getModelUsageStats(env: Pick) { return queryRows<{ model_used: string; @@ -262,6 +440,7 @@ export async function getFileReviewsForJobs(env: Pick confidence_score: number | null; error_msg: string | null; model_provider: string | null; + transient_error_count: number; }>( env, ` diff --git a/src/server/db/jobs.ts b/src/server/db/jobs.ts index 76e1944..52941bc 100644 --- a/src/server/db/jobs.ts +++ b/src/server/db/jobs.ts @@ -3,7 +3,7 @@ import { parseJsonColumn, queryRows } from './client'; import { defaultRepoConfig, jobDetailSchema, jobSummarySchema, repoConfigSchema, type RepoConfig } from '@shared/schema'; import { getOrCreateRepository } from './repositories'; -type JobRow = { +export type JobRow = { id: string; installation_id: string; owner: string; @@ -17,9 +17,15 @@ type JobRow = { status: 'queued' | 'running' | 'done' | 'failed' | 'superseded'; config_snapshot: { review?: RepoConfig['review']; model?: RepoConfig['model'] } | string | null; check_run_id: number | null; + check_run_completed_at: string | null; created_at: string; started_at: string | null; finished_at: string | null; + lease_owner: string | null; + lease_expires_at: string | null; + heartbeat_at: string | null; + recovery_count: number | null; + last_queue_message_at: string | null; total_input_tokens: number | null; total_output_tokens: number | null; verdict: 'approve' | 'comment' | null; @@ -50,6 +56,12 @@ type JobDetailRow = JobRow & { type ByteaValue = ArrayBuffer | ArrayBufferView | string; +export type JobLeaseClaim = + | { status: 'claimed'; row: JobRow } + | { status: 'busy'; row: JobRow; retryAfterSeconds: number } + | { status: 'terminal'; row: JobRow } + | { status: 'missing' }; + function hexToBytes(hex: string) { const bytes = new Uint8Array(hex.length / 2); for (let index = 0; index < bytes.length; index += 1) { @@ -366,6 +378,108 @@ export async function startJobProcessing(env: Pick, j return rows.length > 0; } +export async function claimJobLease( + env: Pick, + jobId: string, + leaseOwner: string, + leaseSeconds: number, +): Promise { + const [claimed] = await queryRows( + env, + ` + WITH claimed AS ( + UPDATE jobs + SET status = CASE WHEN status = 'queued' THEN 'running' ELSE status END, + started_at = COALESCE(started_at, now()), + lease_owner = $2, + lease_expires_at = now() + ($3 || ' seconds')::interval, + heartbeat_at = now(), + last_queue_message_at = now() + WHERE id = $1 + AND status IN ('queued', 'running') + AND ( + lease_expires_at IS NULL + OR lease_expires_at < now() + OR lease_owner = $2 + ) + RETURNING * + ) + SELECT c.*, r.owner, r.repo, r.installation_id + FROM claimed c + JOIN repositories r ON c.repository_id = r.id + `, + [jobId, leaseOwner, String(leaseSeconds)], + ); + + if (claimed) { + return { status: 'claimed', row: claimed }; + } + + const row = await getJobForProcessing(env, jobId); + if (!row) { + return { status: 'missing' }; + } + + if (!['queued', 'running'].includes(row.status)) { + return { status: 'terminal', row }; + } + + const expiresAt = row.lease_expires_at ? new Date(row.lease_expires_at).getTime() : 0; + const secondsUntilExpiry = Math.ceil((expiresAt - Date.now()) / 1000); + return { + status: 'busy', + row, + retryAfterSeconds: Math.max(15, Math.min(60, Number.isFinite(secondsUntilExpiry) ? secondsUntilExpiry : 60)), + }; +} + +export async function heartbeatJobLease( + env: Pick, + jobId: string, + leaseOwner: string, + leaseSeconds: number, +) { + await queryRows( + env, + ` + UPDATE jobs + SET heartbeat_at = now(), + lease_expires_at = now() + ($3 || ' seconds')::interval + WHERE id = $1 + AND lease_owner = $2 + AND status = 'running' + `, + [jobId, leaseOwner, String(leaseSeconds)], + ); +} + +export async function releaseJobLease(env: Pick, jobId: string, leaseOwner: string) { + await queryRows( + env, + ` + UPDATE jobs + SET lease_owner = NULL, + lease_expires_at = NULL + WHERE id = $1 + AND lease_owner = $2 + `, + [jobId, leaseOwner], + ); +} + +export async function markJobContinuationQueued(env: Pick, jobId: string) { + await queryRows( + env, + ` + UPDATE jobs + SET last_queue_message_at = now() + WHERE id = $1 + AND status = 'running' + `, + [jobId], + ); +} + export async function updateJobCheckRun(env: Pick, jobId: string, checkRunId: number) { await queryRows( env, @@ -399,6 +513,9 @@ export async function completeJob( UPDATE jobs SET status = 'done', finished_at = now(), + check_run_completed_at = now(), + lease_owner = NULL, + lease_expires_at = NULL, verdict = $2, file_count = $3, comment_count = $4, @@ -433,6 +550,8 @@ export async function failJob(env: Pick, jobId: strin UPDATE jobs SET status = 'failed', finished_at = now(), + lease_owner = NULL, + lease_expires_at = NULL, error_msg = $2, steps = CASE WHEN steps IS NOT NULL THEN ( @@ -452,6 +571,18 @@ export async function failJob(env: Pick, jobId: strin ); } +export async function markJobCheckRunCompleted(env: Pick, jobId: string) { + await queryRows( + env, + ` + UPDATE jobs + SET check_run_completed_at = now() + WHERE id = $1 + `, + [jobId], + ); +} + export async function updateJobFileCount(env: Pick, jobId: string, fileCount: number) { await queryRows( env, @@ -580,6 +711,126 @@ export async function recoverStaleJobs( return rows.length; } +export async function recoverExpiredJobLeases( + env: Pick, + maxRecoveryCount = 3, + unleasedGraceSeconds = 300, +) { + const requeued = await queryRows<{ id: string }>( + env, + ` + WITH expired AS ( + SELECT id + FROM jobs + WHERE status = 'running' + AND ( + ( + lease_expires_at IS NOT NULL + AND lease_expires_at < now() + ) + OR ( + lease_expires_at IS NULL + AND COALESCE(last_queue_message_at, heartbeat_at, started_at, created_at) < now() - ($2 || ' seconds')::interval + ) + ) + AND recovery_count < $1 + ORDER BY COALESCE(lease_expires_at, last_queue_message_at, heartbeat_at, started_at, created_at) ASC + LIMIT 25 + FOR UPDATE SKIP LOCKED + ) + UPDATE jobs j + SET lease_owner = NULL, + lease_expires_at = NULL, + heartbeat_at = NULL, + recovery_count = recovery_count + 1, + last_queue_message_at = now(), + error_msg = NULL + FROM expired + WHERE j.id = expired.id + RETURNING j.id + `, + [maxRecoveryCount, String(unleasedGraceSeconds)], + ); + + const failed = await queryRows( + env, + ` + WITH expired AS ( + SELECT id + FROM jobs + WHERE status = 'running' + AND ( + ( + lease_expires_at IS NOT NULL + AND lease_expires_at < now() + ) + OR ( + lease_expires_at IS NULL + AND COALESCE(last_queue_message_at, heartbeat_at, started_at, created_at) < now() - ($2 || ' seconds')::interval + ) + ) + AND recovery_count >= $1 + ORDER BY COALESCE(lease_expires_at, last_queue_message_at, heartbeat_at, started_at, created_at) ASC + LIMIT 25 + FOR UPDATE SKIP LOCKED + ), + updated AS ( + UPDATE jobs j + SET status = 'failed', + finished_at = now(), + lease_owner = NULL, + lease_expires_at = NULL, + heartbeat_at = NULL, + error_msg = 'Job timed out: worker crashed or was evicted.', + steps = CASE + WHEN steps IS NOT NULL THEN ( + SELECT jsonb_agg( + CASE + WHEN s->>'status' = 'running' + THEN s || jsonb_build_object('status', 'failed', 'finishedAt', now(), 'error', 'Job timed out: worker crashed or was evicted.') + ELSE s + END + ) FROM jsonb_array_elements(steps) s + ) + ELSE steps + END + FROM expired + WHERE j.id = expired.id + RETURNING j.* + ) + SELECT u.*, r.owner, r.repo, r.installation_id + FROM updated u + JOIN repositories r ON u.repository_id = r.id + `, + [maxRecoveryCount, String(unleasedGraceSeconds)], + ); + + return { + requeuedJobIds: requeued.map((row) => row.id), + failedJobs: failed, + }; +} + +export async function getTerminalJobsNeedingCheckRunCompletion( + env: Pick, + limit = 25, +) { + return queryRows( + env, + ` + SELECT j.*, r.owner, r.repo, r.installation_id + FROM jobs j + JOIN repositories r ON j.repository_id = r.id + WHERE j.status IN ('failed', 'superseded') + AND j.check_run_id IS NOT NULL + AND j.check_run_completed_at IS NULL + ORDER BY COALESCE(j.finished_at, j.started_at, j.created_at) ASC + LIMIT $1 + `, + [limit], + ); +} + export async function supersedeOlderJobs( env: Pick, input: { @@ -596,6 +847,8 @@ export async function supersedeOlderJobs( UPDATE jobs j SET status = 'superseded', finished_at = now(), + lease_owner = NULL, + lease_expires_at = NULL, error_msg = 'Superseded by a newer commit or job.' FROM repositories r WHERE j.repository_id = r.id diff --git a/src/server/env.ts b/src/server/env.ts index d591cb6..03a765b 100644 --- a/src/server/env.ts +++ b/src/server/env.ts @@ -5,7 +5,7 @@ export interface WorkersAiBinding { } export interface QueueProducer { - send(message: T): Promise; + send(message: T, options?: { delaySeconds?: number }): Promise; } export interface AssetsBinding { diff --git a/src/server/index.ts b/src/server/index.ts index df53f8e..be5157a 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -2,19 +2,12 @@ import { createApp } from './app'; import { runReviewJob } from './core/review'; import type { AppBindings } from './env'; import { reviewJobMessageSchema } from '@shared/schema'; -import { recoverStaleJobs } from '@server/db/jobs'; import { logger } from '@server/core/logger'; import { runWithDb } from '@server/db/client'; +import { runOpportunisticJobMaintenance } from '@server/core/job-recovery'; const app = createApp(); -/** - * Jobs left in 'running' after a worker crash must be recovered before the - * next batch is processed. The threshold is set to 20 minutes — well above - * the longest expected review job but below Cloudflare's 30-minute CPU limit. - */ -const STALE_JOB_THRESHOLD_MINUTES = 20; - export default { fetch(request: Request, env: AppBindings, ctx: ExecutionContext) { return runWithDb(env, () => app.fetch(request, env, ctx)); @@ -22,44 +15,34 @@ export default { async queue(batch: MessageBatch, env: AppBindings, _ctx: ExecutionContext) { return runWithDb(env, async () => { - // ── Stale-job recovery ────────────────────────────────────────────────── - // Run once per batch. Any job that was 'running' for > threshold is a - // leftover from a previous crashed invocation; mark it failed now so the - // dashboard and future retries see an accurate state. - try { - const recovered = await recoverStaleJobs(env, STALE_JOB_THRESHOLD_MINUTES); - if (recovered > 0) { - logger.warn('Stale jobs recovered', { count: recovered, thresholdMinutes: STALE_JOB_THRESHOLD_MINUTES }); - } - } catch (err) { - // Non-fatal: log and continue processing the batch. - logger.error('Failed to recover stale jobs', err instanceof Error ? err : new Error(String(err))); - } + await runOpportunisticJobMaintenance(env); - // ── Process messages ──────────────────────────────────────────────────── - for (const message of batch.messages) { - const parseResult = reviewJobMessageSchema.safeParse(message.body); + for (const message of batch.messages) { + const parseResult = reviewJobMessageSchema.safeParse(message.body); - if (!parseResult.success) { - // Malformed message — cannot be retried meaningfully. Ack it so - // Cloudflare delivers it to the DLQ for inspection instead of burning - // retries on something that will never be valid. - logger.error('Invalid queue message schema; discarding message', { - body: message.body, - error: parseResult.error.flatten(), - }); - message.ack(); - continue; - } + if (!parseResult.success) { + logger.error('Invalid queue message schema; retrying so it can reach the DLQ', { + body: message.body, + error: parseResult.error.flatten(), + }); + message.retry(); + continue; + } - try { - await runReviewJob(env, parseResult.data); - message.ack(); - } catch (error) { - logger.error('Queue message processing failed; retrying', error instanceof Error ? error : new Error(String(error))); - message.retry(); + try { + const result = await runReviewJob(env, parseResult.data); + if (result.action === 'retry') { + message.retry({ delaySeconds: result.delaySeconds }); + } else { + message.ack(); + } + } catch (error) { + logger.error('Queue message processing failed; retrying', error instanceof Error ? error : new Error(String(error))); + message.retry(); + } } - } + + await runOpportunisticJobMaintenance(env); }); }, } satisfies ExportedHandler; diff --git a/src/server/models/cloudflare.ts b/src/server/models/cloudflare.ts index b0f20cc..52b831d 100644 --- a/src/server/models/cloudflare.ts +++ b/src/server/models/cloudflare.ts @@ -3,8 +3,51 @@ import type { AppBindings } from '@server/env'; import { TimeoutError } from '@server/core/timeout'; import type { ModelResponse } from './types'; -/** Max wall-clock time allowed for a single Workers-AI call (600 s). */ -const CLOUDFLARE_TIMEOUT_MS = 600_000; +/** Max wall-clock time allowed for a single Workers-AI call. */ +const CLOUDFLARE_TIMEOUT_MS = 45_000; +const CLOUDFLARE_MAX_RETRIES = 1; + +function isText(value: unknown): value is string { + return typeof value === 'string' && value.trim().length > 0; +} + +function extractMessageContent(content: unknown): string | null { + if (isText(content)) return content.trim(); + + if (Array.isArray(content)) { + const text = content + .map((part) => { + if (isText(part)) return part; + if (part && typeof part === 'object' && isText((part as any).text)) return (part as any).text; + return ''; + }) + .join('') + .trim(); + return text || null; + } + + return null; +} + +function extractCloudflareText(result: any, model: string): string { + if (isText(result)) return result.trim(); + if (isText(result?.response)) return result.response.trim(); + if (isText(result?.result?.response)) return result.result.response.trim(); + + const choice = result?.choices?.[0]; + const content = extractMessageContent(choice?.message?.content); + if (content) return content; + + const finishReason = choice?.finish_reason ?? choice?.stop_reason; + if (finishReason) { + throw new Error(`Cloudflare model ${model} returned no review content (finish_reason=${finishReason}).`); + } + if (isText(choice?.message?.reasoning) || isText(choice?.message?.reasoning_content)) { + throw new Error(`Cloudflare model ${model} returned reasoning without review content.`); + } + + throw new Error(`Cloudflare model ${model} returned an empty response.`); +} export async function reviewWithCloudflare( env: Pick, @@ -12,7 +55,7 @@ export async function reviewWithCloudflare( input: { systemPrompt: string; userPrompt: string }, tracker?: { incrementSubrequests(count?: number): void }, ): Promise { - const maxRetries = 2; + const maxRetries = CLOUDFLARE_MAX_RETRIES; let lastError: any; for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -39,17 +82,14 @@ export async function reviewWithCloudflare( { role: 'user', content: input.userPrompt }, ], max_completion_tokens: 4096, + temperature: 0, }), timeoutPromise, ]); const durationMs = Date.now() - startTime; logger.info(`AI model ${model} responded in ${durationMs}ms`); - const rawText = - result?.response ?? - result?.result?.response ?? - result?.choices?.[0]?.message?.content ?? - (typeof result === 'string' ? result : JSON.stringify(result)); + const rawText = extractCloudflareText(result, model); return { rawText, diff --git a/src/server/models/google.ts b/src/server/models/google.ts index bdd1d30..53c20de 100644 --- a/src/server/models/google.ts +++ b/src/server/models/google.ts @@ -3,8 +3,9 @@ import type { AppBindings } from '@server/env'; import { withTimeout } from '@server/core/timeout'; import type { ModelResponse } from './types'; -/** Max wall-clock time allowed for a single Google AI Studio call (120 s). */ -const GOOGLE_TIMEOUT_MS = 120_000; +/** Max wall-clock time allowed for a single Google AI Studio call. */ +const GOOGLE_TIMEOUT_MS = 45_000; +const GOOGLE_MAX_RETRIES = 1; export async function reviewWithGoogle( env: Pick, @@ -15,7 +16,7 @@ export async function reviewWithGoogle( logger.info(`Calling Google AI model: ${model}`); const startTime = Date.now(); const url = `https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${env.GEMINI_API_KEY}`; - const maxRetries = 2; + const maxRetries = GOOGLE_MAX_RETRIES; let lastError: any; for (let attempt = 0; attempt <= maxRetries; attempt++) { diff --git a/src/server/routes/api/jobs.ts b/src/server/routes/api/jobs.ts index 1125d79..87174a2 100644 --- a/src/server/routes/api/jobs.ts +++ b/src/server/routes/api/jobs.ts @@ -1,13 +1,17 @@ import { Hono } from 'hono'; -import { defaultRepoConfig, jobsQuerySchema } from '@shared/schema'; +import { jobsQuerySchema } from '@shared/schema'; import type { AppEnv } from '@server/env'; import { bytesToHex, getJobDetail, getJobForProcessing, insertJob, listJobs, mapJob, supersedeOlderJobs } from '@server/db/jobs'; import { jsonError } from '@server/core/http'; +import { runOpportunisticJobMaintenance } from '@server/core/job-recovery'; +import { loadRepoConfig } from '@server/core/config'; export function createJobsRouter() { const app = new Hono(); app.get('/', async (c) => { + await runOpportunisticJobMaintenance(c.env); + const rawQuery = c.req.query(); const query = jobsQuerySchema.parse(rawQuery); @@ -16,6 +20,8 @@ export function createJobsRouter() { }); app.get('/:id', async (c) => { + await runOpportunisticJobMaintenance(c.env); + const job = await getJobDetail(c.env, c.req.param('id')); if (!job) { return jsonError('Job not found.', 404); @@ -30,6 +36,11 @@ export function createJobsRouter() { return jsonError('Job not found.', 404); } const source = mapJob(rawSource); + const currentConfig = await loadRepoConfig(c.env, { + installationId: source.installationId, + owner: source.owner, + repo: source.repo, + }); const job = await insertJob(c.env, { installationId: source.installationId, @@ -43,7 +54,7 @@ export function createJobsRouter() { trigger: 'retry', headRef: rawSource.head_ref, baseRef: rawSource.base_ref, - configSnapshot: source.configSnapshot ?? defaultRepoConfig, + configSnapshot: currentConfig.parsedJson, retryOfJobId: source.id, }); @@ -60,6 +71,7 @@ export function createJobsRouter() { await c.env.REVIEW_QUEUE.send({ jobId: job.id, deliveryId: crypto.randomUUID(), + phase: 'prepare', requestId: c.get('requestId'), }); diff --git a/src/server/routes/webhook.ts b/src/server/routes/webhook.ts index f1f7dc6..3241b73 100644 --- a/src/server/routes/webhook.ts +++ b/src/server/routes/webhook.ts @@ -115,6 +115,7 @@ export function createWebhookRouter() { await c.env.REVIEW_QUEUE.send({ jobId: job.id, deliveryId, + phase: 'prepare', requestId: c.get('requestId'), }); diff --git a/src/server/services/model.ts b/src/server/services/model.ts index afff3eb..663ca3e 100644 --- a/src/server/services/model.ts +++ b/src/server/services/model.ts @@ -16,6 +16,22 @@ const MODEL_ALIASES: Record = { 'gemma-4-26b': 'gemma-4-26b-a4b-it', }; +export class RetryableModelError extends Error { + readonly retryable = true; + + constructor(message: string, cause?: unknown) { + super(message); + this.name = 'RetryableModelError'; + if (cause !== undefined) { + (this as any).cause = cause; + } + } +} + +export function isRetryableModelError(error: unknown) { + return Boolean(error && typeof error === 'object' && (error as any).retryable === true); +} + function isCloudflareModel(model: string) { return model.startsWith('@cf/'); } @@ -38,6 +54,29 @@ function isGoogleRateLimitError(error: unknown) { return message.includes('429') || message.includes('RESOURCE_EXHAUSTED') || message.toLowerCase().includes('quota exceeded'); } +function isTransientModelFailure(error: unknown) { + if (isRetryableModelError(error)) return true; + if (isCloudflareAllocationError(error)) return false; + const message = error instanceof Error ? error.message : String(error); + const lower = message.toLowerCase(); + + return ( + isGoogleRateLimitError(error) || + /\b50[0-9]\b/.test(message) || + lower.includes('internal error') || + lower.includes('unavailable') || + lower.includes('high demand') || + lower.includes('timeout') || + lower.includes('timed out') || + lower.includes('fetch failed') || + lower.includes('network') || + lower.includes('temporar') || + lower.includes('returned no review content') || + lower.includes('empty response') || + lower.includes('[redacted]') + ); +} + export class ModelService { constructor(private env: AppBindings, private tracker?: TokenTracker) {} @@ -110,6 +149,7 @@ export class ModelService { const modelsToTry = [primary, ...fallbacks]; let lastError: any; + let sawTransientFailure = false; const unavailableProviders = new Set(); for (const currentModel of modelsToTry) { if (isCloudflareModel(currentModel) && unavailableProviders.has('cloudflare')) { @@ -118,7 +158,7 @@ export class ModelService { } let attempts = 0; - const maxAttempts = 2; + const maxAttempts = 1; while (attempts < maxAttempts) { try { @@ -136,6 +176,9 @@ export class ModelService { }; } catch (error: any) { lastError = error; + if (isTransientModelFailure(error)) { + sawTransientFailure = true; + } attempts++; if (isCloudflareModel(currentModel) && isCloudflareAllocationError(error)) { unavailableProviders.add('cloudflare'); @@ -159,6 +202,14 @@ export class ModelService { } } + if (sawTransientFailure) { + const lastMessage = lastError instanceof Error ? lastError.message : String(lastError ?? 'Unknown model error'); + throw new RetryableModelError( + `All configured review models failed for ${params.file.path}; retrying later. Last error: ${lastMessage}`, + lastError, + ); + } + throw lastError; } @@ -172,6 +223,7 @@ export class ModelService { const modelsToTry = [primary, ...fallbacks]; let lastError: any; + let sawTransientFailure = false; const unavailableProviders = new Set(); for (const currentModel of modelsToTry) { if (isCloudflareModel(currentModel) && unavailableProviders.has('cloudflare')) { @@ -192,6 +244,9 @@ export class ModelService { return response; } catch (error: any) { lastError = error; + if (isTransientModelFailure(error)) { + sawTransientFailure = true; + } if (isCloudflareModel(currentModel) && isCloudflareAllocationError(error)) { unavailableProviders.add('cloudflare'); } @@ -199,6 +254,14 @@ export class ModelService { } } + if (sawTransientFailure) { + const lastMessage = lastError instanceof Error ? lastError.message : String(lastError ?? 'Unknown model error'); + throw new RetryableModelError( + `All configured summary models failed; retrying later. Last error: ${lastMessage}`, + lastError, + ); + } + throw lastError; } } diff --git a/src/shared/schema.ts b/src/shared/schema.ts index cf12a90..63bc503 100644 --- a/src/shared/schema.ts +++ b/src/shared/schema.ts @@ -145,6 +145,7 @@ export const repoConfigSchema = z.object({ export const reviewJobMessageSchema = z.object({ jobId: z.string().uuid().optional(), deliveryId: z.string().min(1), + phase: z.enum(['prepare', 'review', 'finalize']).optional(), eventName: z.string().min(1).optional(), payload: z.any().optional(), installationId: z.string().min(1).optional(), diff --git a/test/api.spec.ts b/test/api.spec.ts index 1b0ccd5..ce29724 100644 --- a/test/api.spec.ts +++ b/test/api.spec.ts @@ -1,5 +1,5 @@ import { createApp } from '@server/app'; -import { insertJob } from '@server/db/jobs'; +import { getJobForProcessing, insertJob } from '@server/db/jobs'; import { insertFileReview } from '@server/db/file-reviews'; import { getRepoConfigRecord } from '@server/db/repo-configs'; import { loadRepoConfig, updateGlobalConfig } from '@server/core/config'; @@ -545,6 +545,7 @@ describe('Dashboard API Suite', () => { }); expect(loaded.parsedJson.model.main).toBe('@cf/zai-org/glm-4.7-flash'); + expect(loaded.parsedJson.model.fallbacks).toEqual([]); const record = await getRepoConfigRecord(env, 'api-test-owner', repo); expect(record?.mainModel).toBeNull(); @@ -566,6 +567,73 @@ describe('Dashboard API Suite', () => { expect(reloaded.parsedJson.model.main).toBe('gemma-4-26b-a4b-it'); }); + it('uses the current global model strategy when retrying an older job', async () => { + const env = createTestEnv(); + const token = await getAuthCookie(env); + const repo = `retry-current-config-${Date.now()}`; + + const source = await insertJob(env, { + installationId: '123', + owner: 'api-test-owner', + repo, + prNumber: 12, + prTitle: 'Retry Current Config', + prAuthor: 'author', + commitSha: 'a'.repeat(40), + baseSha: 'b'.repeat(40), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + configSnapshot: { + ...defaultRepoConfig, + model: { + main: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it', '@cf/zai-org/glm-4.7-flash'], + size_overrides: [], + }, + }, + }); + + await updateGlobalConfig(env, { + main: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it'], + size_overrides: [ + { + max_lines: 300, + model: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it'], + }, + ], + }); + + const response = await app.request(`/api/jobs/${source.id}/retry`, { + method: 'POST', + headers: { + Cookie: `codra_session=${token}`, + 'x-requested-with': 'XMLHttpRequest', + }, + }, env); + + expect(response.status).toBe(202); + const body = await response.json() as { job: { id: string } }; + const retry = await getJobForProcessing(env, body.job.id); + const snapshot = typeof retry?.config_snapshot === 'string' + ? JSON.parse(retry.config_snapshot) + : retry?.config_snapshot; + + expect(snapshot.model).toEqual({ + main: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it'], + size_overrides: [ + { + max_lines: 300, + model: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it'], + }, + ], + }); + }); + it('accepts legacy jobId-only queue messages during schema transition', () => { const parsed = reviewJobMessageSchema.safeParse({ jobId: crypto.randomUUID(), diff --git a/test/helpers.ts b/test/helpers.ts index 251b8b7..440fd50 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -49,8 +49,8 @@ export class MockAssets { export class MockQueue { public readonly sent: any[] = []; - async send(message: any) { - this.sent.push(message); + async send(message: any, options?: { delaySeconds?: number }) { + this.sent.push({ ...message, options }); } } @@ -74,6 +74,10 @@ export function getTestDatabaseUrl() { return requiredEnv('TEST_DATABASE_URL'); } +export function hasConfiguredTestDatabaseUrl() { + return Boolean(usableEnvValue(process.env.TEST_DATABASE_URL)); +} + export function createTestEnv(overrides: Partial = {}): AppBindings { return { AI: { diff --git a/test/model-service.spec.ts b/test/model-service.spec.ts index 22f522d..fa16c2d 100644 --- a/test/model-service.spec.ts +++ b/test/model-service.spec.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from 'vitest'; -import { ModelService } from '@server/services/model'; +import { isRetryableModelError, ModelService } from '@server/services/model'; +import { reviewWithCloudflare } from '@server/models/cloudflare'; import { createTestEnv } from './helpers'; +import { defaultRepoConfig } from '@shared/schema'; describe('ModelService', () => { it('routes legacy Kimi K2.5 ids to Kimi K2.6 for new Cloudflare requests', async () => { @@ -23,4 +25,125 @@ describe('ModelService', () => { expect(requestedModel).toBe('@cf/moonshotai/kimi-k2.6'); expect(response.modelUsed).toBe('@cf/moonshotai/kimi-k2.6'); }); + + it('preserves an explicitly empty fallback chain', () => { + const service = new ModelService(createTestEnv()); + const selected = (service as any).selectModel({ + totalLineCount: 500, + config: { + ...defaultRepoConfig, + model: { + main: 'gemma-4-31b-it', + fallbacks: [], + size_overrides: [], + }, + }, + }); + + expect(selected).toEqual({ + primary: 'gemma-4-31b-it', + fallbacks: [], + }); + }); + + it('rejects Cloudflare reasoning-only responses instead of trying to parse the response envelope', async () => { + const env = createTestEnv({ + AI: { + async run() { + return { + choices: [ + { + message: { + content: null, + reasoning: 'Long reasoning that consumed the completion budget.', + }, + finish_reason: 'length', + }, + ], + usage: { prompt_tokens: 1, completion_tokens: 4096 }, + }; + }, + } as any, + }); + + await expect( + reviewWithCloudflare(env, '@cf/moonshotai/kimi-k2.6', { + systemPrompt: 'system', + userPrompt: 'user', + }), + ).rejects.toThrow('returned no review content'); + }); + + it('retries the same Cloudflare model once before failing it', async () => { + let attempts = 0; + const env = createTestEnv({ + AI: { + async run() { + attempts++; + throw new Error('temporary provider error'); + }, + } as any, + }); + + await expect( + reviewWithCloudflare(env, '@cf/zai-org/glm-4.7-flash', { + systemPrompt: 'system', + userPrompt: 'user', + }), + ).rejects.toThrow('temporary provider error'); + expect(attempts).toBe(2); + }); + + it('marks exhausted transient provider failures as retryable for the queue', async () => { + const env = createTestEnv({ + AI: { + async run() { + throw new Error('[REDACTED]'); + }, + } as any, + }); + + const service = new ModelService(env); + await expect( + service.reviewFile({ + file: { + path: 'test/setup.ts', + lineCount: 1, + hunks: [], + isDeleted: false, + isBinary: false, + isNew: false, + previousPath: null, + }, + prTitle: 'Test', + prDescription: null, + config: { + review: { + on: ['opened'], + ignore_drafts: true, + mention_trigger: '@codra-app', + skip_files: [], + max_files: 15, + large_file_threshold_lines: 200, + max_diff_lines_per_file: 800, + max_total_diff_chars: 150_000, + focus: ['quality'], + custom_rules: [], + labels: false, + exec: { + enabled: false, + on_file_types: ['.ts'], + command: 'npm run lint', + }, + }, + model: { + main: '@cf/zai-org/glm-4.7-flash', + fallbacks: [], + size_overrides: [], + }, + }, + totalLineCount: 1, + }), + ).rejects.toSatisfy(isRetryableModelError); + }); }); diff --git a/test/resumable-queue.spec.ts b/test/resumable-queue.spec.ts new file mode 100644 index 0000000..c494817 --- /dev/null +++ b/test/resumable-queue.spec.ts @@ -0,0 +1,293 @@ +import worker from '@server/index'; +import { claimJobLease, getJobForProcessing, insertJob, markJobContinuationQueued, recoverExpiredJobLeases, releaseJobLease } from '@server/db/jobs'; +import { getFileReviewsForJobs, recordRetryableFileReviewFailure, upsertFileReview } from '@server/db/file-reviews'; +import { getDb } from '@server/db/client'; +import { createTestEnv, hasConfiguredTestDatabaseUrl } from './helpers'; + +const sha = (char: string) => char.repeat(40); +const dbDescribe = hasConfiguredTestDatabaseUrl() ? describe : describe.skip; + +dbDescribe('resumable queue primitives', () => { + const env = createTestEnv(); + + it('sets a fresh lease when claiming a queued job', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `lease-${Date.now()}`, + prNumber: 1, + prTitle: 'Lease Test', + prAuthor: 'author', + commitSha: sha('a'), + baseSha: sha('b'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + const claim = await claimJobLease(env, job.id, 'lease-a', 600); + expect(claim.status).toBe('claimed'); + + const row = await getJobForProcessing(env, job.id); + expect(row?.status).toBe('running'); + expect(row?.lease_owner).toBe('lease-a'); + expect(row?.lease_expires_at).toBeTruthy(); + expect(row?.heartbeat_at).toBeTruthy(); + }); + + it('reports busy for a fresh duplicate delivery instead of reclaiming', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `busy-${Date.now()}`, + prNumber: 1, + prTitle: 'Busy Test', + prAuthor: 'author', + commitSha: sha('c'), + baseSha: sha('d'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + await claimJobLease(env, job.id, 'lease-a', 600); + const duplicate = await claimJobLease(env, job.id, 'lease-b', 600); + expect(duplicate.status).toBe('busy'); + }); + + it('reclaims an expired lease', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `expired-${Date.now()}`, + prNumber: 1, + prTitle: 'Expired Test', + prAuthor: 'author', + commitSha: sha('e'), + baseSha: sha('f'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + await claimJobLease(env, job.id, 'lease-a', 600); + await getDb(env).query(`UPDATE jobs SET lease_expires_at = now() - interval '1 minute' WHERE id = $1`, [job.id]); + + const reclaimed = await claimJobLease(env, job.id, 'lease-b', 600); + expect(reclaimed.status).toBe('claimed'); + + const row = await getJobForProcessing(env, job.id); + expect(row?.lease_owner).toBe('lease-b'); + }); + + it('fails repeatedly expired jobs after the recovery limit', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `recovery-${Date.now()}`, + prNumber: 1, + prTitle: 'Recovery Test', + prAuthor: 'author', + commitSha: sha('1'), + baseSha: sha('2'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + await claimJobLease(env, job.id, 'lease-a', 600); + await getDb(env).query( + `UPDATE jobs SET lease_expires_at = now() - interval '1 minute', recovery_count = 3 WHERE id = $1`, + [job.id], + ); + + const recovered = await recoverExpiredJobLeases(env, 3); + expect(recovered.failedJobs.map((row) => row.id)).toContain(job.id); + + const row = await getJobForProcessing(env, job.id); + expect(row?.status).toBe('failed'); + }); + + it('requeues running jobs that have no lease and an old continuation handoff', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `unleased-${Date.now()}`, + prNumber: 1, + prTitle: 'Unleased Test', + prAuthor: 'author', + commitSha: sha('5'), + baseSha: sha('6'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + await claimJobLease(env, job.id, 'lease-a', 600); + await getDb(env).query( + ` + UPDATE jobs + SET lease_owner = NULL, + lease_expires_at = NULL, + heartbeat_at = now() - interval '5 minutes', + last_queue_message_at = now() - interval '5 minutes' + WHERE id = $1 + `, + [job.id], + ); + + const recovered = await recoverExpiredJobLeases(env, 3, 120); + expect(recovered.requeuedJobIds).toContain(job.id); + + const row = await getJobForProcessing(env, job.id); + expect(row?.status).toBe('running'); + expect(row?.lease_owner).toBeNull(); + expect(row?.recovery_count).toBe(1); + expect(row?.error_msg).toBeNull(); + }); + + it('does not recover an unleased job that just scheduled a retry continuation', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `retry-handoff-${Date.now()}`, + prNumber: 1, + prTitle: 'Retry Handoff Test', + prAuthor: 'author', + commitSha: sha('7'), + baseSha: sha('8'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + await claimJobLease(env, job.id, 'lease-a', 600); + await getDb(env).query( + ` + UPDATE jobs + SET heartbeat_at = now() - interval '10 minutes', + last_queue_message_at = now() - interval '10 minutes' + WHERE id = $1 + `, + [job.id], + ); + + await markJobContinuationQueued(env, job.id); + await releaseJobLease(env, job.id, 'lease-a'); + + const recovered = await recoverExpiredJobLeases(env, 3, 120); + expect(recovered.requeuedJobIds).not.toContain(job.id); + + const row = await getJobForProcessing(env, job.id); + expect(row?.status).toBe('running'); + expect(row?.lease_owner).toBeNull(); + expect(row?.recovery_count).toBe(0); + }); + + it('upserts file reviews without duplicating the same file', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `upsert-${Date.now()}`, + prNumber: 1, + prTitle: 'Upsert Test', + prAuthor: 'author', + commitSha: sha('3'), + baseSha: sha('4'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + const baseReview = { + filePath: 'src/app.ts', + fileStatus: 'done' as const, + modelUsed: 'test-model', + modelProvider: 'test-provider', + diffLineCount: 1, + diffInput: 'diff', + rawAiOutput: '{}', + parsedComments: [], + inputTokens: 1, + outputTokens: 1, + durationMs: 1, + verdict: 'approve' as const, + fileSummary: 'ok', + errorMessage: null, + }; + + await upsertFileReview(env, job.id, baseReview); + await upsertFileReview(env, job.id, { ...baseReview, fileSummary: 'updated' }); + + const reviews = await getFileReviewsForJobs(env, [job.id]); + expect(reviews).toHaveLength(1); + expect(reviews[0].file_summary).toBe('updated'); + }); + + it('tracks retryable file review failures and resets the count after success', async () => { + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo: `transient-file-${Date.now()}`, + prNumber: 1, + prTitle: 'Transient File Test', + prAuthor: 'author', + commitSha: sha('9'), + baseSha: sha('0'), + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + }); + + const failureInput = { + filePath: 'src/app.ts', + modelUsed: 'gemma-4-31b-it', + modelProvider: 'google', + diffLineCount: 1, + diffInput: 'diff', + durationMs: 1, + errorMessage: 'All configured review models failed; retrying later.', + }; + + await expect(recordRetryableFileReviewFailure(env, job.id, failureInput)).resolves.toBe(1); + await expect(recordRetryableFileReviewFailure(env, job.id, failureInput)).resolves.toBe(2); + + await upsertFileReview(env, job.id, { + filePath: 'src/app.ts', + fileStatus: 'done', + modelUsed: 'gemma-4-31b-it', + modelProvider: 'google', + diffLineCount: 1, + diffInput: 'diff', + rawAiOutput: '{}', + parsedComments: [], + inputTokens: 1, + outputTokens: 1, + durationMs: 1, + verdict: 'approve', + fileSummary: 'ok', + errorMessage: null, + }); + + const reviews = await getFileReviewsForJobs(env, [job.id]); + expect(reviews).toHaveLength(1); + expect(reviews[0].file_status).toBe('done'); + expect(reviews[0].transient_error_count).toBe(0); + }); +}); + +describe('queue handler', () => { + it('retries invalid messages instead of acknowledging them', async () => { + const env = createTestEnv(); + const message = { + body: { bad: true }, + ack: vi.fn(), + retry: vi.fn(), + }; + + await worker.queue({ messages: [message] } as any, env, {} as ExecutionContext); + + expect(message.retry).toHaveBeenCalledTimes(1); + expect(message.ack).not.toHaveBeenCalled(); + }); +}); diff --git a/test/review-flow.spec.ts b/test/review-flow.spec.ts index e022d3b..f598f42 100644 --- a/test/review-flow.spec.ts +++ b/test/review-flow.spec.ts @@ -1,8 +1,10 @@ import { runReviewJob } from '@server/core/review'; -import { createTestEnv, generateMockDiff } from './helpers'; +import { createTestEnv, generateMockDiff, hasConfiguredTestDatabaseUrl } from './helpers'; import { vi } from 'vitest'; -import { findExistingJobForHead, getJobForProcessing, insertJob } from '@server/db/jobs'; +import { findExistingJobForHead, getJobForProcessing, insertJob, updateJobFileCount, updateJobStep } from '@server/db/jobs'; +import { getFileReviewsForJobs, upsertFileReview } from '@server/db/file-reviews'; import { defaultRepoConfig } from '@shared/schema'; +import { runWithDb } from '@server/db/client'; const sha = (char: string) => char.repeat(40); @@ -65,18 +67,36 @@ vi.mock('@server/services/model', () => { }; } } - return { ModelService: MockModelService }; + return { + ModelService: MockModelService, + isRetryableModelError: (error: unknown) => Boolean(error && typeof error === 'object' && (error as any).retryable === true), + }; }); -describe('Review Flow Lifecycle', () => { +const dbDescribe = hasConfiguredTestDatabaseUrl() ? describe : describe.skip; +const REVIEW_FLOW_TIMEOUT_MS = 60_000; + +dbDescribe('Review Flow Lifecycle', () => { const env = createTestEnv(); + async function runAndDrain(message: Parameters[1]) { + await runWithDb(env, async () => { + (env.REVIEW_QUEUE as any).sent.length = 0; + await runReviewJob(env, message); + const queue = env.REVIEW_QUEUE as any; + while (queue.sent.length > 0) { + const next = queue.sent.shift(); + await runReviewJob(env, next); + } + }); + } + it('completes a full review from pending job to finished', async () => { const repo = `test-repo-${Date.now()}-full`; const headSha = sha('a'); const baseSha = sha('b'); - await runReviewJob(env, { + await runAndDrain({ deliveryId: 'delivery-123', eventName: 'pull_request', payload: { @@ -102,7 +122,7 @@ describe('Review Flow Lifecycle', () => { trigger: 'auto', }); expect(finalJob?.status).toBe('done'); - }); + }, REVIEW_FLOW_TIMEOUT_MS); it('stops processing if the job is superseded mid-way', async () => { const { GitHubService } = await import('@server/services/github'); @@ -131,7 +151,7 @@ describe('Review Flow Lifecycle', () => { return generateMockDiff([{ path: 'test.ts', content: 'a' }]); }); - await runReviewJob(env, { + await runAndDrain({ deliveryId: 'delivery-456', eventName: 'pull_request', payload: { @@ -158,7 +178,7 @@ describe('Review Flow Lifecycle', () => { }); expect(finalJob?.status).toBe('superseded'); expect(finalJob?.verdict).toBeNull(); - }); + }, REVIEW_FLOW_TIMEOUT_MS); it('processes a pre-created retry job from a queue message', async () => { const repo = `test-repo-${Date.now()}-retry`; @@ -197,14 +217,95 @@ describe('Review Flow Lifecycle', () => { retryOfJobId: source.id, }); - await runReviewJob(env, { + await runAndDrain({ jobId: retry.id, deliveryId: 'delivery-retry', }); const finalJob = await getJobForProcessing(env, retry.id); expect(finalJob?.status).toBe('done'); - }); + }, REVIEW_FLOW_TIMEOUT_MS); + + it('does not inherit parent file reviews from models outside the current retry strategy', async () => { + const { ModelService } = await import('@server/services/model'); + const reviewSpy = vi.spyOn(ModelService.prototype, 'reviewFile'); + const repo = `test-repo-${Date.now()}-retry-model-filter`; + const sourceHeadSha = sha('8'); + const retryHeadSha = sha('9'); + const baseSha = sha('0'); + + const source = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo, + prNumber: 6, + prTitle: 'Retry Model Filter', + prAuthor: 'author', + commitSha: sourceHeadSha, + baseSha, + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + configSnapshot: { + ...defaultRepoConfig, + model: { + main: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it', '@cf/zai-org/glm-4.7-flash'], + size_overrides: [], + }, + }, + }); + + await upsertFileReview(env, source.id, { + filePath: 'src/app.ts', + fileStatus: 'done', + modelUsed: '@cf/zai-org/glm-4.7-flash', + modelProvider: 'cloudflare', + diffLineCount: 1, + diffInput: 'old diff', + rawAiOutput: '{}', + parsedComments: [], + inputTokens: 1, + outputTokens: 1, + durationMs: 1, + verdict: 'approve', + fileSummary: 'old', + errorMessage: null, + }); + + const retry = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo, + prNumber: 6, + prTitle: 'Retry Model Filter', + prAuthor: 'author', + commitSha: retryHeadSha, + baseSha, + trigger: 'retry', + headRef: 'feature', + baseRef: 'main', + configSnapshot: { + ...defaultRepoConfig, + model: { + main: 'gemma-4-31b-it', + fallbacks: ['gemma-4-26b-a4b-it'], + size_overrides: [], + }, + }, + retryOfJobId: source.id, + }); + + await runAndDrain({ + jobId: retry.id, + deliveryId: 'delivery-retry-model-filter', + }); + + expect(reviewSpy).toHaveBeenCalled(); + const reviews = await getFileReviewsForJobs(env, [retry.id]); + expect(reviews.find((review) => review.file_path === 'src/app.ts')?.model_used).toBe('test-model'); + reviewSpy.mockRestore(); + }, REVIEW_FLOW_TIMEOUT_MS); it('resumes an existing queued duplicate job instead of stranding it', async () => { const repo = `test-repo-${Date.now()}-duplicate`; @@ -226,7 +327,7 @@ describe('Review Flow Lifecycle', () => { configSnapshot: defaultRepoConfig, }); - await runReviewJob(env, { + await runAndDrain({ deliveryId: 'delivery-duplicate', eventName: 'pull_request', payload: { @@ -246,5 +347,55 @@ describe('Review Flow Lifecycle', () => { const finalJob = await getJobForProcessing(env, existing.id); expect(finalJob?.status).toBe('done'); - }); + }, REVIEW_FLOW_TIMEOUT_MS); + + it('schedules a delayed continuation instead of spending queue retries on transient model failures', async () => { + const { ModelService } = await import('@server/services/model'); + const retryableError = Object.assign(new Error('Google API timed out after 45000ms'), { retryable: true }); + const reviewSpy = vi.spyOn(ModelService.prototype, 'reviewFile').mockRejectedValue(retryableError); + const repo = `test-repo-${Date.now()}-transient`; + const headSha = sha('6'); + const baseSha = sha('7'); + + const job = await insertJob(env, { + installationId: '123', + owner: 'test-owner', + repo, + prNumber: 5, + prTitle: 'Transient Test', + prAuthor: 'author', + commitSha: headSha, + baseSha, + trigger: 'auto', + headRef: 'feature', + baseRef: 'main', + configSnapshot: defaultRepoConfig, + }); + await updateJobFileCount(env, job.id, 1); + await updateJobStep(env, job.id, 'Preparation', { status: 'done' }); + + await runWithDb(env, async () => { + (env.REVIEW_QUEUE as any).sent.length = 0; + const result = await runReviewJob(env, { + jobId: job.id, + deliveryId: 'delivery-transient', + phase: 'review', + }); + + expect(result).toEqual({ action: 'ack' }); + expect(reviewSpy).toHaveBeenCalled(); + expect((env.REVIEW_QUEUE as any).sent).toHaveLength(1); + expect((env.REVIEW_QUEUE as any).sent[0]).toMatchObject({ + jobId: job.id, + phase: 'review', + options: { delaySeconds: 60 }, + }); + }); + + const finalJob = await getJobForProcessing(env, job.id); + expect(finalJob?.status).toBe('running'); + expect(finalJob?.lease_owner).toBeNull(); + + reviewSpy.mockRestore(); + }, REVIEW_FLOW_TIMEOUT_MS); }); diff --git a/test/settings.spec.ts b/test/settings.spec.ts new file mode 100644 index 0000000..fbbe084 --- /dev/null +++ b/test/settings.spec.ts @@ -0,0 +1,21 @@ +import { describe, expect, it } from 'vitest'; +import { normalizeGlobalConfig } from '@client/pages/settings'; + +describe('settings model strategy', () => { + it('preserves an explicit empty global fallback list', () => { + const config = normalizeGlobalConfig({ + main: 'gemma-4-31b-it', + fallbacks: [], + size_overrides: [ + { + max_lines: 300, + model: 'gemma-4-31b-it', + fallbacks: [], + }, + ], + }); + + expect(config.fallbacks).toEqual([]); + expect(config.size_overrides[0].fallbacks).toEqual([]); + }); +}); diff --git a/test/webhook-handling.spec.ts b/test/webhook-handling.spec.ts index 86c86c9..66eb5ff 100644 --- a/test/webhook-handling.spec.ts +++ b/test/webhook-handling.spec.ts @@ -114,6 +114,7 @@ describe('Webhook Handling Suite', () => { expect(queue.sent).toHaveLength(1); expect(queue.sent[0].jobId).toBe(json.job.id); expect(queue.sent[0].deliveryId).toBeDefined(); + expect(queue.sent[0].phase).toBe('prepare'); expect(queue.sent[0].eventName).toBeUndefined(); expect(queue.sent[0].payload).toBeUndefined(); });