diff --git a/backend/src/database/migrations/U1769092368__changeMaintainersRepoReferenceToRepositories.sql b/backend/src/database/migrations/U1769092368__changeMaintainersRepoReferenceToRepositories.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/U1769097489__changeGitServiceExecutionRefreneceToRepositories.sql b/backend/src/database/migrations/U1769097489__changeGitServiceExecutionRefreneceToRepositories.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1769092368__changeMaintainersRepoReferenceToRepositories.sql b/backend/src/database/migrations/V1769092368__changeMaintainersRepoReferenceToRepositories.sql new file mode 100644 index 0000000000..c9c86fa3f5 --- /dev/null +++ b/backend/src/database/migrations/V1769092368__changeMaintainersRepoReferenceToRepositories.sql @@ -0,0 +1,9 @@ + +ALTER TABLE "maintainersInternal" +DROP CONSTRAINT IF EXISTS "maintainersInternal_repoId_fkey"; + +ALTER TABLE "maintainersInternal" +ADD CONSTRAINT "maintainersInternal_repoId_fkey" +FOREIGN KEY ("repoId") +REFERENCES repositories(id) +ON DELETE CASCADE; \ No newline at end of file diff --git a/backend/src/database/migrations/V1769097489__changeGitServiceExecutionRefreneceToRepositories.sql b/backend/src/database/migrations/V1769097489__changeGitServiceExecutionRefreneceToRepositories.sql new file mode 100644 index 0000000000..510b6fbd5a --- /dev/null +++ b/backend/src/database/migrations/V1769097489__changeGitServiceExecutionRefreneceToRepositories.sql @@ -0,0 +1,8 @@ +-- Drop the existing foreign key constraint to git.repositories +ALTER TABLE git."serviceExecutions" + DROP CONSTRAINT "serviceExecutions_repoId_fkey"; + +-- Add new foreign key constraint to public.repositories +ALTER TABLE git."serviceExecutions" + ADD CONSTRAINT "serviceExecutions_repoId_fkey" + FOREIGN KEY ("repoId") REFERENCES public.repositories(id) ON DELETE CASCADE; diff --git a/backend/src/database/repositories/gitReposRepository.ts b/backend/src/database/repositories/gitReposRepository.ts deleted file mode 100644 index 16fabf355b..0000000000 --- a/backend/src/database/repositories/gitReposRepository.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { QueryTypes } from 'sequelize' - -import { IRepositoryOptions } from './IRepositoryOptions' -import SequelizeRepository from './sequelizeRepository' - -export default class GitReposRepository { - /** - * Soft deletes repositories from git.repositories table - * @param integrationId The integration ID to delete repositories for - * @param options Repository options - */ - static async delete(integrationId: string, options: IRepositoryOptions) { - const seq = SequelizeRepository.getSequelize(options) - const transaction = SequelizeRepository.getTransaction(options) - - const [, affectedCount] = await seq.query( - ` - UPDATE git.repositories - SET "deletedAt" = NOW() - WHERE "integrationId" = :integrationId - AND "deletedAt" IS NULL - `, - { - replacements: { - integrationId, - }, - type: QueryTypes.UPDATE, - transaction, - }, - ) - - options.log.info(`Soft deleted ${affectedCount} repositories for integration ${integrationId}`) - - return affectedCount - } - - /** - * Upserts repositories into git.repositories table - * - * @param repositories - Array with id, url, integrationId, segmentId, forkedFrom (null preserves existing value) - * @param options - Repository options - */ - static async upsert( - repositories: Array<{ - id: string - url: string - integrationId: string - segmentId: string - forkedFrom?: string | null - }>, - options: IRepositoryOptions, - ) { - if (!repositories || repositories.length === 0) { - return - } - - const seq = SequelizeRepository.getSequelize(options) - const transaction = SequelizeRepository.getTransaction(options) - - // Build SQL placeholders and parameter replacements in a single loop - const placeholders: string[] = [] - const replacements: Record = {} - - repositories.forEach((repo, idx) => { - // Build placeholder for this repository - placeholders.push( - `(:id_${idx}, :url_${idx}, :integrationId_${idx}, :segmentId_${idx}, :forkedFrom_${idx})`, - ) - - // Build replacements for this repository - replacements[`id_${idx}`] = repo.id - replacements[`url_${idx}`] = repo.url - replacements[`integrationId_${idx}`] = repo.integrationId - replacements[`segmentId_${idx}`] = repo.segmentId - replacements[`forkedFrom_${idx}`] = repo.forkedFrom || null - }) - - const placeholdersString = placeholders.join(', ') - - await seq.query( - ` - INSERT INTO git.repositories (id, url, "integrationId", "segmentId", "forkedFrom") - VALUES ${placeholdersString} - ON CONFLICT (url) DO UPDATE SET - id = EXCLUDED.id, - "integrationId" = EXCLUDED."integrationId", - "segmentId" = EXCLUDED."segmentId", - "forkedFrom" = COALESCE(EXCLUDED."forkedFrom", git.repositories."forkedFrom"), - "updatedAt" = NOW(), - "deletedAt" = NULL, - state = 'pending', - priority = 1, - "lastProcessedAt" = NULL, - "lastProcessedCommit" = NULL - `, - { - replacements, - transaction, - }, - ) - } -} diff --git a/backend/src/database/repositories/githubReposRepository.ts b/backend/src/database/repositories/githubReposRepository.ts deleted file mode 100644 index 39ac7e4e75..0000000000 --- a/backend/src/database/repositories/githubReposRepository.ts +++ /dev/null @@ -1,129 +0,0 @@ -import trim from 'lodash/trim' -import { QueryTypes } from 'sequelize' - -import { DEFAULT_TENANT_ID, Error400 } from '@crowd/common' -import { RedisCache } from '@crowd/redis' - -import { IRepositoryOptions } from './IRepositoryOptions' -import SequelizeRepository from './sequelizeRepository' - -export default class GithubReposRepository { - private static getCache(options: IRepositoryOptions): RedisCache { - return new RedisCache('githubRepos', options.redis, options.log) - } - - private static async bulkInsert( - table, - columns, - placeholdersFn, - values, - options: IRepositoryOptions, - ) { - const transaction = SequelizeRepository.getTransaction(options) - const seq = SequelizeRepository.getSequelize(options) - - columns = columns.map((column) => trim(column, '"')).map((column) => `"${column}"`) - const joinedColumns = columns.join(', ') - - const placeholders = values.map((value, idx) => placeholdersFn(idx)) - - const replacements = values.reduce((acc, value) => { - Object.entries(value).forEach(([key, value]) => { - acc[key] = value - }) - return acc - }, {}) - - return seq.query( - ` - INSERT INTO "${table}" - (${joinedColumns}) - VALUES ${placeholders.join(', ')} - ON CONFLICT ("tenantId", "url") - DO UPDATE SET "segmentId" = EXCLUDED."segmentId", - "integrationId" = EXCLUDED."integrationId", - "deletedAt" = NULL - `, - { - replacements, - transaction, - }, - ) - } - - static async updateMapping(integrationId, mapping, options: IRepositoryOptions) { - const mappingEntries = Object.entries(mapping).map(([url, segmentId]) => ({ - url: url as string, - segmentId: segmentId as string, - })) - - const transaction = SequelizeRepository.getTransaction(options) - const seq = SequelizeRepository.getSequelize(options) - - const existingRows = await seq.query( - ` - select * from "githubRepos" where "tenantId" = :tenantId and "url" in (:urls) - `, - { - replacements: { - tenantId: DEFAULT_TENANT_ID, - urls: mappingEntries.map((e) => e.url), - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - for (const row of existingRows as any[]) { - if (!row.deletedAt && row.integrationId !== integrationId) { - options.log.warn( - `Trying to update github repo ${row.url} mapping with integrationId ${integrationId} but it is already mapped to integration ${row.integrationId}!`, - ) - throw new Error400( - options.language, - 'errors.integrations.repoAlreadyMapped', - row.url, - integrationId, - row.integrationId, - ) - } - } - - await GithubReposRepository.bulkInsert( - 'githubRepos', - ['tenantId', 'integrationId', 'segmentId', 'url'], - (idx) => `(:tenantId_${idx}, :integrationId_${idx}, :segmentId_${idx}, :url_${idx})`, - mappingEntries.map(({ url, segmentId }, idx) => ({ - [`tenantId_${idx}`]: DEFAULT_TENANT_ID, - [`integrationId_${idx}`]: integrationId, - [`segmentId_${idx}`]: segmentId, - [`url_${idx}`]: url, - })), - options, - ) - - await this.getCache(options).deleteAll() - } - - static async delete(integrationId, options: IRepositoryOptions) { - const seq = SequelizeRepository.getSequelize(options) - const transaction = SequelizeRepository.getTransaction(options) - - await seq.query( - ` - UPDATE "githubRepos" - SET "deletedAt" = NOW() - WHERE "integrationId" = :integrationId - `, - { - replacements: { - integrationId, - }, - type: QueryTypes.UPDATE, - transaction, - }, - ) - - await this.getCache(options).deleteAll() - } -} diff --git a/backend/src/database/repositories/gitlabReposRepository.ts b/backend/src/database/repositories/gitlabReposRepository.ts deleted file mode 100644 index 8554be315a..0000000000 --- a/backend/src/database/repositories/gitlabReposRepository.ts +++ /dev/null @@ -1,120 +0,0 @@ -import trim from 'lodash/trim' -import { QueryTypes } from 'sequelize' - -import { DEFAULT_TENANT_ID, Error400 } from '@crowd/common' -import { RedisCache } from '@crowd/redis' - -import { IRepositoryOptions } from './IRepositoryOptions' -import SequelizeRepository from './sequelizeRepository' - -export default class GitlabReposRepository { - private static getCache(options: IRepositoryOptions): RedisCache { - return new RedisCache('gitlabRepos', options.redis, options.log) - } - - private static async bulkInsert( - table, - columns, - placeholdersFn, - values, - options: IRepositoryOptions, - ) { - const transaction = SequelizeRepository.getTransaction(options) - const seq = SequelizeRepository.getSequelize(options) - - columns = columns.map((column) => trim(column, '"')).map((column) => `"${column}"`) - const joinedColumns = columns.join(', ') - - const placeholders = values.map((value, idx) => placeholdersFn(idx)) - - const replacements = values.reduce((acc, value) => { - Object.entries(value).forEach(([key, value]) => { - acc[key] = value - }) - return acc - }, {}) - - return seq.query( - ` - INSERT INTO "${table}" - (${joinedColumns}) - VALUES ${placeholders.join(', ')} - ON CONFLICT ("tenantId", "url") - DO UPDATE SET "segmentId" = EXCLUDED."segmentId", - "integrationId" = EXCLUDED."integrationId" - `, - { - replacements, - transaction, - }, - ) - } - - static async updateMapping(integrationId, mapping, options: IRepositoryOptions) { - const transaction = SequelizeRepository.getTransaction(options) - - // Check for repositories already mapped to other integrations - for (const url of Object.keys(mapping)) { - const existingRows = await options.database.sequelize.query( - `SELECT * FROM "gitlabRepos" WHERE url = :url AND "deletedAt" IS NULL`, - { - replacements: { url }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - for (const row of existingRows as any[]) { - if (!row.deletedAt && row.integrationId !== integrationId) { - options.log.warn( - `Trying to update gitlab repo ${row.url} mapping with integrationId ${integrationId} but it is already mapped to integration ${row.integrationId}!`, - ) - - throw new Error400( - options.language, - 'errors.integrations.repoAlreadyMapped', - row.url, - integrationId, - row.integrationId, - ) - } - } - } - await GitlabReposRepository.bulkInsert( - 'gitlabRepos', - ['tenantId', 'integrationId', 'segmentId', 'url'], - (idx) => `(:tenantId_${idx}, :integrationId_${idx}, :segmentId_${idx}, :url_${idx})`, - Object.entries(mapping).map(([url, segmentId], idx) => ({ - [`tenantId_${idx}`]: DEFAULT_TENANT_ID, - [`integrationId_${idx}`]: integrationId, - [`segmentId_${idx}`]: segmentId, - [`url_${idx}`]: url, - })), - options, - ) - - await this.getCache(options).deleteAll() - } - - static async delete(integrationId, options: IRepositoryOptions) { - const seq = SequelizeRepository.getSequelize(options) - const transaction = SequelizeRepository.getTransaction(options) - - await seq.query( - ` - UPDATE "gitlabRepos" - SET "deletedAt" = NOW() - WHERE "integrationId" = :integrationId - `, - { - replacements: { - integrationId, - }, - type: QueryTypes.UPDATE, - transaction, - }, - ) - - await this.getCache(options).deleteAll() - } -} diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 053e1970a6..3ed0279c91 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -6,21 +6,21 @@ import lodash from 'lodash' import moment from 'moment' import { QueryTypes, Transaction } from 'sequelize' -import { EDITION, Error400, Error404, Error500, Error542, encryptData } from '@crowd/common' -import { CommonIntegrationService, getGithubInstallationToken } from '@crowd/common_services' -import { syncRepositoriesToGitV2 } from '@crowd/data-access-layer' import { - ICreateInsightsProject, - deleteMissingSegmentRepositories, - deleteSegmentRepositories, - upsertSegmentRepositories, -} from '@crowd/data-access-layer/src/collections' + EDITION, + Error400, + Error404, + Error542, + encryptData, + generateUUIDv4 as uuid, +} from '@crowd/common' +import { CommonIntegrationService, getGithubInstallationToken } from '@crowd/common_services' +import { ICreateInsightsProject } from '@crowd/data-access-layer/src/collections' import { findRepositoriesForSegment } from '@crowd/data-access-layer/src/integrations' import { ICreateRepository, IRepository, IRepositoryMapping, - getGitRepositoryIdsByUrl, getIntegrationReposMapping, getRepositoriesBySourceIntegrationId, getRepositoriesByUrl, @@ -47,7 +47,6 @@ import { CodePlatform, Edition, PlatformType } from '@crowd/types' import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' import GithubInstallationsRepository from '@/database/repositories/githubInstallationsRepository' -import GitlabReposRepository from '@/database/repositories/gitlabReposRepository' import IntegrationProgressRepository from '@/database/repositories/integrationProgressRepository' import SegmentRepository from '@/database/repositories/segmentRepository' import { IntegrationProgress, Repos } from '@/serverless/integrations/types/regularTypes' @@ -66,8 +65,6 @@ import { } from '@/serverless/integrations/usecases/groupsio/types' import { DISCORD_CONFIG, GITHUB_CONFIG, GITLAB_CONFIG, IS_TEST_ENV, KUBE_MODE } from '../conf/index' -import GitReposRepository from '../database/repositories/gitReposRepository' -import GithubReposRepository from '../database/repositories/githubReposRepository' import IntegrationRepository from '../database/repositories/integrationRepository' import SequelizeRepository from '../database/repositories/sequelizeRepository' import telemetryTrack from '../segment/telemetryTrack' @@ -457,38 +454,11 @@ export default class IntegrationService { if (remainingRepos.length === 0) { // If no repos left, delete the Git integration entirely - // Soft delete git.repositories for git-integration V2 - await GitReposRepository.delete(gitIntegration.id, { - ...this.options, - transaction, - }) - - // Then delete the git integration await IntegrationRepository.destroy(gitIntegration.id, { ...this.options, transaction, }) } else { - // Soft delete from git.repositories only the repos owned by the deleted integration - const urlsToRemove = allGitRepos - .filter((repo) => repo.sourceIntegrationId === id) - .map((r) => r.url) - - if (urlsToRemove.length > 0) { - await qxForGit.result( - ` - UPDATE git.repositories - SET "deletedAt" = NOW() - WHERE url IN ($(urlsToRemove:csv)) - AND "deletedAt" IS NULL - `, - { urlsToRemove }, - ) - this.options.log.info( - `Soft deleted ${urlsToRemove.length} repos from git.repositories for integration ${id}`, - ) - } - // Update git integration settings with remaining remotes const remainingRemotes = remainingRepos.map((r) => r.url) await this.gitConnectOrUpdate( @@ -505,12 +475,6 @@ export default class IntegrationService { integration.platform === PlatformType.GITHUB || integration.platform === PlatformType.GITHUB_NANGO ) { - // soft delete github repos from legacy table - await GithubReposRepository.delete(integration.id, { - ...this.options, - transaction, - }) - // Soft delete from public.repositories only repos owned by this GitHub integration // This preserves native Git repos that aren't mirrored from GitHub const qx = SequelizeRepository.getQueryExecutor({ @@ -528,33 +492,19 @@ export default class IntegrationService { } } - if (integration.platform === PlatformType.GITLAB) { - if (integration.settings.webhooks) { - await removeGitlabWebhooks( - integration.token, - integration.settings.webhooks.map((hook) => hook.projectId), - integration.settings.webhooks.map((hook) => hook.hookId), - ) - } - - // soft delete gitlab repos - await GitlabReposRepository.delete(integration.id, { - ...this.options, - transaction, - }) + if (integration.platform === PlatformType.GITLAB && integration.settings.webhooks) { + await removeGitlabWebhooks( + integration.token, + integration.settings.webhooks.map((hook) => hook.projectId), + integration.settings.webhooks.map((hook) => hook.hookId), + ) } - // Soft delete git.repositories for git integration if (integration.platform === PlatformType.GIT) { await this.validateGitIntegrationDeletion(integration.id, { ...this.options, transaction, }) - - await GitReposRepository.delete(integration.id, { - ...this.options, - transaction, - }) } // Soft delete from public.repositories for code integrations @@ -572,8 +522,6 @@ export default class IntegrationService { const collectionService = new CollectionService({ ...this.options, transaction }) - const qx = SequelizeRepository.getQueryExecutor(this.options) - let insightsProject = null let widgets = [] @@ -582,9 +530,7 @@ export default class IntegrationService { insightsProject = project const widgetsResult = await collectionService.findSegmentsWidgetsById(segmentId) widgets = widgetsResult.widgets - await deleteSegmentRepositories(qx, { - segmentId, - }) + // Note: Repos are soft-deleted in public.repositories via mapUnifiedRepositories above } const insightsRepo = insightsProject?.repositories ?? [] @@ -997,9 +943,6 @@ export default class IntegrationService { ) } - // sync to public.repositories - await txService.mapUnifiedRepositories(PlatformType.GITHUB_NANGO, integration.id, mapping) - if (!existingTransaction) { await SequelizeRepository.commitTransaction(transaction) } @@ -1034,9 +977,6 @@ export default class IntegrationService { transaction, } try { - this.options.log.info(`Updating GitHub repos mapping for integration ${integrationId}!`) - await GithubReposRepository.updateMapping(integrationId, mapping, txOptions) - // add the repos to the git integration const repos: Record = Object.entries(mapping).reduce( (acc, [url, segmentId]) => { @@ -1048,27 +988,7 @@ export default class IntegrationService { }, {}, ) - - const qx = SequelizeRepository.getQueryExecutor(txOptions) - const collectionService = new CollectionService(txOptions) - - for (const [segmentId, repositories] of Object.entries(repos)) { - this.options.log.info(`Finding insights project for segment ${segmentId}!`) - const [insightsProject] = await collectionService.findInsightsProjectsBySegmentId(segmentId) - - if (insightsProject) { - this.options.log.info(`Upserting segment repositories for segment ${segmentId}!`) - await upsertSegmentRepositories(qx, { - insightsProjectId: insightsProject.id, - repositories, - segmentId, - }) - await deleteMissingSegmentRepositories(qx, { - repositories, - segmentId, - }) - } - } + // Note: Repos are synced to public.repositories via mapUnifiedRepositories at the end of this method // Get integration settings to access forkedFrom data from all orgs const integration = await IntegrationRepository.findById(integrationId, txOptions) @@ -1125,6 +1045,10 @@ export default class IntegrationService { } } + // sync to public.repositories + const txService = new IntegrationService(txOptions) + await txService.mapUnifiedRepositories(integration.platform, integrationId, mapping) + if (fireOnboarding) { this.options.log.info('Updating integration status to in-progress!') const integration = await IntegrationRepository.update( @@ -1389,7 +1313,7 @@ export default class IntegrationService { } /** - * Adds/updates Git integration and syncs repositories to git.repositories table (git-integration V2) + * Adds/updates Git integration and syncs repositories to repositories table * * @param integrationData.remotes - Repository objects with url and optional forkedFrom (parent repo URL). * If forkedFrom is null, existing DB value is preserved. @@ -1448,7 +1372,7 @@ export default class IntegrationService { const existingRows = await seq.query( ` - SELECT url, "integrationId" FROM git.repositories + SELECT url, "gitIntegrationId" AS "integrationId" FROM repositories WHERE url IN (:urls) AND "deletedAt" IS NULL `, { @@ -1474,48 +1398,7 @@ export default class IntegrationService { } } - // upsert repositories to git.repositories in order to be processed by git-integration V2 const currentSegmentId = (options || this.options).currentSegments[0].id - const qx = SequelizeRepository.getQueryExecutor({ - ...(options || this.options), - transaction, - }) - - // Soft-delete repos from git.repositories that are no longer in the remotes list - // Only delete repos owned by this Git integration (not mirrored from other integrations) - const newRemoteUrls = new Set(remotes.map((r) => r.url)) - const existingOwnedRepos: Array<{ url: string }> = await qx.select( - ` - SELECT gr.url - FROM git.repositories gr - JOIN public.repositories pr ON pr.url = gr.url AND pr."deletedAt" IS NULL - WHERE gr."integrationId" = $(integrationId) - AND gr."deletedAt" IS NULL - AND pr."sourceIntegrationId" = $(integrationId) - `, - { integrationId: integration.id }, - ) - const urlsToDelete = existingOwnedRepos - .map((r) => r.url) - .filter((url) => !newRemoteUrls.has(url)) - - if (urlsToDelete.length > 0) { - await qx.result( - ` - UPDATE git.repositories - SET "deletedAt" = NOW() - WHERE url IN ($(urlsToDelete:csv)) - AND "deletedAt" IS NULL - `, - { urlsToDelete }, - ) - this.options.log.info( - `Soft-deleted ${urlsToDelete.length} owned repos from git.repositories`, - ) - } - - await syncRepositoriesToGitV2(qx, remotes, integration.id, currentSegmentId) - // sync to public.repositories (only for direct GIT connections, other platforms handle it themselves) if (!sourcePlatform) { const mapping = remotes.reduce( @@ -2962,8 +2845,6 @@ export default class IntegrationService { } try { - await GitlabReposRepository.updateMapping(integrationId, mapping, txOptions) - // add the repos to the git integration if (EDITION === Edition.LFX) { const repos: Record = Object.entries(mapping).reduce( @@ -2977,25 +2858,7 @@ export default class IntegrationService { {}, ) - const qx = SequelizeRepository.getQueryExecutor(txOptions) - const collectionService = new CollectionService(txOptions) - - for (const [segmentId, repositories] of Object.entries(repos)) { - const [insightsProject] = - await collectionService.findInsightsProjectsBySegmentId(segmentId) - - if (insightsProject) { - await upsertSegmentRepositories(qx, { - insightsProjectId: insightsProject.id, - repositories, - segmentId, - }) - await deleteMissingSegmentRepositories(qx, { - repositories, - segmentId, - }) - } - } + // Note: Repos are written to public.repositories via mapUnifiedRepositories below for (const [segmentId, urls] of Object.entries(repos)) { let isGitintegrationConfigured @@ -3258,16 +3121,6 @@ export default class IntegrationService { const segmentIds = [...new Set(urls.map((url) => mapping[url]))] - const isGitHubPlatform = [PlatformType.GITHUB, PlatformType.GITHUB_NANGO].includes( - sourcePlatform, - ) - - const [gitRepoIdMap, sourceIntegration] = await Promise.all([ - // TODO: after migration, generate UUIDs instead of fetching from git.repositories - getGitRepositoryIdsByUrl(qx, urls), - isGitHubPlatform ? IntegrationRepository.findById(sourceIntegrationId, txOptions) : null, - ]) - const collectionService = new CollectionService(txOptions) const insightsProjectMap = new Map() const gitIntegrationMap = new Map() @@ -3306,6 +3159,12 @@ export default class IntegrationService { // Build forkedFrom map from integration settings (for GITHUB repositories) const forkedFromMap = new Map() + const isGitHubPlatform = [PlatformType.GITHUB, PlatformType.GITHUB_NANGO].includes( + sourcePlatform, + ) + const sourceIntegration = isGitHubPlatform + ? await IntegrationRepository.findById(sourceIntegrationId, txOptions) + : null if (sourceIntegration?.settings?.orgs) { const allRepos = sourceIntegration.settings.orgs.flatMap((org: any) => org.repos || []) for (const repo of allRepos) { @@ -3319,16 +3178,10 @@ export default class IntegrationService { const payloads: ICreateRepository[] = [] for (const url of urls) { const segmentId = mapping[url] - const id = gitRepoIdMap.get(url) + const id = uuid() const insightsProjectId = insightsProjectMap.get(segmentId) const gitIntegrationId = gitIntegrationMap.get(segmentId) - if (!id) { - // TODO: post migration generate id and remove lookup - this.options.log.warn(`No git.repositories ID found for URL ${url}`) - throw new Error500('Repo not found in git.repositories') - } - payloads.push({ id, url, diff --git a/services/apps/nango_worker/src/activities.ts b/services/apps/nango_worker/src/activities.ts index 55aad296de..73171ee9ef 100644 --- a/services/apps/nango_worker/src/activities.ts +++ b/services/apps/nango_worker/src/activities.ts @@ -4,7 +4,6 @@ import { createGithubConnection, deleteConnection, logInfo, - mapGithubRepo, mapGithubRepoToRepositories, processNangoWebhook, removeGithubConnection, @@ -23,7 +22,6 @@ export { removeGithubConnection, setGithubConnection, startNangoSync, - mapGithubRepo, mapGithubRepoToRepositories, unmapGithubRepo, canCreateGithubConnection, diff --git a/services/apps/nango_worker/src/activities/nangoActivities.ts b/services/apps/nango_worker/src/activities/nangoActivities.ts index 9ef10bc124..c055b2f3e5 100644 --- a/services/apps/nango_worker/src/activities/nangoActivities.ts +++ b/services/apps/nango_worker/src/activities/nangoActivities.ts @@ -1,12 +1,11 @@ import { IS_DEV_ENV, IS_STAGING_ENV, singleOrDefault } from '@crowd/common' +import { generateUUIDv4 as uuid } from '@crowd/common' import { CommonIntegrationService, GithubIntegrationService } from '@crowd/common_services' import { - addGitHubRepoMapping, addGithubNangoConnection, addRepoToGitIntegration, fetchIntegrationById, findIntegrationDataForNangoWebhookProcessing, - removeGitHubRepoMapping, removeGithubNangoConnection, setGithubIntegrationSettingsOrgs, setNangoIntegrationCursor, @@ -450,19 +449,6 @@ export async function deleteConnection( await deleteNangoConnection(providerConfigKey as NangoIntegration, connectionId) } -export async function mapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise { - svc.log.info( - { integrationId }, - `Adding github repo mapping for integration ${integrationId} and repo ${repo.owner}/${repo.repoName}!`, - ) - await addGitHubRepoMapping( - dbStoreQx(svc.postgres.writer), - integrationId, - repo.owner, - repo.repoName, - ) -} - export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise { svc.log.info( { integrationId }, @@ -470,15 +456,6 @@ export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoDa ) const repoUrl = `https://github.com/${repo.owner}/${repo.repoName}` - // remove repo from githubRepos mapping - await removeGitHubRepoMapping( - dbStoreQx(svc.postgres.writer), - svc.redis, - integrationId, - repo.owner, - repo.repoName, - ) - // soft-delete from public.repositories const affected = await softDeleteRepositories( dbStoreQx(svc.postgres.writer), @@ -500,8 +477,7 @@ export async function updateGitIntegrationWithRepo( `Updating git integration with repo ${repo.owner}/${repo.repoName} for integration ${integrationId}!`, ) const repoUrl = `https://github.com/${repo.owner}/${repo.repoName}` - const forkedFrom = await GithubIntegrationService.getForkedFrom(repo.owner, repo.repoName) - await addRepoToGitIntegration(dbStoreQx(svc.postgres.writer), integrationId, repoUrl, forkedFrom) + await addRepoToGitIntegration(dbStoreQx(svc.postgres.writer), integrationId, repoUrl) } function parseGithubUrl(url: string): IGithubRepoData { @@ -572,18 +548,9 @@ export async function mapGithubRepoToRepositories( throw new Error(`Insights project not found for segment ${githubIntegration.segmentId}!`) } - // TODO: Post migration, generate UUID instead of fetching from git.repositories - const gitRepo = await qx.selectOneOrNone( - `SELECT id FROM git.repositories WHERE url = $(url) AND "deletedAt" IS NULL`, - { url: repoUrl }, - ) - if (!gitRepo) { - throw new Error(`Repository ${repoUrl} not found in git.repositories!`) - } - try { const result = await upsertRepository(qx, { - id: gitRepo.id, + id: uuid(), url: repoUrl, segmentId: githubIntegration.segmentId, gitIntegrationId: gitIntegration.id, diff --git a/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts b/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts index 36a6e8de8c..33164bc9c9 100644 --- a/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts +++ b/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts @@ -21,7 +21,7 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument // delete connection from integrations.settings.nangoMapping object await activity.removeGithubConnection(integrationId, repo.connectionId) - // delete githubRepos mapping + soft-delete from public.repositories + // delete from public.repositories await activity.unmapGithubRepo(integrationId, repo.repo) } @@ -53,9 +53,6 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument // add connection to integrations.settings.nangoMapping object await activity.setGithubConnection(integrationId, repo, connectionId) - // add repo to githubRepos mapping if it's not already mapped - await activity.mapGithubRepo(integrationId, repo) - // add repo to git integration await activity.updateGitIntegrationWithRepo(integrationId, repo) diff --git a/services/cronjobs/archived_repositories/src/database.ts b/services/cronjobs/archived_repositories/src/database.ts index 2c6c6d0815..4c8e62af79 100644 --- a/services/cronjobs/archived_repositories/src/database.ts +++ b/services/cronjobs/archived_repositories/src/database.ts @@ -33,16 +33,17 @@ export async function fetchRepositoryUrls(config: Config): Promise { try { const result = await client.query( - `SELECT repository FROM "segmentRepositories" + `SELECT url FROM public.repositories WHERE - (starts_with(repository, 'https://github.com/') OR starts_with(repository, 'https://gitlab.com/')) AND - (last_archived_check IS NULL OR last_archived_check < NOW() - INTERVAL '3 days') - ORDER BY last_archived_check + "deletedAt" IS NULL AND + (starts_with(url, 'https://github.com/') OR starts_with(url, 'https://gitlab.com/')) AND + ("lastArchivedCheckAt" IS NULL OR "lastArchivedCheckAt" < NOW() - INTERVAL '3 days') + ORDER BY "lastArchivedCheckAt" NULLS FIRST LIMIT $1`, [config.BatchSize], ) - return result.rows.map((row) => row.repository) + return result.rows.map((row) => row.url) } catch (error) { console.error('Error fetching repository URLs:', error) throw error @@ -58,21 +59,12 @@ export async function updateRepositoryStatus( const client = getPool(config) try { - await Promise.all([ - // TODO: stop writing to segmentRepositories post migration - client.query( - `UPDATE "segmentRepositories" - SET archived = $1, excluded = $2, last_archived_check = NOW(), updated_at = NOW() - WHERE repository = $3`, - [isArchived, isExcluded, repository], - ), - client.query( - `UPDATE "repositories" - SET "archived" = $1, "excluded" = $2, "lastArchivedCheckAt" = NOW(), "updatedAt" = NOW() - WHERE "url" = $3`, - [isArchived, isExcluded, repository], - ), - ]) + await client.query( + `UPDATE public.repositories + SET "archived" = $1, "excluded" = $2, "lastArchivedCheckAt" = NOW(), "updatedAt" = NOW() + WHERE "url" = $3 AND "deletedAt" IS NULL`, + [isArchived, isExcluded, repository], + ) } catch (error) { console.error(`Error updating repository status for ${repository}:`, error) throw error diff --git a/services/libs/common_services/src/services/integration.service.ts b/services/libs/common_services/src/services/integration.service.ts index 474db65b02..40c22df72d 100644 --- a/services/libs/common_services/src/services/integration.service.ts +++ b/services/libs/common_services/src/services/integration.service.ts @@ -1,17 +1,13 @@ import { decryptData } from '@crowd/common' import { InsightsProjectField, - deleteMissingSegmentRepositories, queryInsightsProjects, updateInsightsProject, - upsertSegmentRepositories, } from '@crowd/data-access-layer/src/collections' import { fetchIntegrationById, findNangoRepositoriesToBeRemoved, findRepositoriesForSegment, - removePlainGitHubRepoMapping, - removePlainGitlabRepoMapping, } from '@crowd/data-access-layer/src/integrations' import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' import { getRepoUrlsMappedToOtherSegments } from '@crowd/data-access-layer/src/segments' @@ -76,7 +72,7 @@ export class CommonIntegrationService { } /** - * Syncs GitHub repositories to segmentRepositories table and updates insightsProject.repositories + * Syncs GitHub repositories to insightsProject.repositories field * @param qx - Query executor for database operations * @param redis - Redis client for cache invalidation * @param integrationId - The integration ID to sync repositories for @@ -132,31 +128,13 @@ export class CommonIntegrationService { // Find repos already mapped to other segments (conflicts) const alreadyMappedRepos = await getRepoUrlsMappedToOtherSegments(qx, currentUrls, segmentId) - // Unmap repositories that should be removed - for (const repo of reposToBeRemoved) { - await removePlainGitHubRepoMapping(qx, redis, integrationId, repo) - await removePlainGitlabRepoMapping(qx, redis, integrationId, repo) - } - // Filter valid repositories (dedupe, remove deleted, remove already mapped to other segments) const repositories = [...new Set(currentUrls)].filter( (url) => !reposToBeRemoved.includes(url) && !alreadyMappedRepos.includes(url), ) - // Upsert repositories to segmentRepositories table - await upsertSegmentRepositories(qx, { - insightsProjectId, - repositories, - segmentId, - }) - - // Delete missing repositories from segmentRepositories table - await deleteMissingSegmentRepositories(qx, { - repositories, - segmentId, - }) - // Update insightsProject.repositories field (this also sets updatedAt automatically) + // Note: Writes to public.repositories happen earlier via mapGithubRepoToRepositories() await updateInsightsProject(qx, insightsProjectId, { repositories, }) diff --git a/services/libs/data-access-layer/src/collections/index.ts b/services/libs/data-access-layer/src/collections/index.ts index 0dfbbb9be3..34fb8cc45b 100644 --- a/services/libs/data-access-layer/src/collections/index.ts +++ b/services/libs/data-access-layer/src/collections/index.ts @@ -329,73 +329,3 @@ export async function findBySlug(qx: QueryExecutor, slug: string) { }) return collections } - -export async function upsertSegmentRepositories( - qx: QueryExecutor, - { - insightsProjectId, - repositories, - segmentId, - }: { - insightsProjectId: string - repositories: string[] - segmentId: string - }, -) { - if (repositories.length === 0) { - return - } - - return qx.result( - ` - WITH "input" AS ( - SELECT DISTINCT unnest(ARRAY[$(repositories:csv)]::text[]) AS "repository" - ) - INSERT INTO "segmentRepositories" ("repository", "segmentId", "insightsProjectId") - SELECT "repository", $(segmentId), $(insightsProjectId) - FROM "input" - ON CONFLICT ("repository") - DO UPDATE SET - "segmentId" = EXCLUDED."segmentId", - "insightsProjectId" = EXCLUDED."insightsProjectId"; - `, - { insightsProjectId, repositories, segmentId }, - ) -} - -export async function deleteSegmentRepositories( - qx: QueryExecutor, - { - segmentId, - }: { - segmentId: string - }, -) { - return qx.result( - ` - DELETE FROM "segmentRepositories" - WHERE "segmentId" = '${segmentId}' - `, - { segmentId }, - ) -} - -export async function deleteMissingSegmentRepositories( - qx: QueryExecutor, - { - segmentId, - repositories, - }: { - segmentId: string - repositories: string[] - }, -) { - return qx.result( - ` - DELETE FROM "segmentRepositories" - WHERE "segmentId" = '${segmentId}' - AND ${repositories.length > 0 ? `"repository" != ALL(ARRAY[${repositories.map((repo) => `'${repo}'`).join(', ')}])` : 'TRUE'}; - `, - { segmentId, repositories }, - ) -} diff --git a/services/libs/data-access-layer/src/integrations/index.ts b/services/libs/data-access-layer/src/integrations/index.ts index 069362864a..e4253d70cd 100644 --- a/services/libs/data-access-layer/src/integrations/index.ts +++ b/services/libs/data-access-layer/src/integrations/index.ts @@ -1,6 +1,4 @@ -import { DEFAULT_TENANT_ID, generateUUIDv4 } from '@crowd/common' import { getServiceChildLogger } from '@crowd/logging' -import { RedisCache, RedisClient } from '@crowd/redis' import { IIntegration, PlatformType } from '@crowd/types' import { QueryExecutor } from '../queryExecutor' @@ -487,184 +485,10 @@ export async function addGithubNangoConnection( ) } -export async function removeGitHubRepoMapping( - qx: QueryExecutor, - redisClient: RedisClient, - integrationId: string, - owner: string, - repoName: string, -): Promise { - await qx.result( - ` - update "githubRepos" - set "deletedAt" = now() - where "integrationId" = $(integrationId) - and lower(url) = lower($(repo)) - `, - { - integrationId, - repo: `https://github.com/${owner}/${repoName}`, - }, - ) - - const cache = new RedisCache('githubRepos', redisClient, log) - await cache.deleteAll() -} - -export async function addGitHubRepoMapping( - qx: QueryExecutor, - integrationId: string, - owner: string, - repoName: string, -): Promise { - await qx.result( - ` - insert into "githubRepos"("tenantId", "integrationId", "segmentId", url) - values( - $(tenantId), - $(integrationId), - (select "segmentId" from integrations where id = $(integrationId) limit 1), - $(url) - ) - on conflict ("tenantId", url) do update - set - "deletedAt" = null, - "segmentId" = (select "segmentId" from integrations where id = $(integrationId) limit 1), - "integrationId" = $(integrationId), - "updatedAt" = now() - -- in case there is a row already only update it if it's deleted so deletedAt is not null - -- otherwise leave it as is - where "githubRepos"."deletedAt" is not null - `, - { - tenantId: DEFAULT_TENANT_ID, - integrationId, - url: `https://github.com/${owner}/${repoName}`, - }, - ) -} - -/** - * Syncs repositories to git.repositories table (git-integration V2) - * - * Finds existing repository IDs from githubRepos or gitlabRepos tables, - * or generates new UUIDs, then upserts to git.repositories table. - * - * @param qx - Query executor - * @param remotes - Array of repository objects with url and optional forkedFrom - * @param gitIntegrationId - The git integration ID - * @param segmentId - The segment ID for the repositories - */ -export async function syncRepositoriesToGitV2( - qx: QueryExecutor, - remotes: Array<{ url: string; forkedFrom?: string | null }>, - gitIntegrationId: string, - segmentId: string, -): Promise { - if (!remotes || remotes.length === 0) { - log.warn('No remotes provided to syncRepositoriesToGitV2') - return - } - - const urls = remotes.map((r) => r.url) - - // Check GitHub repos, GitLab repos, AND git.repositories for existing IDs - // Include soft-deleted repos to reuse their IDs on reconnect - const existingRepos: Array<{ - id: string - url: string - }> = await qx.select( - ` - WITH github_repos AS ( - SELECT id, url FROM "githubRepos" - WHERE url IN ($(urls:csv)) - ), - gitlab_repos AS ( - SELECT id, url FROM "gitlabRepos" - WHERE url IN ($(urls:csv)) - ), - git_repos AS ( - SELECT id, url FROM git.repositories - WHERE url IN ($(urls:csv)) - ) - SELECT DISTINCT ON (url) id, url FROM ( - SELECT id, url FROM github_repos - UNION ALL - SELECT id, url FROM gitlab_repos - UNION ALL - SELECT id, url FROM git_repos - ) combined - `, - { urls }, - ) - - // Create a map of existing url to id - const existingUrlToId = new Map(existingRepos.map((r) => [r.url, r.id])) - - // Build repositoriesToSync, using existing IDs where available - const repositoriesToSync: Array<{ - id: string - url: string - integrationId: string - segmentId: string - forkedFrom?: string | null - }> = remotes.map((remote) => { - const existingId = existingUrlToId.get(remote.url) - return { - id: existingId || generateUUIDv4(), - url: remote.url, - integrationId: gitIntegrationId, - segmentId, - forkedFrom: remote.forkedFrom || null, - } - }) - - if (existingRepos.length === 0) { - log.warn( - 'No existing repos found in githubRepos, gitlabRepos, or git.repositories - inserting new with generated UUIDs', - ) - } - - // Build SQL placeholders and parameters - const placeholders: string[] = [] - const params: Record = {} - - repositoriesToSync.forEach((repo, idx) => { - placeholders.push( - `($(id_${idx}), $(url_${idx}), $(integrationId_${idx}), $(segmentId_${idx}), $(forkedFrom_${idx}))`, - ) - params[`id_${idx}`] = repo.id - params[`url_${idx}`] = repo.url - params[`integrationId_${idx}`] = repo.integrationId - params[`segmentId_${idx}`] = repo.segmentId - params[`forkedFrom_${idx}`] = repo.forkedFrom || null - }) - - const placeholdersString = placeholders.join(', ') - - // Upsert to git.repositories - await qx.result( - ` - INSERT INTO git.repositories (id, url, "integrationId", "segmentId", "forkedFrom") - VALUES ${placeholdersString} - ON CONFLICT (id) DO UPDATE SET - "integrationId" = EXCLUDED."integrationId", - "segmentId" = EXCLUDED."segmentId", - "forkedFrom" = COALESCE(EXCLUDED."forkedFrom", git.repositories."forkedFrom"), - "updatedAt" = NOW(), - "deletedAt" = NULL - `, - params, - ) - - log.info(`Synced ${repositoriesToSync.length} repos to git.repositories`) -} - export async function addRepoToGitIntegration( qx: QueryExecutor, integrationId: string, repoUrl: string, - forkedFrom: string | null, ): Promise { // Get the github integration to find its segmentId const githubIntegration = await qx.selectOneOrNone( @@ -722,60 +546,6 @@ export async function addRepoToGitIntegration( ) log.info({ integrationId: gitIntegration.id, repoUrl }, 'Added repo to git integration settings!') - - // Also sync to git.repositories table (git-integration V2) - await syncRepositoriesToGitV2( - qx, - [{ url: repoUrl, forkedFrom }], - gitIntegration.id, - githubIntegration.segmentId, - ) -} - -export async function removePlainGitHubRepoMapping( - qx: QueryExecutor, - redisClient: RedisClient, - integrationId: string, - repo: string, -): Promise { - await qx.result( - ` - update "githubRepos" - set "deletedAt" = now() - where "integrationId" = $(integrationId) - and lower(url) = lower($(repo)) - `, - { - integrationId, - repo, - }, - ) - - const cache = new RedisCache('githubRepos', redisClient, log) - await cache.deleteAll() -} - -export async function removePlainGitlabRepoMapping( - qx: QueryExecutor, - redisClient: RedisClient, - integrationId: string, - repo: string, -): Promise { - await qx.result( - ` - update "gitlabRepos" - set "deletedAt" = now() - where "integrationId" = $(integrationId) - and lower(url) = lower($(repo)) - `, - { - integrationId, - repo, - }, - ) - - const cache = new RedisCache('gitlabRepos', redisClient, log) - await cache.deleteAll() } export function extractGithubRepoSlug(url: string): string { diff --git a/services/libs/data-access-layer/src/repositories/index.ts b/services/libs/data-access-layer/src/repositories/index.ts index 64e6d64eba..afececd502 100644 --- a/services/libs/data-access-layer/src/repositories/index.ts +++ b/services/libs/data-access-layer/src/repositories/index.ts @@ -156,32 +156,6 @@ export async function getRepositoriesBySourceIntegrationId( ) } -/** - * Get git repository IDs by URLs from git.repositories table - * @param qx - Query executor - * @param urls - Array of repository URLs - * @returns Map of URL to repository ID - */ -export async function getGitRepositoryIdsByUrl( - qx: QueryExecutor, - urls: string[], -): Promise> { - if (urls.length === 0) { - return new Map() - } - - const results = await qx.select( - ` - SELECT id, url - FROM git.repositories - WHERE url IN ($(urls:csv)) - `, - { urls }, - ) - - return new Map(results.map((row: { id: string; url: string }) => [row.url, row.id])) -} - /** * Get repositories by their URLs * @param qx - Query executor