From 3ddfaef188eb84de70fb1e193432f2aa9ba45366 Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 10:46:46 -0400 Subject: [PATCH 1/6] feat(integration-platform): add code step and dynamic employee sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `code` step type to the DSL so dynamic integrations can execute arbitrary JavaScript alongside declarative steps — enabling 100% coverage of all possible integration patterns (AWS SDK, Promise.all, data transforms, JWT signing, etc.). Add dynamic employee sync support so integrations created via the dynamic integration system can sync employees without hand-written TypeScript. Includes a generic sync service, sync run log persistence, and frontend support for dynamic sync providers. Key changes: - New `code` DSL step with full CheckContext access (ctx.fetch, ctx.pass, etc.) - SyncEmployee schema + interpretDeclarativeSync function - GenericEmployeeSyncService extracting common import/deactivation pattern - Generic POST /dynamic/:slug/employees sync endpoint - GET /available-providers endpoint for frontend - Sync trigger routes dynamic providers automatically - Execution logs persisted to DB for both checks and syncs - Agent debugging endpoints: validate, check-runs, check-run-by-id - Frontend dynamically renders sync providers from API - All existing sync endpoints (GWS, Rippling, JumpCloud, Ramp) unchanged Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controllers/checks.controller.ts | 32 +- .../dynamic-integrations.controller.ts | 158 +++++- .../controllers/sync.controller.ts | 298 ++++++++++- .../integration-platform.module.ts | 2 + .../dynamic-manifest-loader.service.ts | 6 + .../services/generic-employee-sync.service.ts | 274 ++++++++++ .../sync-employees-schedule.ts | 44 +- .../all/components/TeamMembersClient.tsx | 26 +- .../(app)/[orgId]/people/all/data/queries.ts | 58 ++- .../people/all/hooks/useEmployeeSync.ts | 92 +++- .../migration.sql | 2 + .../prisma/schema/dynamic-integration.prisma | 4 + .../src/dsl/__tests__/interpreter.test.ts | 479 ++++++++++++++++++ .../integration-platform/src/dsl/index.ts | 10 +- .../src/dsl/interpreter.ts | 116 +++++ .../integration-platform/src/dsl/types.ts | 50 +- packages/integration-platform/src/index.ts | 9 +- 17 files changed, 1603 insertions(+), 57 deletions(-) create mode 100644 apps/api/src/integration-platform/services/generic-employee-sync.service.ts create mode 100644 packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql diff --git a/apps/api/src/integration-platform/controllers/checks.controller.ts b/apps/api/src/integration-platform/controllers/checks.controller.ts index da27103c06..66bcfcdfd5 100644 --- a/apps/api/src/integration-platform/controllers/checks.controller.ts +++ b/apps/api/src/integration-platform/controllers/checks.controller.ts @@ -10,6 +10,7 @@ import { UseGuards, } from '@nestjs/common'; import { ApiTags, ApiSecurity } from '@nestjs/swagger'; +import type { Prisma } from '@prisma/client'; import { HybridAuthGuard } from '../../auth/hybrid-auth.guard'; import { PermissionGuard } from '../../auth/permission.guard'; import { RequirePermission } from '../../auth/require-permission.decorator'; @@ -271,7 +272,18 @@ export class ChecksController { await this.checkRunRepository.addResults(resultsToStore); } - // Update the check run status + // Collect execution logs from all check results + const allLogs = result.results.flatMap((checkResult) => + checkResult.result.logs.map((log) => ({ + check: checkResult.checkName, + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })), + ); + + // Update the check run status with logs const startTime = checkRun.startedAt?.getTime() || Date.now(); await this.checkRunRepository.complete(checkRun.id, { status: result.totalFindings > 0 ? 'failed' : 'success', @@ -279,6 +291,9 @@ export class ChecksController { totalChecked: result.results.length, passedCount: result.totalPassing, failedCount: result.totalFindings, + logs: allLogs.length > 0 + ? (allLogs as unknown as Prisma.InputJsonValue) + : undefined, }); return { @@ -288,20 +303,29 @@ export class ChecksController { ...result, }; } catch (error) { - // Mark the check run as failed + // Mark the check run as failed with error details const startTime = checkRun.startedAt?.getTime() || Date.now(); + const errorMessage = error instanceof Error ? error.message : String(error); + const errorStack = error instanceof Error ? error.stack : undefined; await this.checkRunRepository.complete(checkRun.id, { status: 'failed', durationMs: Date.now() - startTime, totalChecked: 0, passedCount: 0, failedCount: 0, - errorMessage: error instanceof Error ? error.message : String(error), + errorMessage, + logs: [{ + check: body.checkId || 'all', + level: 'error', + message: errorMessage, + ...(errorStack ? { data: { stack: errorStack } } : {}), + timestamp: new Date().toISOString(), + }] as unknown as Prisma.InputJsonValue, }); this.logger.error(`Check execution failed: ${error}`); throw new HttpException( - `Check execution failed: ${error instanceof Error ? error.message : String(error)}`, + `Check execution failed: ${errorMessage}`, HttpStatus.INTERNAL_SERVER_ERROR, ); } diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index a7e1427bc2..2c7758e54f 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -13,12 +13,17 @@ import { UseGuards, } from '@nestjs/common'; import type { Prisma } from '@prisma/client'; +import { db } from '@db'; import { InternalTokenGuard } from '../../auth/internal-token.guard'; import { DynamicIntegrationRepository } from '../repositories/dynamic-integration.repository'; import { DynamicCheckRepository } from '../repositories/dynamic-check.repository'; import { ProviderRepository } from '../repositories/provider.repository'; +import { CheckRunRepository } from '../repositories/check-run.repository'; import { DynamicManifestLoaderService } from '../services/dynamic-manifest-loader.service'; -import { validateIntegrationDefinition } from '@trycompai/integration-platform'; +import { + validateIntegrationDefinition, + CheckDefinitionSchema, +} from '@trycompai/integration-platform'; @Controller({ path: 'internal/dynamic-integrations', version: '1' }) @UseGuards(InternalTokenGuard) @@ -29,6 +34,7 @@ export class DynamicIntegrationsController { private readonly dynamicIntegrationRepo: DynamicIntegrationRepository, private readonly dynamicCheckRepo: DynamicCheckRepository, private readonly providerRepo: ProviderRepository, + private readonly checkRunRepo: CheckRunRepository, private readonly loaderService: DynamicManifestLoaderService, ) {} @@ -365,4 +371,154 @@ export class DynamicIntegrationsController { this.logger.log(`Deactivated dynamic integration: ${integration.slug}`); return { success: true }; } + + // ==================== Agent Debugging Endpoints ==================== + + /** + * Validate a definition without saving. + * Agents use this to check syntax/structure before committing. + */ + @Post('validate') + async validate(@Body() body: Record) { + const result = validateIntegrationDefinition(body); + + if (!result.success) { + return { + valid: false, + errors: result.errors, + }; + } + + // The top-level validateIntegrationDefinition already validates + // all checks and syncDefinition via Zod. If we got here, everything is valid. + // Do additional per-check validation for detailed error reporting. + const checkErrors: Array<{ checkSlug: string; errors: Array<{ path: string; message: string }> }> = []; + const definition = result.data!; + const rawBody = body as Record; + + for (const check of definition.checks) { + const checkResult = CheckDefinitionSchema.safeParse(check.definition); + if (!checkResult.success) { + checkErrors.push({ + checkSlug: check.checkSlug, + errors: checkResult.error.issues.map((issue) => ({ + path: issue.path.join('.'), + message: issue.message, + })), + }); + } + } + + return { + valid: checkErrors.length === 0, + ...(checkErrors.length > 0 ? { checkErrors } : {}), + summary: { + slug: definition.slug, + name: definition.name, + category: definition.category, + capabilities: definition.capabilities, + checksCount: definition.checks.length, + hasSyncDefinition: !!rawBody.syncDefinition, + }, + }; + } + + /** + * Get recent check run history for a dynamic integration. + * Agents use this to debug failing checks — includes full logs and results. + */ + @Get(':id/check-runs') + async getCheckRuns(@Param('id') id: string) { + const integration = await this.dynamicIntegrationRepo.findById(id); + if (!integration) { + throw new HttpException('Dynamic integration not found', HttpStatus.NOT_FOUND); + } + + // Find all connections for this provider + const connections = await db.integrationConnection.findMany({ + where: { + provider: { slug: integration.slug }, + status: 'active', + }, + select: { id: true, organizationId: true }, + }); + + if (connections.length === 0) { + return { runs: [], total: 0 }; + } + + // Get recent runs across all connections + const runs = await db.integrationCheckRun.findMany({ + where: { + connectionId: { in: connections.map((c) => c.id) }, + }, + include: { + results: { + select: { + id: true, + passed: true, + title: true, + resourceType: true, + resourceId: true, + severity: true, + remediation: true, + }, + }, + }, + orderBy: { createdAt: 'desc' }, + take: 20, + }); + + return { + runs: runs.map((run) => ({ + id: run.id, + checkId: run.checkId, + checkName: run.checkName, + connectionId: run.connectionId, + status: run.status, + startedAt: run.startedAt, + completedAt: run.completedAt, + durationMs: run.durationMs, + totalChecked: run.totalChecked, + passedCount: run.passedCount, + failedCount: run.failedCount, + errorMessage: run.errorMessage, + logs: run.logs, + results: run.results, + })), + total: runs.length, + }; + } + + /** + * Get a single check run with full details (logs, results, error info). + * Agents use this to debug a specific failed run. + */ + @Get('check-runs/:runId') + async getCheckRunById(@Param('runId') runId: string) { + const run = await this.checkRunRepo.findById(runId); + if (!run) { + throw new HttpException('Check run not found', HttpStatus.NOT_FOUND); + } + + return { + id: run.id, + checkId: run.checkId, + checkName: run.checkName, + connectionId: run.connectionId, + status: run.status, + startedAt: run.startedAt, + completedAt: run.completedAt, + durationMs: run.durationMs, + totalChecked: run.totalChecked, + passedCount: run.passedCount, + failedCount: run.failedCount, + errorMessage: run.errorMessage, + logs: run.logs, + results: run.results, + provider: run.connection?.provider + ? { slug: run.connection.provider.slug, name: run.connection.provider.name } + : null, + }; + } } diff --git a/apps/api/src/integration-platform/controllers/sync.controller.ts b/apps/api/src/integration-platform/controllers/sync.controller.ts index d3af5fd10d..6907fdb440 100644 --- a/apps/api/src/integration-platform/controllers/sync.controller.ts +++ b/apps/api/src/integration-platform/controllers/sync.controller.ts @@ -2,6 +2,7 @@ import { Controller, Post, Get, + Param, Query, Body, HttpException, @@ -19,22 +20,30 @@ import { } from '../../auth/auth-context.decorator'; import type { AuthContext as AuthContextType } from '../../auth/types'; import { db } from '@db'; +import type { Prisma } from '@prisma/client'; import { ConnectionRepository } from '../repositories/connection.repository'; import { CredentialVaultService } from '../services/credential-vault.service'; import { OAuthCredentialsService } from '../services/oauth-credentials.service'; import { getManifest, + registry, matchesSyncFilterTerms, parseSyncFilterTerms, + interpretDeclarativeSync, type OAuthConfig, type RampUser, type RampUserStatus, type RampUsersResponse, type RoleMappingEntry, + type SyncDefinition, } from '@trycompai/integration-platform'; import { RampRoleMappingService } from '../services/ramp-role-mapping.service'; import { IntegrationSyncLoggerService } from '../services/integration-sync-logger.service'; import { RampApiService } from '../services/ramp-api.service'; +import { GenericEmployeeSyncService } from '../services/generic-employee-sync.service'; +import { DynamicIntegrationRepository } from '../repositories/dynamic-integration.repository'; +import { CheckRunRepository } from '../repositories/check-run.repository'; +import { createCheckContext } from '@trycompai/integration-platform'; import { filterUsersByOrgUnits } from './sync-ou-filter'; interface GoogleWorkspaceUser { @@ -74,6 +83,9 @@ export class SyncController { private readonly rampRoleMappingService: RampRoleMappingService, private readonly syncLoggerService: IntegrationSyncLoggerService, private readonly rampApiService: RampApiService, + private readonly genericSyncService: GenericEmployeeSyncService, + private readonly dynamicIntegrationRepo: DynamicIntegrationRepository, + private readonly checkRunRepo: CheckRunRepository, ) {} /** @@ -2067,12 +2079,10 @@ export class SyncController { // Validate provider if set if (provider) { - const validProviders = [ - 'google-workspace', - 'rippling', - 'jumpcloud', - 'ramp', - ]; + const allManifests = registry.getActiveManifests(); + const validProviders = allManifests + .filter((m) => m.capabilities?.includes('sync')) + .map((m) => m.id); if (!validProviders.includes(provider)) { throw new HttpException( `Invalid provider. Must be one of: ${validProviders.join(', ')}`, @@ -2108,4 +2118,280 @@ export class SyncController { provider, }; } + + // ============================================================================ + // Dynamic sync endpoints (for dynamic integrations with syncDefinition) + // ============================================================================ + + /** + * Get all providers that support employee sync (both code-based and dynamic). + * Used by the frontend to render the provider selector dynamically. + */ + @Get('available-providers') + @RequirePermission('integration', 'read') + async getAvailableSyncProviders( + @OrganizationId() organizationId: string, + ) { + const allManifests = registry.getActiveManifests(); + const syncProviders = allManifests.filter((m) => + m.capabilities?.includes('sync'), + ); + + const results = await Promise.all( + syncProviders.map(async (m) => { + const connection = await db.integrationConnection.findFirst({ + where: { + organizationId, + status: 'active', + provider: { slug: m.id }, + }, + select: { + id: true, + status: true, + lastSyncAt: true, + nextSyncAt: true, + }, + }); + return { + slug: m.id, + name: m.name, + logoUrl: m.logoUrl, + connected: !!connection, + connectionId: connection?.id ?? null, + lastSyncAt: connection?.lastSyncAt?.toISOString() ?? null, + nextSyncAt: connection?.nextSyncAt?.toISOString() ?? null, + }; + }), + ); + + return { providers: results }; + } + + /** + * Generic sync endpoint for dynamic integrations. + * Runs the syncDefinition (DSL/code steps) and processes the resulting employees. + * + * NOTE: This route uses :providerSlug param. NestJS matches routes in order, + * so the hardcoded routes above (google-workspace/employees, etc.) take priority. + * This only matches for slugs that don't match the 4 built-in providers. + */ + @Post('dynamic/:providerSlug/employees') + @RequirePermission('integration', 'update') + async syncDynamicProviderEmployees( + @OrganizationId() organizationId: string, + @Param('providerSlug') providerSlug: string, + @Query('connectionId') connectionId: string, + ) { + if (!connectionId) { + throw new HttpException( + 'connectionId is required', + HttpStatus.BAD_REQUEST, + ); + } + + this.logger.log( + `[DynamicSync] Starting sync for provider="${providerSlug}" connection="${connectionId}" org="${organizationId}"`, + ); + + // 1. Validate connection + const connection = await this.connectionRepository.findById(connectionId); + if (!connection || connection.organizationId !== organizationId) { + throw new HttpException('Connection not found', HttpStatus.NOT_FOUND); + } + + // 2. Get manifest from registry — must have 'sync' capability + const manifest = getManifest(providerSlug); + if (!manifest) { + throw new HttpException( + `Integration "${providerSlug}" not found`, + HttpStatus.NOT_FOUND, + ); + } + if (!manifest.capabilities?.includes('sync')) { + throw new HttpException( + `Integration "${providerSlug}" does not support sync`, + HttpStatus.BAD_REQUEST, + ); + } + + // 3. Get dynamic integration — must have syncDefinition + const dynamicIntegration = + await this.dynamicIntegrationRepo.findBySlug(providerSlug); + if (!dynamicIntegration?.syncDefinition) { + throw new HttpException( + `Integration "${providerSlug}" has no sync definition`, + HttpStatus.BAD_REQUEST, + ); + } + + // 4. Get & refresh credentials + let credentials = + await this.credentialVaultService.getDecryptedCredentials(connectionId); + if (!credentials) { + throw new HttpException( + 'No valid credentials found. Please reconnect the integration.', + HttpStatus.UNAUTHORIZED, + ); + } + + // Try to refresh OAuth token if applicable + if (manifest.auth.type === 'oauth2' && credentials.refresh_token) { + const oauthConfig = manifest.auth.config as OAuthConfig; + try { + const oauthCredentials = + await this.oauthCredentialsService.getCredentials( + providerSlug, + organizationId, + ); + if (oauthCredentials) { + const newToken = + await this.credentialVaultService.refreshOAuthTokens( + connectionId, + { + tokenUrl: oauthConfig.tokenUrl, + refreshUrl: oauthConfig.refreshUrl, + clientId: oauthCredentials.clientId, + clientSecret: oauthCredentials.clientSecret, + clientAuthMethod: oauthConfig.clientAuthMethod, + }, + ); + if (newToken) { + credentials = + await this.credentialVaultService.getDecryptedCredentials( + connectionId, + ); + } + } + } catch (refreshError) { + this.logger.warn( + `Token refresh failed for ${providerSlug}, trying with existing token: ${refreshError}`, + ); + } + } + + const accessToken = credentials?.access_token; + this.logger.log( + `[DynamicSync] Credentials ready for "${providerSlug}" (auth=${manifest.auth.type}, hasToken=${!!accessToken})`, + ); + + // 5. Create a sync run record (same table as check runs — agent reads both the same way) + const syncRun = await this.checkRunRepo.create({ + connectionId, + checkId: `sync:${providerSlug}`, + checkName: `Employee Sync: ${manifest.name}`, + }); + + // 6. Create CheckContext with logging that captures to the run + const { ctx, getResults } = createCheckContext({ + manifest, + accessToken: typeof accessToken === 'string' ? accessToken : undefined, + credentials: (credentials ?? {}) as Record, + variables: + ((connection.variables as Record) ?? {}) as Record, + connectionId, + organizationId, + metadata: + (connection.metadata as Record) ?? {}, + logger: { + info: (msg, data) => this.logger.log(msg, data), + warn: (msg, data) => this.logger.warn(msg, data), + error: (msg, data) => this.logger.error(msg, data), + }, + }); + + try { + // 7. Run sync definition → get SyncEmployee[] + const syncDefinition = + dynamicIntegration.syncDefinition as unknown as SyncDefinition; + const syncRunner = interpretDeclarativeSync({ + definition: syncDefinition, + }); + + const employees = await syncRunner.run(ctx); + + this.logger.log( + `[DynamicSync] Sync definition produced ${employees.length} employees for "${providerSlug}"`, + ); + + // 8. Process employees via generic service + const result = await this.genericSyncService.processEmployees({ + organizationId, + employees, + options: { + providerName: manifest.name, + }, + }); + + // 9. Persist execution logs + results to the run record + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: result.errors > 0 ? 'failed' : 'success', + durationMs: Date.now() - startTime, + totalChecked: result.totalFound, + passedCount: result.imported + result.reactivated, + failedCount: result.errors, + logs: executionLogs.length > 0 + ? (executionLogs as unknown as Prisma.InputJsonValue) + : undefined, + }); + + this.logger.log( + `[DynamicSync] Sync complete for "${providerSlug}": imported=${result.imported} skipped=${result.skipped} deactivated=${result.deactivated} reactivated=${result.reactivated} errors=${result.errors}`, + ); + + return { + ...result, + syncRunId: syncRun.id, + }; + } catch (error) { + // Persist error + whatever logs were captured before the failure + const executionLogs = getResults().logs.map((log) => ({ + level: log.level, + message: log.message, + ...(log.data ? { data: log.data } : {}), + timestamp: log.timestamp.toISOString(), + })); + + const errorMessage = error instanceof Error ? error.message : String(error); + const errorStack = error instanceof Error ? error.stack : undefined; + + const startTime = syncRun.startedAt?.getTime() || Date.now(); + await this.checkRunRepo.complete(syncRun.id, { + status: 'failed', + durationMs: Date.now() - startTime, + totalChecked: 0, + passedCount: 0, + failedCount: 0, + errorMessage, + logs: [ + ...executionLogs, + { + level: 'error', + message: `Sync execution failed: ${errorMessage}`, + ...(errorStack ? { data: { stack: errorStack } } : {}), + timestamp: new Date().toISOString(), + }, + ] as unknown as Prisma.InputJsonValue, + }); + + this.logger.error( + `[DynamicSync] Sync failed for "${providerSlug}": ${errorMessage}`, + ); + + throw new HttpException( + { + message: `Sync execution failed: ${errorMessage}`, + syncRunId: syncRun.id, + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } } diff --git a/apps/api/src/integration-platform/integration-platform.module.ts b/apps/api/src/integration-platform/integration-platform.module.ts index 1632dbfdd6..6277efbc5d 100644 --- a/apps/api/src/integration-platform/integration-platform.module.ts +++ b/apps/api/src/integration-platform/integration-platform.module.ts @@ -30,6 +30,7 @@ import { DynamicCheckRepository } from './repositories/dynamic-check.repository' import { RampRoleMappingService } from './services/ramp-role-mapping.service'; import { IntegrationSyncLoggerService } from './services/integration-sync-logger.service'; import { RampApiService } from './services/ramp-api.service'; +import { GenericEmployeeSyncService } from './services/generic-employee-sync.service'; @Module({ imports: [AuthModule], @@ -58,6 +59,7 @@ import { RampApiService } from './services/ramp-api.service'; RampRoleMappingService, IntegrationSyncLoggerService, RampApiService, + GenericEmployeeSyncService, // Repositories ProviderRepository, ConnectionRepository, diff --git a/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts b/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts index b6f858655d..50d3f0e22f 100644 --- a/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts +++ b/apps/api/src/integration-platform/services/dynamic-manifest-loader.service.ts @@ -82,6 +82,11 @@ export class DynamicManifestLoaderService implements OnModuleInit { this.convertCheck(check, integration.slug), ); + // Collect manifest-level variables from syncDefinition (if present) + // These appear in the customer configuration UI (ManageIntegrationDialog) + const syncDef = integration.syncDefinition as Record | null; + const syncVariables = syncDef?.variables as CheckVariable[] | undefined; + return { id: integration.slug, name: integration.name, @@ -94,6 +99,7 @@ export class DynamicManifestLoaderService implements OnModuleInit { defaultHeaders: (integration.defaultHeaders as Record) ?? undefined, capabilities: (integration.capabilities as unknown as IntegrationCapability[]) ?? ['checks'], supportsMultipleConnections: integration.supportsMultipleConnections, + variables: syncVariables && syncVariables.length > 0 ? syncVariables : undefined, checks, isActive: integration.isActive, }; diff --git a/apps/api/src/integration-platform/services/generic-employee-sync.service.ts b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts new file mode 100644 index 0000000000..97326a4202 --- /dev/null +++ b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts @@ -0,0 +1,274 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { db } from '@trycompai/db'; +import type { SyncEmployee } from '@trycompai/integration-platform'; + +// ============================================================================ +// Types +// ============================================================================ + +export interface SyncResultDetail { + email: string; + status: + | 'imported' + | 'skipped' + | 'deactivated' + | 'reactivated' + | 'error'; + reason?: string; +} + +export interface SyncResult { + success: boolean; + totalFound: number; + imported: number; + skipped: number; + deactivated: number; + reactivated: number; + errors: number; + details: SyncResultDetail[]; +} + +export interface ProcessEmployeesOptions { + /** Default role for new members. Defaults to 'employee'. */ + defaultRole?: string; + /** Whether to reactivate previously deactivated members. Defaults to false. */ + allowReactivation?: boolean; + /** Roles that should never be auto-deactivated. Defaults to owner/admin/auditor. */ + protectedRoles?: string[]; + /** Provider slug for deactivation reason messages. */ + providerName?: string; +} + +const DEFAULT_PROTECTED_ROLES = ['owner', 'admin', 'auditor']; + +// ============================================================================ +// Service +// ============================================================================ + +/** + * Generic employee sync service that handles the platform-generic operations: + * - Creating users and members from a standardized employee list + * - Deactivating members no longer present in the provider + * - Safety guards (never deactivate privileged roles) + * + * This extracts the common pattern from SyncController's 4 provider-specific + * implementations (Google Workspace, Rippling, JumpCloud, Ramp). + * + * The provider-specific logic (fetching users, normalizing fields) is handled + * by the dynamic integration's syncDefinition (DSL/code steps). + */ +@Injectable() +export class GenericEmployeeSyncService { + private readonly logger = new Logger(GenericEmployeeSyncService.name); + + /** + * Process a standardized employee list into DB operations. + * + * Phase 1: Import active employees (create User if needed, create Member if needed) + * Phase 2: Deactivate members no longer present in the provider + */ + async processEmployees({ + organizationId, + employees, + options = {}, + }: { + organizationId: string; + employees: SyncEmployee[]; + options?: ProcessEmployeesOptions; + }): Promise { + const defaultRole = options.defaultRole ?? 'employee'; + const allowReactivation = options.allowReactivation ?? false; + const protectedRoles = options.protectedRoles ?? DEFAULT_PROTECTED_ROLES; + const providerName = options.providerName ?? 'provider'; + + const results: SyncResult = { + success: true, + totalFound: employees.length, + imported: 0, + skipped: 0, + deactivated: 0, + reactivated: 0, + errors: 0, + details: [], + }; + + this.logger.log( + `[GenericSync] Processing ${employees.length} employees for org="${organizationId}" provider="${providerName}"`, + ); + + // Separate employees by status + const activeEmployees = employees.filter((e) => e.status === 'active'); + const inactiveEmails = new Set( + employees + .filter((e) => e.status !== 'active') + .map((e) => e.email.toLowerCase()), + ); + const activeEmails = new Set( + activeEmployees.map((e) => e.email.toLowerCase()), + ); + + // Build domain set from all employees (for domain-scoped deactivation) + const providerDomains = new Set(); + for (const emp of employees) { + const domain = emp.email.toLowerCase().split('@')[1]; + if (domain) providerDomains.add(domain); + } + + this.logger.log( + `[GenericSync] Employee breakdown: active=${activeEmployees.length} inactive/suspended=${inactiveEmails.size} domains=${Array.from(providerDomains).join(',')}`, + ); + + // ==================================================================== + // Phase 1: Import active employees + // ==================================================================== + for (const employee of activeEmployees) { + const normalizedEmail = employee.email.toLowerCase(); + const displayName = + employee.name || + [employee.firstName, employee.lastName].filter(Boolean).join(' ') || + normalizedEmail.split('@')[0] || + normalizedEmail; + + try { + // Find or create User + let existingUser = await db.user.findUnique({ + where: { email: normalizedEmail }, + }); + + if (!existingUser) { + existingUser = await db.user.create({ + data: { + email: normalizedEmail, + name: displayName, + emailVerified: true, + }, + }); + } + + // Check if Member already exists in this org + const existingMember = await db.member.findFirst({ + where: { + organizationId, + userId: existingUser.id, + }, + }); + + if (existingMember) { + if (existingMember.deactivated && allowReactivation) { + // Reactivate the member + await db.member.update({ + where: { id: existingMember.id }, + data: { deactivated: false, isActive: true }, + }); + results.reactivated++; + results.details.push({ + email: normalizedEmail, + status: 'reactivated', + }); + } else { + results.skipped++; + results.details.push({ + email: normalizedEmail, + status: 'skipped', + reason: existingMember.deactivated + ? 'Member is deactivated' + : 'Already a member', + }); + } + continue; + } + + // Create new member + await db.member.create({ + data: { + organizationId, + userId: existingUser.id, + role: employee.role || defaultRole, + isActive: true, + }, + }); + + results.imported++; + results.details.push({ + email: normalizedEmail, + status: 'imported', + }); + } catch (error) { + this.logger.error( + `Error importing employee ${normalizedEmail}: ${error}`, + ); + results.errors++; + results.details.push({ + email: normalizedEmail, + status: 'error', + reason: error instanceof Error ? error.message : 'Unknown error', + }); + } + } + + this.logger.log( + `[GenericSync] Phase 1 complete: imported=${results.imported} skipped=${results.skipped} reactivated=${results.reactivated} errors=${results.errors}`, + ); + + // ==================================================================== + // Phase 2: Deactivate members no longer in provider + // ==================================================================== + const allOrgMembers = await db.member.findMany({ + where: { + organizationId, + deactivated: false, + }, + include: { + user: true, + }, + }); + + for (const member of allOrgMembers) { + const memberEmail = member.user.email.toLowerCase(); + const memberDomain = memberEmail.split('@')[1]; + + // Only check members whose email domain matches the provider's domains + if (!memberDomain || !providerDomains.has(memberDomain)) { + continue; + } + + // Safety guard: never auto-deactivate privileged members + const memberRoles = member.role + .split(',') + .map((role) => role.trim().toLowerCase()); + if (protectedRoles.some((pr) => memberRoles.includes(pr))) { + continue; + } + + // If member is not in active set AND not already accounted for, deactivate + const isSuspended = inactiveEmails.has(memberEmail); + const isRemoved = !activeEmails.has(memberEmail) && !isSuspended; + + if (isSuspended || isRemoved) { + try { + await db.member.update({ + where: { id: member.id }, + data: { deactivated: true, isActive: false }, + }); + results.deactivated++; + results.details.push({ + email: memberEmail, + status: 'deactivated', + reason: isSuspended + ? `User is suspended in ${providerName}` + : `User was removed from ${providerName}`, + }); + } catch (error) { + this.logger.error(`Error deactivating member ${memberEmail}: ${error}`); + results.errors++; + } + } + } + + this.logger.log( + `Sync complete for ${providerName}: ${results.imported} imported, ${results.reactivated} reactivated, ${results.deactivated} deactivated, ${results.skipped} skipped, ${results.errors} errors`, + ); + + return results; + } +} diff --git a/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts b/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts index 7e16b0a05d..11b9fa5e8e 100644 --- a/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts +++ b/apps/api/src/trigger/integration-platform/sync-employees-schedule.ts @@ -201,7 +201,8 @@ async function syncProvider(params: SyncProviderParams): Promise { return syncRamp({ connectionId, organizationId }); default: - throw new Error(`No sync handler for provider: ${providerSlug}`); + // Try generic dynamic sync endpoint for non-built-in providers + return syncDynamicProvider({ providerSlug, connectionId, organizationId }); } } @@ -352,3 +353,44 @@ async function syncRamp({ errors: data.errors || 0, }; } + +async function syncDynamicProvider({ + providerSlug, + connectionId, + organizationId, +}: { + providerSlug: string; + connectionId: string; + organizationId: string; +}): Promise { + const url = new URL( + `${API_BASE_URL}/v1/integrations/sync/dynamic/${providerSlug}/employees`, + ); + url.searchParams.set('connectionId', connectionId); + + const response = await fetch(url.toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-service-token': process.env.SERVICE_TOKEN_TRIGGER!, + 'x-organization-id': organizationId, + }, + }); + + if (!response.ok) { + const errorBody = await response.text(); + throw new Error( + `Dynamic sync failed for ${providerSlug}: ${response.status} - ${errorBody}`, + ); + } + + const data = await response.json(); + return { + success: data.success, + imported: data.imported || 0, + reactivated: data.reactivated || 0, + deactivated: data.deactivated || 0, + skipped: data.skipped || 0, + errors: data.errors || 0, + }; +} diff --git a/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx b/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx index ff75c30a52..0436678794 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx +++ b/apps/app/src/app/(app)/[orgId]/people/all/components/TeamMembersClient.tsx @@ -121,13 +121,14 @@ export function TeamMembersClient({ handleRoleMappingClose, handleRoleMappingSaved, openRoleMappingEditor, + availableProviders, } = useEmployeeSync({ organizationId, initialData: employeeSyncData }); const lastSyncAt = employeeSyncData.lastSyncAt; const nextSyncAt = employeeSyncData.nextSyncAt; const handleEmployeeSync = async ( - provider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp', + provider: string, ) => { const result = await syncEmployees(provider); if (result?.success) { @@ -474,6 +475,29 @@ export function TeamMembersClient({ )} + {/* Dynamic sync providers (from dynamic integrations) */} + {availableProviders + .filter((p) => p.connected && !['google-workspace', 'rippling', 'jumpcloud', 'ramp'].includes(p.slug)) + .map((provider) => ( + +
+ {provider.logoUrl && ( + {provider.name} + )} + {provider.name} + {selectedProvider === provider.slug && ( + Active + )} +
+
+ ))} diff --git a/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts b/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts index 9b12c0e2b8..a964403da2 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts +++ b/apps/app/src/app/(app)/[orgId]/people/all/data/queries.ts @@ -1,13 +1,25 @@ import { serverApi } from '@/lib/server-api-client'; +export interface SyncProviderInfo { + slug: string; + name: string; + logoUrl: string; + connected: boolean; + connectionId: string | null; + lastSyncAt: string | null; + nextSyncAt: string | null; +} + export interface EmployeeSyncConnectionsData { googleWorkspaceConnectionId: string | null; ripplingConnectionId: string | null; jumpcloudConnectionId: string | null; rampConnectionId: string | null; - selectedProvider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp' | null | undefined; + selectedProvider: string | null | undefined; lastSyncAt: Date | null; nextSyncAt: Date | null; + /** All providers that support sync (built-in + dynamic) */ + availableProviders: SyncProviderInfo[]; } interface ConnectionStatus { @@ -20,7 +32,7 @@ interface ConnectionStatus { export async function getEmployeeSyncConnections( organizationId: string, ): Promise { - const [gwResponse, ripplingResponse, jumpcloudResponse, rampResponse, providerResponse] = + const [gwResponse, ripplingResponse, jumpcloudResponse, rampResponse, providerResponse, availableResponse] = await Promise.all([ serverApi.post( `/v1/integrations/sync/google-workspace/status?organizationId=${organizationId}`, @@ -34,23 +46,36 @@ export async function getEmployeeSyncConnections( serverApi.post( `/v1/integrations/sync/ramp/status?organizationId=${organizationId}`, ), - serverApi.get<{ provider: 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp' | null }>( + serverApi.get<{ provider: string | null }>( `/v1/integrations/sync/employee-sync-provider?organizationId=${organizationId}`, ), + serverApi.get<{ providers: SyncProviderInfo[] }>( + `/v1/integrations/sync/available-providers?organizationId=${organizationId}`, + ).catch(() => ({ data: null, error: null, status: 500 })), ]); - // Get sync times from the selected provider's connection + const availableProviders = availableResponse.data?.providers ?? []; + + // Get sync times from the selected provider + // Check built-in providers first, then fall back to available-providers data const selectedProviderSlug = providerResponse.data?.provider; - const selectedConnection = - selectedProviderSlug === 'google-workspace' - ? gwResponse.data - : selectedProviderSlug === 'rippling' - ? ripplingResponse.data - : selectedProviderSlug === 'jumpcloud' - ? jumpcloudResponse.data - : selectedProviderSlug === 'ramp' - ? rampResponse.data - : null; + let selectedSyncTimes: { lastSyncAt?: string | null; nextSyncAt?: string | null } | null = null; + + if (selectedProviderSlug === 'google-workspace') { + selectedSyncTimes = gwResponse.data ?? null; + } else if (selectedProviderSlug === 'rippling') { + selectedSyncTimes = ripplingResponse.data ?? null; + } else if (selectedProviderSlug === 'jumpcloud') { + selectedSyncTimes = jumpcloudResponse.data ?? null; + } else if (selectedProviderSlug === 'ramp') { + selectedSyncTimes = rampResponse.data ?? null; + } else if (selectedProviderSlug) { + // Dynamic provider — get sync times from available-providers data + const dynProvider = availableProviders.find((p) => p.slug === selectedProviderSlug); + if (dynProvider) { + selectedSyncTimes = { lastSyncAt: dynProvider.lastSyncAt, nextSyncAt: dynProvider.nextSyncAt }; + } + } return { googleWorkspaceConnectionId: @@ -70,7 +95,8 @@ export async function getEmployeeSyncConnections( ? rampResponse.data.connectionId : null, selectedProvider: selectedProviderSlug, - lastSyncAt: selectedConnection?.lastSyncAt ? new Date(selectedConnection.lastSyncAt) : null, - nextSyncAt: selectedConnection?.nextSyncAt ? new Date(selectedConnection.nextSyncAt) : null, + lastSyncAt: selectedSyncTimes?.lastSyncAt ? new Date(selectedSyncTimes.lastSyncAt) : null, + nextSyncAt: selectedSyncTimes?.nextSyncAt ? new Date(selectedSyncTimes.nextSyncAt) : null, + availableProviders, }; } diff --git a/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts b/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts index 8dd6254620..887971e01c 100644 --- a/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts +++ b/apps/app/src/app/(app)/[orgId]/people/all/hooks/useEmployeeSync.ts @@ -5,9 +5,10 @@ import { useState } from 'react'; import { toast } from 'sonner'; import useSWR from 'swr'; -import type { EmployeeSyncConnectionsData } from '../data/queries'; +import type { EmployeeSyncConnectionsData, SyncProviderInfo } from '../data/queries'; -type SyncProvider = 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp'; +type BuiltInSyncProvider = 'google-workspace' | 'rippling' | 'jumpcloud' | 'ramp'; +type SyncProvider = string; interface SyncResult { success: boolean; @@ -68,8 +69,17 @@ interface UseEmployeeSyncReturn { handleRoleMappingClose: () => void; handleRoleMappingSaved: () => void; openRoleMappingEditor: () => Promise; + /** All available sync providers (built-in + dynamic) */ + availableProviders: SyncProviderInfo[]; } +const BUILT_IN_PROVIDERS = new Set([ + 'google-workspace', + 'rippling', + 'jumpcloud', + 'ramp', +]); + const PROVIDER_CONFIG = { 'google-workspace': { name: 'Google Workspace', @@ -125,7 +135,10 @@ export const useEmployeeSync = ({ mutate({ ...data!, selectedProvider: provider }, false); if (provider) { - toast.success(`${PROVIDER_CONFIG[provider].name} set as your employee sync provider`); + const name = provider in PROVIDER_CONFIG + ? PROVIDER_CONFIG[provider as BuiltInSyncProvider].name + : (availableProviders.find((p) => p.slug === provider)?.name ?? provider); + toast.success(`${name} set as your employee sync provider`); } } catch (error) { toast.error('Failed to set sync provider'); @@ -133,23 +146,36 @@ export const useEmployeeSync = ({ } }; + const availableProviders = data?.availableProviders ?? []; + + const getConnectionIdForProvider = (provider: SyncProvider): string | null => { + if (provider === 'google-workspace') return googleWorkspaceConnectionId; + if (provider === 'rippling') return ripplingConnectionId; + if (provider === 'jumpcloud') return jumpcloudConnectionId; + if (provider === 'ramp') return rampConnectionId; + // Dynamic provider — look up from availableProviders + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.connectionId ?? null; + }; + + const getSyncUrl = (provider: SyncProvider, connectionId: string): string => { + if (BUILT_IN_PROVIDERS.has(provider)) { + return `/v1/integrations/sync/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`; + } + return `/v1/integrations/sync/dynamic/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`; + }; + const syncEmployees = async (provider: SyncProvider): Promise => { - const connectionId = - provider === 'google-workspace' - ? googleWorkspaceConnectionId - : provider === 'rippling' - ? ripplingConnectionId - : provider === 'jumpcloud' - ? jumpcloudConnectionId - : rampConnectionId; + const connectionId = getConnectionIdForProvider(provider); if (!connectionId) { - toast.error(`${PROVIDER_CONFIG[provider].name} is not connected`); + const providerName = getProviderName(provider); + toast.error(`${providerName} is not connected`); return null; } setIsSyncing(true); - const config = PROVIDER_CONFIG[provider]; + const providerName = getProviderName(provider); try { // Set as sync provider if not already @@ -158,7 +184,7 @@ export const useEmployeeSync = ({ } const response = await apiClient.post( - `/v1/integrations/sync/${provider}/employees?organizationId=${organizationId}&connectionId=${connectionId}`, + getSyncUrl(provider, connectionId), ); // Handle role mapping requirement (Ramp only) @@ -205,7 +231,7 @@ export const useEmployeeSync = ({ } if (deactivated > 0) { toast.info( - `Deactivated ${deactivated} employee${deactivated > 1 ? 's' : ''} (no longer in ${config.name})`, + `Deactivated ${deactivated} employee${deactivated > 1 ? 's' : ''} (no longer in ${providerName})`, ); } if (imported === 0 && updated === 0 && reactivated === 0 && deactivated === 0 && skipped > 0) { @@ -224,15 +250,27 @@ export const useEmployeeSync = ({ return null; } catch (error) { - toast.error(`Failed to sync employees from ${config.name}`); + toast.error(`Failed to sync employees from ${providerName}`); return null; } finally { setIsSyncing(false); } }; - const getProviderName = (provider: SyncProvider) => PROVIDER_CONFIG[provider].shortName; - const getProviderLogo = (provider: SyncProvider) => PROVIDER_CONFIG[provider].logo; + const getProviderName = (provider: SyncProvider): string => { + if (provider in PROVIDER_CONFIG) { + return PROVIDER_CONFIG[provider as BuiltInSyncProvider].shortName; + } + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.name ?? provider; + }; + const getProviderLogo = (provider: SyncProvider): string => { + if (provider in PROVIDER_CONFIG) { + return PROVIDER_CONFIG[provider as BuiltInSyncProvider].logo; + } + const dynProvider = availableProviders.find((p) => p.slug === provider); + return dynProvider?.logoUrl ?? ''; + }; const openRoleMappingEditor = async () => { if (!rampConnectionId) { @@ -285,6 +323,16 @@ export const useEmployeeSync = ({ } }; + const hasAnyBuiltInConnection = !!( + googleWorkspaceConnectionId || + ripplingConnectionId || + jumpcloudConnectionId || + rampConnectionId + ); + const hasDynamicConnection = availableProviders.some( + (p) => p.connected && !BUILT_IN_PROVIDERS.has(p.slug), + ); + return { googleWorkspaceConnectionId, ripplingConnectionId, @@ -294,12 +342,7 @@ export const useEmployeeSync = ({ isSyncing, syncEmployees, setSyncProvider, - hasAnyConnection: !!( - googleWorkspaceConnectionId || - ripplingConnectionId || - jumpcloudConnectionId || - rampConnectionId - ), + hasAnyConnection: hasAnyBuiltInConnection || hasDynamicConnection, getProviderName, getProviderLogo, showRoleMappingSheet, @@ -307,5 +350,6 @@ export const useEmployeeSync = ({ handleRoleMappingClose, handleRoleMappingSaved, openRoleMappingEditor, + availableProviders, }; }; diff --git a/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql b/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql new file mode 100644 index 0000000000..2dfb24aea7 --- /dev/null +++ b/packages/db/prisma/migrations/20260330000000_add_dynamic_sync_definition/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "DynamicIntegration" ADD COLUMN "syncDefinition" JSONB; diff --git a/packages/db/prisma/schema/dynamic-integration.prisma b/packages/db/prisma/schema/dynamic-integration.prisma index 04fb814ee9..3bea2bc600 100644 --- a/packages/db/prisma/schema/dynamic-integration.prisma +++ b/packages/db/prisma/schema/dynamic-integration.prisma @@ -32,6 +32,10 @@ model DynamicIntegration { /// Whether multiple connections per org are allowed supportsMultipleConnections Boolean @default(false) + /// Declarative sync definition (JSON — DSL steps that produce employee list) + /// When present and capabilities includes 'sync', enables employee sync + syncDefinition Json? + /// Whether this dynamic integration is active isActive Boolean @default(true) diff --git a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts index 2381acd1fd..7b17d2ba8f 100644 --- a/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts +++ b/packages/integration-platform/src/dsl/__tests__/interpreter.test.ts @@ -603,4 +603,483 @@ describe('interpretDeclarativeCheck', () => { expect(ctx._fails[0]!.remediation).toContain('Microsoft 365 admin center'); }); }); + + describe('code step', () => { + it('sets scope values used by subsequent DSL steps', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/users', + as: 'rawUsers', + }, + { + type: 'code', + code: 'scope.users = scope.rawUsers.filter(u => u.active);', + }, + { + type: 'forEach', + collection: 'users', + itemAs: 'user', + resourceType: 'user', + resourceIdPath: 'user.email', + conditions: [ + { field: 'user.mfa', operator: 'eq', value: true }, + ], + onPass: { + title: 'MFA OK for {{user.email}}', + resourceType: 'user', + resourceId: '{{user.email}}', + }, + onFail: { + title: 'MFA missing for {{user.email}}', + resourceType: 'user', + resourceId: '{{user.email}}', + severity: 'high', + remediation: 'Enable MFA', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_filter', + name: 'Code Filter Test', + description: 'Tests code step filtering', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [ + { email: 'alice@test.com', active: true, mfa: true }, + { email: 'bob@test.com', active: false, mfa: false }, + { email: 'carol@test.com', active: true, mfa: false }, + ]); + + await check.run(ctx); + + // Bob is filtered out by the code step (active: false) + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('MFA OK for alice@test.com'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('MFA missing for carol@test.com'); + }); + + it('calls ctx.pass() and ctx.fail() directly', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + ctx.pass({ + title: 'Direct pass', + description: 'From code', + resourceType: 'test', + resourceId: 'test-1', + evidence: { source: 'code' }, + }); + ctx.fail({ + title: 'Direct fail', + description: 'From code', + resourceType: 'test', + resourceId: 'test-2', + severity: 'critical', + remediation: 'Fix it', + }); + `, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_direct', + name: 'Code Direct Test', + description: 'Tests code step direct results', + definition, + }); + + const ctx = createMockContext(); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Direct pass'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('Direct fail'); + expect(ctx._fails[0]!.severity).toBe('critical'); + }); + + it('uses async/await with ctx.fetch()', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + const data = await ctx.fetch('/api/settings'); + scope.ssoEnabled = data.sso_enabled; + `, + }, + { + type: 'branch', + condition: { field: 'ssoEnabled', operator: 'eq', value: true }, + then: [ + { + type: 'emit', + result: 'pass', + template: { + title: 'SSO is enabled', + resourceType: 'settings', + resourceId: 'sso', + }, + }, + ], + else: [ + { + type: 'emit', + result: 'fail', + template: { + title: 'SSO is not enabled', + resourceType: 'settings', + resourceId: 'sso', + severity: 'high', + remediation: 'Enable SSO', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_async', + name: 'Code Async Test', + description: 'Tests code step async fetch', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/settings', { sso_enabled: true }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('SSO is enabled'); + expect(ctx._fails).toHaveLength(0); + }); + + it('supports Promise.all for parallel fetches', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: ` + const [users, roles] = await Promise.all([ + ctx.fetch('/api/users'), + ctx.fetch('/api/roles'), + ]); + scope.userCount = users.length; + scope.roleCount = roles.length; + `, + }, + { + type: 'emit', + result: 'pass', + template: { + title: 'Fetched {{userCount}} users and {{roleCount}} roles', + resourceType: 'system', + resourceId: 'parallel-fetch', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_parallel', + name: 'Code Parallel Test', + description: 'Tests Promise.all', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [{ id: 1 }, { id: 2 }, { id: 3 }]); + ctx._fetchResponses.set('/api/roles', [{ id: 'admin' }, { id: 'user' }]); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Fetched 3 users and 2 roles'); + }); + + it('reads scope from prior fetch step', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/config', + as: 'config', + }, + { + type: 'code', + code: ` + const cfg = scope.config; + scope.isCompliant = cfg.encryption && cfg.audit_logging; + `, + }, + { + type: 'branch', + condition: { field: 'isCompliant', operator: 'truthy' }, + then: [ + { + type: 'emit', + result: 'pass', + template: { + title: 'System is compliant', + resourceType: 'config', + resourceId: 'compliance', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_scope_read', + name: 'Code Scope Read', + description: 'Tests reading prior scope', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/config', { encryption: true, audit_logging: true }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('System is compliant'); + }); + + it('works nested inside forEach', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/repos', + as: 'repos', + }, + { + type: 'forEach', + collection: 'repos', + itemAs: 'repo', + resourceType: 'repository', + resourceIdPath: 'repo.name', + steps: [ + { + type: 'code', + code: ` + const details = await ctx.fetch('/api/repos/' + scope.repo.name + '/settings'); + scope.repo.branchProtection = details.branch_protection; + `, + }, + ], + conditions: [ + { field: 'repo.branchProtection', operator: 'eq', value: true }, + ], + onPass: { + title: '{{repo.name}} has branch protection', + resourceType: 'repository', + resourceId: '{{repo.name}}', + }, + onFail: { + title: '{{repo.name}} missing branch protection', + resourceType: 'repository', + resourceId: '{{repo.name}}', + severity: 'high', + remediation: 'Enable branch protection', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_in_foreach', + name: 'Code in ForEach', + description: 'Tests nested code step', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/repos', [ + { name: 'frontend' }, + { name: 'backend' }, + ]); + ctx._fetchResponses.set('/api/repos/frontend/settings', { branch_protection: true }); + ctx._fetchResponses.set('/api/repos/backend/settings', { branch_protection: false }); + + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('frontend has branch protection'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('backend missing branch protection'); + }); + + it('works nested inside branch', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/org', + as: 'org', + }, + { + type: 'branch', + condition: { field: 'org.plan', operator: 'eq', value: 'enterprise' }, + then: [ + { + type: 'code', + code: ` + ctx.pass({ + title: 'Enterprise plan detected', + description: 'Advanced checks available', + resourceType: 'org', + resourceId: scope.org.id, + evidence: { plan: scope.org.plan }, + }); + `, + }, + ], + else: [ + { + type: 'emit', + result: 'fail', + template: { + title: 'Not enterprise plan', + resourceType: 'org', + resourceId: '{{org.id}}', + severity: 'info', + remediation: 'Upgrade to enterprise', + }, + }, + ], + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_in_branch', + name: 'Code in Branch', + description: 'Tests code in branch', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/org', { id: 'org-1', plan: 'enterprise' }); + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('Enterprise plan detected'); + expect(ctx._fails).toHaveLength(0); + }); + + it('propagates errors from code step', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'code', + code: 'throw new Error("Something went wrong");', + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_error', + name: 'Code Error Test', + description: 'Tests error propagation', + definition, + }); + + const ctx = createMockContext(); + await expect(check.run(ctx)).rejects.toThrow('Something went wrong'); + }); + + it('handles complex data transformation (Map, filter, reduce)', async () => { + const definition: CheckDefinition = { + steps: [ + { + type: 'fetch', + path: '/api/users', + as: 'rawUsers', + }, + { + type: 'fetch', + path: '/api/roles', + as: 'rawRoles', + }, + { + type: 'code', + code: ` + // Build lookup map (like Google Workspace employee-access check) + const roleMap = new Map(); + for (const role of scope.rawRoles) { + roleMap.set(role.id, role.name); + } + + // Enrich users with role names + scope.users = scope.rawUsers.map(u => ({ + ...u, + roleName: roleMap.get(u.roleId) || 'Unknown', + })); + `, + }, + { + type: 'forEach', + collection: 'users', + itemAs: 'user', + resourceType: 'user', + resourceIdPath: 'user.email', + conditions: [ + { field: 'user.roleName', operator: 'neq', value: 'Unknown' }, + ], + onPass: { + title: '{{user.email}} has role {{user.roleName}}', + resourceType: 'user', + resourceId: '{{user.email}}', + }, + onFail: { + title: '{{user.email}} has unknown role', + resourceType: 'user', + resourceId: '{{user.email}}', + severity: 'medium', + remediation: 'Assign a valid role', + }, + }, + ], + }; + + const check = interpretDeclarativeCheck({ + id: 'code_transform', + name: 'Code Transform Test', + description: 'Tests Map + data enrichment', + definition, + }); + + const ctx = createMockContext(); + ctx._fetchResponses.set('/api/users', [ + { email: 'alice@test.com', roleId: 'r1' }, + { email: 'bob@test.com', roleId: 'r999' }, + ]); + ctx._fetchResponses.set('/api/roles', [ + { id: 'r1', name: 'Admin' }, + { id: 'r2', name: 'User' }, + ]); + + await check.run(ctx); + + expect(ctx._passes).toHaveLength(1); + expect(ctx._passes[0]!.title).toBe('alice@test.com has role Admin'); + + expect(ctx._fails).toHaveLength(1); + expect(ctx._fails[0]!.title).toBe('bob@test.com has unknown role'); + }); + }); }); diff --git a/packages/integration-platform/src/dsl/index.ts b/packages/integration-platform/src/dsl/index.ts index c01b470523..4a3544dcbc 100644 --- a/packages/integration-platform/src/dsl/index.ts +++ b/packages/integration-platform/src/dsl/index.ts @@ -1,5 +1,5 @@ -// DSL Engine — Declarative check definitions -export { interpretDeclarativeCheck } from './interpreter'; +// DSL Engine — Declarative check and sync definitions +export { interpretDeclarativeCheck, interpretDeclarativeSync } from './interpreter'; export { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; export { interpolate, interpolateTemplate } from './template-engine'; export { validateIntegrationDefinition, type ValidationResult } from './validate'; @@ -13,7 +13,10 @@ export type { AggregateStep, BranchStep, EmitStep, + CodeStep, CheckDefinition, + SyncEmployee, + SyncDefinition, Condition, FieldCondition, LogicalCondition, @@ -27,8 +30,11 @@ export type { export { DSLStepSchema, CheckDefinitionSchema, + SyncEmployeeSchema, + SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, ResultTemplateSchema, PaginationConfigSchema, + CodeStepSchema, } from './types'; diff --git a/packages/integration-platform/src/dsl/interpreter.ts b/packages/integration-platform/src/dsl/interpreter.ts index 0be2210c46..0d65b0ad34 100644 --- a/packages/integration-platform/src/dsl/interpreter.ts +++ b/packages/integration-platform/src/dsl/interpreter.ts @@ -2,13 +2,17 @@ import type { CheckContext, IntegrationCheck, FindingSeverity } from '../types'; import type { DSLStep, CheckDefinition, + SyncDefinition, + SyncEmployee, FetchStep, FetchPagesStep, ForEachStep, AggregateStep, BranchStep, EmitStep, + CodeStep, } from './types'; +import { SyncEmployeeSchema } from './types'; import { evaluateCondition, evaluateOperator, resolvePath } from './expression-evaluator'; import { interpolate, interpolateTemplate } from './template-engine'; @@ -51,6 +55,64 @@ export function interpretDeclarativeCheck(opts: { }; } +/** + * Converts a declarative SyncDefinition (JSON DSL) into a function + * that produces a validated list of SyncEmployee objects. + * + * The sync definition runs the same DSL steps as checks, but instead of + * emitting pass/fail results, it produces a standardized employee list + * at `scope[employeesPath]` that the generic sync service can process. + */ +export function interpretDeclarativeSync(opts: { + definition: SyncDefinition; + defaultSeverity?: FindingSeverity; +}): { + run: (ctx: CheckContext) => Promise; +} { + return { + run: async (ctx: CheckContext) => { + const scope: Record = { + variables: ctx.variables, + credentials: ctx.credentials, + accessToken: ctx.accessToken, + connectionId: ctx.connectionId, + organizationId: ctx.organizationId, + metadata: ctx.metadata, + }; + + ctx.log('Running declarative sync'); + + for (const step of opts.definition.steps) { + await executeStep(step, scope, ctx, opts.defaultSeverity || 'medium'); + } + + const employeesPath = opts.definition.employeesPath || 'employees'; + const raw = resolvePath(scope, employeesPath); + + if (!Array.isArray(raw)) { + throw new Error( + `Sync definition did not produce an array at scope.${employeesPath}`, + ); + } + + const employees: SyncEmployee[] = []; + for (let i = 0; i < raw.length; i++) { + const parsed = SyncEmployeeSchema.safeParse(raw[i]); + if (!parsed.success) { + ctx.warn( + `Employee at index ${i} failed validation: ${parsed.error.issues.map((iss) => iss.message).join(', ')}`, + ); + continue; + } + employees.push(parsed.data); + } + + ctx.log(`Sync produced ${employees.length} validated employees`); + return employees; + }, + }; +} + /** * Execute a single DSL step. */ @@ -79,6 +141,9 @@ async function executeStep( case 'emit': executeEmit(step, scope, ctx, defaultSeverity); break; + case 'code': + await executeCode(step, scope, ctx, defaultSeverity); + break; } } @@ -243,6 +308,10 @@ async function executeForEach( ctx.log(`Iterating over ${collection.length} items in ${step.collection}`); + let passCount = 0; + let failCount = 0; + let filteredCount = 0; + for (const item of collection) { // Create child scope with current item const childScope: Record = { @@ -253,6 +322,7 @@ async function executeForEach( // Apply filter — skip items that don't match if (step.filter) { if (!evaluateCondition(step.filter, childScope)) { + filteredCount++; continue; } } @@ -272,6 +342,7 @@ async function executeForEach( const resourceId = String(resolvePath(childScope, step.resourceIdPath) ?? 'unknown'); if (allPass) { + passCount++; const result = interpolateTemplate(step.onPass, childScope); ctx.pass({ title: result.title, @@ -281,6 +352,7 @@ async function executeForEach( evidence: result.evidence || { item, checkedAt: new Date().toISOString() }, }); } else { + failCount++; const result = interpolateTemplate(step.onFail, childScope); ctx.fail({ title: result.title, @@ -293,6 +365,10 @@ async function executeForEach( }); } } + + ctx.log( + `forEach complete on ${step.collection}: ${passCount} passed, ${failCount} failed${filteredCount > 0 ? `, ${filteredCount} filtered out` : ''}`, + ); } /** @@ -435,6 +511,7 @@ function executeEmit( ctx: CheckContext, defaultSeverity: FindingSeverity, ): void { + ctx.log(`Emitting ${step.result} result: ${step.template.title}`); const template = interpolateTemplate(step.template, scope); if (step.result === 'pass') { @@ -457,3 +534,42 @@ function executeEmit( }); } } + +/** + * Execute a code step — run arbitrary JavaScript with access to ctx and scope. + */ +async function executeCode( + step: CodeStep, + scope: Record, + ctx: CheckContext, + _defaultSeverity: FindingSeverity, +): Promise { + const codePreview = step.code.length > 100 + ? step.code.slice(0, 100) + '...' + : step.code; + ctx.log(`Executing code step: ${codePreview.replace(/\n/g, ' ').trim()}`); + + const scopeKeysBefore = Object.keys(scope); + + try { + // eslint-disable-next-line @typescript-eslint/no-implied-eval + const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor; + const fn = new AsyncFunction('ctx', 'scope', step.code); + await fn(ctx, scope); + + // Log scope changes for debugging + const scopeKeysAfter = Object.keys(scope); + const newKeys = scopeKeysAfter.filter((k) => !scopeKeysBefore.includes(k)); + if (newKeys.length > 0) { + ctx.log(`Code step added scope keys: ${newKeys.join(', ')}`); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const stack = error instanceof Error ? error.stack : undefined; + ctx.error(`Code step failed: ${message}`, { + code: step.code, + ...(stack ? { stack } : {}), + }); + throw error; + } +} diff --git a/packages/integration-platform/src/dsl/types.ts b/packages/integration-platform/src/dsl/types.ts index 12b6c0c880..5c76cfb70f 100644 --- a/packages/integration-platform/src/dsl/types.ts +++ b/packages/integration-platform/src/dsl/types.ts @@ -208,6 +208,13 @@ export const EmitStepSchema = z.object({ export type EmitStep = z.infer; +export const CodeStepSchema = z.object({ + type: z.literal('code'), + code: z.string().min(1), +}); + +export type CodeStep = z.infer; + // ============================================================================ // Union of All Steps // ============================================================================ @@ -220,6 +227,7 @@ export const DSLStepSchema: z.ZodType = z.lazy(() => AggregateStepSchema, BranchStepSchema, EmitStepSchema, + CodeStepSchema, ]), ); @@ -229,7 +237,8 @@ export type DSLStep = | ForEachStep | AggregateStep | BranchStep - | EmitStep; + | EmitStep + | CodeStep; // ============================================================================ // Check Definition (the top-level DSL object) @@ -256,6 +265,44 @@ export const CheckDefinitionSchema = z.object({ export type CheckDefinition = z.infer; +// ============================================================================ +// Sync Definition (for dynamic employee sync) +// ============================================================================ + +export const VariableSchema = z.object({ + id: z.string(), + label: z.string(), + type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), + required: z.boolean().optional(), + default: z.unknown().optional(), + helpText: z.string().optional(), + options: z + .array(z.object({ value: z.string(), label: z.string() })) + .optional(), +}); + +export const SyncEmployeeSchema = z.object({ + email: z.string(), + name: z.string().optional(), + firstName: z.string().optional(), + lastName: z.string().optional(), + externalId: z.string().optional(), + status: z.enum(['active', 'inactive', 'suspended']), + role: z.string().optional(), + department: z.string().optional(), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +export type SyncEmployee = z.infer; + +export const SyncDefinitionSchema = z.object({ + steps: z.array(DSLStepSchema), + employeesPath: z.string().default('employees'), + variables: z.array(VariableSchema).optional(), +}); + +export type SyncDefinition = z.infer; + // ============================================================================ // Dynamic Integration Definition (full manifest + checks as JSON) // ============================================================================ @@ -285,6 +332,7 @@ export const DynamicIntegrationDefinitionSchema = z.object({ }), capabilities: z.array(z.enum(['checks', 'webhook', 'sync'])).default(['checks']), supportsMultipleConnections: z.boolean().optional(), + syncDefinition: SyncDefinitionSchema.optional(), checks: z.array( z.object({ checkSlug: z.string().regex(/^[a-z0-9_]+$/, 'Check slug must be lowercase alphanumeric with underscores'), diff --git a/packages/integration-platform/src/index.ts b/packages/integration-platform/src/index.ts index 57435a1924..e914650883 100644 --- a/packages/integration-platform/src/index.ts +++ b/packages/integration-platform/src/index.ts @@ -93,9 +93,10 @@ export { type TaskTemplateId, } from './task-mappings'; -// DSL Engine (declarative check definitions) +// DSL Engine (declarative check and sync definitions) export { interpretDeclarativeCheck, + interpretDeclarativeSync, evaluateCondition, evaluateOperator, resolvePath, @@ -103,14 +104,20 @@ export { interpolateTemplate, validateIntegrationDefinition, CheckDefinitionSchema, + SyncEmployeeSchema, + SyncDefinitionSchema, DynamicIntegrationDefinitionSchema, ConditionSchema, DSLStepSchema, + CodeStepSchema, } from './dsl'; export type { DSLStep, + CodeStep, CheckDefinition, + SyncEmployee, + SyncDefinition, Condition, DynamicIntegrationDefinition, ValidationResult, From 6f08b6b5242e0eadbd5e6422f2dc3cf48d2a6c3a Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 11:14:32 -0400 Subject: [PATCH 2/6] fix(integration-platform): persist syncDefinition to database syncDefinition was validated by Zod and read from the DB but never written. Both upsert and create in the controller and repository omitted the field, silently dropping it. This would cause syncDynamicProviderEmployees to always fail with "has no sync definition." Fixed in: repository create/upsertBySlug, controller upsert/create, and seed script. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../controllers/dynamic-integrations.controller.ts | 8 ++++++++ .../repositories/dynamic-integration.repository.ts | 5 +++++ apps/api/src/scripts/seed-dynamic-integration.ts | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index 2c7758e54f..f847b4eda3 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -55,6 +55,7 @@ export class DynamicIntegrationsController { const def = validation.data!; // Upsert the integration + const rawSyncDef = (body as Record).syncDefinition; const integration = await this.dynamicIntegrationRepo.upsertBySlug({ slug: def.slug, name: def.name, @@ -67,6 +68,9 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, + syncDefinition: rawSyncDef + ? (rawSyncDef as Prisma.InputJsonValue) + : undefined, }); // Delete checks not in the new definition, then upsert the rest @@ -138,6 +142,7 @@ export class DynamicIntegrationsController { ); } + const rawSyncDef = (body as Record).syncDefinition; const integration = await this.dynamicIntegrationRepo.create({ slug: def.slug, name: def.name, @@ -150,6 +155,9 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, + syncDefinition: rawSyncDef + ? (rawSyncDef as Prisma.InputJsonValue) + : undefined, }); for (const [index, check] of def.checks.entries()) { diff --git a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts index 1867afe10f..76329e58f9 100644 --- a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts +++ b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts @@ -54,6 +54,7 @@ export class DynamicIntegrationRepository { authConfig: Prisma.InputJsonValue; capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; + syncDefinition?: Prisma.InputJsonValue; }): Promise { return db.dynamicIntegration.create({ data: { @@ -68,6 +69,7 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition ?? undefined, }, }); } @@ -98,6 +100,7 @@ export class DynamicIntegrationRepository { authConfig: Prisma.InputJsonValue; capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; + syncDefinition?: Prisma.InputJsonValue; }): Promise { return db.dynamicIntegration.upsert({ where: { slug: data.slug }, @@ -113,6 +116,7 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition ?? undefined, }, update: { name: data.name, @@ -125,6 +129,7 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, + syncDefinition: data.syncDefinition ?? undefined, }, }); } diff --git a/apps/api/src/scripts/seed-dynamic-integration.ts b/apps/api/src/scripts/seed-dynamic-integration.ts index cdc8d8419b..ec14a6168e 100644 --- a/apps/api/src/scripts/seed-dynamic-integration.ts +++ b/apps/api/src/scripts/seed-dynamic-integration.ts @@ -69,6 +69,9 @@ async function main() { authConfig: toJson(def.authConfig), capabilities: toJson(def.capabilities), supportsMultipleConnections: def.supportsMultipleConnections ?? false, + syncDefinition: (rawJson as Record).syncDefinition + ? toJson((rawJson as Record).syncDefinition) + : undefined, isActive: true, }, update: { @@ -82,6 +85,9 @@ async function main() { authConfig: toJson(def.authConfig), capabilities: toJson(def.capabilities), supportsMultipleConnections: def.supportsMultipleConnections ?? false, + syncDefinition: (rawJson as Record).syncDefinition + ? toJson((rawJson as Record).syncDefinition) + : undefined, }, }); From 4b007ff4dfb5a578720daab1d6e492479b828646 Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 11:23:02 -0400 Subject: [PATCH 3/6] fix(integration-platform): validate syncDefinition before storing, fix success flag - Parse syncDefinition through SyncDefinitionSchema.parse() before persisting to DB, ensuring Zod defaults (employeesPath: 'employees') are applied. Previously stored raw unvalidated JSON. - Set results.success = false when errors > 0 in GenericEmployeeSyncService. Previously success was always true, causing the sync trigger to misreport partial failures as successes. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../dynamic-integrations.controller.ts | 21 +++++++++++++------ .../services/generic-employee-sync.service.ts | 2 ++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index f847b4eda3..11fb27f86b 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -23,6 +23,7 @@ import { DynamicManifestLoaderService } from '../services/dynamic-manifest-loade import { validateIntegrationDefinition, CheckDefinitionSchema, + SyncDefinitionSchema, } from '@trycompai/integration-platform'; @Controller({ path: 'internal/dynamic-integrations', version: '1' }) @@ -54,8 +55,13 @@ export class DynamicIntegrationsController { const def = validation.data!; - // Upsert the integration + // Validate and store syncDefinition through Zod to apply defaults (e.g., employeesPath) const rawSyncDef = (body as Record).syncDefinition; + const validatedSyncDef = rawSyncDef + ? SyncDefinitionSchema.parse(rawSyncDef) + : undefined; + + // Upsert the integration const integration = await this.dynamicIntegrationRepo.upsertBySlug({ slug: def.slug, name: def.name, @@ -68,8 +74,8 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, - syncDefinition: rawSyncDef - ? (rawSyncDef as Prisma.InputJsonValue) + syncDefinition: validatedSyncDef + ? (JSON.parse(JSON.stringify(validatedSyncDef)) as Prisma.InputJsonValue) : undefined, }); @@ -142,7 +148,10 @@ export class DynamicIntegrationsController { ); } - const rawSyncDef = (body as Record).syncDefinition; + const rawSyncDefCreate = (body as Record).syncDefinition; + const validatedSyncDefCreate = rawSyncDefCreate + ? SyncDefinitionSchema.parse(rawSyncDefCreate) + : undefined; const integration = await this.dynamicIntegrationRepo.create({ slug: def.slug, name: def.name, @@ -155,8 +164,8 @@ export class DynamicIntegrationsController { authConfig: def.authConfig as unknown as Prisma.InputJsonValue, capabilities: def.capabilities as unknown as Prisma.InputJsonValue, supportsMultipleConnections: def.supportsMultipleConnections, - syncDefinition: rawSyncDef - ? (rawSyncDef as Prisma.InputJsonValue) + syncDefinition: validatedSyncDefCreate + ? (JSON.parse(JSON.stringify(validatedSyncDefCreate)) as Prisma.InputJsonValue) : undefined, }); diff --git a/apps/api/src/integration-platform/services/generic-employee-sync.service.ts b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts index 97326a4202..b66cbd6e8b 100644 --- a/apps/api/src/integration-platform/services/generic-employee-sync.service.ts +++ b/apps/api/src/integration-platform/services/generic-employee-sync.service.ts @@ -265,6 +265,8 @@ export class GenericEmployeeSyncService { } } + results.success = results.errors === 0; + this.logger.log( `Sync complete for ${providerName}: ${results.imported} imported, ${results.reactivated} reactivated, ${results.deactivated} deactivated, ${results.skipped} skipped, ${results.errors} errors`, ); From b116616e8f8e72aefd6b7ddcd940e1dba7aca0b4 Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 11:29:08 -0400 Subject: [PATCH 4/6] fix(integration-platform): remove redundant per-check re-validation in validate endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-check CheckDefinitionSchema.safeParse loop was dead code — validateIntegrationDefinition already validates all check definitions via the nested Zod schema. Simplified to return valid: true with summary when top-level validation passes. Also added checkSlugs to the summary for agent convenience. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../dynamic-integrations.controller.ts | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index 11fb27f86b..08e90b72b6 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -22,7 +22,6 @@ import { CheckRunRepository } from '../repositories/check-run.repository'; import { DynamicManifestLoaderService } from '../services/dynamic-manifest-loader.service'; import { validateIntegrationDefinition, - CheckDefinitionSchema, SyncDefinitionSchema, } from '@trycompai/integration-platform'; @@ -406,36 +405,21 @@ export class DynamicIntegrationsController { }; } - // The top-level validateIntegrationDefinition already validates - // all checks and syncDefinition via Zod. If we got here, everything is valid. - // Do additional per-check validation for detailed error reporting. - const checkErrors: Array<{ checkSlug: string; errors: Array<{ path: string; message: string }> }> = []; + // validateIntegrationDefinition validates everything via Zod: + // the manifest fields, all check definitions, and syncDefinition. + // If we got here, the entire definition is valid. const definition = result.data!; - const rawBody = body as Record; - - for (const check of definition.checks) { - const checkResult = CheckDefinitionSchema.safeParse(check.definition); - if (!checkResult.success) { - checkErrors.push({ - checkSlug: check.checkSlug, - errors: checkResult.error.issues.map((issue) => ({ - path: issue.path.join('.'), - message: issue.message, - })), - }); - } - } return { - valid: checkErrors.length === 0, - ...(checkErrors.length > 0 ? { checkErrors } : {}), + valid: true, summary: { slug: definition.slug, name: definition.name, category: definition.category, capabilities: definition.capabilities, checksCount: definition.checks.length, - hasSyncDefinition: !!rawBody.syncDefinition, + checkSlugs: definition.checks.map((c) => c.checkSlug), + hasSyncDefinition: !!(body as Record).syncDefinition, }, }; } From ebfa25a08be22b5a71576b78ec4a787d62e583b6 Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 11:40:38 -0400 Subject: [PATCH 5/6] fix(integration-platform): clear stale syncDefinition on upsert, deduplicate VariableSchema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Upsert now sends Prisma.DbNull when syncDefinition is removed from the definition, instead of undefined (which Prisma skips). This prevents stale sync definitions from persisting after removal. - Deduplicated VariableSchema — was defined 3 times identically. Now defined once and reused in CheckDefinitionSchema, SyncDefinitionSchema, and DynamicIntegrationDefinitionSchema. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../dynamic-integrations.controller.ts | 2 +- .../dynamic-integration.repository.ts | 6 +- .../integration-platform/src/dsl/types.ts | 57 ++++++------------- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts index 08e90b72b6..6869bc6800 100644 --- a/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts +++ b/apps/api/src/integration-platform/controllers/dynamic-integrations.controller.ts @@ -75,7 +75,7 @@ export class DynamicIntegrationsController { supportsMultipleConnections: def.supportsMultipleConnections, syncDefinition: validatedSyncDef ? (JSON.parse(JSON.stringify(validatedSyncDef)) as Prisma.InputJsonValue) - : undefined, + : null, }); // Delete checks not in the new definition, then upsert the rest diff --git a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts index 76329e58f9..e06ec07231 100644 --- a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts +++ b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts @@ -100,7 +100,7 @@ export class DynamicIntegrationRepository { authConfig: Prisma.InputJsonValue; capabilities?: Prisma.InputJsonValue; supportsMultipleConnections?: boolean; - syncDefinition?: Prisma.InputJsonValue; + syncDefinition?: Prisma.InputJsonValue | null; }): Promise { return db.dynamicIntegration.upsert({ where: { slug: data.slug }, @@ -129,7 +129,9 @@ export class DynamicIntegrationRepository { authConfig: data.authConfig, capabilities: data.capabilities ?? ['checks'], supportsMultipleConnections: data.supportsMultipleConnections ?? false, - syncDefinition: data.syncDefinition ?? undefined, + syncDefinition: data.syncDefinition === null + ? Prisma.DbNull + : (data.syncDefinition ?? undefined), }, }); } diff --git a/packages/integration-platform/src/dsl/types.ts b/packages/integration-platform/src/dsl/types.ts index 5c76cfb70f..15c0dc0758 100644 --- a/packages/integration-platform/src/dsl/types.ts +++ b/packages/integration-platform/src/dsl/types.ts @@ -241,32 +241,7 @@ export type DSLStep = | CodeStep; // ============================================================================ -// Check Definition (the top-level DSL object) -// ============================================================================ - -export const CheckDefinitionSchema = z.object({ - steps: z.array(DSLStepSchema), - variables: z - .array( - z.object({ - id: z.string(), - label: z.string(), - type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), - required: z.boolean().optional(), - default: z.unknown().optional(), - helpText: z.string().optional(), - options: z - .array(z.object({ value: z.string(), label: z.string() })) - .optional(), - }), - ) - .optional(), -}); - -export type CheckDefinition = z.infer; - -// ============================================================================ -// Sync Definition (for dynamic employee sync) +// Shared Variable Schema (used by checks, sync, and integration definitions) // ============================================================================ export const VariableSchema = z.object({ @@ -281,6 +256,21 @@ export const VariableSchema = z.object({ .optional(), }); +// ============================================================================ +// Check Definition (the top-level DSL object) +// ============================================================================ + +export const CheckDefinitionSchema = z.object({ + steps: z.array(DSLStepSchema), + variables: z.array(VariableSchema).optional(), +}); + +export type CheckDefinition = z.infer; + +// ============================================================================ +// Sync Definition (for dynamic employee sync) +// ============================================================================ + export const SyncEmployeeSchema = z.object({ email: z.string(), name: z.string().optional(), @@ -341,20 +331,7 @@ export const DynamicIntegrationDefinitionSchema = z.object({ taskMapping: z.string().optional(), defaultSeverity: z.enum(['info', 'low', 'medium', 'high', 'critical']).optional(), definition: CheckDefinitionSchema, - variables: z - .array( - z.object({ - id: z.string(), - label: z.string(), - type: z.enum(['text', 'number', 'boolean', 'select', 'multi-select']), - required: z.boolean().optional(), - default: z.unknown().optional(), - helpText: z.string().optional(), - options: z - .array(z.object({ value: z.string(), label: z.string() })) - .optional(), - }), - ) + variables: z.array(VariableSchema) .optional(), isEnabled: z.boolean().optional(), sortOrder: z.number().optional(), From 73d0fa6fd72f0a486d48a6af7ca598a016f6ccff Mon Sep 17 00:00:00 2001 From: Tofik Hasanov Date: Mon, 30 Mar 2026 12:02:42 -0400 Subject: [PATCH 6/6] fix: use value import for Prisma (DbNull requires runtime access) Prisma.DbNull is a runtime value, not a type. Changed from `import type { Prisma }` to `import { Prisma }` to fix build error: TS1361: 'Prisma' cannot be used as a value because it was imported using 'import type'. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../repositories/dynamic-integration.repository.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts index e06ec07231..9e23e222c8 100644 --- a/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts +++ b/apps/api/src/integration-platform/repositories/dynamic-integration.repository.ts @@ -1,6 +1,7 @@ import { Injectable } from '@nestjs/common'; import { db } from '@db'; -import type { DynamicIntegration, DynamicCheck, Prisma } from '@prisma/client'; +import { Prisma } from '@prisma/client'; +import type { DynamicIntegration, DynamicCheck } from '@prisma/client'; export type DynamicIntegrationWithChecks = DynamicIntegration & { checks: DynamicCheck[];