From cb5b5f4d2f223a8bb31b7e2fc5a1aa9082254682 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 15:42:37 -0700 Subject: [PATCH 1/3] improvement(cron): fire-and-forget for cron-invoked endpoints --- .../cron/renew-subscriptions/route.test.ts | 90 ++++++ .../app/api/cron/renew-subscriptions/route.ts | 291 ++++++++++-------- .../app/api/notifications/poll/route.test.ts | 93 ++++++ apps/sim/app/api/notifications/poll/route.ts | 33 +- .../app/api/schedules/execute/route.test.ts | 140 ++++----- apps/sim/app/api/schedules/execute/route.ts | 193 ++++++------ .../webhooks/poll/[provider]/route.test.ts | 107 +++++++ .../app/api/webhooks/poll/[provider]/route.ts | 54 ++-- apps/sim/lib/api/contracts/schedules.ts | 2 +- apps/sim/lib/core/utils/background.test.ts | 76 +++++ apps/sim/lib/core/utils/background.ts | 61 ++++ 11 files changed, 808 insertions(+), 332 deletions(-) create mode 100644 apps/sim/app/api/cron/renew-subscriptions/route.test.ts create mode 100644 apps/sim/app/api/notifications/poll/route.test.ts create mode 100644 apps/sim/app/api/webhooks/poll/[provider]/route.test.ts create mode 100644 apps/sim/lib/core/utils/background.test.ts create mode 100644 apps/sim/lib/core/utils/background.ts diff --git a/apps/sim/app/api/cron/renew-subscriptions/route.test.ts b/apps/sim/app/api/cron/renew-subscriptions/route.test.ts new file mode 100644 index 00000000000..1f4b74cbce1 --- /dev/null +++ b/apps/sim/app/api/cron/renew-subscriptions/route.test.ts @@ -0,0 +1,90 @@ +/** + * Tests for the Teams subscription renewal cron route. + * + * @vitest-environment node + */ +import { + authOAuthUtilsMock, + createMockRequest, + dbChainMock, + dbChainMockFns, + redisConfigMock, + redisConfigMockFns, + resetDbChainMock, +} from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockVerifyCronAuth } = vi.hoisted(() => ({ + mockVerifyCronAuth: vi.fn().mockReturnValue(null), +})) + +vi.mock('@/lib/auth/internal', () => ({ + verifyCronAuth: mockVerifyCronAuth, +})) + +vi.mock('@/lib/core/config/redis', () => redisConfigMock) +vi.mock('@sim/db', () => dbChainMock) +vi.mock('@/app/api/auth/oauth/utils', () => authOAuthUtilsMock) + +import { GET } from './route' + +function createRequest() { + return createMockRequest( + 'GET', + undefined, + {}, + 'http://localhost:3000/api/cron/renew-subscriptions' + ) +} + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +describe('Teams subscription renewal route (fire-and-forget)', () => { + beforeEach(() => { + vi.clearAllMocks() + resetDbChainMock() + redisConfigMockFns.mockAcquireLock.mockResolvedValue(true) + redisConfigMockFns.mockReleaseLock.mockResolvedValue(true) + mockVerifyCronAuth.mockReturnValue(null) + }) + + it('returns the auth error when cron auth fails', async () => { + mockVerifyCronAuth.mockReturnValueOnce(new Response(null, { status: 401 }) as never) + + const response = await GET(createRequest()) + + expect(response.status).toBe(401) + expect(redisConfigMockFns.mockAcquireLock).not.toHaveBeenCalled() + }) + + it('acknowledges with 202 and renews in the background after acquiring the lock', async () => { + const response = await GET(createRequest()) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'started' }) + expect(redisConfigMockFns.mockAcquireLock).toHaveBeenCalledWith( + 'teams-subscription-renewal-lock', + expect.any(String), + expect.any(Number) + ) + + await flushMicrotasks() + expect(dbChainMockFns.select).toHaveBeenCalled() + expect(redisConfigMockFns.mockReleaseLock).toHaveBeenCalledWith( + 'teams-subscription-renewal-lock', + expect.any(String) + ) + }) + + it('skips with 202 when the lock is already held', async () => { + redisConfigMockFns.mockAcquireLock.mockResolvedValueOnce(false) + + const response = await GET(createRequest()) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'skip' }) + expect(dbChainMockFns.select).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/app/api/cron/renew-subscriptions/route.ts b/apps/sim/app/api/cron/renew-subscriptions/route.ts index a22156b3c94..9dcc8375623 100644 --- a/apps/sim/app/api/cron/renew-subscriptions/route.ts +++ b/apps/sim/app/api/cron/renew-subscriptions/route.ts @@ -1,14 +1,21 @@ import { db } from '@sim/db' import { account, webhook as webhookTable } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { generateShortId } from '@sim/utils/id' import { and, eq, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' +import { runDetached } from '@/lib/core/utils/background' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { refreshAccessTokenIfNeeded, resolveOAuthAccountId } from '@/app/api/auth/oauth/utils' const logger = createLogger('TeamsSubscriptionRenewal') +const LOCK_KEY = 'teams-subscription-renewal-lock' +/** Lock TTL in seconds — generous enough to cover the Graph API renewal loop. */ +const LOCK_TTL_SECONDS = 300 + async function getCredentialOwner( credentialId: string ): Promise<{ userId: string; accountId: string } | null> { @@ -29,159 +36,189 @@ async function getCredentialOwner( } /** - * Cron endpoint to renew Microsoft Teams chat subscriptions before they expire + * Renews Microsoft Teams chat subscriptions that are close to expiring. * - * Teams subscriptions expire after ~3 days and must be renewed. - * Configured in helm/sim/values.yaml under cronjobs.jobs.renewSubscriptions + * Teams subscriptions expire after ~3 days and must be renewed. Runs detached + * from the HTTP response so the cron caller does not wait for the Graph API loop. */ -export const GET = withRouteHandler(async (request: NextRequest) => { - try { - const authError = verifyCronAuth(request, 'Teams subscription renewal') - if (authError) { - return authError - } - - logger.info('Starting Teams subscription renewal job') - - let totalRenewed = 0 - let totalFailed = 0 - let totalChecked = 0 - - // Get all active Microsoft Teams webhooks - const webhooksWithWorkflows = await db - .select({ - webhook: webhookTable, - }) - .from(webhookTable) - .where( - and( - eq(webhookTable.isActive, true), - or( - eq(webhookTable.provider, 'microsoft-teams'), - eq(webhookTable.provider, 'microsoftteams') - ) +async function renewExpiringSubscriptions(): Promise<{ + checked: number + renewed: number + failed: number + total: number +}> { + logger.info('Starting Teams subscription renewal job') + + let totalRenewed = 0 + let totalFailed = 0 + let totalChecked = 0 + + // Get all active Microsoft Teams webhooks + const webhooksWithWorkflows = await db + .select({ + webhook: webhookTable, + }) + .from(webhookTable) + .where( + and( + eq(webhookTable.isActive, true), + or( + eq(webhookTable.provider, 'microsoft-teams'), + eq(webhookTable.provider, 'microsoftteams') ) ) - - logger.info( - `Found ${webhooksWithWorkflows.length} active Teams webhooks, checking for expiring subscriptions` ) - // Renewal threshold: 48 hours before expiration - const renewalThreshold = new Date(Date.now() + 48 * 60 * 60 * 1000) + logger.info( + `Found ${webhooksWithWorkflows.length} active Teams webhooks, checking for expiring subscriptions` + ) - for (const { webhook } of webhooksWithWorkflows) { - const config = (webhook.providerConfig as Record) || {} + // Renewal threshold: 48 hours before expiration + const renewalThreshold = new Date(Date.now() + 48 * 60 * 60 * 1000) - // Check if this is a Teams chat subscription that needs renewal - if (config.triggerId !== 'microsoftteams_chat_subscription') continue + for (const { webhook } of webhooksWithWorkflows) { + const config = (webhook.providerConfig as Record) || {} - const expirationStr = config.subscriptionExpiration as string | undefined - if (!expirationStr) continue + // Check if this is a Teams chat subscription that needs renewal + if (config.triggerId !== 'microsoftteams_chat_subscription') continue - const expiresAt = new Date(expirationStr) - if (expiresAt > renewalThreshold) continue // Not expiring soon + const expirationStr = config.subscriptionExpiration as string | undefined + if (!expirationStr) continue - totalChecked++ + const expiresAt = new Date(expirationStr) + if (expiresAt > renewalThreshold) continue // Not expiring soon - try { - logger.info( - `Renewing Teams subscription for webhook ${webhook.id} (expires: ${expiresAt.toISOString()})` - ) - - const credentialId = config.credentialId as string | undefined - const externalSubscriptionId = config.externalSubscriptionId as string | undefined - - if (!credentialId || !externalSubscriptionId) { - logger.error(`Missing credentialId or externalSubscriptionId for webhook ${webhook.id}`) - totalFailed++ - continue - } + totalChecked++ - const credentialOwner = await getCredentialOwner(credentialId) - if (!credentialOwner) { - logger.error(`Credential owner not found for credential ${credentialId}`) - totalFailed++ - continue - } + try { + logger.info( + `Renewing Teams subscription for webhook ${webhook.id} (expires: ${expiresAt.toISOString()})` + ) - // Get fresh access token - const accessToken = await refreshAccessTokenIfNeeded( - credentialOwner.accountId, - credentialOwner.userId, - `renewal-${webhook.id}` - ) + const credentialId = config.credentialId as string | undefined + const externalSubscriptionId = config.externalSubscriptionId as string | undefined - if (!accessToken) { - logger.error(`Failed to get access token for webhook ${webhook.id}`) - totalFailed++ - continue - } + if (!credentialId || !externalSubscriptionId) { + logger.error(`Missing credentialId or externalSubscriptionId for webhook ${webhook.id}`) + totalFailed++ + continue + } - // Extend subscription to maximum lifetime (4230 minutes = ~3 days) - const maxLifetimeMinutes = 4230 - const newExpirationDateTime = new Date( - Date.now() + maxLifetimeMinutes * 60 * 1000 - ).toISOString() - - const res = await fetch( - `https://graph.microsoft.com/v1.0/subscriptions/${externalSubscriptionId}`, - { - method: 'PATCH', - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ expirationDateTime: newExpirationDateTime }), - } - ) + const credentialOwner = await getCredentialOwner(credentialId) + if (!credentialOwner) { + logger.error(`Credential owner not found for credential ${credentialId}`) + totalFailed++ + continue + } - if (!res.ok) { - const error = await res.json() - logger.error( - `Failed to renew Teams subscription ${externalSubscriptionId} for webhook ${webhook.id}`, - { status: res.status, error: error.error } - ) - totalFailed++ - continue - } + // Get fresh access token + const accessToken = await refreshAccessTokenIfNeeded( + credentialOwner.accountId, + credentialOwner.userId, + `renewal-${webhook.id}` + ) - const payload = await res.json() + if (!accessToken) { + logger.error(`Failed to get access token for webhook ${webhook.id}`) + totalFailed++ + continue + } - // Update webhook config with new expiration - const updatedConfig = { - ...config, - subscriptionExpiration: payload.expirationDateTime, + // Extend subscription to maximum lifetime (4230 minutes = ~3 days) + const maxLifetimeMinutes = 4230 + const newExpirationDateTime = new Date( + Date.now() + maxLifetimeMinutes * 60 * 1000 + ).toISOString() + + const res = await fetch( + `https://graph.microsoft.com/v1.0/subscriptions/${externalSubscriptionId}`, + { + method: 'PATCH', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ expirationDateTime: newExpirationDateTime }), } + ) - await db - .update(webhookTable) - .set({ providerConfig: updatedConfig, updatedAt: new Date() }) - .where(eq(webhookTable.id, webhook.id)) - - logger.info( - `Successfully renewed Teams subscription for webhook ${webhook.id}. New expiration: ${payload.expirationDateTime}` + if (!res.ok) { + const error = await res.json() + logger.error( + `Failed to renew Teams subscription ${externalSubscriptionId} for webhook ${webhook.id}`, + { status: res.status, error: error.error } ) - totalRenewed++ - } catch (error) { - logger.error(`Error renewing subscription for webhook ${webhook.id}:`, error) totalFailed++ + continue + } + + const payload = await res.json() + + // Update webhook config with new expiration + const updatedConfig = { + ...config, + subscriptionExpiration: payload.expirationDateTime, } + + await db + .update(webhookTable) + .set({ providerConfig: updatedConfig, updatedAt: new Date() }) + .where(eq(webhookTable.id, webhook.id)) + + logger.info( + `Successfully renewed Teams subscription for webhook ${webhook.id}. New expiration: ${payload.expirationDateTime}` + ) + totalRenewed++ + } catch (error) { + logger.error(`Error renewing subscription for webhook ${webhook.id}:`, error) + totalFailed++ } + } - logger.info( - `Teams subscription renewal job completed. Checked: ${totalChecked}, Renewed: ${totalRenewed}, Failed: ${totalFailed}` - ) + logger.info( + `Teams subscription renewal job completed. Checked: ${totalChecked}, Renewed: ${totalRenewed}, Failed: ${totalFailed}` + ) - return NextResponse.json({ - success: true, - checked: totalChecked, - renewed: totalRenewed, - failed: totalFailed, - total: webhooksWithWorkflows.length, - }) - } catch (error) { - logger.error('Error in Teams subscription renewal job:', error) - return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + return { + checked: totalChecked, + renewed: totalRenewed, + failed: totalFailed, + total: webhooksWithWorkflows.length, } +} + +/** + * Cron endpoint to renew Microsoft Teams chat subscriptions before they expire. + * Configured in helm/sim/values.yaml under cronjobs.jobs.renewSubscriptions. + * + * Acknowledges the cron call immediately and renews subscriptions in the + * background; a Redis lock prevents overlapping runs. + */ +export const GET = withRouteHandler(async (request: NextRequest) => { + const authError = verifyCronAuth(request, 'Teams subscription renewal') + if (authError) { + return authError + } + + const lockValue = generateShortId() + const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) + if (!locked) { + return NextResponse.json( + { success: true, message: 'Renewal already in progress – skipped', status: 'skip' }, + { status: 202 } + ) + } + + runDetached('teams-subscription-renewal', async () => { + try { + await renewExpiringSubscriptions() + } finally { + await releaseLock(LOCK_KEY, lockValue).catch(() => {}) + } + }) + + return NextResponse.json( + { success: true, message: 'Teams subscription renewal started', status: 'started' }, + { status: 202 } + ) }) diff --git a/apps/sim/app/api/notifications/poll/route.test.ts b/apps/sim/app/api/notifications/poll/route.test.ts new file mode 100644 index 00000000000..fd41807c47e --- /dev/null +++ b/apps/sim/app/api/notifications/poll/route.test.ts @@ -0,0 +1,93 @@ +/** + * Tests for the inactivity-alert polling cron route. + * + * @vitest-environment node + */ +import { createMockRequest, redisConfigMock, redisConfigMockFns } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockVerifyCronAuth, mockPollInactivityAlerts } = vi.hoisted(() => ({ + mockVerifyCronAuth: vi.fn().mockReturnValue(null), + mockPollInactivityAlerts: vi.fn().mockResolvedValue({ checked: 0, delivered: 0 }), +})) + +vi.mock('@/lib/auth/internal', () => ({ + verifyCronAuth: mockVerifyCronAuth, +})) + +vi.mock('@/lib/core/config/redis', () => redisConfigMock) + +vi.mock('@/lib/notifications/inactivity-polling', () => ({ + pollInactivityAlerts: mockPollInactivityAlerts, +})) + +import { GET } from './route' + +function createRequest() { + return createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/notifications/poll') +} + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +describe('inactivity alert polling route (fire-and-forget)', () => { + beforeEach(() => { + vi.clearAllMocks() + redisConfigMockFns.mockAcquireLock.mockResolvedValue(true) + redisConfigMockFns.mockReleaseLock.mockResolvedValue(true) + mockVerifyCronAuth.mockReturnValue(null) + mockPollInactivityAlerts.mockResolvedValue({ checked: 0, delivered: 0 }) + }) + + it('returns the auth error when cron auth fails', async () => { + mockVerifyCronAuth.mockReturnValueOnce(new Response(null, { status: 401 }) as never) + + const response = await GET(createRequest()) + + expect(response.status).toBe(401) + expect(mockPollInactivityAlerts).not.toHaveBeenCalled() + }) + + it('acknowledges with 202 and polls in the background after acquiring the lock', async () => { + const response = await GET(createRequest()) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'started' }) + expect(redisConfigMockFns.mockAcquireLock).toHaveBeenCalledWith( + 'inactivity-alert-polling-lock', + expect.any(String), + expect.any(Number) + ) + + await flushMicrotasks() + expect(mockPollInactivityAlerts).toHaveBeenCalledTimes(1) + expect(redisConfigMockFns.mockReleaseLock).toHaveBeenCalledWith( + 'inactivity-alert-polling-lock', + expect.any(String) + ) + }) + + it('skips with 202 when the lock is already held', async () => { + redisConfigMockFns.mockAcquireLock.mockResolvedValueOnce(false) + + const response = await GET(createRequest()) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'skip' }) + expect(mockPollInactivityAlerts).not.toHaveBeenCalled() + }) + + it('releases the lock even when polling throws', async () => { + mockPollInactivityAlerts.mockRejectedValueOnce(new Error('poll failed')) + + const response = await GET(createRequest()) + + expect(response.status).toBe(202) + await flushMicrotasks() + expect(redisConfigMockFns.mockReleaseLock).toHaveBeenCalledWith( + 'inactivity-alert-polling-lock', + expect.any(String) + ) + }) +}) diff --git a/apps/sim/app/api/notifications/poll/route.ts b/apps/sim/app/api/notifications/poll/route.ts index f9e3c8c0c2b..d81707cb192 100644 --- a/apps/sim/app/api/notifications/poll/route.ts +++ b/apps/sim/app/api/notifications/poll/route.ts @@ -6,6 +6,7 @@ import { noInputSchema } from '@/lib/api/contracts/primitives' import { validationErrorResponse } from '@/lib/api/server' import { verifyCronAuth } from '@/lib/auth/internal' import { acquireLock, releaseLock } from '@/lib/core/config/redis' +import { runDetached } from '@/lib/core/utils/background' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { pollInactivityAlerts } from '@/lib/notifications/inactivity-polling' @@ -24,15 +25,13 @@ export const GET = withRouteHandler(async (request: NextRequest) => { ) if (!queryValidation.success) return validationErrorResponse(queryValidation.error) - let lockAcquired = false - try { const authError = verifyCronAuth(request, 'Inactivity alert polling') if (authError) { return authError } - lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS) + const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS) if (!lockAcquired) { return NextResponse.json( @@ -46,15 +45,23 @@ export const GET = withRouteHandler(async (request: NextRequest) => { ) } - const results = await pollInactivityAlerts() - - return NextResponse.json({ - success: true, - message: 'Inactivity alert polling completed', - requestId, - status: 'completed', - ...results, + runDetached('inactivity-alert-polling', async () => { + try { + await pollInactivityAlerts() + } finally { + await releaseLock(LOCK_KEY, requestId).catch(() => {}) + } }) + + return NextResponse.json( + { + success: true, + message: 'Inactivity alert polling started', + requestId, + status: 'started', + }, + { status: 202 } + ) } catch (error) { logger.error(`Error during inactivity alert polling (${requestId}):`, error) return NextResponse.json( @@ -66,9 +73,5 @@ export const GET = withRouteHandler(async (request: NextRequest) => { }, { status: 500 } ) - } finally { - if (lockAcquired) { - await releaseLock(LOCK_KEY, requestId).catch(() => {}) - } } }) diff --git a/apps/sim/app/api/schedules/execute/route.test.ts b/apps/sim/app/api/schedules/execute/route.test.ts index 877e564941e..3ad4338e545 100644 --- a/apps/sim/app/api/schedules/execute/route.test.ts +++ b/apps/sim/app/api/schedules/execute/route.test.ts @@ -4,7 +4,7 @@ * @vitest-environment node */ import { dbChainMock, dbChainMockFns, requestUtilsMockFns, resetDbChainMock } from '@sim/testing' -import type { NextRequest } from 'next/server' +import { type NextRequest, NextResponse } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' const orderByLimitMock = vi.fn() @@ -131,7 +131,7 @@ vi.mock('@sim/utils/id', () => ({ ), })) -import { GET } from './route' +import { GET, runScheduleTick } from './route' const SINGLE_SCHEDULE = [ { @@ -284,13 +284,9 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response).toBeDefined() - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('message') - expect(data).toHaveProperty('processedCount', 1) + expect(result.processedCount).toBe(1) }) it('should queue schedules to Trigger.dev when enabled', async () => { @@ -300,23 +296,17 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response).toBeDefined() - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('processedCount', 1) + expect(result.processedCount).toBe(1) }) it('should handle case with no due schedules', async () => { dbChainMockFns.returning.mockReturnValueOnce([]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('message') - expect(data).toHaveProperty('processedCount', 0) + expect(result.processedCount).toBe(0) }) it('should execute multiple schedules in parallel', async () => { @@ -328,20 +318,16 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce(MULTIPLE_SCHEDULES).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('processedCount', 2) + expect(result.processedCount).toBe(2) }) it('should execute mothership jobs inline', async () => { dbChainMockFns.limit.mockResolvedValueOnce([]).mockResolvedValueOnce([{ id: 'job-1' }]) dbChainMockFns.returning.mockReturnValueOnce(SINGLE_JOB) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockExecuteJobInline).toHaveBeenCalledWith( expect.objectContaining({ scheduleId: 'job-1', @@ -358,9 +344,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).toHaveBeenCalledWith( 'schedule-execution', expect.objectContaining({ @@ -398,9 +382,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([{ id: 'job-id-1' }]) try { - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).toHaveBeenCalledWith( 'schedule-execution', expect.objectContaining({ scheduleId: 'schedule-1' }), @@ -435,9 +417,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) try { - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).toHaveBeenCalled() expect(mockExecuteScheduleJob).not.toHaveBeenCalled() expect(mockCompleteJob).not.toHaveBeenCalled() @@ -485,9 +465,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([{ id: 'job-id-1' }]) try { - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockExecuteScheduleJob).toHaveBeenCalledWith( expect.objectContaining({ scheduleId: 'schedule-1' }) ) @@ -527,11 +505,9 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockResolvedValueOnce([{ id: 'pending-job-id' }]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('processedCount', 1) + expect(result.processedCount).toBe(1) expect(mockEnqueue).not.toHaveBeenCalled() expect(mockExecuteScheduleJob).toHaveBeenCalledWith( expect.objectContaining({ @@ -563,9 +539,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockExecuteScheduleJob).not.toHaveBeenCalled() expect(mockCompleteJob).toHaveBeenCalledWith( 'stale-pending-job-id', @@ -596,9 +570,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) .mockResolvedValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockExecuteScheduleJob).not.toHaveBeenCalled() expect(dbChainMockFns.set).toHaveBeenCalledWith( expect.objectContaining({ @@ -629,9 +601,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).not.toHaveBeenCalled() expect(mockReleaseScheduleLock).not.toHaveBeenCalled() expect(dbChainMockFns.set).toHaveBeenCalledWith( @@ -655,9 +625,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).not.toHaveBeenCalled() expect(dbChainMockFns.set).toHaveBeenCalledWith( expect.objectContaining({ @@ -685,9 +653,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([{ id: 'job-id-1' }]) try { - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockShouldExecuteInline).toHaveBeenCalledTimes(1) expect(mockExecuteScheduleJob).toHaveBeenCalledWith( expect.objectContaining({ scheduleId: 'schedule-1' }) @@ -718,9 +684,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).not.toHaveBeenCalled() expect(mockReleaseScheduleLock).not.toHaveBeenCalled() expect(dbChainMockFns.set).toHaveBeenCalledWith( @@ -769,9 +733,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockReturnValueOnce([schedule]) .mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).not.toHaveBeenCalled() expect(dbChainMockFns.set).not.toHaveBeenCalledWith( expect.objectContaining({ @@ -802,9 +764,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockEnqueue).toHaveBeenCalled() expect(dbChainMockFns.set).toHaveBeenCalledWith( expect.objectContaining({ @@ -835,9 +795,7 @@ describe('Scheduled Workflow Execution API Route', () => { .mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockCancelJob).toHaveBeenCalledWith('trigger-run-id') expect(mockReleaseScheduleLock).toHaveBeenCalledWith( 'schedule-1', @@ -871,11 +829,9 @@ describe('Scheduled Workflow Execution API Route', () => { dbChainMockFns.limit.mockResolvedValueOnce(claimedIds).mockResolvedValueOnce([]) dbChainMockFns.returning.mockReturnValueOnce(claimedSchedules).mockReturnValueOnce([]) - const response = await GET(createMockRequest()) + const result = await runScheduleTick('test-request-id') - expect(response.status).toBe(200) - const data = await response.json() - expect(data).toHaveProperty('processedCount', 100) + expect(result.processedCount).toBe(100) expect(dbChainMockFns.limit).toHaveBeenCalledWith(100) expect(mockEnqueue).toHaveBeenCalledTimes(100) }) @@ -892,9 +848,7 @@ describe('Scheduled Workflow Execution API Route', () => { dbChainMockFns.returning.mockReturnValueOnce([schedule]).mockReturnValueOnce([]) mockGetJob.mockResolvedValueOnce({ id: 'job-id-1', status: 'completed' }) - const response = await GET(createMockRequest()) - - expect(response.status).toBe(200) + await runScheduleTick('test-request-id') expect(mockReleaseScheduleLock).toHaveBeenCalledWith( 'schedule-1', 'test-request-id', @@ -904,4 +858,40 @@ describe('Scheduled Workflow Execution API Route', () => { { expectedLastQueuedAt: claimedAt } ) }) + + describe('GET handler (fire-and-forget)', () => { + it('returns the auth error when cron auth fails', async () => { + mockVerifyCronAuth.mockReturnValueOnce( + NextResponse.json({ error: 'unauthorized' }, { status: 401 }) + ) + + const response = await GET(createMockRequest()) + + expect(response.status).toBe(401) + }) + + it('acknowledges immediately with 202 and starts the tick in the background', async () => { + const response = await GET(createMockRequest()) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'started' }) + }) + + it('returns already_running while a tick is in flight in the same process', async () => { + // Both calls run synchronously up to their `return` (no awaits before the + // single-flight guard), so the detached tick from the first call cannot + // progress past its first `await` before the second call checks the guard. + const first = GET(createMockRequest()) + const second = GET(createMockRequest()) + + const [firstData, secondData] = await Promise.all([ + first.then((r) => r.json()), + second.then((r) => r.json()), + ]) + + expect(firstData).toMatchObject({ status: 'started' }) + expect(secondData).toMatchObject({ status: 'already_running' }) + }) + }) }) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index fa63b04935a..8126f0bfdca 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -14,6 +14,7 @@ import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' import { JOB_STATUS, type Job } from '@/lib/core/async-jobs/types' import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' +import { createSingleFlight } from '@/lib/core/utils/background' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { @@ -1071,111 +1072,127 @@ async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string } } -export const GET = withRouteHandler(async (request: NextRequest) => { - const requestId = generateRequestId() - const tickStart = Date.now() - logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`) +const scheduleTickGuard = createSingleFlight() - const authError = verifyCronAuth(request, 'Schedule execution') - if (authError) { - return authError - } +interface ScheduleTickResult { + processedCount: number + totalSchedules: number + totalJobs: number +} - try { - const jobQueue = await getJobQueue() - const useDatabaseFallback = shouldExecuteInline() - let totalSchedules = 0 - let totalJobs = 0 - let iterations = 0 - let remainingWorkflowBudget = SCHEDULE_WORKFLOW_ENQUEUE_LIMIT - let schedulesExhausted = false - let jobsExhausted = false - - while (Date.now() - tickStart < MAX_TICK_DURATION_MS) { - if (schedulesExhausted && jobsExhausted) break - const queuedAt = new Date() - let resumedPendingSchedules = 0 - let databaseScheduleSlots = SCHEDULE_EXECUTION_CONCURRENCY_LIMIT - - if (useDatabaseFallback) { - await recoverStaleDatabaseScheduleJobs(queuedAt) - databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() - resumedPendingSchedules = await resumePendingDatabaseScheduleJobs( - jobQueue, - requestId, - databaseScheduleSlots - ) - databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() - } +/** + * Drains due schedules and jobs, claiming and enqueuing work until the tick + * budget is exhausted or no more items are due. Runs detached from the HTTP + * response so the cron caller does not wait; cross-replica safety is provided by + * the `FOR UPDATE SKIP LOCKED` claim layer, not this function. + */ +export async function runScheduleTick(requestId: string): Promise { + const tickStart = Date.now() - const workflowClaimLimit = Math.min( - WORKFLOW_CHUNK_SIZE, - remainingWorkflowBudget, - useDatabaseFallback ? databaseScheduleSlots : WORKFLOW_CHUNK_SIZE - ) + const jobQueue = await getJobQueue() + const useDatabaseFallback = shouldExecuteInline() + let totalSchedules = 0 + let totalJobs = 0 + let iterations = 0 + let remainingWorkflowBudget = SCHEDULE_WORKFLOW_ENQUEUE_LIMIT + let schedulesExhausted = false + let jobsExhausted = false + + while (Date.now() - tickStart < MAX_TICK_DURATION_MS) { + if (schedulesExhausted && jobsExhausted) break + const queuedAt = new Date() + let resumedPendingSchedules = 0 + let databaseScheduleSlots = SCHEDULE_EXECUTION_CONCURRENCY_LIMIT - if (useDatabaseFallback && workflowClaimLimit <= 0) { - schedulesExhausted = true - } + if (useDatabaseFallback) { + await recoverStaleDatabaseScheduleJobs(queuedAt) + databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() + resumedPendingSchedules = await resumePendingDatabaseScheduleJobs( + jobQueue, + requestId, + databaseScheduleSlots + ) + databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() + } - const [dueSchedules, dueJobs] = await Promise.all([ - schedulesExhausted ? [] : claimWorkflowSchedules(queuedAt, workflowClaimLimit), - jobsExhausted ? [] : claimJobSchedules(queuedAt, JOB_CHUNK_SIZE), - ]) + const workflowClaimLimit = Math.min( + WORKFLOW_CHUNK_SIZE, + remainingWorkflowBudget, + useDatabaseFallback ? databaseScheduleSlots : WORKFLOW_CHUNK_SIZE + ) - remainingWorkflowBudget -= dueSchedules.length - if (dueSchedules.length < workflowClaimLimit || remainingWorkflowBudget <= 0) { - schedulesExhausted = true - } - if (dueJobs.length < JOB_CHUNK_SIZE) jobsExhausted = true + if (useDatabaseFallback && workflowClaimLimit <= 0) { + schedulesExhausted = true + } - if (dueSchedules.length === 0 && dueJobs.length === 0 && resumedPendingSchedules === 0) break + const [dueSchedules, dueJobs] = await Promise.all([ + schedulesExhausted ? [] : claimWorkflowSchedules(queuedAt, workflowClaimLimit), + jobsExhausted ? [] : claimJobSchedules(queuedAt, JOB_CHUNK_SIZE), + ]) - iterations += 1 - totalSchedules += dueSchedules.length + resumedPendingSchedules - totalJobs += dueJobs.length + remainingWorkflowBudget -= dueSchedules.length + if (dueSchedules.length < workflowClaimLimit || remainingWorkflowBudget <= 0) { + schedulesExhausted = true + } + if (dueJobs.length < JOB_CHUNK_SIZE) jobsExhausted = true - logger.info( - `[${requestId}] Iteration ${iterations}: claimed ${dueSchedules.length} schedules, resumed ${resumedPendingSchedules} pending schedule jobs, ${dueJobs.length} jobs`, - { - remainingWorkflowBudget, - scheduleConcurrencyLimit: SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, - databaseScheduleSlots, - } - ) + if (dueSchedules.length === 0 && dueJobs.length === 0 && resumedPendingSchedules === 0) break - const schedulePromises = - dueSchedules.length > 0 - ? dueSchedules.map((schedule) => - processScheduleItem(schedule, queuedAt, requestId, jobQueue, useDatabaseFallback) - ) - : [] - - await Promise.allSettled([ - ...schedulePromises, - ...dueJobs.map((job) => processJobItem(job, queuedAt, requestId)), - ]) - } + iterations += 1 + totalSchedules += dueSchedules.length + resumedPendingSchedules + totalJobs += dueJobs.length - const totalCount = totalSchedules + totalJobs - const durationMs = Date.now() - tickStart logger.info( - `[${requestId}] Processed ${totalCount} items across ${iterations} iteration(s) in ${durationMs}ms (${totalSchedules} schedules, ${totalJobs} jobs)`, + `[${requestId}] Iteration ${iterations}: claimed ${dueSchedules.length} schedules, resumed ${resumedPendingSchedules} pending schedule jobs, ${dueJobs.length} jobs`, { - scheduleConcurrencyLimit: SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, - scheduleEnqueueBudget: SCHEDULE_WORKFLOW_ENQUEUE_LIMIT, remainingWorkflowBudget, + scheduleConcurrencyLimit: SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, + databaseScheduleSlots, } ) - const response = { - message: 'Scheduled workflow executions processed', - processedCount: totalCount, - } satisfies ExecuteSchedulesResponse + const schedulePromises = + dueSchedules.length > 0 + ? dueSchedules.map((schedule) => + processScheduleItem(schedule, queuedAt, requestId, jobQueue, useDatabaseFallback) + ) + : [] - return NextResponse.json(response) - } catch (error) { - logger.error(`[${requestId}] Error in scheduled execution handler`, error) - return NextResponse.json({ error: toError(error).message }, { status: 500 }) + await Promise.allSettled([ + ...schedulePromises, + ...dueJobs.map((job) => processJobItem(job, queuedAt, requestId)), + ]) } + + const totalCount = totalSchedules + totalJobs + const durationMs = Date.now() - tickStart + logger.info( + `[${requestId}] Processed ${totalCount} items across ${iterations} iteration(s) in ${durationMs}ms (${totalSchedules} schedules, ${totalJobs} jobs)`, + { + scheduleConcurrencyLimit: SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, + scheduleEnqueueBudget: SCHEDULE_WORKFLOW_ENQUEUE_LIMIT, + remainingWorkflowBudget, + } + ) + + return { processedCount: totalCount, totalSchedules, totalJobs } +} + +export const GET = withRouteHandler(async (request: NextRequest) => { + const requestId = generateRequestId() + logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`) + + const authError = verifyCronAuth(request, 'Schedule execution') + if (authError) { + return authError + } + + const started = scheduleTickGuard.run('schedule-execution-tick', () => runScheduleTick(requestId)) + + const response = { + message: started ? 'Scheduled execution started' : 'Scheduled execution already in progress', + status: started ? 'started' : 'already_running', + } satisfies ExecuteSchedulesResponse + + return NextResponse.json(response, { status: 202 }) }) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.test.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.test.ts new file mode 100644 index 00000000000..e8d5fc91da4 --- /dev/null +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.test.ts @@ -0,0 +1,107 @@ +/** + * Tests for the webhook polling cron route. + * + * @vitest-environment node + */ +import { createMockRequest, redisConfigMock, redisConfigMockFns } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockVerifyCronAuth, mockPollProvider } = vi.hoisted(() => ({ + mockVerifyCronAuth: vi.fn().mockReturnValue(null), + mockPollProvider: vi.fn().mockResolvedValue({ processed: 0 }), +})) + +vi.mock('@/lib/auth/internal', () => ({ + verifyCronAuth: mockVerifyCronAuth, +})) + +vi.mock('@/lib/core/config/redis', () => redisConfigMock) + +vi.mock('@/lib/webhooks/polling', () => ({ + pollProvider: mockPollProvider, + VALID_POLLING_PROVIDERS: new Set(['gmail', 'outlook', 'rss']), +})) + +import { GET } from './route' + +function createRequest() { + return createMockRequest('GET', undefined, {}, 'http://localhost:3000/api/webhooks/poll/gmail') +} + +function createContext(provider: string) { + return { params: Promise.resolve({ provider }) } +} + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +describe('webhook polling route (fire-and-forget)', () => { + beforeEach(() => { + vi.clearAllMocks() + redisConfigMockFns.mockAcquireLock.mockResolvedValue(true) + redisConfigMockFns.mockReleaseLock.mockResolvedValue(true) + mockVerifyCronAuth.mockReturnValue(null) + mockPollProvider.mockResolvedValue({ processed: 0 }) + }) + + it('returns the auth error when cron auth fails', async () => { + mockVerifyCronAuth.mockReturnValueOnce( + new Response(null, { status: 401 }) as unknown as Response + ) + + const response = await GET(createRequest(), createContext('gmail')) + + expect(response.status).toBe(401) + expect(mockPollProvider).not.toHaveBeenCalled() + }) + + it('returns 404 for an unknown provider', async () => { + const response = await GET(createRequest(), createContext('unknown')) + + expect(response.status).toBe(404) + expect(redisConfigMockFns.mockAcquireLock).not.toHaveBeenCalled() + }) + + it('acknowledges with 202 and polls in the background after acquiring the lock', async () => { + const response = await GET(createRequest(), createContext('gmail')) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'started' }) + expect(redisConfigMockFns.mockAcquireLock).toHaveBeenCalledWith( + 'gmail-polling-lock', + expect.any(String), + expect.any(Number) + ) + + await flushMicrotasks() + expect(mockPollProvider).toHaveBeenCalledWith('gmail') + expect(redisConfigMockFns.mockReleaseLock).toHaveBeenCalledWith( + 'gmail-polling-lock', + expect.any(String) + ) + }) + + it('skips with 202 when the lock is already held', async () => { + redisConfigMockFns.mockAcquireLock.mockResolvedValueOnce(false) + + const response = await GET(createRequest(), createContext('gmail')) + + expect(response.status).toBe(202) + const data = await response.json() + expect(data).toMatchObject({ status: 'skip' }) + expect(mockPollProvider).not.toHaveBeenCalled() + }) + + it('releases the lock even when polling throws', async () => { + mockPollProvider.mockRejectedValueOnce(new Error('poll failed')) + + const response = await GET(createRequest(), createContext('gmail')) + + expect(response.status).toBe(202) + await flushMicrotasks() + expect(redisConfigMockFns.mockReleaseLock).toHaveBeenCalledWith( + 'gmail-polling-lock', + expect.any(String) + ) + }) +}) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index 3c8415ea343..a55c1082724 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -6,6 +6,7 @@ import { webhookPollingContract } from '@/lib/api/contracts/webhooks' import { parseRequest } from '@/lib/api/server' import { verifyCronAuth } from '@/lib/auth/internal' import { acquireLock, releaseLock } from '@/lib/core/config/redis' +import { runDetached } from '@/lib/core/utils/background' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' @@ -38,37 +39,38 @@ export const GET = withRouteHandler( } const LOCK_KEY = `${provider}-polling-lock` - let lockValue: string | undefined + const lockValue = requestId + const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) + if (!locked) { + return NextResponse.json( + { + success: true, + message: 'Polling already in progress – skipped', + requestId, + status: 'skip', + }, + { status: 202 } + ) + } - try { - lockValue = requestId - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - if (!locked) { - return NextResponse.json( - { - success: true, - message: 'Polling already in progress – skipped', - requestId, - status: 'skip', - }, - { status: 202 } - ) + const pollingProvider = provider + runDetached(`${pollingProvider}-polling`, async () => { + try { + await pollProvider(pollingProvider) + } finally { + await releaseLock(LOCK_KEY, lockValue).catch(() => {}) } + }) - const results = await pollProvider(provider) - - return NextResponse.json({ + return NextResponse.json( + { success: true, - message: `${provider} polling completed`, + message: `${provider} polling started`, requestId, - status: 'completed', - ...results, - }) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } - } + status: 'started', + }, + { status: 202 } + ) } catch (error) { const providerLabel = provider ?? 'webhook' logger.error(`Error during ${providerLabel} polling (${requestId}):`, error) diff --git a/apps/sim/lib/api/contracts/schedules.ts b/apps/sim/lib/api/contracts/schedules.ts index bac9c0b2d70..3737473ba74 100644 --- a/apps/sim/lib/api/contracts/schedules.ts +++ b/apps/sim/lib/api/contracts/schedules.ts @@ -134,7 +134,7 @@ const messageResponseSchema = z.object({ export const executeSchedulesResponseSchema = z.object({ message: z.string(), - processedCount: z.number().int().min(0), + status: z.enum(['started', 'already_running']), }) export type ExecuteSchedulesResponse = z.output diff --git a/apps/sim/lib/core/utils/background.test.ts b/apps/sim/lib/core/utils/background.test.ts new file mode 100644 index 00000000000..974344d167c --- /dev/null +++ b/apps/sim/lib/core/utils/background.test.ts @@ -0,0 +1,76 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it, vi } from 'vitest' +import { createSingleFlight, runDetached } from '@/lib/core/utils/background' + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +describe('runDetached', () => { + it('runs the work without the caller awaiting it', async () => { + const work = vi.fn().mockResolvedValue(undefined) + + runDetached('test', work) + + await flushMicrotasks() + expect(work).toHaveBeenCalledTimes(1) + }) + + it('swallows rejections so they do not surface as unhandled', async () => { + const work = vi.fn().mockRejectedValue(new Error('boom')) + + expect(() => runDetached('test', work)).not.toThrow() + await flushMicrotasks() + expect(work).toHaveBeenCalledTimes(1) + }) + + it('swallows synchronous throws from work', async () => { + const work = vi.fn(() => { + throw new Error('sync boom') + }) + + expect(() => runDetached('test', work)).not.toThrow() + await flushMicrotasks() + }) +}) + +describe('createSingleFlight', () => { + it('starts work and reports active while in flight', async () => { + const guard = createSingleFlight() + let release: () => void = () => {} + const gate = new Promise((resolve) => { + release = resolve + }) + + const started = guard.run('task', () => gate) + expect(started).toBe(true) + expect(guard.isActive()).toBe(true) + + release() + await flushMicrotasks() + expect(guard.isActive()).toBe(false) + }) + + it('refuses a second run while one is already in flight', async () => { + const guard = createSingleFlight() + let release: () => void = () => {} + const gate = new Promise((resolve) => { + release = resolve + }) + + expect(guard.run('task', () => gate)).toBe(true) + expect(guard.run('task', () => Promise.resolve())).toBe(false) + + release() + await flushMicrotasks() + expect(guard.run('task', () => Promise.resolve())).toBe(true) + }) + + it('clears the active flag even when work rejects', async () => { + const guard = createSingleFlight() + + expect(guard.run('task', () => Promise.reject(new Error('boom')))).toBe(true) + await flushMicrotasks() + expect(guard.isActive()).toBe(false) + }) +}) diff --git a/apps/sim/lib/core/utils/background.ts b/apps/sim/lib/core/utils/background.ts new file mode 100644 index 00000000000..a3d500cdbee --- /dev/null +++ b/apps/sim/lib/core/utils/background.ts @@ -0,0 +1,61 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' + +const logger = createLogger('BackgroundTask') + +/** + * Runs work detached from the HTTP response so a caller (e.g. a cron job with a + * short request timeout) receives an immediate response while processing + * continues on the long-lived server process. + * + * `withRouteHandler` only wraps awaited work in its try/catch, so a detached + * promise must catch its own rejection or it surfaces as an `unhandledRejection`. + * The request-scoped AsyncLocalStorage context (request ID) is captured when the + * work is scheduled and preserved across the detached continuation, so loggers + * inside `work` keep the originating request ID. + * + * @param label - Identifier used in the failure log line. + * @param work - The async work to run in the background. + */ +export function runDetached(label: string, work: () => Promise): void { + void Promise.resolve() + .then(work) + .catch((error) => { + logger.error(`Background task failed: ${label}`, toError(error)) + }) +} + +/** + * A per-process single-flight guard. Prevents a long-running detached task from + * piling up when it is invoked again before the previous run finishes. + * + * This guards a single Node process only — cross-replica deduplication must be + * handled by the underlying work (e.g. database row claiming or a distributed + * lock). + */ +export function createSingleFlight() { + let active = false + + return { + /** Whether a run is currently in flight in this process. */ + isActive: (): boolean => active, + + /** + * Starts `work` detached if no run is active. + * + * @returns `true` if a new run started, `false` if one was already in flight. + */ + run(label: string, work: () => Promise): boolean { + if (active) return false + active = true + runDetached(label, () => + Promise.resolve() + .then(work) + .finally(() => { + active = false + }) + ) + return true + }, + } +} From 1877f2c06bab49c5214edb77de064e3d4f2ac534 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 16:02:12 -0700 Subject: [PATCH 2/3] fix(cron): add staleness takeover to single-flight guard --- apps/sim/app/api/schedules/execute/route.ts | 6 ++- apps/sim/lib/core/utils/background.test.ts | 50 +++++++++++++++++++-- apps/sim/lib/core/utils/background.ts | 45 +++++++++++++++---- 3 files changed, 88 insertions(+), 13 deletions(-) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 8126f0bfdca..8f83e5b157a 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -1072,7 +1072,11 @@ async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string } } -const scheduleTickGuard = createSingleFlight() +/** + * A tick self-bounds at `MAX_TICK_DURATION_MS`; allow a grace window beyond that + * before a hung tick is considered stale and a new one is allowed to take over. + */ +const scheduleTickGuard = createSingleFlight({ staleAfterMs: MAX_TICK_DURATION_MS + 60_000 }) interface ScheduleTickResult { processedCount: number diff --git a/apps/sim/lib/core/utils/background.test.ts b/apps/sim/lib/core/utils/background.test.ts index 974344d167c..67ef0239497 100644 --- a/apps/sim/lib/core/utils/background.test.ts +++ b/apps/sim/lib/core/utils/background.test.ts @@ -36,7 +36,7 @@ describe('runDetached', () => { describe('createSingleFlight', () => { it('starts work and reports active while in flight', async () => { - const guard = createSingleFlight() + const guard = createSingleFlight({ staleAfterMs: 60_000 }) let release: () => void = () => {} const gate = new Promise((resolve) => { release = resolve @@ -52,7 +52,7 @@ describe('createSingleFlight', () => { }) it('refuses a second run while one is already in flight', async () => { - const guard = createSingleFlight() + const guard = createSingleFlight({ staleAfterMs: 60_000 }) let release: () => void = () => {} const gate = new Promise((resolve) => { release = resolve @@ -67,10 +67,54 @@ describe('createSingleFlight', () => { }) it('clears the active flag even when work rejects', async () => { - const guard = createSingleFlight() + const guard = createSingleFlight({ staleAfterMs: 60_000 }) expect(guard.run('task', () => Promise.reject(new Error('boom')))).toBe(true) await flushMicrotasks() expect(guard.isActive()).toBe(false) }) + + it('takes over a stale run whose work never settles', async () => { + const guard = createSingleFlight({ staleAfterMs: 10 }) + + // A run whose promise never settles — its `finally` never fires. + expect(guard.run('task', () => new Promise(() => {}))).toBe(true) + expect(guard.run('task', () => Promise.resolve())).toBe(false) + + await new Promise((resolve) => setTimeout(resolve, 20)) + + const second = vi.fn().mockResolvedValue(undefined) + expect(guard.run('task', second)).toBe(true) + await flushMicrotasks() + expect(second).toHaveBeenCalledTimes(1) + expect(guard.isActive()).toBe(false) + }) + + it('does not let a late stale run clear a newer run slot', async () => { + const guard = createSingleFlight({ staleAfterMs: 10 }) + + let releaseStale: () => void = () => {} + const stale = new Promise((resolve) => { + releaseStale = resolve + }) + expect(guard.run('task', () => stale)).toBe(true) + + await new Promise((resolve) => setTimeout(resolve, 20)) + + // New run takes over the stale slot. + let releaseFresh: () => void = () => {} + const fresh = new Promise((resolve) => { + releaseFresh = resolve + }) + expect(guard.run('task', () => fresh)).toBe(true) + + // The original stale run settling late must not release the newer slot. + releaseStale() + await flushMicrotasks() + expect(guard.isActive()).toBe(true) + + releaseFresh() + await flushMicrotasks() + expect(guard.isActive()).toBe(false) + }) }) diff --git a/apps/sim/lib/core/utils/background.ts b/apps/sim/lib/core/utils/background.ts index a3d500cdbee..c4e3c142ab1 100644 --- a/apps/sim/lib/core/utils/background.ts +++ b/apps/sim/lib/core/utils/background.ts @@ -25,6 +25,16 @@ export function runDetached(label: string, work: () => Promise): void { }) } +interface SingleFlightOptions { + /** + * How long a run may hold the slot before it is treated as stale. A later + * `run` call past this window takes over and starts a fresh run, so a hung + * task (one whose promise never settles) cannot wedge the slot permanently. + * This is the in-process equivalent of a distributed lock's TTL. + */ + staleAfterMs: number +} + /** * A per-process single-flight guard. Prevents a long-running detached task from * piling up when it is invoked again before the previous run finishes. @@ -32,27 +42,44 @@ export function runDetached(label: string, work: () => Promise): void { * This guards a single Node process only — cross-replica deduplication must be * handled by the underlying work (e.g. database row claiming or a distributed * lock). + * + * A held slot is released when its run settles, or — if the run hangs and never + * settles — taken over by the next `run` call after `staleAfterMs`. Ownership is + * tracked by token so a stale run that settles late cannot clear a newer run's + * slot. */ -export function createSingleFlight() { - let active = false +export function createSingleFlight({ staleAfterMs }: SingleFlightOptions) { + let activeToken: symbol | null = null + let activeSince = 0 return { - /** Whether a run is currently in flight in this process. */ - isActive: (): boolean => active, + /** Whether a run currently holds the slot in this process. */ + isActive: (): boolean => activeToken !== null, /** - * Starts `work` detached if no run is active. + * Starts `work` detached unless a non-stale run already holds the slot. * - * @returns `true` if a new run started, `false` if one was already in flight. + * @returns `true` if a new run started, `false` if a run was already in flight. */ run(label: string, work: () => Promise): boolean { - if (active) return false - active = true + const now = Date.now() + if (activeToken !== null) { + if (now - activeSince < staleAfterMs) return false + logger.warn( + `Single-flight "${label}" held for ${now - activeSince}ms (> ${staleAfterMs}ms); starting a new run` + ) + } + + const token = Symbol(label) + activeToken = token + activeSince = now runDetached(label, () => Promise.resolve() .then(work) .finally(() => { - active = false + if (activeToken === token) { + activeToken = null + } }) ) return true From 82d194cae584fcc6a67a30709b086baff3dd5de7 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 16:07:00 -0700 Subject: [PATCH 3/3] improvement(cron): drop single-flight guard, rely on DB row claiming --- .../app/api/schedules/execute/route.test.ts | 16 ---- apps/sim/app/api/schedules/execute/route.ts | 14 +-- apps/sim/lib/api/contracts/schedules.ts | 2 +- apps/sim/lib/core/utils/background.test.ts | 87 +------------------ apps/sim/lib/core/utils/background.ts | 62 ------------- 5 files changed, 6 insertions(+), 175 deletions(-) diff --git a/apps/sim/app/api/schedules/execute/route.test.ts b/apps/sim/app/api/schedules/execute/route.test.ts index 3ad4338e545..2fe434e87b2 100644 --- a/apps/sim/app/api/schedules/execute/route.test.ts +++ b/apps/sim/app/api/schedules/execute/route.test.ts @@ -877,21 +877,5 @@ describe('Scheduled Workflow Execution API Route', () => { const data = await response.json() expect(data).toMatchObject({ status: 'started' }) }) - - it('returns already_running while a tick is in flight in the same process', async () => { - // Both calls run synchronously up to their `return` (no awaits before the - // single-flight guard), so the detached tick from the first call cannot - // progress past its first `await` before the second call checks the guard. - const first = GET(createMockRequest()) - const second = GET(createMockRequest()) - - const [firstData, secondData] = await Promise.all([ - first.then((r) => r.json()), - second.then((r) => r.json()), - ]) - - expect(firstData).toMatchObject({ status: 'started' }) - expect(secondData).toMatchObject({ status: 'already_running' }) - }) }) }) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 8f83e5b157a..6dedd8d7494 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -14,7 +14,7 @@ import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' import { JOB_STATUS, type Job } from '@/lib/core/async-jobs/types' import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' -import { createSingleFlight } from '@/lib/core/utils/background' +import { runDetached } from '@/lib/core/utils/background' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { @@ -1072,12 +1072,6 @@ async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string } } -/** - * A tick self-bounds at `MAX_TICK_DURATION_MS`; allow a grace window beyond that - * before a hung tick is considered stale and a new one is allowed to take over. - */ -const scheduleTickGuard = createSingleFlight({ staleAfterMs: MAX_TICK_DURATION_MS + 60_000 }) - interface ScheduleTickResult { processedCount: number totalSchedules: number @@ -1191,11 +1185,11 @@ export const GET = withRouteHandler(async (request: NextRequest) => { return authError } - const started = scheduleTickGuard.run('schedule-execution-tick', () => runScheduleTick(requestId)) + runDetached('schedule-execution-tick', () => runScheduleTick(requestId)) const response = { - message: started ? 'Scheduled execution started' : 'Scheduled execution already in progress', - status: started ? 'started' : 'already_running', + message: 'Scheduled execution started', + status: 'started', } satisfies ExecuteSchedulesResponse return NextResponse.json(response, { status: 202 }) diff --git a/apps/sim/lib/api/contracts/schedules.ts b/apps/sim/lib/api/contracts/schedules.ts index 3737473ba74..28e938d30d1 100644 --- a/apps/sim/lib/api/contracts/schedules.ts +++ b/apps/sim/lib/api/contracts/schedules.ts @@ -134,7 +134,7 @@ const messageResponseSchema = z.object({ export const executeSchedulesResponseSchema = z.object({ message: z.string(), - status: z.enum(['started', 'already_running']), + status: z.literal('started'), }) export type ExecuteSchedulesResponse = z.output diff --git a/apps/sim/lib/core/utils/background.test.ts b/apps/sim/lib/core/utils/background.test.ts index 67ef0239497..dd4fad698ba 100644 --- a/apps/sim/lib/core/utils/background.test.ts +++ b/apps/sim/lib/core/utils/background.test.ts @@ -2,7 +2,7 @@ * @vitest-environment node */ import { describe, expect, it, vi } from 'vitest' -import { createSingleFlight, runDetached } from '@/lib/core/utils/background' +import { runDetached } from '@/lib/core/utils/background' const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) @@ -33,88 +33,3 @@ describe('runDetached', () => { await flushMicrotasks() }) }) - -describe('createSingleFlight', () => { - it('starts work and reports active while in flight', async () => { - const guard = createSingleFlight({ staleAfterMs: 60_000 }) - let release: () => void = () => {} - const gate = new Promise((resolve) => { - release = resolve - }) - - const started = guard.run('task', () => gate) - expect(started).toBe(true) - expect(guard.isActive()).toBe(true) - - release() - await flushMicrotasks() - expect(guard.isActive()).toBe(false) - }) - - it('refuses a second run while one is already in flight', async () => { - const guard = createSingleFlight({ staleAfterMs: 60_000 }) - let release: () => void = () => {} - const gate = new Promise((resolve) => { - release = resolve - }) - - expect(guard.run('task', () => gate)).toBe(true) - expect(guard.run('task', () => Promise.resolve())).toBe(false) - - release() - await flushMicrotasks() - expect(guard.run('task', () => Promise.resolve())).toBe(true) - }) - - it('clears the active flag even when work rejects', async () => { - const guard = createSingleFlight({ staleAfterMs: 60_000 }) - - expect(guard.run('task', () => Promise.reject(new Error('boom')))).toBe(true) - await flushMicrotasks() - expect(guard.isActive()).toBe(false) - }) - - it('takes over a stale run whose work never settles', async () => { - const guard = createSingleFlight({ staleAfterMs: 10 }) - - // A run whose promise never settles — its `finally` never fires. - expect(guard.run('task', () => new Promise(() => {}))).toBe(true) - expect(guard.run('task', () => Promise.resolve())).toBe(false) - - await new Promise((resolve) => setTimeout(resolve, 20)) - - const second = vi.fn().mockResolvedValue(undefined) - expect(guard.run('task', second)).toBe(true) - await flushMicrotasks() - expect(second).toHaveBeenCalledTimes(1) - expect(guard.isActive()).toBe(false) - }) - - it('does not let a late stale run clear a newer run slot', async () => { - const guard = createSingleFlight({ staleAfterMs: 10 }) - - let releaseStale: () => void = () => {} - const stale = new Promise((resolve) => { - releaseStale = resolve - }) - expect(guard.run('task', () => stale)).toBe(true) - - await new Promise((resolve) => setTimeout(resolve, 20)) - - // New run takes over the stale slot. - let releaseFresh: () => void = () => {} - const fresh = new Promise((resolve) => { - releaseFresh = resolve - }) - expect(guard.run('task', () => fresh)).toBe(true) - - // The original stale run settling late must not release the newer slot. - releaseStale() - await flushMicrotasks() - expect(guard.isActive()).toBe(true) - - releaseFresh() - await flushMicrotasks() - expect(guard.isActive()).toBe(false) - }) -}) diff --git a/apps/sim/lib/core/utils/background.ts b/apps/sim/lib/core/utils/background.ts index c4e3c142ab1..5f3d4f7d6c4 100644 --- a/apps/sim/lib/core/utils/background.ts +++ b/apps/sim/lib/core/utils/background.ts @@ -24,65 +24,3 @@ export function runDetached(label: string, work: () => Promise): void { logger.error(`Background task failed: ${label}`, toError(error)) }) } - -interface SingleFlightOptions { - /** - * How long a run may hold the slot before it is treated as stale. A later - * `run` call past this window takes over and starts a fresh run, so a hung - * task (one whose promise never settles) cannot wedge the slot permanently. - * This is the in-process equivalent of a distributed lock's TTL. - */ - staleAfterMs: number -} - -/** - * A per-process single-flight guard. Prevents a long-running detached task from - * piling up when it is invoked again before the previous run finishes. - * - * This guards a single Node process only — cross-replica deduplication must be - * handled by the underlying work (e.g. database row claiming or a distributed - * lock). - * - * A held slot is released when its run settles, or — if the run hangs and never - * settles — taken over by the next `run` call after `staleAfterMs`. Ownership is - * tracked by token so a stale run that settles late cannot clear a newer run's - * slot. - */ -export function createSingleFlight({ staleAfterMs }: SingleFlightOptions) { - let activeToken: symbol | null = null - let activeSince = 0 - - return { - /** Whether a run currently holds the slot in this process. */ - isActive: (): boolean => activeToken !== null, - - /** - * Starts `work` detached unless a non-stale run already holds the slot. - * - * @returns `true` if a new run started, `false` if a run was already in flight. - */ - run(label: string, work: () => Promise): boolean { - const now = Date.now() - if (activeToken !== null) { - if (now - activeSince < staleAfterMs) return false - logger.warn( - `Single-flight "${label}" held for ${now - activeSince}ms (> ${staleAfterMs}ms); starting a new run` - ) - } - - const token = Symbol(label) - activeToken = token - activeSince = now - runDetached(label, () => - Promise.resolve() - .then(work) - .finally(() => { - if (activeToken === token) { - activeToken = null - } - }) - ) - return true - }, - } -}