diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index a8fe72ae3e..3962faa836 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -16,6 +16,14 @@ import { queueOrgForAggComputation, syncRemoveOrganization, } from './activities/cleanup/organization' +import { + deleteOrphanMembersSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, + getOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + startOrphanCleanupRun, + updateOrphanCleanupRun, +} from './activities/cleanup/segments-agg' import { calculateMemberAffiliations, getWorkflowsCount, @@ -86,4 +94,10 @@ export { markMemberForAffiliationRecalc, getMembersForAffiliationRecalc, calculateMemberAffiliations, + startOrphanCleanupRun, + updateOrphanCleanupRun, + getOrphanMembersSegmentsAgg, + deleteOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, } diff --git a/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts new file mode 100644 index 0000000000..bf734bc84e --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/cleanup/segments-agg.ts @@ -0,0 +1,73 @@ +import { + deleteOrphanMemberSegmentsAgg as deleteMemberSegmentsAgg, + getOrphanMemberSegmentsAgg as getMemberSegmentsAgg, +} from '@crowd/data-access-layer/src/members/segmentsAgg' +import { + deleteOrphanOrganizationSegmentsAgg as deleteOrganizationSegmentsAgg, + getOrphanOrganizationSegmentsAgg as getOrganizationSegmentsAgg, +} from '@crowd/data-access-layer/src/organizations/segmentsAgg' +import { + IOrphanCleanupRun, + startOrphanCleanupRun as startCleanupRun, + updateOrphanCleanupRun as updateCleanupRun, +} from '@crowd/data-access-layer/src/orphanCleanupRuns' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' + +import { svc } from '../../main' + +export async function startOrphanCleanupRun(aggregateName: string): Promise { + try { + return startCleanupRun(dbStoreQx(svc.postgres.writer), aggregateName) + } catch (error) { + svc.log.error(error, 'Error starting orphan cleanup run!') + throw error + } +} + +export async function updateOrphanCleanupRun( + runId: string, + updates: Partial, +): Promise { + try { + return updateCleanupRun(dbStoreQx(svc.postgres.writer), runId, updates) + } catch (error) { + svc.log.error(error, 'Error updating orphan cleanup run!') + throw error + } +} + +export async function getOrphanMembersSegmentsAgg(batchSize: number): Promise { + try { + return getMemberSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize) + } catch (error) { + svc.log.error(error, 'Error getting orphan memberSegmentsAgg records!') + throw error + } +} + +export async function deleteOrphanMembersSegmentsAgg(memberId: string): Promise { + try { + return deleteMemberSegmentsAgg(dbStoreQx(svc.postgres.writer), memberId) + } catch (error) { + svc.log.error(error, 'Error deleting orphan memberSegmentsAgg record!') + throw error + } +} + +export async function getOrphanOrganizationSegmentsAgg(batchSize: number): Promise { + try { + return getOrganizationSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize) + } catch (error) { + svc.log.error(error, 'Error getting orphan organizationSegmentsAgg records!') + throw error + } +} + +export async function deleteOrphanOrganizationSegmentsAgg(organizationId: string): Promise { + try { + return deleteOrganizationSegmentsAgg(dbStoreQx(svc.postgres.writer), organizationId) + } catch (error) { + svc.log.error(error, 'Error deleting orphan organizationSegmentsAgg record!') + throw error + } +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index aad2b6f3c9..9047acaf13 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -1,7 +1,12 @@ import { Config } from '@crowd/archetype-standard' import { Options, ServiceWorker } from '@crowd/archetype-worker' -import { scheduleMembersCleanup, scheduleOrganizationsCleanup } from './schedules/scheduleCleanup' +import { + scheduleMemberSegmentsAggCleanup, + scheduleMembersCleanup, + scheduleOrganizationSegmentAggCleanup, + scheduleOrganizationsCleanup, +} from './schedules/scheduleCleanup' const config: Config = { envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], @@ -35,6 +40,8 @@ setImmediate(async () => { await scheduleMembersCleanup() await scheduleOrganizationsCleanup() + await scheduleMemberSegmentsAggCleanup() + await scheduleOrganizationSegmentAggCleanup() await svc.start() }) diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts index d09e9f136e..8b26d7bdeb 100644 --- a/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts +++ b/services/apps/script_executor_worker/src/schedules/scheduleCleanup.ts @@ -3,6 +3,10 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien import { svc } from '../main' import { cleanupMembers } from '../workflows/cleanup/members' import { cleanupOrganizations } from '../workflows/cleanup/organizations' +import { + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, +} from '../workflows/cleanup/segments-agg' export const scheduleMembersCleanup = async () => { try { @@ -77,3 +81,83 @@ export const scheduleOrganizationsCleanup = async () => { } } } + +export const scheduleMemberSegmentsAggCleanup = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'cleanupMemberSegmentsAgg', + spec: { + // Run every day at midnight + cronExpressions: ['0 0 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.BUFFER_ONE, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: cleanupMemberSegmentsAgg, + taskQueue: 'script-executor', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [ + { + batchSize: 500, + }, + ], + }, + }) + svc.log.info('Schedule for member segments agg cleanup created successfully!') + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule cleanupMemberSegmentsAgg already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Error creating schedule for member segments agg cleanup') + throw new Error(err) + } + } +} + +export const scheduleOrganizationSegmentAggCleanup = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'cleanupOrganizationSegmentAgg', + spec: { + // Run every day at midnight + cronExpressions: ['0 0 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.BUFFER_ONE, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: cleanupOrganizationSegmentAgg, + taskQueue: 'script-executor', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [ + { + batchSize: 500, + }, + ], + }, + }) + svc.log.info('Schedule for organization segment agg cleanup created successfully!') + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule cleanupOrganizationSegmentAgg already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Error creating schedule for organization segment agg cleanup') + throw new Error(err) + } + } +} diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 2465f4dd17..6a12517196 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -27,6 +27,7 @@ export interface IPopulateActivityRelationsArgs { export interface IScriptBatchTestArgs { batchSize?: number testRun?: boolean + cleanupRunId?: string } export interface IFixActivityForiegnKeysArgs extends IScriptBatchTestArgs { diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index a7e9307e64..5a14a2a01e 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,6 +2,10 @@ import { blockProjectOrganizationAffiliations } from './workflows/block-project- import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members' import { cleanupMembers } from './workflows/cleanup/members' import { cleanupOrganizations } from './workflows/cleanup/organizations' +import { + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, +} from './workflows/cleanup/segments-agg' import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' @@ -17,6 +21,8 @@ export { fixOrgIdentitiesWithWrongUrls, cleanupMembers, cleanupOrganizations, + cleanupMemberSegmentsAgg, + cleanupOrganizationSegmentAgg, processLLMVerifiedMerges, cleanupDuplicateMembers, fixBotMembersAffiliation, diff --git a/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts new file mode 100644 index 0000000000..2e0c05a68d --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/cleanup/segments-agg.ts @@ -0,0 +1,177 @@ +import { continueAsNew, proxyActivities } from '@temporalio/workflow' + +import * as activities from '../../activities' +import { IScriptBatchTestArgs } from '../../types' +import { chunkArray } from '../../utils/common' + +const { + startOrphanCleanupRun, + updateOrphanCleanupRun, + getOrphanMembersSegmentsAgg, + deleteOrphanMembersSegmentsAgg, + getOrphanOrganizationSegmentsAgg, + deleteOrphanOrganizationSegmentsAgg, +} = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function cleanupMemberSegmentsAgg(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + const AGGREGATE_NAME = 'memberSegmentsAgg' + + let runId: string | undefined + let totalOrphansFound = 0 + let totalOrphansDeleted = 0 + const startTime = Date.now() + + try { + // Initialize the cleanup run only on the first iteration + if (!args.cleanupRunId) { + runId = await startOrphanCleanupRun(AGGREGATE_NAME) + console.log(`Started cleanup run for ${AGGREGATE_NAME} with ID: ${runId}`) + } else { + runId = args.cleanupRunId + } + + // Get orphaned records + const orphanIds = await getOrphanMembersSegmentsAgg(BATCH_SIZE) + + if (orphanIds.length === 0) { + // Update the cleanup run as completed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + totalOrphansFound += orphanIds.length + + // Process orphans in chunks + const CHUNK_SIZE = 25 + let deletedCount = 0 + + for (const chunk of chunkArray(orphanIds, CHUNK_SIZE)) { + const deleteTasks = chunk.map(async (id) => { + await deleteOrphanMembersSegmentsAgg(id) + deletedCount++ + }) + + await Promise.all(deleteTasks).catch((err) => { + console.error(`Error cleaning up orphan ${AGGREGATE_NAME} records!`, err) + throw err + }) + } + totalOrphansDeleted += deletedCount + + // Update the cleanup run with current progress + if (runId) { + await updateOrphanCleanupRun(runId, { + orphansFound: totalOrphansFound, + orphansDeleted: totalOrphansDeleted, + }) + } + + // Continue as new for the next batch + await continueAsNew({ + ...args, + cleanupRunId: runId, + }) + } catch (error) { + // Update the cleanup run as failed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'failed', + errorMessage: error.message || String(error), + executionTimeMs: Date.now() - startTime, + }) + } + + throw error + } +} + +export async function cleanupOrganizationSegmentAgg(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + const AGGREGATE_NAME = 'organizationSegmentsAgg' + + let runId: string | undefined + let totalOrphansFound = 0 + let totalOrphansDeleted = 0 + const startTime = Date.now() + + try { + // Initialize the cleanup run only on the first iteration + if (!args.cleanupRunId) { + runId = await startOrphanCleanupRun(AGGREGATE_NAME) + console.log(`Started cleanup run for ${AGGREGATE_NAME} with ID: ${runId}`) + } else { + runId = args.cleanupRunId + } + + // Get orphaned records + const orphanIds = await getOrphanOrganizationSegmentsAgg(BATCH_SIZE) + + if (orphanIds.length === 0) { + // Update the cleanup run as completed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'completed', + executionTimeMs: Date.now() - startTime, + }) + } + + return + } + totalOrphansFound += orphanIds.length + + // Process orphans in chunks + const CHUNK_SIZE = 25 + let deletedCount = 0 + + for (const chunk of chunkArray(orphanIds, CHUNK_SIZE)) { + const deleteTasks = chunk.map(async (id) => { + await deleteOrphanOrganizationSegmentsAgg(id) + deletedCount++ + }) + + await Promise.all(deleteTasks).catch((err) => { + console.error(`Error cleaning up orphan ${AGGREGATE_NAME} records!`, err) + throw err + }) + } + + totalOrphansDeleted += deletedCount + // Update the cleanup run with current progress + if (runId) { + await updateOrphanCleanupRun(runId, { + orphansFound: totalOrphansFound, + orphansDeleted: totalOrphansDeleted, + }) + } + + // Continue as new for the next batch + await continueAsNew({ + ...args, + cleanupRunId: runId, + }) + } catch (error) { + // Update the cleanup run as failed + if (runId) { + await updateOrphanCleanupRun(runId, { + completedAt: new Date(), + status: 'failed', + errorMessage: error.message || String(error), + executionTimeMs: Date.now() - startTime, + }) + } + + throw error + } +} diff --git a/services/libs/data-access-layer/src/members/index.ts b/services/libs/data-access-layer/src/members/index.ts index 3d8a649365..346f476bb5 100644 --- a/services/libs/data-access-layer/src/members/index.ts +++ b/services/libs/data-access-layer/src/members/index.ts @@ -2,6 +2,7 @@ export * from './base' export * from './identities' export * from './organizations' export * from './segments' +export * from './segmentsAgg' export * from './others' export * from './attributes' export * from './dashboard' diff --git a/services/libs/data-access-layer/src/members/segmentsAgg.ts b/services/libs/data-access-layer/src/members/segmentsAgg.ts new file mode 100644 index 0000000000..f0ce3616ea --- /dev/null +++ b/services/libs/data-access-layer/src/members/segmentsAgg.ts @@ -0,0 +1,32 @@ +import { QueryExecutor } from '../queryExecutor' + +export async function getOrphanMemberSegmentsAgg( + qx: QueryExecutor, + batchSize: number, +): Promise { + const rows = await qx.select( + ` + SELECT msa."memberId" + FROM "memberSegmentsAgg" msa + LEFT JOIN members m ON msa."memberId" = m.id + WHERE m.id IS NULL + LIMIT $(batchSize) + `, + { batchSize }, + ) + + return rows.map((r) => r.memberId) +} + +export async function deleteOrphanMemberSegmentsAgg( + qx: QueryExecutor, + memberId: string, +): Promise { + await qx.result( + ` + DELETE FROM "memberSegmentsAgg" + WHERE "memberId" = $(memberId) + `, + { memberId }, + ) +} diff --git a/services/libs/data-access-layer/src/organizations/index.ts b/services/libs/data-access-layer/src/organizations/index.ts index a65b8a0422..7c605616a8 100644 --- a/services/libs/data-access-layer/src/organizations/index.ts +++ b/services/libs/data-access-layer/src/organizations/index.ts @@ -3,5 +3,6 @@ export * from './base' export * from './attributes' export * from './identities' export * from './segments' +export * from './segmentsAgg' export * from './utils' export * from './enrichment' diff --git a/services/libs/data-access-layer/src/organizations/segmentsAgg.ts b/services/libs/data-access-layer/src/organizations/segmentsAgg.ts new file mode 100644 index 0000000000..9db09122cf --- /dev/null +++ b/services/libs/data-access-layer/src/organizations/segmentsAgg.ts @@ -0,0 +1,32 @@ +import { QueryExecutor } from '../queryExecutor' + +export async function getOrphanOrganizationSegmentsAgg( + qx: QueryExecutor, + batchSize: number, +): Promise { + const rows = await qx.select( + ` + SELECT osa."organizationId" + FROM "organizationSegmentsAgg" osa + LEFT JOIN organizations o ON osa."organizationId" = o.id + WHERE o.id IS NULL + LIMIT $(batchSize) + `, + { batchSize }, + ) + + return rows.map((r) => r.organizationId) +} + +export async function deleteOrphanOrganizationSegmentsAgg( + qx: QueryExecutor, + organizationId: string, +): Promise { + await qx.result( + ` + DELETE FROM "organizationSegmentsAgg" + WHERE "organizationId" = $(organizationId) + `, + { organizationId }, + ) +} diff --git a/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts b/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts new file mode 100644 index 0000000000..fdedee6c0b --- /dev/null +++ b/services/libs/data-access-layer/src/orphanCleanupRuns/base.ts @@ -0,0 +1,98 @@ +import { QueryExecutor } from '../queryExecutor' + +export interface IOrphanCleanupRun { + id?: string + aggregateName: string + startedAt: Date + completedAt?: Date + status: 'running' | 'completed' | 'failed' + orphansFound: number + orphansDeleted: number + executionTimeMs?: number + errorMessage?: string +} + +export async function startOrphanCleanupRun( + qx: QueryExecutor, + aggregateName: string, +): Promise { + // Check if there's already a running cleanup for this aggregate + const existingRun = await qx.selectOneOrNone( + ` + SELECT id, "startedAt" + FROM "orphanCleanupRuns" + WHERE "tableName" = $(aggregateName) + AND "status" = 'running' + LIMIT 1 + `, + { aggregateName }, + ) + + if (existingRun) { + return existingRun.id + } + + // Create new cleanup run + const result = await qx.selectOne( + ` + INSERT INTO "orphanCleanupRuns" ( + "tableName", + "startedAt", + "status", + "orphansFound", + "orphansDeleted" + ) VALUES ($(aggregateName), NOW(), 'running', 0, 0) + RETURNING id + `, + { aggregateName }, + ) + + return result.id +} + +export async function updateOrphanCleanupRun( + qx: QueryExecutor, + runId: string, + updates: Partial, +): Promise { + const setClauses: string[] = [] + const params: Record = { runId } + + if (updates.completedAt !== undefined) { + setClauses.push(`"completedAt" = $(completedAt)`) + params.completedAt = updates.completedAt + } + if (updates.status !== undefined) { + setClauses.push(`"status" = $(status)`) + params.status = updates.status + } + if (updates.orphansFound !== undefined) { + setClauses.push(`"orphansFound" = $(orphansFound)`) + params.orphansFound = updates.orphansFound + } + if (updates.orphansDeleted !== undefined) { + setClauses.push(`"orphansDeleted" = $(orphansDeleted)`) + params.orphansDeleted = updates.orphansDeleted + } + if (updates.executionTimeMs !== undefined) { + setClauses.push(`"executionTimeMs" = $(executionTimeMs)`) + params.executionTimeMs = updates.executionTimeMs + } + if (updates.errorMessage !== undefined) { + setClauses.push(`"errorMessage" = $(errorMessage)`) + params.errorMessage = updates.errorMessage + } + + if (setClauses.length === 0) { + return + } + + await qx.result( + ` + UPDATE "orphanCleanupRuns" + SET ${setClauses.join(', ')} + WHERE id = $(runId) + `, + params, + ) +} diff --git a/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts b/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts new file mode 100644 index 0000000000..85e56520d9 --- /dev/null +++ b/services/libs/data-access-layer/src/orphanCleanupRuns/index.ts @@ -0,0 +1 @@ +export * from './base'