From 5748fbaccf455732b421af8fd9e7bafc9d47255b Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 13:10:11 +0100 Subject: [PATCH 1/6] feat: script implementation --- .../script_executor_worker/src/activities.ts | 14 ++ .../src/activities/cleanup/segments-agg.ts | 190 +++++++++++++++ .../apps/script_executor_worker/src/main.ts | 19 +- .../src/schedules/scheduleCleanup.ts | 124 ++++++++++ .../apps/script_executor_worker/src/types.ts | 1 + .../script_executor_worker/src/workflows.ts | 6 + .../src/workflows/cleanup/segments-agg.ts | 223 ++++++++++++++++++ 7 files changed, 573 insertions(+), 4 deletions(-) create mode 100644 services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts create mode 100644 services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index a8fe72ae3e..dee36f7bec 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -10,6 +10,14 @@ import { moveMemberActivityRelations, } from './activities/cleanup/duplicate-members' import { deleteMember, getMembersToCleanup, syncRemoveMember } from './activities/cleanup/member' +import { + deleteOrphanMembersSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, + getOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + startOrphanCleanupRun, + updateOrphanCleanupRun, +} from './activities/cleanup/segments-agg' import { deleteOrganization, getOrganizationsToCleanup, @@ -86,4 +94,10 @@ export { markMemberForAffiliationRecalc, getMembersForAffiliationRecalc, calculateMemberAffiliations, + startOrphanCleanupRun, + updateOrphanCleanupRun, + getOrphanMembersSegmentsAgg, + deleteOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, } diff --git a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts new file mode 100644 index 0000000000..35a842dedf --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts @@ -0,0 +1,190 @@ +import { svc } from '../../main' + +interface OrphanCleanupRun { + id?: string + tableName: string + startedAt: Date + completedAt?: Date + status: 'running' | 'completed' | 'failed' + orphansFound: number + orphansDeleted: number + executionTimeMs?: number + errorMessage?: string +} + +export async function startOrphanCleanupRun(tableName: string): Promise { + try { + // First, mark any stale runs (running for more than 2 hours) as failed + await svc.postgres.writer.connection().none( + ` + UPDATE "orphanCleanupRuns" + SET + "status" = 'failed', + "completedAt" = NOW(), + "errorMessage" = 'Cleanup run timed out or worker crashed', + "executionTimeMs" = EXTRACT(EPOCH FROM (NOW() - "startedAt")) * 1000 + WHERE "tableName" = $1 + AND "status" = 'running' + AND "startedAt" < NOW() - INTERVAL '2 hours' + `, + [tableName], + ) + + // Check if there's already a running cleanup for this table + const existingRun = await svc.postgres.reader.connection().oneOrNone( + ` + SELECT id, "startedAt" + FROM "orphanCleanupRuns" + WHERE "tableName" = $1 + AND "status" = 'running' + LIMIT 1 + `, + [tableName], + ) + + if (existingRun) { + svc.log.warn( + { tableName, existingRunId: existingRun.id }, + 'Found existing running cleanup, reusing it', + ) + return existingRun.id + } + + // Create new cleanup run + const result = await svc.postgres.writer.connection().one( + ` + INSERT INTO "orphanCleanupRuns" ( + "tableName", + "startedAt", + "status", + "orphansFound", + "orphansDeleted" + ) VALUES ($1, NOW(), 'running', 0, 0) + RETURNING id + `, + [tableName], + ) + return result.id + } catch (error) { + svc.log.error(error, 'Error starting orphan cleanup run!') + throw error + } +} + +export async function updateOrphanCleanupRun( + runId: string, + updates: Partial, +): Promise { + try { + const setClauses: string[] = [] + const values: any[] = [] + let paramIndex = 1 + + if (updates.completedAt !== undefined) { + setClauses.push(`"completedAt" = $${paramIndex++}`) + values.push(updates.completedAt) + } + if (updates.status !== undefined) { + setClauses.push(`"status" = $${paramIndex++}`) + values.push(updates.status) + } + if (updates.orphansFound !== undefined) { + setClauses.push(`"orphansFound" = $${paramIndex++}`) + values.push(updates.orphansFound) + } + if (updates.orphansDeleted !== undefined) { + setClauses.push(`"orphansDeleted" = $${paramIndex++}`) + values.push(updates.orphansDeleted) + } + if (updates.executionTimeMs !== undefined) { + setClauses.push(`"executionTimeMs" = $${paramIndex++}`) + values.push(updates.executionTimeMs) + } + if (updates.errorMessage !== undefined) { + setClauses.push(`"errorMessage" = $${paramIndex++}`) + values.push(updates.errorMessage) + } + + values.push(runId) + + await svc.postgres.writer.connection().none( + ` + UPDATE "orphanCleanupRuns" + SET ${setClauses.join(', ')} + WHERE id = $${paramIndex} + `, + values, + ) + } catch (error) { + svc.log.error(error, 'Error updating orphan cleanup run!') + throw error + } +} + +export async function getOrphanMembersSegmentsAgg(batchSize: number): Promise { + try { + const result = await svc.postgres.reader.connection().any( + ` + SELECT msa."memberId" + FROM "memberSegmentsAgg" msa + LEFT JOIN members m ON msa."memberId" = m.id + WHERE m.id IS NULL + LIMIT $1 + `, + [batchSize], + ) + return result.map((r) => r.memberId) + } catch (error) { + svc.log.error(error, 'Error getting orphan memberSegmentsAgg records!') + throw error + } +} + +export async function deleteOrphanMembersSegmentsAgg(memberId: string): Promise { + try { + await svc.postgres.writer.connection().none( + ` + DELETE FROM "memberSegmentsAgg" + WHERE "memberId" = $1 + `, + [memberId], + ) + } catch (error) { + svc.log.error(error, 'Error deleting orphan memberSegmentsAgg record!') + throw error + } +} + +export async function getOrphanOrganizationSegmentsAgg(batchSize: number): Promise { + try { + const result = await svc.postgres.reader.connection().any( + ` + SELECT osa."organizationId" + FROM "organizationSegmentsAgg" osa + LEFT JOIN organizations o ON osa."organizationId" = o.id + WHERE o.id IS NULL + LIMIT $1 + `, + [batchSize], + ) + return result.map((r) => r.organizationId) + } catch (error) { + svc.log.error(error, 'Error getting orphan organizationSegmentsAgg records!') + throw error + } +} + +export async function deleteOrphanOrganizationSegmentsAgg(organizationId: string): Promise { + try { + await svc.postgres.writer.connection().none( + ` + DELETE FROM "organizationSegmentsAgg" + WHERE "organizationId" = $1 + `, + [organizationId], + ) + } catch (error) { + svc.log.error(error, 'Error deleting orphan organizationSegmentsAgg record!') + throw error + } +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index aad2b6f3c9..78eb49e1f5 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -1,10 +1,15 @@ import { Config } from '@crowd/archetype-standard' import { Options, ServiceWorker } from '@crowd/archetype-worker' -import { scheduleMembersCleanup, scheduleOrganizationsCleanup } from './schedules/scheduleCleanup' +import { + scheduleMemberSegmentsAggCleanup, + scheduleMembersCleanup, + scheduleOrganizationSegmentAggCleanup, + scheduleOrganizationsCleanup, +} from './schedules/scheduleCleanup' const config: Config = { - envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], + // envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], producer: { enabled: false, }, @@ -33,8 +38,14 @@ export const svc = new ServiceWorker(config, options) setImmediate(async () => { await svc.init() - await scheduleMembersCleanup() - await scheduleOrganizationsCleanup() + console.log('🔄 Registering schedules...') + + // await scheduleMembersCleanup() + // await scheduleOrganizationsCleanup() + await scheduleMemberSegmentsAggCleanup() + await scheduleOrganizationSegmentAggCleanup() + + console.log('✅ Schedules registered, starting worker...') await svc.start() }) diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts index d09e9f136e..9756aa4ef8 100644 --- a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts +++ b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts @@ -3,9 +3,22 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien import { svc } from '../main' import { cleanupMembers } from '../workflows/cleanup/members' import { cleanupOrganizations } from '../workflows/cleanup/organizations' +import { + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, +} from '../workflows/cleanup/segments-agg' export const scheduleMembersCleanup = async () => { try { + // Try to delete existing schedule first to ensure fresh config + try { + const handle = svc.temporal.schedule.getHandle('cleanupMembers') + await handle.delete() + svc.log.info('Deleted existing cleanupMembers schedule to recreate with new config') + } catch (err) { + // Schedule doesn't exist, that's fine + } + await svc.temporal.schedule.create({ scheduleId: 'cleanupMembers', spec: { @@ -43,6 +56,15 @@ export const scheduleMembersCleanup = async () => { export const scheduleOrganizationsCleanup = async () => { try { + // Try to delete existing schedule first to ensure fresh config + try { + const handle = svc.temporal.schedule.getHandle('cleanupOrganizations') + await handle.delete() + svc.log.info('Deleted existing cleanupOrganizations schedule to recreate with new config') + } catch (err) { + // Schedule doesn't exist, that's fine + } + await svc.temporal.schedule.create({ scheduleId: 'cleanupOrganizations', spec: { @@ -77,3 +99,105 @@ export const scheduleOrganizationsCleanup = async () => { } } } + +export const scheduleMemberSegmentsAggCleanup = async () => { + try { + svc.log.info('Creating schedule for member segments agg cleanup...') + + // Try to delete existing schedule first to ensure fresh config + try { + const handle = svc.temporal.schedule.getHandle('cleanupMemberSegmentsAgg') + await handle.delete() + svc.log.info('Deleted existing cleanupMemberSegmentsAgg schedule to recreate with new config') + } catch (err) { + // Schedule doesn't exist, that's fine + } + + await svc.temporal.schedule.create({ + scheduleId: 'cleanupMemberSegmentsAgg', + spec: { + // Run every minute for testing - change to '0 3 * * *' for daily at 3 AM + cronExpressions: ['* * * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.BUFFER_ONE, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: cleanupMemberSegmentsAgg, + taskQueue: 'script-executor', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [ + { + batchSize: 500, + }, + ], + }, + }) + svc.log.info('Schedule for member segments agg cleanup created successfully!') + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule cleanupMemberSegmentsAgg already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Error creating schedule for member segments agg cleanup') + throw new Error(err) + } + } +} + +export const scheduleOrganizationSegmentAggCleanup = async () => { + try { + svc.log.info('Creating schedule for organization segment agg cleanup...') + + // Try to delete existing schedule first to ensure fresh config + try { + const handle = svc.temporal.schedule.getHandle('cleanupOrganizationSegmentAgg') + await handle.delete() + svc.log.info('Deleted existing cleanupOrganizationSegmentAgg schedule to recreate with new config') + } catch (err) { + // Schedule doesn't exist, that's fine + } + + await svc.temporal.schedule.create({ + scheduleId: 'cleanupOrganizationSegmentAgg', + spec: { + // Run every minute for testing - change to '30 3 * * *' for daily at 3:30 AM + cronExpressions: ['* * * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.BUFFER_ONE, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: cleanupOrganizationSegmentAgg, + taskQueue: 'script-executor', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [ + { + batchSize: 500, + }, + ], + }, + }) + svc.log.info('Schedule for organization segment agg cleanup created successfully!') + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule cleanupOrganizationSegmentAgg already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Error creating schedule for organization segment agg cleanup') + throw new Error(err) + } + } +} diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 2465f4dd17..6a12517196 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -27,6 +27,7 @@ export interface IPopulateActivityRelationsArgs { export interface IScriptBatchTestArgs { batchSize?: number testRun?: boolean + cleanupRunId?: string } export interface IFixActivityForiegnKeysArgs extends IScriptBatchTestArgs { diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index a7e9307e64..5a14a2a01e 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,6 +2,10 @@ import { blockProjectOrganizationAffiliations } from './workflows/block-project- import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members' import { cleanupMembers } from './workflows/cleanup/members' import { cleanupOrganizations } from './workflows/cleanup/organizations' +import { + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, +} from './workflows/cleanup/segments-agg' import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' @@ -17,6 +21,8 @@ export { fixOrgIdentitiesWithWrongUrls, cleanupMembers, cleanupOrganizations, + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, processLLMVerifiedMerges, cleanupDuplicateMembers, fixBotMembersAffiliation, diff --git a/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts new file mode 100644 index 0000000000..75a98db5b6 --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts @@ -0,0 +1,223 @@ +import { continueAsNew, proxyActivities } from '@temporalio/workflow' + +import * as activities from '../../activities' +import { IScriptBatchTestArgs } from '../../types' +import { chunkArray } from '../../utils/common' + +const { + startOrphanCleanupRun, + updateOrphanCleanupRun, + getOrphanMembersSegmentsAgg, + deleteOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, +} = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + const TABLE_NAME = 'memberSegmentsAgg' + + let runId: string | undefined + let totalOrphansFound = 0 + let totalOrphansDeleted = 0 + const startTime = Date.now() + + try { + // Initialize the cleanup run only on the first iteration + if (!args.cleanupRunId) { + runId = await startOrphanCleanupRun(TABLE_NAME) + console.log(`Started cleanup run for ${TABLE_NAME} with ID: ${runId}`) + } else { + runId = args.cleanupRunId + } + + // Get orphaned records + const orphanIds = await getOrphanMembersSegmentsAgg(BATCH_SIZE) + + if (orphanIds.length === 0) { + console.log(`No more orphan ${TABLE_NAME} records to cleanup!`) + + // Update the cleanup run as completed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + + console.log(`Found ${orphanIds.length} orphan ${TABLE_NAME} records`) + totalOrphansFound += orphanIds.length + + // Process orphans in chunks + const CHUNK_SIZE = 25 + let deletedCount = 0 + + for (const chunk of chunkArray(orphanIds, CHUNK_SIZE)) { + const deleteTasks = chunk.map(async (id) => { + await deleteOrphanMembersSegmentsAgg(id) + deletedCount++ + }) + + await Promise.all(deleteTasks).catch((err) => { + console.error(`Error cleaning up orphan ${TABLE_NAME} records!`, err) + throw err + }) + } + + totalOrphansDeleted += deletedCount + console.log(`Deleted ${deletedCount} orphan ${TABLE_NAME} records in this batch`) + + // Update the cleanup run with current progress + if (runId) { + await updateOrphanCleanupRun(runId, { + orphansFound: totalOrphansFound, + orphansDeleted: totalOrphansDeleted, + }) + } + + if (args.testRun) { + console.log(`Test run completed for ${TABLE_NAME} - stopping after first batch!`) + + // Mark as completed for test runs + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + + // Continue as new for the next batch + await continueAsNew({ + ...args, + cleanupRunId: runId, + }) + } catch (error) { + console.error(`Error during ${TABLE_NAME} cleanup workflow!`, error) + + // Update the cleanup run as failed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'failed', + errorMessage: error.message || String(error), + executionTimeMs: Date.now() - startTime, + }) + } + + throw error + } +} + +export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + const TABLE_NAME = 'organizationSegmentsAgg' + + let runId: string | undefined + let totalOrphansFound = 0 + let totalOrphansDeleted = 0 + const startTime = Date.now() + + try { + // Initialize the cleanup run only on the first iteration + if (!args.cleanupRunId) { + runId = await startOrphanCleanupRun(TABLE_NAME) + console.log(`Started cleanup run for ${TABLE_NAME} with ID: ${runId}`) + } else { + runId = args.cleanupRunId + } + + // Get orphaned records + const orphanIds = await getOrphanOrganizationSegmentsAgg(BATCH_SIZE) + + if (orphanIds.length === 0) { + console.log(`No more orphan ${TABLE_NAME} records to cleanup!`) + + // Update the cleanup run as completed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + + console.log(`Found ${orphanIds.length} orphan ${TABLE_NAME} records`) + totalOrphansFound += orphanIds.length + + // Process orphans in chunks + const CHUNK_SIZE = 25 + let deletedCount = 0 + + for (const chunk of chunkArray(orphanIds, CHUNK_SIZE)) { + const deleteTasks = chunk.map(async (id) => { + await deleteOrphanOrganizationSegmentsAgg(id) + deletedCount++ + }) + + await Promise.all(deleteTasks).catch((err) => { + console.error(`Error cleaning up orphan ${TABLE_NAME} records!`, err) + throw err + }) + } + + totalOrphansDeleted += deletedCount + console.log(`Deleted ${deletedCount} orphan ${TABLE_NAME} records in this batch`) + + // Update the cleanup run with current progress + if (runId) { + await updateOrphanCleanupRun(runId, { + orphansFound: totalOrphansFound, + orphansDeleted: totalOrphansDeleted, + }) + } + + if (args.testRun) { + console.log(`Test run completed for ${TABLE_NAME} - stopping after first batch!`) + + // Mark as completed for test runs + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + + // Continue as new for the next batch + await continueAsNew({ + ...args, + cleanupRunId: runId, + }) + } catch (error) { + console.error(`Error during ${TABLE_NAME} cleanup workflow!`, error) + + // Update the cleanup run as failed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'failed', + errorMessage: error.message || String(error), + executionTimeMs: Date.now() - startTime, + }) + } + + throw error + } +} From 232a4cd0e8f4ea4e1761e01d0c709e21153d05cc Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 13:34:27 +0100 Subject: [PATCH 2/6] feat: uncomment other schedules --- services/apps/script_executor_worker/src/main.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index 78eb49e1f5..9047acaf13 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -9,7 +9,7 @@ import { } from './schedules/scheduleCleanup' const config: Config = { - // envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], + envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], producer: { enabled: false, }, @@ -38,14 +38,10 @@ export const svc = new ServiceWorker(config, options) setImmediate(async () => { await svc.init() - console.log('🔄 Registering schedules...') - - // await scheduleMembersCleanup() - // await scheduleOrganizationsCleanup() + await scheduleMembersCleanup() + await scheduleOrganizationsCleanup() await scheduleMemberSegmentsAggCleanup() await scheduleOrganizationSegmentAggCleanup() - console.log('✅ Schedules registered, starting worker...') - await svc.start() }) From ac9ae5cd8fdf72af982b08f2abf093ebceca0f41 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 13:46:48 +0100 Subject: [PATCH 3/6] fix: lint --- .../script_executor_worker/src/activities.ts | 12 +++++------ .../src/schedules/scheduleCleanup.ts | 20 ++++++++++--------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index dee36f7bec..3962faa836 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -10,6 +10,12 @@ import { moveMemberActivityRelations, } from './activities/cleanup/duplicate-members' import { deleteMember, getMembersToCleanup, syncRemoveMember } from './activities/cleanup/member' +import { + deleteOrganization, + getOrganizationsToCleanup, + queueOrgForAggComputation, + syncRemoveOrganization, +} from './activities/cleanup/organization' import { deleteOrphanMembersSegmentsAgg, deleteOrphanOrganizationSegmentsAgg, @@ -18,12 +24,6 @@ import { startOrphanCleanupRun, updateOrphanCleanupRun, } from './activities/cleanup/segments-agg' -import { - deleteOrganization, - getOrganizationsToCleanup, - queueOrgForAggComputation, - syncRemoveOrganization, -} from './activities/cleanup/organization' import { calculateMemberAffiliations, getWorkflowsCount, diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts index 9756aa4ef8..6ad7206035 100644 --- a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts +++ b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts @@ -103,7 +103,7 @@ export const scheduleOrganizationsCleanup = async () => { export const scheduleMemberSegmentsAggCleanup = async () => { try { svc.log.info('Creating schedule for member segments agg cleanup...') - + // Try to delete existing schedule first to ensure fresh config try { const handle = svc.temporal.schedule.getHandle('cleanupMemberSegmentsAgg') @@ -112,12 +112,12 @@ export const scheduleMemberSegmentsAggCleanup = async () => { } catch (err) { // Schedule doesn't exist, that's fine } - + await svc.temporal.schedule.create({ scheduleId: 'cleanupMemberSegmentsAgg', spec: { - // Run every minute for testing - change to '0 3 * * *' for daily at 3 AM - cronExpressions: ['* * * * *'], + // Run every 5 minutes + cronExpressions: ['*/5 * * * *'], }, policies: { overlap: ScheduleOverlapPolicy.BUFFER_ONE, @@ -154,21 +154,23 @@ export const scheduleMemberSegmentsAggCleanup = async () => { export const scheduleOrganizationSegmentAggCleanup = async () => { try { svc.log.info('Creating schedule for organization segment agg cleanup...') - + // Try to delete existing schedule first to ensure fresh config try { const handle = svc.temporal.schedule.getHandle('cleanupOrganizationSegmentAgg') await handle.delete() - svc.log.info('Deleted existing cleanupOrganizationSegmentAgg schedule to recreate with new config') + svc.log.info( + 'Deleted existing cleanupOrganizationSegmentAgg schedule to recreate with new config', + ) } catch (err) { // Schedule doesn't exist, that's fine } - + await svc.temporal.schedule.create({ scheduleId: 'cleanupOrganizationSegmentAgg', spec: { - // Run every minute for testing - change to '30 3 * * *' for daily at 3:30 AM - cronExpressions: ['* * * * *'], + // Run every 5 minutes + cronExpressions: ['*/5 * * * *'], }, policies: { overlap: ScheduleOverlapPolicy.BUFFER_ONE, From 3e4c17577ec9c650e610fa0be611024c05c2cd17 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 13:56:24 +0100 Subject: [PATCH 4/6] fix: lint --- .../src/activities/cleanup/segments-agg.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts index 35a842dedf..8b22f6f4e0 100644 --- a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts +++ b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts @@ -77,7 +77,7 @@ export async function updateOrphanCleanupRun( ): Promise { try { const setClauses: string[] = [] - const values: any[] = [] + const values: unknown[] = [] let paramIndex = 1 if (updates.completedAt !== undefined) { From 31a9ea7d6ec3829e54b593b5bfce304cebe42921 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 16:25:51 +0100 Subject: [PATCH 5/6] feat: simplify code --- .../src/activities/cleanup/segments-agg.ts | 163 +++--------------- .../src/schedules/scheduleCleanup.ts | 42 ----- .../src/workflows/cleanup/segments-agg.ts | 62 +------ .../data-access-layer/src/members/index.ts | 1 + .../src/members/segmentsAgg.ts | 32 ++++ .../src/organizations/index.ts | 1 + .../src/organizations/segmentsAgg.ts | 32 ++++ .../src/orphanCleanupRuns/base.ts | 98 +++++++++++ .../src/orphanCleanupRuns/index.ts | 1 + 9 files changed, 196 insertions(+), 236 deletions(-) create mode 100644 services/libs/data-access-layer/src/members/segmentsAgg.ts create mode 100644 services/libs/data-access-layer/src/organizations/segmentsAgg.ts create mode 100644 services/libs/data-access-layer/src/orphanCleanupRuns/base.ts create mode 100644 services/libs/data-access-layer/src/orphanCleanupRuns/index.ts diff --git a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts index 8b22f6f4e0..bf734bc84e 100644 --- a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts +++ b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts @@ -1,70 +1,23 @@ -import { svc } from '../../main' +import { + deleteOrphanMemberSegmentsAgg as deleteMemberSegmentsAgg, + getOrphanMemberSegmentsAgg as getMemberSegmentsAgg, +} from '@crowd/data-access-layer/src/members/segmentsAgg' +import { + deleteOrphanOrganizationSegmentsAgg as deleteOrganizationSegmentsAgg, + getOrphanOrganizationSegmentsAgg as getOrganizationSegmentsAgg, +} from '@crowd/data-access-layer/src/organizations/segmentsAgg' +import { + IOrphanCleanupRun, + startOrphanCleanupRun as startCleanupRun, + updateOrphanCleanupRun as updateCleanupRun, +} from '@crowd/data-access-layer/src/orphanCleanupRuns' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' -interface OrphanCleanupRun { - id?: string - tableName: string - startedAt: Date - completedAt?: Date - status: 'running' | 'completed' | 'failed' - orphansFound: number - orphansDeleted: number - executionTimeMs?: number - errorMessage?: string -} +import { svc } from '../../main' -export async function startOrphanCleanupRun(tableName: string): Promise { +export async function startOrphanCleanupRun(aggregateName: string): Promise { try { - // First, mark any stale runs (running for more than 2 hours) as failed - await svc.postgres.writer.connection().none( - ` - UPDATE "orphanCleanupRuns" - SET - "status" = 'failed', - "completedAt" = NOW(), - "errorMessage" = 'Cleanup run timed out or worker crashed', - "executionTimeMs" = EXTRACT(EPOCH FROM (NOW() - "startedAt")) * 1000 - WHERE "tableName" = $1 - AND "status" = 'running' - AND "startedAt" < NOW() - INTERVAL '2 hours' - `, - [tableName], - ) - - // Check if there's already a running cleanup for this table - const existingRun = await svc.postgres.reader.connection().oneOrNone( - ` - SELECT id, "startedAt" - FROM "orphanCleanupRuns" - WHERE "tableName" = $1 - AND "status" = 'running' - LIMIT 1 - `, - [tableName], - ) - - if (existingRun) { - svc.log.warn( - { tableName, existingRunId: existingRun.id }, - 'Found existing running cleanup, reusing it', - ) - return existingRun.id - } - - // Create new cleanup run - const result = await svc.postgres.writer.connection().one( - ` - INSERT INTO "orphanCleanupRuns" ( - "tableName", - "startedAt", - "status", - "orphansFound", - "orphansDeleted" - ) VALUES ($1, NOW(), 'running', 0, 0) - RETURNING id - `, - [tableName], - ) - return result.id + return startCleanupRun(dbStoreQx(svc.postgres.writer), aggregateName) } catch (error) { svc.log.error(error, 'Error starting orphan cleanup run!') throw error @@ -73,48 +26,10 @@ export async function startOrphanCleanupRun(tableName: string): Promise export async function updateOrphanCleanupRun( runId: string, - updates: Partial, + updates: Partial, ): Promise { try { - const setClauses: string[] = [] - const values: unknown[] = [] - let paramIndex = 1 - - if (updates.completedAt !== undefined) { - setClauses.push(`"completedAt" = $${paramIndex++}`) - values.push(updates.completedAt) - } - if (updates.status !== undefined) { - setClauses.push(`"status" = $${paramIndex++}`) - values.push(updates.status) - } - if (updates.orphansFound !== undefined) { - setClauses.push(`"orphansFound" = $${paramIndex++}`) - values.push(updates.orphansFound) - } - if (updates.orphansDeleted !== undefined) { - setClauses.push(`"orphansDeleted" = $${paramIndex++}`) - values.push(updates.orphansDeleted) - } - if (updates.executionTimeMs !== undefined) { - setClauses.push(`"executionTimeMs" = $${paramIndex++}`) - values.push(updates.executionTimeMs) - } - if (updates.errorMessage !== undefined) { - setClauses.push(`"errorMessage" = $${paramIndex++}`) - values.push(updates.errorMessage) - } - - values.push(runId) - - await svc.postgres.writer.connection().none( - ` - UPDATE "orphanCleanupRuns" - SET ${setClauses.join(', ')} - WHERE id = $${paramIndex} - `, - values, - ) + return updateCleanupRun(dbStoreQx(svc.postgres.writer), runId, updates) } catch (error) { svc.log.error(error, 'Error updating orphan cleanup run!') throw error @@ -123,17 +38,7 @@ export async function updateOrphanCleanupRun( export async function getOrphanMembersSegmentsAgg(batchSize: number): Promise { try { - const result = await svc.postgres.reader.connection().any( - ` - SELECT msa."memberId" - FROM "memberSegmentsAgg" msa - LEFT JOIN members m ON msa."memberId" = m.id - WHERE m.id IS NULL - LIMIT $1 - `, - [batchSize], - ) - return result.map((r) => r.memberId) + return getMemberSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize) } catch (error) { svc.log.error(error, 'Error getting orphan memberSegmentsAgg records!') throw error @@ -142,13 +47,7 @@ export async function getOrphanMembersSegmentsAgg(batchSize: number): Promise { try { - await svc.postgres.writer.connection().none( - ` - DELETE FROM "memberSegmentsAgg" - WHERE "memberId" = $1 - `, - [memberId], - ) + return deleteMemberSegmentsAgg(dbStoreQx(svc.postgres.writer), memberId) } catch (error) { svc.log.error(error, 'Error deleting orphan memberSegmentsAgg record!') throw error @@ -157,17 +56,7 @@ export async function deleteOrphanMembersSegmentsAgg(memberId: string): Promise< export async function getOrphanOrganizationSegmentsAgg(batchSize: number): Promise { try { - const result = await svc.postgres.reader.connection().any( - ` - SELECT osa."organizationId" - FROM "organizationSegmentsAgg" osa - LEFT JOIN organizations o ON osa."organizationId" = o.id - WHERE o.id IS NULL - LIMIT $1 - `, - [batchSize], - ) - return result.map((r) => r.organizationId) + return getOrganizationSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize) } catch (error) { svc.log.error(error, 'Error getting orphan organizationSegmentsAgg records!') throw error @@ -176,13 +65,7 @@ export async function getOrphanOrganizationSegmentsAgg(batchSize: number): Promi export async function deleteOrphanOrganizationSegmentsAgg(organizationId: string): Promise { try { - await svc.postgres.writer.connection().none( - ` - DELETE FROM "organizationSegmentsAgg" - WHERE "organizationId" = $1 - `, - [organizationId], - ) + return deleteOrganizationSegmentsAgg(dbStoreQx(svc.postgres.writer), organizationId) } catch (error) { svc.log.error(error, 'Error deleting orphan organizationSegmentsAgg record!') throw error diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts index 6ad7206035..d4918d6023 100644 --- a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts +++ b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts @@ -10,15 +10,6 @@ import { export const scheduleMembersCleanup = async () => { try { - // Try to delete existing schedule first to ensure fresh config - try { - const handle = svc.temporal.schedule.getHandle('cleanupMembers') - await handle.delete() - svc.log.info('Deleted existing cleanupMembers schedule to recreate with new config') - } catch (err) { - // Schedule doesn't exist, that's fine - } - await svc.temporal.schedule.create({ scheduleId: 'cleanupMembers', spec: { @@ -56,15 +47,6 @@ export const scheduleMembersCleanup = async () => { export const scheduleOrganizationsCleanup = async () => { try { - // Try to delete existing schedule first to ensure fresh config - try { - const handle = svc.temporal.schedule.getHandle('cleanupOrganizations') - await handle.delete() - svc.log.info('Deleted existing cleanupOrganizations schedule to recreate with new config') - } catch (err) { - // Schedule doesn't exist, that's fine - } - await svc.temporal.schedule.create({ scheduleId: 'cleanupOrganizations', spec: { @@ -102,17 +84,6 @@ export const scheduleOrganizationsCleanup = async () => { export const scheduleMemberSegmentsAggCleanup = async () => { try { - svc.log.info('Creating schedule for member segments agg cleanup...') - - // Try to delete existing schedule first to ensure fresh config - try { - const handle = svc.temporal.schedule.getHandle('cleanupMemberSegmentsAgg') - await handle.delete() - svc.log.info('Deleted existing cleanupMemberSegmentsAgg schedule to recreate with new config') - } catch (err) { - // Schedule doesn't exist, that's fine - } - await svc.temporal.schedule.create({ scheduleId: 'cleanupMemberSegmentsAgg', spec: { @@ -153,19 +124,6 @@ export const scheduleMemberSegmentsAggCleanup = async () => { export const scheduleOrganizationSegmentAggCleanup = async () => { try { - svc.log.info('Creating schedule for organization segment agg cleanup...') - - // Try to delete existing schedule first to ensure fresh config - try { - const handle = svc.temporal.schedule.getHandle('cleanupOrganizationSegmentAgg') - await handle.delete() - svc.log.info( - 'Deleted existing cleanupOrganizationSegmentAgg schedule to recreate with new config', - ) - } catch (err) { - // Schedule doesn't exist, that's fine - } - await svc.temporal.schedule.create({ scheduleId: 'cleanupOrganizationSegmentAgg', spec: { diff --git a/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts index 75a98db5b6..2e0c05a68d 100644 --- a/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts +++ b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts @@ -18,7 +18,7 @@ const { export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Promise { const BATCH_SIZE = args.batchSize ?? 100 - const TABLE_NAME = 'memberSegmentsAgg' + const AGGREGATE_NAME = 'memberSegmentsAgg' let runId: string | undefined let totalOrphansFound = 0 @@ -28,8 +28,8 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom try { // Initialize the cleanup run only on the first iteration if (!args.cleanupRunId) { - runId = await startOrphanCleanupRun(TABLE_NAME) - console.log(`Started cleanup run for ${TABLE_NAME} with ID: ${runId}`) + runId = await startOrphanCleanupRun(AGGREGATE_NAME) + console.log(`Started cleanup run for ${AGGREGATE_NAME} with ID: ${runId}`) } else { runId = args.cleanupRunId } @@ -38,8 +38,6 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom const orphanIds = await getOrphanMembersSegmentsAgg(BATCH_SIZE) if (orphanIds.length === 0) { - console.log(`No more orphan ${TABLE_NAME} records to cleanup!`) - // Update the cleanup run as completed if (runId) { await updateOrphanCleanupRun(runId, { @@ -51,8 +49,6 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom return } - - console.log(`Found ${orphanIds.length} orphan ${TABLE_NAME} records`) totalOrphansFound += orphanIds.length // Process orphans in chunks @@ -66,13 +62,11 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom }) await Promise.all(deleteTasks).catch((err) => { - console.error(`Error cleaning up orphan ${TABLE_NAME} records!`, err) + console.error(`Error cleaning up orphan ${AGGREGATE_NAME} records!`, err) throw err }) } - totalOrphansDeleted += deletedCount - console.log(`Deleted ${deletedCount} orphan ${TABLE_NAME} records in this batch`) // Update the cleanup run with current progress if (runId) { @@ -82,29 +76,12 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom }) } - if (args.testRun) { - console.log(`Test run completed for ${TABLE_NAME} - stopping after first batch!`) - - // Mark as completed for test runs - if (runId) { - await updateOrphanCleanupRun(runId, { - completedAt: new Date(), - status: 'completed', - executionTimeMs: Date.now() - startTime, - }) - } - - return - } - // Continue as new for the next batch await continueAsNew({ ...args, cleanupRunId: runId, }) } catch (error) { - console.error(`Error during ${TABLE_NAME} cleanup workflow!`, error) - // Update the cleanup run as failed if (runId) { await updateOrphanCleanupRun(runId, { @@ -121,7 +98,7 @@ export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Prom export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): Promise { const BATCH_SIZE = args.batchSize ?? 100 - const TABLE_NAME = 'organizationSegmentsAgg' + const AGGREGATE_NAME = 'organizationSegmentsAgg' let runId: string | undefined let totalOrphansFound = 0 @@ -131,8 +108,8 @@ export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): try { // Initialize the cleanup run only on the first iteration if (!args.cleanupRunId) { - runId = await startOrphanCleanupRun(TABLE_NAME) - console.log(`Started cleanup run for ${TABLE_NAME} with ID: ${runId}`) + runId = await startOrphanCleanupRun(AGGREGATE_NAME) + console.log(`Started cleanup run for ${AGGREGATE_NAME} with ID: ${runId}`) } else { runId = args.cleanupRunId } @@ -141,8 +118,6 @@ export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): const orphanIds = await getOrphanOrganizationSegmentsAgg(BATCH_SIZE) if (orphanIds.length === 0) { - console.log(`No more orphan ${TABLE_NAME} records to cleanup!`) - // Update the cleanup run as completed if (runId) { await updateOrphanCleanupRun(runId, { @@ -154,8 +129,6 @@ export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): return } - - console.log(`Found ${orphanIds.length} orphan ${TABLE_NAME} records`) totalOrphansFound += orphanIds.length // Process orphans in chunks @@ -169,14 +142,12 @@ export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): }) await Promise.all(deleteTasks).catch((err) => { - console.error(`Error cleaning up orphan ${TABLE_NAME} records!`, err) + console.error(`Error cleaning up orphan ${AGGREGATE_NAME} records!`, err) throw err }) } totalOrphansDeleted += deletedCount - console.log(`Deleted ${deletedCount} orphan ${TABLE_NAME} records in this batch`) - // Update the cleanup run with current progress if (runId) { await updateOrphanCleanupRun(runId, { @@ -185,29 +156,12 @@ export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): }) } - if (args.testRun) { - console.log(`Test run completed for ${TABLE_NAME} - stopping after first batch!`) - - // Mark as completed for test runs - if (runId) { - await updateOrphanCleanupRun(runId, { - completedAt: new Date(), - status: 'completed', - executionTimeMs: Date.now() - startTime, - }) - } - - return - } - // Continue as new for the next batch await continueAsNew({ ...args, cleanupRunId: runId, }) } catch (error) { - console.error(`Error during ${TABLE_NAME} cleanup workflow!`, error) - // Update the cleanup run as failed if (runId) { await updateOrphanCleanupRun(runId, { diff --git a/services/libs/data-access-layer/src/members/index.ts b/services/libs/data-access-layer/src/members/index.ts index 3d8a649365..346f476bb5 100644 --- a/services/libs/data-access-layer/src/members/index.ts +++ b/services/libs/data-access-layer/src/members/index.ts @@ -2,6 +2,7 @@ export * from './base' export * from './identities' export * from './organizations' export * from './segments' +export * from './segmentsAgg' export * from './others' export * from './attributes' export * from './dashboard' diff --git a/services/libs/data-access-layer/src/members/segmentsAgg.ts b/services/libs/data-access-layer/src/members/segmentsAgg.ts new file mode 100644 index 0000000000..f0ce3616ea --- /dev/null +++ b/services/libs/data-access-layer/src/members/segmentsAgg.ts @@ -0,0 +1,32 @@ +import { QueryExecutor } from '../queryExecutor' + +export async function getOrphanMemberSegmentsAgg( + qx: QueryExecutor, + batchSize: number, +): Promise { + const rows = await qx.select( + ` + SELECT msa."memberId" + FROM "memberSegmentsAgg" msa + LEFT JOIN members m ON msa."memberId" = m.id + WHERE m.id IS NULL + LIMIT $(batchSize) + `, + { batchSize }, + ) + + return rows.map((r) => r.memberId) +} + +export async function deleteOrphanMemberSegmentsAgg( + qx: QueryExecutor, + memberId: string, +): Promise { + await qx.result( + ` + DELETE FROM "memberSegmentsAgg" + WHERE "memberId" = $(memberId) + `, + { memberId }, + ) +} diff --git a/services/libs/data-access-layer/src/organizations/index.ts b/services/libs/data-access-layer/src/organizations/index.ts index a65b8a0422..7c605616a8 100644 --- a/services/libs/data-access-layer/src/organizations/index.ts +++ b/services/libs/data-access-layer/src/organizations/index.ts @@ -3,5 +3,6 @@ export * from './base' export * from './attributes' export * from './identities' export * from './segments' +export * from './segmentsAgg' export * from './utils' export * from './enrichment' diff --git a/services/libs/data-access-layer/src/organizations/segmentsAgg.ts b/services/libs/data-access-layer/src/organizations/segmentsAgg.ts new file mode 100644 index 0000000000..9db09122cf --- /dev/null +++ b/services/libs/data-access-layer/src/organizations/segmentsAgg.ts @@ -0,0 +1,32 @@ +import { QueryExecutor } from '../queryExecutor' + +export async function getOrphanOrganizationSegmentsAgg( + qx: QueryExecutor, + batchSize: number, +): Promise { + const rows = await qx.select( + ` + SELECT osa."organizationId" + FROM "organizationSegmentsAgg" osa + LEFT JOIN organizations o ON osa."organizationId" = o.id + WHERE o.id IS NULL + LIMIT $(batchSize) + `, + { batchSize }, + ) + + return rows.map((r) => r.organizationId) +} + +export async function deleteOrphanOrganizationSegmentsAgg( + qx: QueryExecutor, + organizationId: string, +): Promise { + await qx.result( + ` + DELETE FROM "organizationSegmentsAgg" + WHERE "organizationId" = $(organizationId) + `, + { organizationId }, + ) +} diff --git a/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts b/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts new file mode 100644 index 0000000000..fdedee6c0b --- /dev/null +++ b/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts @@ -0,0 +1,98 @@ +import { QueryExecutor } from '../queryExecutor' + +export interface IOrphanCleanupRun { + id?: string + aggregateName: string + startedAt: Date + completedAt?: Date + status: 'running' | 'completed' | 'failed' + orphansFound: number + orphansDeleted: number + executionTimeMs?: number + errorMessage?: string +} + +export async function startOrphanCleanupRun( + qx: QueryExecutor, + aggregateName: string, +): Promise { + // Check if there's already a running cleanup for this aggregate + const existingRun = await qx.selectOneOrNone( + ` + SELECT id, "startedAt" + FROM "orphanCleanupRuns" + WHERE "tableName" = $(aggregateName) + AND "status" = 'running' + LIMIT 1 + `, + { aggregateName }, + ) + + if (existingRun) { + return existingRun.id + } + + // Create new cleanup run + const result = await qx.selectOne( + ` + INSERT INTO "orphanCleanupRuns" ( + "tableName", + "startedAt", + "status", + "orphansFound", + "orphansDeleted" + ) VALUES ($(aggregateName), NOW(), 'running', 0, 0) + RETURNING id + `, + { aggregateName }, + ) + + return result.id +} + +export async function updateOrphanCleanupRun( + qx: QueryExecutor, + runId: string, + updates: Partial, +): Promise { + const setClauses: string[] = [] + const params: Record = { runId } + + if (updates.completedAt !== undefined) { + setClauses.push(`"completedAt" = $(completedAt)`) + params.completedAt = updates.completedAt + } + if (updates.status !== undefined) { + setClauses.push(`"status" = $(status)`) + params.status = updates.status + } + if (updates.orphansFound !== undefined) { + setClauses.push(`"orphansFound" = $(orphansFound)`) + params.orphansFound = updates.orphansFound + } + if (updates.orphansDeleted !== undefined) { + setClauses.push(`"orphansDeleted" = $(orphansDeleted)`) + params.orphansDeleted = updates.orphansDeleted + } + if (updates.executionTimeMs !== undefined) { + setClauses.push(`"executionTimeMs" = $(executionTimeMs)`) + params.executionTimeMs = updates.executionTimeMs + } + if (updates.errorMessage !== undefined) { + setClauses.push(`"errorMessage" = $(errorMessage)`) + params.errorMessage = updates.errorMessage + } + + if (setClauses.length === 0) { + return + } + + await qx.result( + ` + UPDATE "orphanCleanupRuns" + SET ${setClauses.join(', ')} + WHERE id = $(runId) + `, + params, + ) +} diff --git a/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts b/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts new file mode 100644 index 0000000000..85e56520d9 --- /dev/null +++ b/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts @@ -0,0 +1 @@ +export * from './base' From 374cb58c751dcb5bd0972addbe9ee0424cc8a970 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 30 Jan 2026 17:37:27 +0100 Subject: [PATCH 6/6] feat: cron set to every day --- .../src/schedules/scheduleCleanup.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts index d4918d6023..8b26d7bdeb 100644 --- a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts +++ b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts @@ -87,8 +87,8 @@ export const scheduleMemberSegmentsAggCleanup = async () => { await svc.temporal.schedule.create({ scheduleId: 'cleanupMemberSegmentsAgg', spec: { - // Run every 5 minutes - cronExpressions: ['*/5 * * * *'], + // Run every day at midnight + cronExpressions: ['0 0 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.BUFFER_ONE, @@ -127,8 +127,8 @@ export const scheduleOrganizationSegmentAggCleanup = async () => { await svc.temporal.schedule.create({ scheduleId: 'cleanupOrganizationSegmentAgg', spec: { - // Run every 5 minutes - cronExpressions: ['*/5 * * * *'], + // Run every day at midnight + cronExpressions: ['0 0 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.BUFFER_ONE,