Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions apps/sim/app/api/cron/cleanup-soft-deletes/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue } from '@/lib/core/async-jobs'

export const dynamic = 'force-dynamic'

const logger = createLogger('SoftDeleteCleanupAPI')

export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'soft-delete cleanup')
if (authError) return authError

const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('cleanup-soft-deletes', {})

logger.info('Soft-delete cleanup job dispatched', { jobId })

return NextResponse.json({ triggered: true, jobId })
} catch (error) {
logger.error('Failed to dispatch soft-delete cleanup job:', { error })
return NextResponse.json({ error: 'Failed to dispatch soft-delete cleanup' }, { status: 500 })
}
}
25 changes: 25 additions & 0 deletions apps/sim/app/api/cron/cleanup-tasks/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue } from '@/lib/core/async-jobs'

export const dynamic = 'force-dynamic'

const logger = createLogger('TaskCleanupAPI')

export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'task cleanup')
if (authError) return authError

const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('cleanup-tasks', {})

logger.info('Task cleanup job dispatched', { jobId })

return NextResponse.json({ triggered: true, jobId })
} catch (error) {
logger.error('Failed to dispatch task cleanup job:', { error })
return NextResponse.json({ error: 'Failed to dispatch task cleanup' }, { status: 500 })
}
}
28 changes: 28 additions & 0 deletions apps/sim/app/api/cron/redact-task-context/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue } from '@/lib/core/async-jobs'

export const dynamic = 'force-dynamic'

const logger = createLogger('TaskRedactionAPI')

export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'task context redaction')
if (authError) return authError

const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('redact-task-context', {})

logger.info('Task context redaction job dispatched', { jobId })

return NextResponse.json({ triggered: true, jobId })
} catch (error) {
logger.error('Failed to dispatch task context redaction job:', { error })
return NextResponse.json(
{ error: 'Failed to dispatch task context redaction' },
{ status: 500 }
)
}
}
192 changes: 8 additions & 184 deletions apps/sim/app/api/logs/cleanup/route.ts
Original file line number Diff line number Diff line change
@@ -1,201 +1,25 @@
import { db } from '@sim/db'
import { subscription, workflowExecutionLogs, workspace } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, lt } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { sqlIsPaid } from '@/lib/billing/plan-helpers'
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
import { env } from '@/lib/core/config/env'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
import { getJobQueue } from '@/lib/core/async-jobs'

export const dynamic = 'force-dynamic'

const logger = createLogger('LogsCleanupAPI')

const BATCH_SIZE = 2000

export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'logs cleanup')
if (authError) {
return authError
}

const retentionDate = new Date()
retentionDate.setDate(retentionDate.getDate() - Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7'))

const freeWorkspacesSubquery = db
.select({ id: workspace.id })
.from(workspace)
.leftJoin(
subscription,
and(
eq(subscription.referenceId, workspace.billedAccountUserId),
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES),
sqlIsPaid(subscription.plan)
)
)
.where(isNull(subscription.id))

const results = {
enhancedLogs: {
total: 0,
archived: 0,
archiveFailed: 0,
deleted: 0,
deleteFailed: 0,
},
files: {
total: 0,
deleted: 0,
deleteFailed: 0,
},
snapshots: {
cleaned: 0,
cleanupFailed: 0,
},
}

const startTime = Date.now()
const MAX_BATCHES = 10

let batchesProcessed = 0
let hasMoreLogs = true

logger.info('Starting enhanced logs cleanup for free-plan workspaces')

while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
const oldEnhancedLogs = await db
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
level: workflowExecutionLogs.level,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
executionData: workflowExecutionLogs.executionData,
cost: workflowExecutionLogs.cost,
files: workflowExecutionLogs.files,
createdAt: workflowExecutionLogs.createdAt,
})
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, freeWorkspacesSubquery),
lt(workflowExecutionLogs.startedAt, retentionDate)
)
)
.limit(BATCH_SIZE)

results.enhancedLogs.total += oldEnhancedLogs.length

for (const log of oldEnhancedLogs) {
const today = new Date().toISOString().split('T')[0]

const enhancedLogKey = `logs/archived/${today}/${log.id}.json`
const enhancedLogData = JSON.stringify({
...log,
archivedAt: new Date().toISOString(),
logType: 'enhanced',
})

try {
await StorageService.uploadFile({
file: Buffer.from(enhancedLogData),
fileName: enhancedLogKey,
contentType: 'application/json',
context: 'logs',
preserveKey: true,
customKey: enhancedLogKey,
metadata: {
logId: String(log.id),
workflowId: String(log.workflowId ?? ''),
executionId: String(log.executionId),
logType: 'enhanced',
archivedAt: new Date().toISOString(),
},
})

results.enhancedLogs.archived++

if (isUsingCloudStorage() && log.files && Array.isArray(log.files)) {
for (const file of log.files) {
if (file && typeof file === 'object' && file.key) {
results.files.total++
try {
await StorageService.deleteFile({
key: file.key,
context: 'execution',
})
results.files.deleted++

// Also delete from workspace_files table
const { deleteFileMetadata } = await import('@/lib/uploads/server/metadata')
await deleteFileMetadata(file.key)

logger.info(`Deleted execution file: ${file.key}`)
} catch (fileError) {
results.files.deleteFailed++
logger.error(`Failed to delete file ${file.key}:`, { fileError })
}
}
}
}

try {
const deleteResult = await db
.delete(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.id, log.id))
.returning({ id: workflowExecutionLogs.id })

if (deleteResult.length > 0) {
results.enhancedLogs.deleted++
} else {
results.enhancedLogs.deleteFailed++
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
}
} catch (deleteError) {
results.enhancedLogs.deleteFailed++
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
}
} catch (archiveError) {
results.enhancedLogs.archiveFailed++
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
}
}

batchesProcessed++
hasMoreLogs = oldEnhancedLogs.length === BATCH_SIZE

logger.info(`Processed logs batch ${batchesProcessed}: ${oldEnhancedLogs.length} logs`)
}
if (authError) return authError

try {
const snapshotRetentionDays = Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7') + 1 // Keep snapshots 1 day longer
const cleanedSnapshots = await snapshotService.cleanupOrphanedSnapshots(snapshotRetentionDays)
results.snapshots.cleaned = cleanedSnapshots
logger.info(`Cleaned up ${cleanedSnapshots} orphaned snapshots`)
} catch (snapshotError) {
results.snapshots.cleanupFailed = 1
logger.error('Error cleaning up orphaned snapshots:', { snapshotError })
}
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('cleanup-logs', {})

const timeElapsed = (Date.now() - startTime) / 1000
const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs
logger.info('Log cleanup job dispatched', { jobId })

return NextResponse.json({
message: `Processed ${batchesProcessed} enhanced log batches (${results.enhancedLogs.total} logs, ${results.files.total} files) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
results,
complete: !hasMoreLogs,
batchLimitReached: reachedLimit,
})
return NextResponse.json({ triggered: true, jobId })
} catch (error) {
logger.error('Error in log cleanup process:', { error })
return NextResponse.json({ error: 'Failed to process log cleanup' }, { status: 500 })
logger.error('Failed to dispatch log cleanup job:', { error })
return NextResponse.json({ error: 'Failed to dispatch log cleanup' }, { status: 500 })
}
}
Loading
Loading