diff --git a/apps/api/src/integration-platform/controllers/checks.controller.ts b/apps/api/src/integration-platform/controllers/checks.controller.ts index da27103c06..66bcfcdfd5 100644 --- a/apps/api/src/integration-platform/controllers/checks.controller.ts +++ b/apps/api/src/integration-platform/controllers/checks.controller.ts @@ -10,6 +10,7 @@ import { UseGuards, } from '@nestjs/common'; import { ApiTags, ApiSecurity } from '@nestjs/swagger'; +import type { Prisma } from '@prisma/client'; import { HybridAuthGuard } from '../../auth/hybrid-auth.guard'; import { PermissionGuard } from '../../auth/permission.guard'; import { RequirePermission } from '../../auth/require-permission.decorator'; @@ -271,7 +272,18 @@ export class ChecksController { await this.checkRunRepository.addResults(resultsToStore); } - // Update the check run status + // Collect execution logs from all check results + const allLogs = result.results.flatMap((checkResult) => + checkResult.result.logs.map((log) => ({ + check: checkResult.checkName, + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })), + ); + + // Update the check run status with logs const startTime = checkRun.startedAt?.getTime() || Date.now(); await this.checkRunRepository.complete(checkRun.id, { status: result.totalFindings > 0 ? 'failed' : 'success', @@ -279,6 +291,9 @@ export class ChecksController { totalChecked: result.results.length, passedCount: result.totalPassing, failedCount: result.totalFindings, + logs: allLogs.length > 0 + ? (allLogs as unknown as Prisma.InputJsonValue) + : undefined, }); return { @@ -288,20 +303,29 @@ export class ChecksController { ...result, }; } catch (error) { - // Mark the check run as failed + // Mark the check run as failed with error details const startTime = checkRun.startedAt?.getTime() || Date.now(); + const errorMessage = error instanceof Error ? error.message : String(error); + const errorStack = error instanceof Error ? error.stack : undefined; await this.checkRunRepository.complete(checkRun.id, { status: 'failed', durationMs: Date.now() - startTime, totalChecked: 0, passedCount: 0, failedCount: 0, - errorMessage: error instanceof Error ? error.message : String(error), + errorMessage, + logs: [{ + check: body.checkId || 'all', + level: 'error', + message: errorMessage, + ...(errorStack ? { data: { stack: errorStack } } : {}), + timestamp: new Date().toISOString(), + }] as unknown as Prisma.InputJsonValue, }); this.logger.error(`Check execution failed: ${error}`); throw new HttpException( - `Check execution failed: ${error instanceof Error ? error.message : String(error)}`, + `Check execution failed: ${errorMessage}`, HttpStatus.INTERNAL_SERVER_ERROR, ); } diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index a7e1427bc2..6869bc6800 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -13,12 +13,17 @@ import { UseGuards, } from '@nestjs/common'; import type { Prisma } from '@prisma/client'; +import { db } from '@db'; import { InternalTokenGuard } from '../../auth/internal-token.guard'; import { DynamicIntegrationRepository } from '../repositories/dynamic-integration.repository'; import { DynamicCheckRepository } from '../repositories/dynamic-check.repository'; import { ProviderRepository } from '../repositories/provider.repository'; +import { CheckRunRepository } from '../repositories/check-run.repository'; import { DynamicManifestLoaderService } from '../services/dynamic-manifest-loader.service'; -import { validateIntegrationDefinition } from '@trycompai/integration-platform'; +import { + validateIntegrationDefinition, + SyncDefinitionSchema, +} from '@trycompai/integration-platform'; @Controller({ path: 'internal/dynamic-integrations', version: '1' }) @UseGuards(InternalTokenGuard) @@ -29,6 +34,7 @@ export class DynamicIntegrationsController { private readonly dynamicIntegrationRepo: DynamicIntegrationRepository, private readonly dynamicCheckRepo: DynamicCheckRepository, private readonly providerRepo: ProviderRepository, + private readonly checkRunRepo: CheckRunRepository, private readonly loaderService: DynamicManifestLoaderService, ) {} @@ -48,6 +54,12 @@ export class DynamicIntegrationsController { const def = validation.data!; + // Validate and store syncDefinition through Zod to apply defaults (e.g., employeesPath) + const rawSyncDef = (body as Record).syncDefinition; + const validatedSyncDef = rawSyncDef + ? SyncDefinitionSchema.parse(rawSyncDef) + : undefined; + // Upsert the integration const integration = await this.dynamicIntegrationRepo.upsertBySlug({ slug: def.slug, @@ -61,6 +73,9 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, + syncDefinition: validatedSyncDef + ? (JSON.parse(JSON.stringify(validatedSyncDef)) as Prisma.InputJsonValue) + : null, }); // Delete checks not in the new definition, then upsert the rest @@ -132,6 +147,10 @@ export class DynamicIntegrationsController { ); } + const rawSyncDefCreate = (body as Record).syncDefinition; + const validatedSyncDefCreate = rawSyncDefCreate + ? SyncDefinitionSchema.parse(rawSyncDefCreate) + : undefined; const integration = await this.dynamicIntegrationRepo.create({ slug: def.slug, name: def.name, @@ -144,6 +163,9 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, + syncDefinition: validatedSyncDefCreate + ? (JSON.parse(JSON.stringify(validatedSyncDefCreate)) as Prisma.InputJsonValue) + : undefined, }); for (const [index, check] of def.checks.entries()) { @@ -365,4 +387,139 @@ export class DynamicIntegrationsController { this.logger.log(`Deactivated dynamic integration: ${integration.slug}`); return { success: true }; } + + // ==================== Agent Debugging Endpoints ==================== + + /** + * Validate a definition without saving. + * Agents use this to check syntax/structure before committing. + */ + @Post('validate') + async validate(@Body() body: Record) { + const result = validateIntegrationDefinition(body); + + if (!result.success) { + return { + valid: false, + errors: result.errors, + }; + } + + // validateIntegrationDefinition validates everything via Zod: + // the manifest fields, all check definitions, and syncDefinition. + // If we got here, the entire definition is valid. + const definition = result.data!; + + return { + valid: true, + summary: { + slug: definition.slug, + name: definition.name, + category: definition.category, + capabilities: definition.capabilities, + checksCount: definition.checks.length, + checkSlugs: definition.checks.map((c) => c.checkSlug), + hasSyncDefinition: !!(body as Record).syncDefinition, + }, + }; + } + + /** + * Get recent check run history for a dynamic integration. + * Agents use this to debug failing checks — includes full logs and results. + */ + @Get(':id/check-runs') + async getCheckRuns(@Param('id') id: string) { + const integration = await this.dynamicIntegrationRepo.findById(id); + if (!integration) { + throw new HttpException('Dynamic integration not found', HttpStatus.NOT_FOUND); + } + + // Find all connections for this provider + const connections = await db.integrationConnection.findMany({ + where: { + provider: { slug: integration.slug }, + status: 'active', + }, + select: { id: true, organizationId: true }, + }); + + if (connections.length === 0) { + return { runs: [], total: 0 }; + } + + // Get recent runs across all connections + const runs = await db.integrationCheckRun.findMany({ + where: { + connectionId: { in: connections.map((c) => c.id) }, + }, + include: { + results: { + select: { + id: true, + passed: true, + title: true, + resourceType: true, + resourceId: true, + severity: true, + remediation: true, + }, + }, + }, + orderBy: { createdAt: 'desc' }, + take: 20, + }); + + return { + runs: runs.map((run) => ({ + id: run.id, + checkId: run.checkId, + checkName: run.checkName, + connectionId: run.connectionId, + status: run.status, + startedAt: run.startedAt, + completedAt: run.completedAt, + durationMs: run.durationMs, + totalChecked: run.totalChecked, + passedCount: run.passedCount, + failedCount: run.failedCount, + errorMessage: run.errorMessage, + logs: run.logs, + results: run.results, + })), + total: runs.length, + }; + } + + /** + * Get a single check run with full details (logs, results, error info). + * Agents use this to debug a specific failed run. + */ + @Get('check-runs/:runId') + async getCheckRunById(@Param('runId') runId: string) { + const run = await this.checkRunRepo.findById(runId); + if (!run) { + throw new HttpException('Check run not found', HttpStatus.NOT_FOUND); + } + + return { + id: run.id, + checkId: run.checkId, + checkName: run.checkName, + connectionId: run.connectionId, + status: run.status, + startedAt: run.startedAt, + completedAt: run.completedAt, + durationMs: run.durationMs, + totalChecked: run.totalChecked, + passedCount: run.passedCount, + failedCount: run.failedCount, + errorMessage: run.errorMessage, + logs: run.logs, + results: run.results, + provider: run.connection?.provider + ? { slug: run.connection.provider.slug, name: run.connection.provider.name } + : null, + }; + } } diff --git a/apps/api/src/integration-platform/controllers/sync.controller.ts b/apps/api/src/integration-platform/controllers/sync.controller.ts index d3af5fd10d..6907fdb440 100644 --- a/apps/api/src/integration-platform/controllers/sync.controller.ts +++ b/apps/api/src/integration-platform/controllers/sync.controller.ts @@ -2,6 +2,7 @@ import { Controller, Post, Get, + Param, Query, Body, HttpException, @@ -19,22 +20,30 @@ import { } from '../../auth/auth-context.decorator'; import type { AuthContext as AuthContextType } from '../../auth/types'; import { db } from '@db'; +import type { Prisma } from '@prisma/client'; import { ConnectionRepository } from '../repositories/connection.repository'; import { CredentialVaultService } from '../services/credential-vault.service'; import { OAuthCredentialsService } from '../services/oauth-credentials.service'; import { getManifest, + registry, matchesSyncFilterTerms, parseSyncFilterTerms, + interpretDeclarativeSync, type OAuthConfig, type RampUser, type RampUserStatus, type RampUsersResponse, type RoleMappingEntry, + type SyncDefinition, } from '@trycompai/integration-platform'; import { RampRoleMappingService } from '../services/ramp-role-mapping.service'; import { IntegrationSyncLoggerService } from '../services/integration-sync-logger.service'; import { RampApiService } from '../services/ramp-api.service'; +import { GenericEmployeeSyncService } from '../services/generic-employee-sync.service'; +import { DynamicIntegrationRepository } from '../repositories/dynamic-integration.repository'; +import { CheckRunRepository } from '../repositories/check-run.repository'; +import { createCheckContext } from '@trycompai/integration-platform'; import { filterUsersByOrgUnits } from './sync-ou-filter'; interface GoogleWorkspaceUser { @@ -74,6 +83,9 @@ export class SyncController { private readonly rampRoleMappingService: RampRoleMappingService, private readonly syncLoggerService: IntegrationSyncLoggerService, private readonly rampApiService: RampApiService, + private readonly genericSyncService: GenericEmployeeSyncService, + private readonly dynamicIntegrationRepo: DynamicIntegrationRepository, + private readonly checkRunRepo: CheckRunRepository, ) {} /** @@ -2067,12 +2079,10 @@ export class SyncController { // Validate provider if set if (provider) { - const validProviders = [ - 'google-workspace', - 'rippling', - 'jumpcloud', - 'ramp', - ]; + const allManifests = registry.getActiveManifests(); + const validProviders = allManifests + .filter((m) => m.capabilities?.includes('sync')) + .map((m) => m.id); if (!validProviders.includes(provider)) { throw new HttpException( `Invalid provider. Must be one of: ${validProviders.join(', ')}`, @@ -2108,4 +2118,280 @@ export class SyncController { provider, }; } + + // ============================================================================ + // Dynamic sync endpoints (for dynamic integrations with syncDefinition) + // ============================================================================ + + /** + * Get all providers that support employee sync (both code-based and dynamic). + * Used by the frontend to render the provider selector dynamically. + */ + @Get('available-providers') + @RequirePermission('integration', 'read') + async getAvailableSyncProviders( + @OrganizationId() organizationId: string, + ) { + const allManifests = registry.getActiveManifests(); + const syncProviders = allManifests.filter((m) => + m.capabilities?.includes('sync'), + ); + + const results = await Promise.all( + syncProviders.map(async (m) => { + const connection = await db.integrationConnection.findFirst({ + where: { + organizationId, + status: 'active', + provider: { slug: m.id }, + }, + select: { + id: true, + status: true, + lastSyncAt: true, + nextSyncAt: true, + }, + }); + return { + slug: m.id, + name: m.name, + logoUrl: m.logoUrl, + connected: !!connection, + connectionId: connection?.id ?? null, + lastSyncAt: connection?.lastSyncAt?.toISOString() ?? null, + nextSyncAt: connection?.nextSyncAt?.toISOString() ?? null, + }; + }), + ); + + return { providers: results }; + } + + /** + * Generic sync endpoint for dynamic integrations. + * Runs the syncDefinition (DSL/code steps) and processes the resulting employees. + * + * NOTE: This route uses :providerSlug param. NestJS matches routes in order, + * so the hardcoded routes above (google-workspace/employees, etc.) take priority. + * This only matches for slugs that don't match the 4 built-in providers. + */ + @Post('dynamic/:providerSlug/employees') + @RequirePermission('integration', 'update') + async syncDynamicProviderEmployees( + @OrganizationId() organizationId: string, + @Param('providerSlug') providerSlug: string, + @Query('connectionId') connectionId: string, + ) { + if (!connectionId) { + throw new HttpException( + 'connectionId is required', + HttpStatus.BAD_REQUEST, + ); + } + + this.logger.log( + `[DynamicSync] Starting sync for provider="${providerSlug}" connection="${connectionId}" org="${organizationId}"`, + ); + + // 1. Validate connection + const connection = await this.connectionRepository.findById(connectionId); + if (!connection || connection.organizationId !== organizationId) { + throw new HttpException('Connection not found', HttpStatus.NOT_FOUND); + } + + // 2. Get manifest from registry — must have 'sync' capability + const manifest = getManifest(providerSlug); + if (!manifest) { + throw new HttpException( + `Integration "${providerSlug}" not found`, + HttpStatus.NOT_FOUND, + ); + } + if (!manifest.capabilities?.includes('sync')) { + throw new HttpException( + `Integration "${providerSlug}" does not support sync`, + HttpStatus.BAD_REQUEST, + ); + } + + // 3. Get dynamic integration — must have syncDefinition + const dynamicIntegration = + await this.dynamicIntegrationRepo.findBySlug(providerSlug); + if (!dynamicIntegration?.syncDefinition) { + throw new HttpException( + `Integration "${providerSlug}" has no sync definition`, + HttpStatus.BAD_REQUEST, + ); + } + + // 4. Get & refresh credentials + let credentials = + await this.credentialVaultService.getDecryptedCredentials(connectionId); + if (!credentials) { + throw new HttpException( + 'No valid credentials found. Please reconnect the integration.', + HttpStatus.UNAUTHORIZED, + ); + } + + // Try to refresh OAuth token if applicable + if (manifest.auth.type === 'oauth2' && credentials.refresh_token) { + const oauthConfig = manifest.auth.config as OAuthConfig; + try { + const oauthCredentials = + await this.oauthCredentialsService.getCredentials( + providerSlug, + organizationId, + ); + if (oauthCredentials) { + const newToken = + await this.credentialVaultService.refreshOAuthTokens( + connectionId, + { + tokenUrl: oauthConfig.tokenUrl, + refreshUrl: oauthConfig.refreshUrl, + clientId: oauthCredentials.clientId, + clientSecret: oauthCredentials.clientSecret, + clientAuthMethod: oauthConfig.clientAuthMethod, + }, + ); + if (newToken) { + credentials = + await this.credentialVaultService.getDecryptedCredentials( + connectionId, + ); + } + } + } catch (refreshError) { + this.logger.warn( + `Token refresh failed for ${providerSlug}, trying with existing token: ${refreshError}`, + ); + } + } + + const accessToken = credentials?.access_token; + this.logger.log( + `[DynamicSync] Credentials ready for "${providerSlug}" (auth=${manifest.auth.type}, hasToken=${!!accessToken})`, + ); + + // 5. Create a sync run record (same table as check runs — agent reads both the same way) + const syncRun = await this.checkRunRepo.create({ + connectionId, + checkId: `sync:${providerSlug}`, + checkName: `Employee Sync: ${manifest.name}`, + }); + + // 6. Create CheckContext with logging that captures to the run + const { ctx, getResults } = createCheckContext({ + manifest, + accessToken: typeof accessToken === 'string' ? accessToken : undefined, + credentials: (credentials ?? {}) as Record, + variables: + ((connection.variables as Record) ?? {}) as Record, + connectionId, + organizationId, + metadata: + (connection.metadata as Record) ?? {}, + logger: { + info: (msg, data) => this.logger.log(msg, data), + warn: (msg, data) => this.logger.warn(msg, data), + error: (msg, data) => this.logger.error(msg, data), + }, + }); + + try { + // 7. Run sync definition → get SyncEmployee[] + const syncDefinition = + dynamicIntegration.syncDefinition as unknown as SyncDefinition; + const syncRunner = interpretDeclarativeSync({ + definition: syncDefinition, + }); + + const employees = await syncRunner.run(ctx); + + this.logger.log( + `[DynamicSync] Sync definition produced ${employees.length} employees for "${providerSlug}"`, + ); + + // 8. Process employees via generic service + const result = await this.genericSyncService.processEmployees({ + organizationId, + employees, + options: { + providerName: manifest.name, + }, + }); + + // 9. Persist execution logs + results to the run record + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: result.errors > 0 ? 'failed' : 'success', + durationMs: Date.now() - startTime, + totalChecked: result.totalFound, + passedCount: result.imported + result.reactivated, + failedCount: result.errors, + logs: executionLogs.length > 0 + ? (executionLogs as unknown as Prisma.InputJsonValue) + : undefined, + }); + + this.logger.log( + `[DynamicSync] Sync complete for "${providerSlug}": imported=${result.imported} skipped=${result.skipped} deactivated=${result.deactivated} reactivated=${result.reactivated} errors=${result.errors}`, + ); + + return { + ...result, + syncRunId: syncRun.id, + }; + } catch (error) { + // Persist error + whatever logs were captured before the failure + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const errorMessage = error instanceof Error ? error.message : String(error); + const errorStack = error instanceof Error ? error.stack : undefined; + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: 'failed', + durationMs: Date.now() - startTime, + totalChecked: 0, + passedCount: 0, + failedCount: 0, + errorMessage, + logs: [ + ...executionLogs, + { + level: 'error', + message: `Sync execution failed: ${errorMessage}`, + ...(errorStack ? { data: { stack: errorStack } } : {}), + timestamp: new Date().toISOString(), + }, + ] as unknown as Prisma.InputJsonValue, + }); + + this.logger.error( + `[DynamicSync] Sync failed for "${providerSlug}": ${errorMessage}`, + ); + + throw new HttpException( + { + message: `Sync execution failed: ${errorMessage}`, + syncRunId: syncRun.id, + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } } diff --git a/apps/api/src/integration-platform/integration-platform.module.ts b/apps/api/src/integration-platform/integration-platform.module.ts index 1632dbfdd6..6277efbc5d 100644 --- a/apps/api/src/integration-platform/integration-platform.module.ts +++ b/apps/api/src/integration-platform/integration-platform.module.ts @@ -30,6 +30,7 @@ import { DynamicCheckRepository } from './repositories/dynamic-check.repository' import { RampRoleMappingService } from './services/ramp-role-mapping.service'; import { IntegrationSyncLoggerService } from './services/integration-sync-logger.service'; import { RampApiService } from './services/ramp-api.service'; +import { GenericEmployeeSyncService } from './services/generic-employee-sync.service'; @Module({ imports: [AuthModule], @@ -58,6 +59,7 @@ import { RampApiService } from './services/ramp-api.service'; RampRoleMappingService, IntegrationSyncLoggerService, RampApiService, + GenericEmployeeSyncService, // Repositories ProviderRepository, ConnectionRepository, diff --git a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts index 1867afe10f..9e23e222c8 100644 --- a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts +++ b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts @@ -1,6 +1,7 @@ import { Injectable } from '@nestjs/common'; import { db } from '@db'; -import type { DynamicIntegration, DynamicCheck, Prisma } from '@prisma/client'; +import { Prisma } from '@prisma/client'; +import type { DynamicIntegration, DynamicCheck } from '@prisma/client'; export type DynamicIntegrationWithChecks = DynamicIntegration & { checks: DynamicCheck[]; @@ -54,6 +55,7 @@ export class DynamicIntegrationRepository { authConfig: Prisma.InputJsonValue; capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; + syncDefinition?: Prisma.InputJsonValue; }): Promise { return db.dynamicIntegration.create({ data: { @@ -68,6 +70,7 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition ?? undefined, }, }); } @@ -98,6 +101,7 @@ export class DynamicIntegrationRepository { authConfig: Prisma.InputJsonValue; capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; + syncDefinition?: Prisma.InputJsonValue | null; }): Promise { return db.dynamicIntegration.upsert({ where: { slug: data.slug }, @@ -113,6 +117,7 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition ?? undefined, }, update: { name: data.name, @@ -125,6 +130,9 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition === null + ? Prisma.DbNull + : (data.syncDefinition ?? undefined), }, }); } diff --git a/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts b/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts index b6f858655d..50d3f0e22f 100644 --- a/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts +++ b/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts @@ -82,6 +82,11 @@ export class DynamicManifestLoaderService implements OnModuleInit { this.convertCheck(check, integration.slug), ); + // Collect manifest-level variables from syncDefinition (if present) + // These appear in the customer configuration UI (ManageIntegrationDialog) + const syncDef = integration.syncDefinition as Record | null; + const syncVariables = syncDef?.variables as CheckVariable[] | undefined; + return { id: integration.slug, name: integration.name, @@ -94,6 +99,7 @@ export class DynamicManifestLoaderService implements OnModuleInit { defaultHeaders: (integration.defaultHeaders as Record) ?? undefined, capabilities: (integration.capabilities as unknown as IntegrationCapability[]) ?? ['checks'], supportsMultipleConnections: integration.supportsMultipleConnections, + variables: syncVariables && syncVariables.length > 0 ? syncVariables : undefined, checks, isActive: integration.isActive, }; diff --git a/apps/api/src/integration-platform/services/generic-employee-sync.service.ts b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts new file mode 100644 index 0000000000..b66cbd6e8b --- /dev/null +++ b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts @@ -0,0 +1,276 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { db } from '@trycompai/db'; +import type { SyncEmployee } from '@trycompai/integration-platform'; + +// ============================================================================ +// Types +// ============================================================================ + +export interface SyncResultDetail { + email: string; + status: + | 'imported' + | 'skipped' + | 'deactivated' + | 'reactivated' + | 'error'; + reason?: string; +} + +export interface SyncResult { + success: boolean; + totalFound: number; + imported: number; + skipped: number; + deactivated: number; + reactivated: number; + errors: number; + details: SyncResultDetail[]; +} + +export interface ProcessEmployeesOptions { + /** Default role for new members. Defaults to 'employee'. */ + defaultRole?: string; + /** Whether to reactivate previously deactivated members. Defaults to false. */ + allowReactivation?: boolean; + /** Roles that should never be auto-deactivated. Defaults to owner/admin/auditor. */ + protectedRoles?: string[]; + /** Provider slug for deactivation reason messages. */ + providerName?: string; +} + +const DEFAULT_PROTECTED_ROLES = ['owner', 'admin', 'auditor']; + +// ============================================================================ +// Service +// ============================================================================ + +/** + * Generic employee sync service that handles the platform-generic operations: + * - Creating users and members from a standardized employee list + * - Deactivating members no longer present in the provider + * - Safety guards (never deactivate privileged roles) + * + * This extracts the common pattern from SyncController's 4 provider-specific + * implementations (Google Workspace, Rippling, JumpCloud, Ramp). + * + * The provider-specific logic (fetching users, normalizing fields) is handled + * by the dynamic integration's syncDefinition (DSL/code steps). + */ +@Injectable() +export class GenericEmployeeSyncService { + private readonly logger = new Logger(GenericEmployeeSyncService.name); + + /** + * Process a standardized employee list into DB operations. + * + * Phase 1: Import active employees (create User if needed, create Member if needed) + * Phase 2: Deactivate members no longer present in the provider + */ + async processEmployees({ + organizationId, + employees, + options = {}, + }: { + organizationId: string; + employees: SyncEmployee[]; + options?: ProcessEmployeesOptions; + }): Promise { + const defaultRole = options.defaultRole ?? 'employee'; + const allowReactivation = options.allowReactivation ?? false; + const protectedRoles = options.protectedRoles ?? DEFAULT_PROTECTED_ROLES; + const providerName = options.providerName ?? 'provider'; + + const results: SyncResult = { + success: true, + totalFound: employees.length, + imported: 0, + skipped: 0, + deactivated: 0, + reactivated: 0, + errors: 0, + details: [], + }; + + this.logger.log( + `[GenericSync] Processing ${employees.length} employees for org="${organizationId}" provider="${providerName}"`, + ); + + // Separate employees by status + const activeEmployees = employees.filter((e) => e.status === 'active'); + const inactiveEmails = new Set( + employees + .filter((e) => e.status !== 'active') + .map((e) => e.email.toLowerCase()), + ); + const activeEmails = new Set( + activeEmployees.map((e) => e.email.toLowerCase()), + ); + + // Build domain set from all employees (for domain-scoped deactivation) + const providerDomains = new Set(); + for (const emp of employees) { + const domain = emp.email.toLowerCase().split('@')[1]; + if (domain) providerDomains.add(domain); + } + + this.logger.log( + `[GenericSync] Employee breakdown: active=${activeEmployees.length} inactive/suspended=${inactiveEmails.size} domains=${Array.from(providerDomains).join(',')}`, + ); + + // ==================================================================== + // Phase 1: Import active employees + // ==================================================================== + for (const employee of activeEmployees) { + const normalizedEmail = employee.email.toLowerCase(); + const displayName = + employee.name || + [employee.firstName, employee.lastName].filter(Boolean).join(' ') || + normalizedEmail.split('@')[0] || + normalizedEmail; + + try { + // Find or create User + let existingUser = await db.user.findUnique({ + where: { email: normalizedEmail }, + }); + + if (!existingUser) { + existingUser = await db.user.create({ + data: { + email: normalizedEmail, + name: displayName, + emailVerified: true, + }, + }); + } + + // Check if Member already exists in this org + const existingMember = await db.member.findFirst({ + where: { + organizationId, + userId: existingUser.id, + }, + }); + + if (existingMember) { + if (existingMember.deactivated && allowReactivation) { + // Reactivate the member + await db.member.update({ + where: { id: existingMember.id }, + data: { deactivated: false, isActive: true }, + }); + results.reactivated++; + results.details.push({ + email: normalizedEmail, + status: 'reactivated', + }); + } else { + results.skipped++; + results.details.push({ + email: normalizedEmail, + status: 'skipped', + reason: existingMember.deactivated + ? 'Member is deactivated' + : 'Already a member', + }); + } + continue; + } + + // Create new member + await db.member.create({ + data: { + organizationId, + userId: existingUser.id, + role: employee.role || defaultRole, + isActive: true, + }, + }); + + results.imported++; + results.details.push({ + email: normalizedEmail, + status: 'imported', + }); + } catch (error) { + this.logger.error( + `Error importing employee ${normalizedEmail}: ${error}`, + ); + results.errors++; + results.details.push({ + email: normalizedEmail, + status: 'error', + reason: error instanceof Error ? error.message : 'Unknown error', + }); + } + } + + this.logger.log( + `[GenericSync] Phase 1 complete: imported=${results.imported} skipped=${results.skipped} reactivated=${results.reactivated} errors=${results.errors}`, + ); + + // ==================================================================== + // Phase 2: Deactivate members no longer in provider + // ==================================================================== + const allOrgMembers = await db.member.findMany({ + where: { + organizationId, + deactivated: false, + }, + include: { + user: true, + }, + }); + + for (const member of allOrgMembers) { + const memberEmail = member.user.email.toLowerCase(); + const memberDomain = memberEmail.split('@')[1]; + + // Only check members whose email domain matches the provider's domains + if (!memberDomain || !providerDomains.has(memberDomain)) { + continue; + } + + // Safety guard: never auto-deactivate privileged members + const memberRoles = member.role + .split(',') + .map((role) => role.trim().toLowerCase()); + if (protectedRoles.some((pr) => memberRoles.includes(pr))) { + continue; + } + + // If member is not in active set AND not already accounted for, deactivate + const isSuspended = inactiveEmails.has(memberEmail); + const isRemoved = !activeEmails.has(memberEmail) && !isSuspended; + + if (isSuspended || isRemoved) { + try { + await db.member.update({ + where: { id: member.id }, + data: { deactivated: true, isActive: false }, + }); + results.deactivated++; + results.details.push({ + email: memberEmail, + status: 'deactivated', + reason: isSuspended + ? `User is suspended in ${providerName}` + : `User was removed from ${providerName}`, + }); + } catch (error) { + this.logger.error(`Error deactivating member ${memberEmail}: ${error}`); + results.errors++; + } + } + } + + results.success = results.errors === 0; + + this.logger.log( + `Sync complete for ${providerName}: ${results.imported} imported, ${results.reactivated} reactivated, ${results.deactivated} deactivated, ${results.skipped} skipped, ${results.errors} errors`, + ); + + return results; + } +} diff --git a/apps/api/src/scripts/seed-dynamic-integration.ts b/apps/api/src/scripts/seed-dynamic-integration.ts index cdc8d8419b..ec14a6168e 100644 --- a/apps/api/src/scripts/seed-dynamic-integration.ts +++ b/apps/api/src/scripts/seed-dynamic-integration.ts @@ -69,6 +69,9 @@ async function main() { authConfig: toJson(def.authConfig), capabilities: toJson(def.capabilities), supportsMultipleConnections: def.supportsMultipleConnections ?? false, + syncDefinition: (rawJson as Record).syncDefinition + ? toJson((rawJson as Record).syncDefinition) + : undefined, isActive: true, }, update: { @@ -82,6 +85,9 @@ async function main() { authConfig: toJson(def.authConfig), capabilities: toJson(def.capabilities), supportsMultipleConnections: def.supportsMultipleConnections ?? false, + syncDefinition: (rawJson as Record).syncDefinition + ? toJson((rawJson as Record).syncDefinition) + : undefined, }, }); diff --git a/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts b/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts index 7e16b0a05d..11b9fa5e8e 100644 --- a/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts +++ b/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts @@ -201,7 +201,8 @@ async function syncProvider(params: SyncProviderParams): Promise { return syncRamp({ connectionId, organizationId }); default: - throw new Error(`No sync handler for provider: ${providerSlug}`); + // Try generic dynamic sync endpoint for non-built-in providers + return syncDynamicProvider({ providerSlug, connectionId, organizationId }); } } @@ -352,3 +353,44 @@ async function syncRamp({ errors: data.errors || 0, }; } + +async function syncDynamicProvider({ + providerSlug, + connectionId, + organizationId, +}: { + providerSlug: string; + connectionId: string; + organizationId: string; +}): Promise { + const url = new URL( + `${API_BASE_URL}/v1/integrations/sync/dynamic/${providerSlug}/employees`, + ); + url.searchParams.set('connectionId', connectionId); + + const response = await fetch(url.toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-service-token': process.env.SERVICE_TOKEN_TRIGGER!, + 'x-organization-id': organizationId, + }, + }); + + if (!response.ok) { + const errorBody = await response.text(); + throw new Error( + `Dynamic sync failed for ${providerSlug}: ${response.status} - ${errorBody}`, + ); + } + + const data = await response.json(); + return { + success: data.success, + imported: data.imported || 0, + reactivated: data.reactivated || 0, + deactivated: data.deactivated || 0, + skipped: data.skipped || 0, + errors: data.errors || 0, + }; +} diff --git a/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx b/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx index ff75c30a52..0436678794 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx +++ b/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx @@ -121,13 +121,14 @@ export function TeamMembersClient({ handleRoleMappingClose, handleRoleMappingSaved, openRoleMappingEditor, + availableProviders, } = useEmployeeSync({ organizationId, initialData: employeeSyncData }); const lastSyncAt = employeeSyncData.lastSyncAt; const nextSyncAt = employeeSyncData.nextSyncAt; const handleEmployeeSync = async ( - provider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp', + provider: string, ) => { const result = await syncEmployees(provider); if (result?.success) { @@ -474,6 +475,29 @@ export function TeamMembersClient({ )} + {/* Dynamic sync providers (from dynamic integrations) */} + {availableProviders + .filter((p) => p.connected && !['google-workspace', 'rippling', 'jumpcloud', 'ramp'].includes(p.slug)) + .map((provider) => ( + +
+ {provider.logoUrl && ( + {provider.name} + )} + {provider.name} + {selectedProvider === provider.slug && ( + Active + )} +
+
+ ))} diff --git a/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts b/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts index 9b12c0e2b8..a964403da2 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts +++ b/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts @@ -1,13 +1,25 @@ import { serverApi } from '@/lib/server-api-client'; +export interface SyncProviderInfo { + slug: string; + name: string; + logoUrl: string; + connected: boolean; + connectionId: string | null; + lastSyncAt: string | null; + nextSyncAt: string | null; +} + export interface EmployeeSyncConnectionsData { googleWorkspaceConnectionId: string | null; ripplingConnectionId: string | null; jumpcloudConnectionId: string | null; rampConnectionId: string | null; - selectedProvider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp' | null | undefined; + selectedProvider: string | null | undefined; lastSyncAt: Date | null; nextSyncAt: Date | null; + /** All providers that support sync (built-in + dynamic) */ + availableProviders: SyncProviderInfo[]; } interface ConnectionStatus { @@ -20,7 +32,7 @@ interface ConnectionStatus { export async function getEmployeeSyncConnections( organizationId: string, ): Promise { - const [gwResponse, ripplingResponse, jumpcloudResponse, rampResponse, providerResponse] = + const [gwResponse, ripplingResponse, jumpcloudResponse, rampResponse, providerResponse, availableResponse] = await Promise.all([ serverApi.post( `/v1/integrations/sync/google-workspace/status?organizationId=${organizationId}`, @@ -34,23 +46,36 @@ export async function getEmployeeSyncConnections( serverApi.post( `/v1/integrations/sync/ramp/status?organizationId=${organizationId}`, ), - serverApi.get<{ provider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp' | null }>( + serverApi.get<{ provider: string | null }>( `/v1/integrations/sync/employee-sync-provider?organizationId=${organizationId}`, ), + serverApi.get<{ providers: SyncProviderInfo[] }>( + `/v1/integrations/sync/available-providers?organizationId=${organizationId}`, + ).catch(() => ({ data: null, error: null, status: 500 })), ]); - // Get sync times from the selected provider's connection + const availableProviders = availableResponse.data?.providers ?? []; + + // Get sync times from the selected provider + // Check built-in providers first, then fall back to available-providers data const selectedProviderSlug = providerResponse.data?.provider; - const selectedConnection = - selectedProviderSlug === 'google-workspace' - ? gwResponse.data - : selectedProviderSlug === 'rippling' - ? ripplingResponse.data - : selectedProviderSlug === 'jumpcloud' - ? jumpcloudResponse.data - : selectedProviderSlug === 'ramp' - ? rampResponse.data - : null; + let selectedSyncTimes: { lastSyncAt?: string | null; nextSyncAt?: string | null } | null = null; + + if (selectedProviderSlug === 'google-workspace') { + selectedSyncTimes = gwResponse.data ?? null; + } else if (selectedProviderSlug === 'rippling') { + selectedSyncTimes = ripplingResponse.data ?? null; + } else if (selectedProviderSlug === 'jumpcloud') { + selectedSyncTimes = jumpcloudResponse.data ?? null; + } else if (selectedProviderSlug === 'ramp') { + selectedSyncTimes = rampResponse.data ?? null; + } else if (selectedProviderSlug) { + // Dynamic provider — get sync times from available-providers data + const dynProvider = availableProviders.find((p) => p.slug === selectedProviderSlug); + if (dynProvider) { + selectedSyncTimes = { lastSyncAt: dynProvider.lastSyncAt, nextSyncAt: dynProvider.nextSyncAt }; + } + } return { googleWorkspaceConnectionId: @@ -70,7 +95,8 @@ export async function getEmployeeSyncConnections( ? rampResponse.data.connectionId : null, selectedProvider: selectedProviderSlug, - lastSyncAt: selectedConnection?.lastSyncAt ? new Date(selectedConnection.lastSyncAt) : null, - nextSyncAt: selectedConnection?.nextSyncAt ? new Date(selectedConnection.nextSyncAt) : null, + lastSyncAt: selectedSyncTimes?.lastSyncAt ? new Date(selectedSyncTimes.lastSyncAt) : null, + nextSyncAt: selectedSyncTimes?.nextSyncAt ? new Date(selectedSyncTimes.nextSyncAt) : null, + availableProviders, }; } diff --git a/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts b/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts index 8dd6254620..887971e01c 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts +++ b/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts @@ -5,9 +5,10 @@ import { useState } from 'react'; import { toast } from 'sonner'; import useSWR from 'swr'; -import type { EmployeeSyncConnectionsData } from '../data/queries'; +import type { EmployeeSyncConnectionsData, SyncProviderInfo } from '../data/queries'; -type SyncProvider = 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp'; +type BuiltInSyncProvider = 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp'; +type SyncProvider = string; interface SyncResult { success: boolean; @@ -68,8 +69,17 @@ interface UseEmployeeSyncReturn { handleRoleMappingClose: () => void; handleRoleMappingSaved: () => void; openRoleMappingEditor: () => Promise; + /** All available sync providers (built-in + dynamic) */ + availableProviders: SyncProviderInfo[]; } +const BUILT_IN_PROVIDERS = new Set([ + 'google-workspace', + 'rippling', + 'jumpcloud', + 'ramp', +]); + const PROVIDER_CONFIG = { 'google-workspace': { name: 'Google Workspace', @@ -125,7 +135,10 @@ export const useEmployeeSync = ({ mutate({ ...data!, selectedProvider: provider }, false); if (provider) { - toast.success(`${PROVIDER_CONFIG[provider].name} set as your employee sync provider`); + const name = provider in PROVIDER_CONFIG + ? PROVIDER_CONFIG[provider as BuiltInSyncProvider].name + : (availableProviders.find((p) => p.slug === provider)?.name ?? provider); + toast.success(`${name} set as your employee sync provider`); } } catch (error) { toast.error('Failed to set sync provider'); @@ -133,23 +146,36 @@ export const useEmployeeSync = ({ } }; + const availableProviders = data?.availableProviders ?? []; + + const getConnectionIdForProvider = (provider: SyncProvider): string | null => { + if (provider === 'google-workspace') return googleWorkspaceConnectionId; + if (provider === 'rippling') return ripplingConnectionId; + if (provider === 'jumpcloud') return jumpcloudConnectionId; + if (provider === 'ramp') return rampConnectionId; + // Dynamic provider — look up from availableProviders + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.connectionId ?? null; + }; + + const getSyncUrl = (provider: SyncProvider, connectionId: string): string => { + if (BUILT_IN_PROVIDERS.has(provider)) { + return `/v1/integrations/sync/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`; + } + return `/v1/integrations/sync/dynamic/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`; + }; + const syncEmployees = async (provider: SyncProvider): Promise => { - const connectionId = - provider === 'google-workspace' - ? googleWorkspaceConnectionId - : provider === 'rippling' - ? ripplingConnectionId - : provider === 'jumpcloud' - ? jumpcloudConnectionId - : rampConnectionId; + const connectionId = getConnectionIdForProvider(provider); if (!connectionId) { - toast.error(`${PROVIDER_CONFIG[provider].name} is not connected`); + const providerName = getProviderName(provider); + toast.error(`${providerName} is not connected`); return null; } setIsSyncing(true); - const config = PROVIDER_CONFIG[provider]; + const providerName = getProviderName(provider); try { // Set as sync provider if not already @@ -158,7 +184,7 @@ export const useEmployeeSync = ({ } const response = await apiClient.post( - `/v1/integrations/sync/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`, + getSyncUrl(provider, connectionId), ); // Handle role mapping requirement (Ramp only) @@ -205,7 +231,7 @@ export const useEmployeeSync = ({ } if (deactivated > 0) { toast.info( - `Deactivated ${deactivated} employee${deactivated > 1 ? 's' : ''} (no longer in ${config.name})`, + `Deactivated ${deactivated} employee${deactivated > 1 ? 's' : ''} (no longer in ${providerName})`, ); } if (imported === 0 && updated === 0 && reactivated === 0 && deactivated === 0 && skipped > 0) { @@ -224,15 +250,27 @@ export const useEmployeeSync = ({ return null; } catch (error) { - toast.error(`Failed to sync employees from ${config.name}`); + toast.error(`Failed to sync employees from ${providerName}`); return null; } finally { setIsSyncing(false); } }; - const getProviderName = (provider: SyncProvider) => PROVIDER_CONFIG[provider].shortName; - const getProviderLogo = (provider: SyncProvider) => PROVIDER_CONFIG[provider].logo; + const getProviderName = (provider: SyncProvider): string => { + if (provider in PROVIDER_CONFIG) { + return PROVIDER_CONFIG[provider as BuiltInSyncProvider].shortName; + } + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.name ?? provider; + }; + const getProviderLogo = (provider: SyncProvider): string => { + if (provider in PROVIDER_CONFIG) { + return PROVIDER_CONFIG[provider as BuiltInSyncProvider].logo; + } + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.logoUrl ?? ''; + }; const openRoleMappingEditor = async () => { if (!rampConnectionId) { @@ -285,6 +323,16 @@ export const useEmployeeSync = ({ } }; + const hasAnyBuiltInConnection = !!( + googleWorkspaceConnectionId || + ripplingConnectionId || + jumpcloudConnectionId || + rampConnectionId + ); + const hasDynamicConnection = availableProviders.some( + (p) => p.connected && !BUILT_IN_PROVIDERS.has(p.slug), + ); + return { googleWorkspaceConnectionId, ripplingConnectionId, @@ -294,12 +342,7 @@ export const useEmployeeSync = ({ isSyncing, syncEmployees, setSyncProvider, - hasAnyConnection: !!( - googleWorkspaceConnectionId || - ripplingConnectionId || - jumpcloudConnectionId || - rampConnectionId - ), + hasAnyConnection: hasAnyBuiltInConnection || hasDynamicConnection, getProviderName, getProviderLogo, showRoleMappingSheet, @@ -307,5 +350,6 @@ export const useEmployeeSync = ({ handleRoleMappingClose, handleRoleMappingSaved, openRoleMappingEditor, + availableProviders, }; }; diff --git a/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql b/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql new file mode 100644 index 0000000000..2dfb24aea7 --- /dev/null +++ b/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "DynamicIntegration" ADD COLUMN "syncDefinition" JSONB; diff --git a/packages/db/prisma/schema/dynamic-integration.prisma b/packages/db/prisma/schema/dynamic-integration.prisma index 04fb814ee9..3bea2bc600 100644 --- a/packages/db/prisma/schema/dynamic-integration.prisma +++ b/packages/db/prisma/schema/dynamic-integration.prisma @@ -32,6 +32,10 @@ model DynamicIntegration { /// Whether multiple connections per org are allowed supportsMultipleConnections Boolean @default(false) + /// Declarative sync definition (JSON — DSL steps that produce employee list) + /// When present and capabilities includes 'sync', enables employee sync + syncDefinition Json? + /// Whether this dynamic integration is active isActive Boolean @default(true) diff --git a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts index 2381acd1fd..7b17d2ba8f 100644 --- a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts +++ b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts @@ -603,4 +603,483 @@ describe('interpretDeclarativeCheck', () => { expect(ctx._fails[0]!.remediation).toContain('Microsoft 365 admin center'); }); }); + + describe('code step', () => { + it('sets scope values used by subsequent DSL steps', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/users', + as: 'rawUsers', + }, + { + type: 'code', + code: 'scope.users = scope.rawUsers.filter(u => u.active);', + }, + { + type: 'forEach', + collection: 'users', + itemAs: 'user', + resourceType: 'user', + resourceIdPath: 'user.email', + conditions: [ + { field: 'user.mfa', operator: 'eq', value: true }, + ], + onPass: { + title: 'MFA OK for {{user.email}}', + resourceType: 'user', + resourceId: '{{user.email}}', + }, + onFail: { + title: 'MFA missing for {{user.email}}', + resourceType: 'user', + resourceId: '{{user.email}}', + severity: 'high', + remediation: 'Enable MFA', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_filter', + name: 'Code Filter Test', + description: 'Tests code step filtering', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [ + { email: 'alice@test.com', active: true, mfa: true }, + { email: 'bob@test.com', active: false, mfa: false }, + { email: 'carol@test.com', active: true, mfa: false }, + ]); + + await check.run(ctx); + + // Bob is filtered out by the code step (active: false) + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('MFA OK for alice@test.com'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('MFA missing for carol@test.com'); + }); + + it('calls ctx.pass() and ctx.fail() directly', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + ctx.pass({ + title: 'Direct pass', + description: 'From code', + resourceType: 'test', + resourceId: 'test-1', + evidence: { source: 'code' }, + }); + ctx.fail({ + title: 'Direct fail', + description: 'From code', + resourceType: 'test', + resourceId: 'test-2', + severity: 'critical', + remediation: 'Fix it', + }); + `, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_direct', + name: 'Code Direct Test', + description: 'Tests code step direct results', + definition, + }); + + const ctx = createMockContext(); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Direct pass'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('Direct fail'); + expect(ctx._fails[0]!.severity).toBe('critical'); + }); + + it('uses async/await with ctx.fetch()', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + const data = await ctx.fetch('/api/settings'); + scope.ssoEnabled = data.sso_enabled; + `, + }, + { + type: 'branch', + condition: { field: 'ssoEnabled', operator: 'eq', value: true }, + then: [ + { + type: 'emit', + result: 'pass', + template: { + title: 'SSO is enabled', + resourceType: 'settings', + resourceId: 'sso', + }, + }, + ], + else: [ + { + type: 'emit', + result: 'fail', + template: { + title: 'SSO is not enabled', + resourceType: 'settings', + resourceId: 'sso', + severity: 'high', + remediation: 'Enable SSO', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_async', + name: 'Code Async Test', + description: 'Tests code step async fetch', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/settings', { sso_enabled: true }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('SSO is enabled'); + expect(ctx._fails).toHaveLength(0); + }); + + it('supports Promise.all for parallel fetches', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + const [users, roles] = await Promise.all([ + ctx.fetch('/api/users'), + ctx.fetch('/api/roles'), + ]); + scope.userCount = users.length; + scope.roleCount = roles.length; + `, + }, + { + type: 'emit', + result: 'pass', + template: { + title: 'Fetched {{userCount}} users and {{roleCount}} roles', + resourceType: 'system', + resourceId: 'parallel-fetch', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_parallel', + name: 'Code Parallel Test', + description: 'Tests Promise.all', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [{ id: 1 }, { id: 2 }, { id: 3 }]); + ctx._fetchResponses.set('/api/roles', [{ id: 'admin' }, { id: 'user' }]); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Fetched 3 users and 2 roles'); + }); + + it('reads scope from prior fetch step', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/config', + as: 'config', + }, + { + type: 'code', + code: ` + const cfg = scope.config; + scope.isCompliant = cfg.encryption && cfg.audit_logging; + `, + }, + { + type: 'branch', + condition: { field: 'isCompliant', operator: 'truthy' }, + then: [ + { + type: 'emit', + result: 'pass', + template: { + title: 'System is compliant', + resourceType: 'config', + resourceId: 'compliance', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_scope_read', + name: 'Code Scope Read', + description: 'Tests reading prior scope', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/config', { encryption: true, audit_logging: true }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('System is compliant'); + }); + + it('works nested inside forEach', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/repos', + as: 'repos', + }, + { + type: 'forEach', + collection: 'repos', + itemAs: 'repo', + resourceType: 'repository', + resourceIdPath: 'repo.name', + steps: [ + { + type: 'code', + code: ` + const details = await ctx.fetch('/api/repos/' + scope.repo.name + '/settings'); + scope.repo.branchProtection = details.branch_protection; + `, + }, + ], + conditions: [ + { field: 'repo.branchProtection', operator: 'eq', value: true }, + ], + onPass: { + title: '{{repo.name}} has branch protection', + resourceType: 'repository', + resourceId: '{{repo.name}}', + }, + onFail: { + title: '{{repo.name}} missing branch protection', + resourceType: 'repository', + resourceId: '{{repo.name}}', + severity: 'high', + remediation: 'Enable branch protection', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_in_foreach', + name: 'Code in ForEach', + description: 'Tests nested code step', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/repos', [ + { name: 'frontend' }, + { name: 'backend' }, + ]); + ctx._fetchResponses.set('/api/repos/frontend/settings', { branch_protection: true }); + ctx._fetchResponses.set('/api/repos/backend/settings', { branch_protection: false }); + + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('frontend has branch protection'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('backend missing branch protection'); + }); + + it('works nested inside branch', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/org', + as: 'org', + }, + { + type: 'branch', + condition: { field: 'org.plan', operator: 'eq', value: 'enterprise' }, + then: [ + { + type: 'code', + code: ` + ctx.pass({ + title: 'Enterprise plan detected', + description: 'Advanced checks available', + resourceType: 'org', + resourceId: scope.org.id, + evidence: { plan: scope.org.plan }, + }); + `, + }, + ], + else: [ + { + type: 'emit', + result: 'fail', + template: { + title: 'Not enterprise plan', + resourceType: 'org', + resourceId: '{{org.id}}', + severity: 'info', + remediation: 'Upgrade to enterprise', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_in_branch', + name: 'Code in Branch', + description: 'Tests code in branch', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/org', { id: 'org-1', plan: 'enterprise' }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Enterprise plan detected'); + expect(ctx._fails).toHaveLength(0); + }); + + it('propagates errors from code step', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: 'throw new Error("Something went wrong");', + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_error', + name: 'Code Error Test', + description: 'Tests error propagation', + definition, + }); + + const ctx = createMockContext(); + await expect(check.run(ctx)).rejects.toThrow('Something went wrong'); + }); + + it('handles complex data transformation (Map, filter, reduce)', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/users', + as: 'rawUsers', + }, + { + type: 'fetch', + path: '/api/roles', + as: 'rawRoles', + }, + { + type: 'code', + code: ` + // Build lookup map (like Google Workspace employee-access check) + const roleMap = new Map(); + for (const role of scope.rawRoles) { + roleMap.set(role.id, role.name); + } + + // Enrich users with role names + scope.users = scope.rawUsers.map(u => ({ + ...u, + roleName: roleMap.get(u.roleId) || 'Unknown', + })); + `, + }, + { + type: 'forEach', + collection: 'users', + itemAs: 'user', + resourceType: 'user', + resourceIdPath: 'user.email', + conditions: [ + { field: 'user.roleName', operator: 'neq', value: 'Unknown' }, + ], + onPass: { + title: '{{user.email}} has role {{user.roleName}}', + resourceType: 'user', + resourceId: '{{user.email}}', + }, + onFail: { + title: '{{user.email}} has unknown role', + resourceType: 'user', + resourceId: '{{user.email}}', + severity: 'medium', + remediation: 'Assign a valid role', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_transform', + name: 'Code Transform Test', + description: 'Tests Map + data enrichment', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [ + { email: 'alice@test.com', roleId: 'r1' }, + { email: 'bob@test.com', roleId: 'r999' }, + ]); + ctx._fetchResponses.set('/api/roles', [ + { id: 'r1', name: 'Admin' }, + { id: 'r2', name: 'User' }, + ]); + + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('alice@test.com has role Admin'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('bob@test.com has unknown role'); + }); + }); }); diff --git a/packages/integration-platform/src/dsl/index.ts b/packages/integration-platform/src/dsl/index.ts index c01b470523..4a3544dcbc 100644 --- a/packages/integration-platform/src/dsl/index.ts +++ b/packages/integration-platform/src/dsl/index.ts @@ -1,5 +1,5 @@ -// DSL Engine — Declarative check definitions -export { interpretDeclarativeCheck } from './interpreter'; +// DSL Engine — Declarative check and sync definitions +export { interpretDeclarativeCheck, interpretDeclarativeSync } from './interpreter'; export { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; export { interpolate, interpolateTemplate } from './template-engine'; export { validateIntegrationDefinition, type ValidationResult } from './validate'; @@ -13,7 +13,10 @@ export type { AggregateStep, BranchStep, EmitStep, + CodeStep, CheckDefinition, + SyncEmployee, + SyncDefinition, Condition, FieldCondition, LogicalCondition, @@ -27,8 +30,11 @@ export type { export { DSLStepSchema, CheckDefinitionSchema, + SyncEmployeeSchema, + SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, ResultTemplateSchema, PaginationConfigSchema, + CodeStepSchema, } from './types'; diff --git a/packages/integration-platform/src/dsl/interpreter.ts b/packages/integration-platform/src/dsl/interpreter.ts index 0be2210c46..0d65b0ad34 100644 --- a/packages/integration-platform/src/dsl/interpreter.ts +++ b/packages/integration-platform/src/dsl/interpreter.ts @@ -2,13 +2,17 @@ import type { CheckContext, IntegrationCheck, FindingSeverity } from '../types'; import type { DSLStep, CheckDefinition, + SyncDefinition, + SyncEmployee, FetchStep, FetchPagesStep, ForEachStep, AggregateStep, BranchStep, EmitStep, + CodeStep, } from './types'; +import { SyncEmployeeSchema } from './types'; import { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; import { interpolate, interpolateTemplate } from './template-engine'; @@ -51,6 +55,64 @@ export function interpretDeclarativeCheck(opts: { }; } +/** + * Converts a declarative SyncDefinition (JSON DSL) into a function + * that produces a validated list of SyncEmployee objects. + * + * The sync definition runs the same DSL steps as checks, but instead of + * emitting pass/fail results, it produces a standardized employee list + * at `scope[employeesPath]` that the generic sync service can process. + */ +export function interpretDeclarativeSync(opts: { + definition: SyncDefinition; + defaultSeverity?: FindingSeverity; +}): { + run: (ctx: CheckContext) => Promise; +} { + return { + run: async (ctx: CheckContext) => { + const scope: Record = { + variables: ctx.variables, + credentials: ctx.credentials, + accessToken: ctx.accessToken, + connectionId: ctx.connectionId, + organizationId: ctx.organizationId, + metadata: ctx.metadata, + }; + + ctx.log('Running declarative sync'); + + for (const step of opts.definition.steps) { + await executeStep(step, scope, ctx, opts.defaultSeverity || 'medium'); + } + + const employeesPath = opts.definition.employeesPath || 'employees'; + const raw = resolvePath(scope, employeesPath); + + if (!Array.isArray(raw)) { + throw new Error( + `Sync definition did not produce an array at scope.${employeesPath}`, + ); + } + + const employees: SyncEmployee[] = []; + for (let i = 0; i < raw.length; i++) { + const parsed = SyncEmployeeSchema.safeParse(raw[i]); + if (!parsed.success) { + ctx.warn( + `Employee at index ${i} failed validation: ${parsed.error.issues.map((iss) => iss.message).join(', ')}`, + ); + continue; + } + employees.push(parsed.data); + } + + ctx.log(`Sync produced ${employees.length} validated employees`); + return employees; + }, + }; +} + /** * Execute a single DSL step. */ @@ -79,6 +141,9 @@ async function executeStep( case 'emit': executeEmit(step, scope, ctx, defaultSeverity); break; + case 'code': + await executeCode(step, scope, ctx, defaultSeverity); + break; } } @@ -243,6 +308,10 @@ async function executeForEach( ctx.log(`Iterating over ${collection.length} items in ${step.collection}`); + let passCount = 0; + let failCount = 0; + let filteredCount = 0; + for (const item of collection) { // Create child scope with current item const childScope: Record = { @@ -253,6 +322,7 @@ async function executeForEach( // Apply filter — skip items that don't match if (step.filter) { if (!evaluateCondition(step.filter, childScope)) { + filteredCount++; continue; } } @@ -272,6 +342,7 @@ async function executeForEach( const resourceId = String(resolvePath(childScope, step.resourceIdPath) ?? 'unknown'); if (allPass) { + passCount++; const result = interpolateTemplate(step.onPass, childScope); ctx.pass({ title: result.title, @@ -281,6 +352,7 @@ async function executeForEach( evidence: result.evidence || { item, checkedAt: new Date().toISOString() }, }); } else { + failCount++; const result = interpolateTemplate(step.onFail, childScope); ctx.fail({ title: result.title, @@ -293,6 +365,10 @@ async function executeForEach( }); } } + + ctx.log( + `forEach complete on ${step.collection}: ${passCount} passed, ${failCount} failed${filteredCount > 0 ? `, ${filteredCount} filtered out` : ''}`, + ); } /** @@ -435,6 +511,7 @@ function executeEmit( ctx: CheckContext, defaultSeverity: FindingSeverity, ): void { + ctx.log(`Emitting ${step.result} result: ${step.template.title}`); const template = interpolateTemplate(step.template, scope); if (step.result === 'pass') { @@ -457,3 +534,42 @@ function executeEmit( }); } } + +/** + * Execute a code step — run arbitrary JavaScript with access to ctx and scope. + */ +async function executeCode( + step: CodeStep, + scope: Record, + ctx: CheckContext, + _defaultSeverity: FindingSeverity, +): Promise { + const codePreview = step.code.length > 100 + ? step.code.slice(0, 100) + '...' + : step.code; + ctx.log(`Executing code step: ${codePreview.replace(/\n/g, ' ').trim()}`); + + const scopeKeysBefore = Object.keys(scope); + + try { + // eslint-disable-next-line @typescript-eslint/no-implied-eval + const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor; + const fn = new AsyncFunction('ctx', 'scope', step.code); + await fn(ctx, scope); + + // Log scope changes for debugging + const scopeKeysAfter = Object.keys(scope); + const newKeys = scopeKeysAfter.filter((k) => !scopeKeysBefore.includes(k)); + if (newKeys.length > 0) { + ctx.log(`Code step added scope keys: ${newKeys.join(', ')}`); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const stack = error instanceof Error ? error.stack : undefined; + ctx.error(`Code step failed: ${message}`, { + code: step.code, + ...(stack ? { stack } : {}), + }); + throw error; + } +} diff --git a/packages/integration-platform/src/dsl/types.ts b/packages/integration-platform/src/dsl/types.ts index 12b6c0c880..15c0dc0758 100644 --- a/packages/integration-platform/src/dsl/types.ts +++ b/packages/integration-platform/src/dsl/types.ts @@ -208,6 +208,13 @@ export const EmitStepSchema = z.object({ export type EmitStep = z.infer; +export const CodeStepSchema = z.object({ + type: z.literal('code'), + code: z.string().min(1), +}); + +export type CodeStep = z.infer; + // ============================================================================ // Union of All Steps // ============================================================================ @@ -220,6 +227,7 @@ export const DSLStepSchema: z.ZodType = z.lazy(() => AggregateStepSchema, BranchStepSchema, EmitStepSchema, + CodeStepSchema, ]), ); @@ -229,7 +237,24 @@ export type DSLStep = | ForEachStep | AggregateStep | BranchStep - | EmitStep; + | EmitStep + | CodeStep; + +// ============================================================================ +// Shared Variable Schema (used by checks, sync, and integration definitions) +// ============================================================================ + +export const VariableSchema = z.object({ + id: z.string(), + label: z.string(), + type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), + required: z.boolean().optional(), + default: z.unknown().optional(), + helpText: z.string().optional(), + options: z + .array(z.object({ value: z.string(), label: z.string() })) + .optional(), +}); // ============================================================================ // Check Definition (the top-level DSL object) @@ -237,25 +262,37 @@ export type DSLStep = export const CheckDefinitionSchema = z.object({ steps: z.array(DSLStepSchema), - variables: z - .array( - z.object({ - id: z.string(), - label: z.string(), - type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), - required: z.boolean().optional(), - default: z.unknown().optional(), - helpText: z.string().optional(), - options: z - .array(z.object({ value: z.string(), label: z.string() })) - .optional(), - }), - ) - .optional(), + variables: z.array(VariableSchema).optional(), }); export type CheckDefinition = z.infer; +// ============================================================================ +// Sync Definition (for dynamic employee sync) +// ============================================================================ + +export const SyncEmployeeSchema = z.object({ + email: z.string(), + name: z.string().optional(), + firstName: z.string().optional(), + lastName: z.string().optional(), + externalId: z.string().optional(), + status: z.enum(['active', 'inactive', 'suspended']), + role: z.string().optional(), + department: z.string().optional(), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +export type SyncEmployee = z.infer; + +export const SyncDefinitionSchema = z.object({ + steps: z.array(DSLStepSchema), + employeesPath: z.string().default('employees'), + variables: z.array(VariableSchema).optional(), +}); + +export type SyncDefinition = z.infer; + // ============================================================================ // Dynamic Integration Definition (full manifest + checks as JSON) // ============================================================================ @@ -285,6 +322,7 @@ export const DynamicIntegrationDefinitionSchema = z.object({ }), capabilities: z.array(z.enum(['checks', 'webhook', 'sync'])).default(['checks']), supportsMultipleConnections: z.boolean().optional(), + syncDefinition: SyncDefinitionSchema.optional(), checks: z.array( z.object({ checkSlug: z.string().regex(/^[a-z0-9_]+$/, 'Check slug must be lowercase alphanumeric with underscores'), @@ -293,20 +331,7 @@ export const DynamicIntegrationDefinitionSchema = z.object({ taskMapping: z.string().optional(), defaultSeverity: z.enum(['info', 'low', 'medium', 'high', 'critical']).optional(), definition: CheckDefinitionSchema, - variables: z - .array( - z.object({ - id: z.string(), - label: z.string(), - type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), - required: z.boolean().optional(), - default: z.unknown().optional(), - helpText: z.string().optional(), - options: z - .array(z.object({ value: z.string(), label: z.string() })) - .optional(), - }), - ) + variables: z.array(VariableSchema) .optional(), isEnabled: z.boolean().optional(), sortOrder: z.number().optional(), diff --git a/packages/integration-platform/src/index.ts b/packages/integration-platform/src/index.ts index 57435a1924..e914650883 100644 --- a/packages/integration-platform/src/index.ts +++ b/packages/integration-platform/src/index.ts @@ -93,9 +93,10 @@ export { type TaskTemplateId, } from './task-mappings'; -// DSL Engine (declarative check definitions) +// DSL Engine (declarative check and sync definitions) export { interpretDeclarativeCheck, + interpretDeclarativeSync, evaluateCondition, evaluateOperator, resolvePath, @@ -103,14 +104,20 @@ export { interpolateTemplate, validateIntegrationDefinition, CheckDefinitionSchema, + SyncEmployeeSchema, + SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, DSLStepSchema, + CodeStepSchema, } from './dsl'; export type { DSLStep, + CodeStep, CheckDefinition, + SyncEmployee, + SyncDefinition, Condition, DynamicIntegrationDefinition, ValidationResult,