Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import {
queueOrgForAggComputation,
syncRemoveOrganization,
} from './activities/cleanup/organization'
import {
deleteOrphanMembersSegmentsAgg,
deleteOrphanOrganizationSegmentsAgg,
getOrphanMembersSegmentsAgg,
getOrphanOrganizationSegmentsAgg,
startOrphanCleanupRun,
updateOrphanCleanupRun,
} from './activities/cleanup/segments-agg'
import {
calculateMemberAffiliations,
getWorkflowsCount,
Expand Down Expand Up @@ -86,4 +94,10 @@ export {
markMemberForAffiliationRecalc,
getMembersForAffiliationRecalc,
calculateMemberAffiliations,
startOrphanCleanupRun,
updateOrphanCleanupRun,
getOrphanMembersSegmentsAgg,
deleteOrphanMembersSegmentsAgg,
getOrphanOrganizationSegmentsAgg,
deleteOrphanOrganizationSegmentsAgg,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import {
deleteOrphanMemberSegmentsAgg as deleteMemberSegmentsAgg,
getOrphanMemberSegmentsAgg as getMemberSegmentsAgg,
} from '@crowd/data-access-layer/src/members/segmentsAgg'
import {
deleteOrphanOrganizationSegmentsAgg as deleteOrganizationSegmentsAgg,
getOrphanOrganizationSegmentsAgg as getOrganizationSegmentsAgg,
} from '@crowd/data-access-layer/src/organizations/segmentsAgg'
import {
IOrphanCleanupRun,
startOrphanCleanupRun as startCleanupRun,
updateOrphanCleanupRun as updateCleanupRun,
} from '@crowd/data-access-layer/src/orphanCleanupRuns'
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'

import { svc } from '../../main'

export async function startOrphanCleanupRun(aggregateName: string): Promise<string> {
try {
return startCleanupRun(dbStoreQx(svc.postgres.writer), aggregateName)
} catch (error) {
svc.log.error(error, 'Error starting orphan cleanup run!')
throw error
}
}

export async function updateOrphanCleanupRun(
runId: string,
updates: Partial<IOrphanCleanupRun>,
): Promise<void> {
try {
return updateCleanupRun(dbStoreQx(svc.postgres.writer), runId, updates)
} catch (error) {
svc.log.error(error, 'Error updating orphan cleanup run!')
throw error
}
}

export async function getOrphanMembersSegmentsAgg(batchSize: number): Promise<string[]> {
try {
return getMemberSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize)
} catch (error) {
svc.log.error(error, 'Error getting orphan memberSegmentsAgg records!')
throw error
}
}

export async function deleteOrphanMembersSegmentsAgg(memberId: string): Promise<void> {
try {
return deleteMemberSegmentsAgg(dbStoreQx(svc.postgres.writer), memberId)
} catch (error) {
svc.log.error(error, 'Error deleting orphan memberSegmentsAgg record!')
throw error
}
}

export async function getOrphanOrganizationSegmentsAgg(batchSize: number): Promise<string[]> {
try {
return getOrganizationSegmentsAgg(dbStoreQx(svc.postgres.reader), batchSize)
} catch (error) {
svc.log.error(error, 'Error getting orphan organizationSegmentsAgg records!')
throw error
}
}

export async function deleteOrphanOrganizationSegmentsAgg(organizationId: string): Promise<void> {
try {
return deleteOrganizationSegmentsAgg(dbStoreQx(svc.postgres.writer), organizationId)
} catch (error) {
svc.log.error(error, 'Error deleting orphan organizationSegmentsAgg record!')
throw error
}
}
9 changes: 8 additions & 1 deletion services/apps/script_executor_worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { Config } from '@crowd/archetype-standard'
import { Options, ServiceWorker } from '@crowd/archetype-worker'

import { scheduleMembersCleanup, scheduleOrganizationsCleanup } from './schedules/scheduleCleanup'
import {
scheduleMemberSegmentsAggCleanup,
scheduleMembersCleanup,
scheduleOrganizationSegmentAggCleanup,
scheduleOrganizationsCleanup,
} from './schedules/scheduleCleanup'

const config: Config = {
envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'],
Expand Down Expand Up @@ -35,6 +40,8 @@ setImmediate(async () => {

await scheduleMembersCleanup()
await scheduleOrganizationsCleanup()
await scheduleMemberSegmentsAggCleanup()
await scheduleOrganizationSegmentAggCleanup()

await svc.start()
})
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien
import { svc } from '../main'
import { cleanupMembers } from '../workflows/cleanup/members'
import { cleanupOrganizations } from '../workflows/cleanup/organizations'
import {
cleanupMemberSegmentsAgg,
cleanupOrganizationSegmentAgg,
} from '../workflows/cleanup/segments-agg'

export const scheduleMembersCleanup = async () => {
try {
Expand Down Expand Up @@ -77,3 +81,83 @@ export const scheduleOrganizationsCleanup = async () => {
}
}
}

export const scheduleMemberSegmentsAggCleanup = async () => {
try {
await svc.temporal.schedule.create({
scheduleId: 'cleanupMemberSegmentsAgg',
spec: {
// Run every day at midnight
cronExpressions: ['0 0 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.BUFFER_ONE,
catchupWindow: '1 minute',
},
action: {
type: 'startWorkflow',
workflowType: cleanupMemberSegmentsAgg,
taskQueue: 'script-executor',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
args: [
{
batchSize: 500,
},
],
},
})
svc.log.info('Schedule for member segments agg cleanup created successfully!')
} catch (err) {
if (err instanceof ScheduleAlreadyRunning) {
svc.log.info('Schedule cleanupMemberSegmentsAgg already registered in Temporal.')
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
} else {
svc.log.error({ err }, 'Error creating schedule for member segments agg cleanup')
throw new Error(err)
}
}
}

export const scheduleOrganizationSegmentAggCleanup = async () => {
try {
await svc.temporal.schedule.create({
scheduleId: 'cleanupOrganizationSegmentAgg',
spec: {
// Run every day at midnight
cronExpressions: ['0 0 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.BUFFER_ONE,
catchupWindow: '1 minute',
},
action: {
type: 'startWorkflow',
workflowType: cleanupOrganizationSegmentAgg,
taskQueue: 'script-executor',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
args: [
{
batchSize: 500,
},
],
},
})
svc.log.info('Schedule for organization segment agg cleanup created successfully!')
} catch (err) {
if (err instanceof ScheduleAlreadyRunning) {
svc.log.info('Schedule cleanupOrganizationSegmentAgg already registered in Temporal.')
svc.log.info('Configuration may have changed since. Please make sure they are in sync.')
} else {
svc.log.error({ err }, 'Error creating schedule for organization segment agg cleanup')
throw new Error(err)
}
}
}
1 change: 1 addition & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface IPopulateActivityRelationsArgs {
export interface IScriptBatchTestArgs {
batchSize?: number
testRun?: boolean
cleanupRunId?: string
}

export interface IFixActivityForiegnKeysArgs extends IScriptBatchTestArgs {
Expand Down
6 changes: 6 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { blockProjectOrganizationAffiliations } from './workflows/block-project-
import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members'
import { cleanupMembers } from './workflows/cleanup/members'
import { cleanupOrganizations } from './workflows/cleanup/organizations'
import {
cleanupMemberSegmentsAgg,
cleanupOrganizationSegmentAgg,
} from './workflows/cleanup/segments-agg'
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
Expand All @@ -17,6 +21,8 @@ export {
fixOrgIdentitiesWithWrongUrls,
cleanupMembers,
cleanupOrganizations,
cleanupMemberSegmentsAgg,
cleanupOrganizationSegmentAgg,
processLLMVerifiedMerges,
cleanupDuplicateMembers,
fixBotMembersAffiliation,
Expand Down
Loading