diff --git a/backend/src/mcp/index.ts b/backend/src/mcp/index.ts index 39718a70..f4c784fc 100644 --- a/backend/src/mcp/index.ts +++ b/backend/src/mcp/index.ts @@ -5,4 +5,5 @@ export * from './mcp-gateway.controller'; export * from './dto/mcp-gateway.dto'; export * from './dto/mcp-discovery.dto'; export * from './mcp-discovery.controller'; -export * from './mcp-discovery.service'; +export * from './mcp-discovery-orchestrator.service'; +export * from './mcp.tokens'; diff --git a/backend/src/mcp/mcp-discovery-orchestrator.service.ts b/backend/src/mcp/mcp-discovery-orchestrator.service.ts new file mode 100644 index 00000000..21d640ef --- /dev/null +++ b/backend/src/mcp/mcp-discovery-orchestrator.service.ts @@ -0,0 +1,159 @@ +import { Injectable, Logger, BadRequestException, Inject, OnModuleDestroy } from '@nestjs/common'; +import { randomUUID } from 'node:crypto'; +import type Redis from 'ioredis'; + +import { TemporalService } from '../temporal/temporal.service'; +import type { + DiscoveryInputDto, + DiscoveryStatusDto, + DiscoveryStartResponseDto, + GroupDiscoveryInputDto, + GroupDiscoveryStartResponseDto, + GroupDiscoveryStatusDto, +} from './dto/mcp-discovery.dto'; +import { MCP_DISCOVERY_REDIS } from './mcp.tokens'; + +@Injectable() +export class McpDiscoveryOrchestratorService implements OnModuleDestroy { + private readonly logger = new Logger(McpDiscoveryOrchestratorService.name); + + constructor( + private readonly temporalService: TemporalService, + @Inject(MCP_DISCOVERY_REDIS) private readonly redis: Redis, + ) {} + + async onModuleDestroy(): Promise { + // Best-effort shutdown. + try { + await this.redis.quit(); + } catch { + // ignore + } + } + + async startDiscovery(input: DiscoveryInputDto): Promise { + const workflowId = randomUUID(); + const cacheToken = randomUUID(); + + this.logger.log( + `Starting MCP discovery workflow ${workflowId} for ${input.transport} server: ${input.name} (cache: ${cacheToken})`, + ); + + // Store cache token in Redis (worker populates final result); expire in 5 minutes. + await this.redis.setex( + `mcp-discovery:${cacheToken}`, + 300, + JSON.stringify({ status: 'pending', workflowId }), + ); + + await this.temporalService.startWorkflow({ + workflowType: 'mcpDiscoveryWorkflow', + workflowId, + taskQueue: this.temporalService.getDefaultTaskQueue(), + args: [{ ...input, cacheToken }], + }); + + return { workflowId, cacheToken, status: 'started' }; + } + + async getStatus(workflowId: string): Promise { + this.logger.debug(`Querying MCP discovery status for workflow ${workflowId}`); + + const result = await this.temporalService.queryWorkflow<{ + status: 'running' | 'completed' | 'failed'; + tools?: { name: string; description?: string; inputSchema?: Record }[]; + toolCount?: number; + error?: string; + errorCode?: string; + }>({ + workflowId, + queryType: 'getDiscoveryResult', + }); + + if (!result) { + return { workflowId, status: 'running' }; + } + + return { + workflowId, + status: result.status, + tools: result.tools, + toolCount: result.toolCount, + error: result.error, + errorCode: result.errorCode, + }; + } + + async startGroupDiscovery( + input: GroupDiscoveryInputDto, + ): Promise { + const workflowId = randomUUID(); + const cacheTokens: Record = {}; + + const serverNames = input.servers.map((server) => server.name); + const uniqueNames = new Set(serverNames); + if (uniqueNames.size !== serverNames.length) { + throw new BadRequestException('Server names must be unique for group discovery'); + } + + for (const server of input.servers) { + cacheTokens[server.name] = randomUUID(); + } + + this.logger.log( + `Starting MCP group discovery workflow ${workflowId} for ${input.servers.length} server(s)`, + ); + + await Promise.all( + Object.values(cacheTokens).map((cacheToken) => + this.redis.setex( + `mcp-discovery:${cacheToken}`, + 300, + JSON.stringify({ status: 'pending', workflowId }), + ), + ), + ); + + await this.temporalService.startWorkflow({ + workflowType: 'mcpGroupDiscoveryWorkflow', + workflowId, + taskQueue: this.temporalService.getDefaultTaskQueue(), + args: [{ ...input, cacheTokens }], + }); + + return { workflowId, cacheTokens, status: 'started' }; + } + + async getGroupStatus(workflowId: string): Promise { + this.logger.debug(`Querying MCP group discovery status for workflow ${workflowId}`); + + const result = await this.temporalService.queryWorkflow<{ + status: 'running' | 'completed' | 'failed'; + results?: { + name: string; + status: 'running' | 'completed' | 'failed'; + tools?: { name: string; description?: string; inputSchema?: Record }[]; + toolCount?: number; + error?: string; + cacheToken?: string; + }[]; + error?: string; + errorCode?: string; + }>({ + workflowId, + queryType: 'getGroupDiscoveryResult', + }); + + if (!result) { + return { workflowId, status: 'running' }; + } + + return { + workflowId, + status: result.status, + results: result.results, + error: result.error, + errorCode: result.errorCode, + }; + } +} diff --git a/backend/src/mcp/mcp-discovery.controller.ts b/backend/src/mcp/mcp-discovery.controller.ts index de2a93ea..fa74176e 100644 --- a/backend/src/mcp/mcp-discovery.controller.ts +++ b/backend/src/mcp/mcp-discovery.controller.ts @@ -1,19 +1,7 @@ -import { - BadRequestException, - Body, - Controller, - Get, - HttpCode, - HttpStatus, - Logger, - Param, - Post, -} from '@nestjs/common'; +import { Body, Controller, Get, HttpCode, HttpStatus, Param, Post } from '@nestjs/common'; import { ApiTags, ApiOperation, ApiResponse as SwaggerApiResponse } from '@nestjs/swagger'; -import { randomUUID } from 'node:crypto'; -import Redis from 'ioredis'; -import { TemporalService } from '../temporal/temporal.service'; +import { McpDiscoveryOrchestratorService } from './mcp-discovery-orchestrator.service'; import { DiscoveryInputDto, DiscoveryStatusDto, @@ -26,19 +14,7 @@ import { @ApiTags('mcp') @Controller('mcp') export class McpDiscoveryController { - private readonly logger = new Logger(McpDiscoveryController.name); - private readonly redis: Redis; - - constructor(private readonly temporalService: TemporalService) { - // Initialize Redis for caching discovery results - const redisUrl = process.env.REDIS_URL || process.env.TERMINAL_REDIS_URL; - if (redisUrl) { - this.redis = new Redis(redisUrl); - } else { - // Fallback to localhost - this.redis = new Redis('redis://localhost:6379'); - } - } + constructor(private readonly orchestrator: McpDiscoveryOrchestratorService) {} @Post('discover') @HttpCode(HttpStatus.ACCEPTED) @@ -57,43 +33,7 @@ export class McpDiscoveryController { description: 'Invalid input parameters', }) async discover(@Body() input: DiscoveryInputDto): Promise { - const workflowId = randomUUID(); - const cacheToken = randomUUID(); - - this.logger.log( - `Starting MCP discovery workflow ${workflowId} for ${input.transport} server: ${input.name} (cache: ${cacheToken})`, - ); - - try { - // Store cache token in Redis (will be populated when discovery completes) - // Expires in 5 minutes - await this.redis.setex( - `mcp-discovery:${cacheToken}`, - 300, - JSON.stringify({ status: 'pending', workflowId }), - ); - - // Start Temporal workflow for MCP discovery with cache token - await this.temporalService.startWorkflow({ - workflowType: 'mcpDiscoveryWorkflow', - workflowId, - taskQueue: process.env.TEMPORAL_TASK_QUEUE ?? 'shipsec-dev', - args: [{ ...input, cacheToken }], - }); - - this.logger.log(`MCP discovery workflow ${workflowId} started successfully`); - - return { - workflowId, - cacheToken, - status: 'started', - }; - } catch (error) { - this.logger.error( - `Failed to start MCP discovery workflow ${workflowId}: ${error instanceof Error ? error.message : String(error)}`, - ); - throw error; - } + return this.orchestrator.startDiscovery(input); } @Get('discover/:workflowId') @@ -112,49 +52,7 @@ export class McpDiscoveryController { description: 'Workflow not found', }) async getStatus(@Param('workflowId') workflowId: string): Promise { - this.logger.debug(`Querying MCP discovery status for workflow ${workflowId}`); - - try { - // Query workflow for current result - const result = await this.temporalService.queryWorkflow<{ - status: 'running' | 'completed' | 'failed'; - tools?: { name: string; description?: string; inputSchema?: Record }[]; - toolCount?: number; - error?: string; - errorCode?: string; - }>({ - workflowId, - queryType: 'getDiscoveryResult', - }); - - if (result) { - return { - workflowId, - status: result.status, - tools: result.tools, - toolCount: result.toolCount, - error: result.error, - errorCode: result.errorCode, - }; - } - - // Workflow is still running, no result yet - return { - workflowId, - status: 'running', - }; - } catch (error) { - // Workflow not found or query failed - if (error instanceof Error && error.message.includes('workflow not found')) { - this.logger.warn(`Discovery workflow ${workflowId} not found`); - throw error; - } - - this.logger.error( - `Failed to query discovery workflow ${workflowId}: ${error instanceof Error ? error.message : String(error)}`, - ); - throw error; - } + return this.orchestrator.getStatus(workflowId); } @Post('discover-group') @@ -172,54 +70,7 @@ export class McpDiscoveryController { async discoverGroup( @Body() input: GroupDiscoveryInputDto, ): Promise { - const workflowId = randomUUID(); - const cacheTokens: Record = {}; - - const serverNames = input.servers.map((server) => server.name); - const uniqueNames = new Set(serverNames); - if (uniqueNames.size !== serverNames.length) { - throw new BadRequestException('Server names must be unique for group discovery'); - } - - for (const server of input.servers) { - cacheTokens[server.name] = randomUUID(); - } - - this.logger.log( - `Starting MCP group discovery workflow ${workflowId} for ${input.servers.length} server(s)`, - ); - - try { - await Promise.all( - Object.values(cacheTokens).map((cacheToken) => - this.redis.setex( - `mcp-discovery:${cacheToken}`, - 300, - JSON.stringify({ status: 'pending', workflowId }), - ), - ), - ); - - await this.temporalService.startWorkflow({ - workflowType: 'mcpGroupDiscoveryWorkflow', - workflowId, - taskQueue: process.env.TEMPORAL_TASK_QUEUE ?? 'shipsec-dev', - args: [{ ...input, cacheTokens }], - }); - - this.logger.log(`MCP group discovery workflow ${workflowId} started successfully`); - - return { - workflowId, - cacheTokens, - status: 'started', - }; - } catch (error) { - this.logger.error( - `Failed to start MCP group discovery workflow ${workflowId}: ${error instanceof Error ? error.message : String(error)}`, - ); - throw error; - } + return this.orchestrator.startGroupDiscovery(input); } @Get('discover-group/:workflowId') @@ -234,50 +85,6 @@ export class McpDiscoveryController { type: GroupDiscoveryStatusDto, }) async getGroupStatus(@Param('workflowId') workflowId: string): Promise { - this.logger.debug(`Querying MCP group discovery status for workflow ${workflowId}`); - - try { - const result = await this.temporalService.queryWorkflow<{ - status: 'running' | 'completed' | 'failed'; - results?: { - name: string; - status: 'running' | 'completed' | 'failed'; - tools?: { name: string; description?: string; inputSchema?: Record }[]; - toolCount?: number; - error?: string; - cacheToken?: string; - }[]; - error?: string; - errorCode?: string; - }>({ - workflowId, - queryType: 'getGroupDiscoveryResult', - }); - - if (result) { - return { - workflowId, - status: result.status, - results: result.results, - error: result.error, - errorCode: result.errorCode, - }; - } - - return { - workflowId, - status: 'running', - }; - } catch (error) { - if (error instanceof Error && error.message.includes('workflow not found')) { - this.logger.warn(`Group discovery workflow ${workflowId} not found`); - throw error; - } - - this.logger.error( - `Failed to query group discovery workflow ${workflowId}: ${error instanceof Error ? error.message : String(error)}`, - ); - throw error; - } + return this.orchestrator.getGroupStatus(workflowId); } } diff --git a/backend/src/mcp/mcp-discovery.service.ts b/backend/src/mcp/mcp-discovery.service.ts deleted file mode 100644 index 150c494e..00000000 --- a/backend/src/mcp/mcp-discovery.service.ts +++ /dev/null @@ -1,461 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { spawn } from 'child_process'; -import { randomUUID } from 'crypto'; - -import { McpServersRepository } from '../mcp-servers/mcp-servers.repository'; -import { SecretsEncryptionService } from '../secrets/secrets.encryption'; -import { McpGroupsRepository } from '../mcp-groups/mcp-groups.repository'; -import type { McpServerRecord } from '../database/schema'; -import type { McpToolResponse } from '../mcp-servers/dto/mcp-servers.dto'; - -/** - * Result of MCP discovery on a single server - */ -export interface McpToolDiscoveryResult { - serverId: string; - serverName: string; - toolCount: number; - success: boolean; - error?: string; -} - -/** - * Result of MCP discovery on a group - */ -export interface GroupDiscoveryResult { - groupId: string; - totalServers: number; - successCount: number; - failureCount: number; - results: McpToolDiscoveryResult[]; -} - -/** - * Raw MCP tool from protocol - */ -export interface McpTool { - name: string; - description?: string; - inputSchema?: Record; -} - -/** - * Service for discovering MCP tools from servers. - * Handles both HTTP and stdio transport types. - */ -@Injectable() -export class McpDiscoveryService { - private readonly logger = new Logger(McpDiscoveryService.name); - private readonly DISCOVERY_TIMEOUT_MS = 30_000; - private readonly DOCKER_IMAGE = 'shipsec/mcp-stdio-proxy:latest'; - - constructor( - private readonly mcpServersRepository: McpServersRepository, - private readonly encryption: SecretsEncryptionService, - private readonly mcpGroupsRepository: McpGroupsRepository, - ) {} - - /** - * Discover tools for a single MCP server - */ - async discoverServer(serverId: string, userId: string): Promise { - this.logger.log(`Starting discovery for server ${serverId}`); - - // Fetch server configuration - const server = await this.mcpServersRepository.findById(serverId); - - if (server.transportType === 'stdio') { - return this.discoverStdioServer(server, userId); - } - - // For HTTP, use existing discovery methods - return this.discoverHttpServer(server); - } - - /** - * Discover tools for all servers in a group - */ - async discoverGroup(groupId: string, userId: string): Promise { - this.logger.log(`Starting group discovery for group ${groupId}`); - - // Fetch all servers in the group - const servers = await this.mcpGroupsRepository.findServersByGroup(groupId); - - if (servers.length === 0) { - return { - groupId, - totalServers: 0, - successCount: 0, - failureCount: 0, - results: [], - }; - } - - const results: McpToolDiscoveryResult[] = []; - - // Discover tools for each server in parallel - const discoveryPromises = servers.map(async (server) => { - try { - const tools = await this.discoverServer(server.id, userId); - return { - serverId: server.id, - serverName: server.name, - toolCount: tools.length, - success: true, - }; - } catch (error) { - this.logger.error(`Discovery failed for server ${server.id}:`, error); - return { - serverId: server.id, - serverName: server.name, - toolCount: 0, - success: false, - error: error instanceof Error ? error.message : 'Unknown error', - }; - } - }); - - const discoveryResults = await Promise.all(discoveryPromises); - results.push(...discoveryResults); - - const successCount = results.filter((r) => r.success).length; - const failureCount = results.filter((r) => !r.success).length; - - this.logger.log( - `Group discovery complete: ${successCount}/${results.length} servers succeeded`, - ); - - return { - groupId, - totalServers: results.length, - successCount, - failureCount, - results, - }; - } - - /** - * Discover tools from an HTTP MCP server - */ - private async discoverHttpServer(server: McpServerRecord): Promise { - // Decrypt headers - let headers: Record | null = null; - if (server.headers) { - const decryptedJson = await this.encryption.decrypt(server.headers); - headers = JSON.parse(decryptedJson) as Record; - } - - if (!server.endpoint) { - throw new Error(`Server ${server.id} has no endpoint configured`); - } - - // Perform health check and tool discovery - const tools = await this.performMcpDiscovery(server.endpoint, headers); - - // Upsert tools to database - const toolRecords = tools.map((tool) => ({ - toolName: tool.name, - description: tool.description ?? null, - inputSchema: tool.inputSchema ?? null, - })); - - await this.mcpServersRepository.upsertTools(server.id, toolRecords); - - // Return as response DTOs - return toolRecords.map((tool) => ({ - id: `${server.id}-${tool.toolName}`, - toolName: tool.toolName, - description: tool.description, - inputSchema: tool.inputSchema, - serverId: server.id, - serverName: server.name, - enabled: true, - discoveredAt: new Date().toISOString(), - })); - } - - /** - * Discover tools from a stdio MCP server by spawning a temporary Docker container - */ - private async discoverStdioServer( - server: McpServerRecord, - _userId: string, - ): Promise { - let containerId: string | null = null; - - try { - // Spawn container for stdio server - const { endpoint, containerId: spawnedContainerId } = - await this.spawnDiscoveryContainer(server); - containerId = spawnedContainerId; - - // Perform MCP discovery via the proxy endpoint - const tools = await this.performMcpDiscovery(endpoint, null); - - // Upsert tools to database - const toolRecords = tools.map((tool) => ({ - toolName: tool.name, - description: tool.description ?? null, - inputSchema: tool.inputSchema ?? null, - })); - - await this.mcpServersRepository.upsertTools(server.id, toolRecords); - - // Return as response DTOs - return toolRecords.map((tool) => ({ - id: `${server.id}-${tool.toolName}`, - toolName: tool.toolName, - description: tool.description, - inputSchema: tool.inputSchema, - serverId: server.id, - serverName: server.name, - enabled: true, - discoveredAt: new Date().toISOString(), - })); - } finally { - // Always clean up the container - if (containerId) { - try { - await this.cleanupContainer(containerId); - } catch (error) { - this.logger.warn(`Failed to cleanup container ${containerId}:`, error); - } - } - } - } - - /** - * Spawn a temporary Docker container for stdio MCP server discovery - * Uses the shipsec/mcp-stdio-proxy:latest image to proxy stdio to HTTP - */ - async spawnDiscoveryContainer(server: McpServerRecord): Promise<{ - endpoint: string; - containerId: string; - }> { - const containerName = `mcp-discovery-${server.id}-${Date.now()}`; - const port = 3000 + Math.floor(Math.random() * 1000); - - if (!server.command) { - throw new Error(`Server ${server.id} has no command configured`); - } - - // Build Docker command - const dockerArgs = [ - 'run', - '--rm', - '--name', - containerName, - '-p', - `${port}:8080`, - '-e', - `MCP_COMMAND=${server.command}`, - '-e', - 'PORT=8080', - '-e', - 'MCP_NAMED_SERVERS={}', - ]; - - // Add args as environment variable - if (server.args && server.args.length > 0) { - dockerArgs.push('-e', `MCP_ARGS=${JSON.stringify(server.args)}`); - } - - dockerArgs.push(this.DOCKER_IMAGE); - - this.logger.debug(`Spawning discovery container: docker ${dockerArgs.join(' ')}`); - - return new Promise((resolve, reject) => { - const dockerProcess = spawn('docker', dockerArgs, { - stdio: ['ignore', 'pipe', 'pipe'], - }); - - let stdout = ''; - let stderr = ''; - - dockerProcess.stdout?.on('data', (data) => { - stdout += data.toString(); - }); - - dockerProcess.stderr?.on('data', (data) => { - stderr += data.toString(); - }); - - // Wait for container to be fully ready (HTTP server + STDIO client) - const waitForReady = async () => { - const healthUrl = `http://localhost:${port}/health`; - const maxAttempts = 60; - const pollInterval = 1000; - - for (let attempt = 0; attempt < maxAttempts; attempt++) { - try { - const response = await fetch(healthUrl, { method: 'GET' }); - if (response.ok) { - const data = (await response.json()) as { - status?: string; - servers?: { ready: boolean }[]; - }; - if (data.status === 'ok') { - const servers = data.servers ?? []; - const allReady = servers.every((s) => s.ready); - if (servers.length > 0 && allReady) { - this.logger.log( - `Discovery container ${containerName} ready after ${attempt + 1}s`, - ); - return; - } - if (attempt % 10 === 0) { - this.logger.debug( - `Container HTTP ready, waiting for MCP server... (${servers.filter((s) => s.ready).length}/${servers.length} ready)`, - ); - } - } - } - } catch { - // Not ready yet, continue polling - } - await new Promise((res) => setTimeout(res, pollInterval)); - } - throw new Error('Container failed to become ready after 60 seconds'); - }; - - // Start waiting for readiness - waitForReady() - .then(() => { - const endpoint = `http://localhost:${port}/mcp`; - this.logger.log(`Discovery container ${containerName} ready at ${endpoint}`); - resolve({ endpoint, containerId: containerName }); - }) - .catch((error) => { - reject(new Error(`Failed to start discovery container: ${error.message}`)); - }); - - dockerProcess.on('error', (error) => { - reject(new Error(`Failed to spawn Docker: ${error.message}`)); - }); - - dockerProcess.on('exit', (code, _signal) => { - if (code !== 0 && code !== null) { - this.logger.warn(`Docker process exited with code ${code}: ${stderr || stdout}`); - } - }); - }); - } - - /** - * Perform MCP protocol discovery by calling tools/list - */ - async performMcpDiscovery( - endpoint: string, - headers?: Record | null, - ): Promise { - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), this.DISCOVERY_TIMEOUT_MS); - - try { - // MCP tools/list request - const toolsListRequest = { - jsonrpc: '2.0', - id: randomUUID(), - method: 'tools/list', - params: {}, - }; - - const response = await fetch(endpoint, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Accept: 'application/json, text/event-stream', - ...(headers ?? {}), - }, - body: JSON.stringify(toolsListRequest), - signal: controller.signal, - }); - - clearTimeout(timeoutId); - - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } - - const contentType = response.headers.get('content-type') ?? ''; - if (!contentType.includes('application/json')) { - throw new Error(`Unexpected content type: ${contentType}`); - } - - const data = (await response.json()) as { - error?: { message?: string }; - result?: { - tools?: McpTool[]; - }; - }; - - if (data.error) { - throw new Error(data.error.message || 'tools/list failed'); - } - - return data.result?.tools ?? []; - } catch (error) { - clearTimeout(timeoutId); - - if (error instanceof Error) { - if (error.name === 'AbortError') { - throw new Error( - `Discovery timeout (${this.DISCOVERY_TIMEOUT_MS}ms) - server did not respond`, - ); - } - - if (error.message.includes('ECONNREFUSED')) { - throw new Error('Connection refused - server may be down or unreachable'); - } - - if (error.message.includes('ENOTFOUND')) { - throw new Error('DNS lookup failed - check the server URL'); - } - - throw error; - } - - throw new Error('Unknown error during MCP discovery'); - } - } - - /** - * Stop and remove a Docker container - */ - async cleanupContainer(containerId: string): Promise { - this.logger.debug(`Cleaning up container ${containerId}`); - - return new Promise((resolve) => { - const dockerProcess = spawn('docker', ['rm', '-f', containerId], { - stdio: ['ignore', 'pipe', 'pipe'], - }); - - let stderr = ''; - - dockerProcess.stderr?.on('data', (data) => { - stderr += data.toString(); - }); - - dockerProcess.on('close', (code) => { - if (code === 0) { - this.logger.debug(`Container ${containerId} cleaned up successfully`); - resolve(); - } else { - // Container might not exist, which is fine - if (stderr.includes('No such container')) { - this.logger.debug(`Container ${containerId} was already removed`); - resolve(); - } else { - this.logger.warn(`Failed to cleanup container ${containerId}: ${stderr}`); - resolve(); // Don't fail if cleanup fails - } - } - }); - - dockerProcess.on('error', (error) => { - this.logger.warn(`Error during container cleanup: ${error.message}`); - resolve(); // Don't fail if cleanup fails - }); - }); - } -} diff --git a/backend/src/mcp/mcp.module.ts b/backend/src/mcp/mcp.module.ts index 265de6f3..a0f9bcc0 100644 --- a/backend/src/mcp/mcp.module.ts +++ b/backend/src/mcp/mcp.module.ts @@ -8,12 +8,13 @@ import { SecretsModule } from '../secrets/secrets.module'; import { InternalMcpController } from './internal-mcp.controller'; import { WorkflowsModule } from '../workflows/workflows.module'; import { ApiKeysModule } from '../api-keys/api-keys.module'; -import { McpDiscoveryService } from './mcp-discovery.service'; import { McpDiscoveryController } from './mcp-discovery.controller'; import { TemporalModule } from '../temporal/temporal.module'; import { McpGroupsModule } from '../mcp-groups/mcp-groups.module'; import { McpServersRepository } from '../mcp-servers/mcp-servers.repository'; import { DatabaseModule } from '../database/database.module'; +import { McpDiscoveryOrchestratorService } from './mcp-discovery-orchestrator.service'; +import { MCP_DISCOVERY_REDIS } from './mcp.tokens'; @Global() @Module({ @@ -27,6 +28,15 @@ import { DatabaseModule } from '../database/database.module'; ], controllers: [McpGatewayController, InternalMcpController, McpDiscoveryController], providers: [ + { + provide: MCP_DISCOVERY_REDIS, + useFactory: () => { + // Keep consistent with the worker-side caching (worker uses REDIS_URL || TERMINAL_REDIS_URL || localhost). + const redisUrl = + process.env.REDIS_URL || process.env.TERMINAL_REDIS_URL || 'redis://localhost:6379'; + return new Redis(redisUrl); + }, + }, { provide: TOOL_REGISTRY_REDIS, useFactory: () => { @@ -46,9 +56,9 @@ import { DatabaseModule } from '../database/database.module'; ToolRegistryService, McpAuthService, McpGatewayService, - McpDiscoveryService, + McpDiscoveryOrchestratorService, McpServersRepository, ], - exports: [ToolRegistryService, McpGatewayService, McpAuthService, McpDiscoveryService], + exports: [ToolRegistryService, McpGatewayService, McpAuthService], }) export class McpModule {} diff --git a/backend/src/mcp/mcp.tokens.ts b/backend/src/mcp/mcp.tokens.ts new file mode 100644 index 00000000..2e967086 --- /dev/null +++ b/backend/src/mcp/mcp.tokens.ts @@ -0,0 +1 @@ +export const MCP_DISCOVERY_REDIS = Symbol('MCP_DISCOVERY_REDIS'); diff --git a/backend/src/temporal/temporal.service.ts b/backend/src/temporal/temporal.service.ts index ad54b52a..c77b783f 100644 --- a/backend/src/temporal/temporal.service.ts +++ b/backend/src/temporal/temporal.service.ts @@ -22,6 +22,7 @@ import { scheduleTriggerWorkflow, mcpDiscoveryWorkflow, mcpGroupDiscoveryWorkflow, + webhookParsingWorkflow, } from '@shipsec/studio-worker/workflows'; import type { ExecutionTriggerMetadata, ScheduleOverlapPolicy } from '@shipsec/shared'; @@ -155,6 +156,8 @@ export class TemporalService implements OnModuleDestroy { return mcpDiscoveryWorkflow; case 'mcpGroupDiscoveryWorkflow': return mcpGroupDiscoveryWorkflow; + case 'webhookParsingWorkflow': + return webhookParsingWorkflow; default: throw new Error(`Unknown workflow type: ${workflowType}`); } diff --git a/backend/src/webhooks/__tests__/webhooks.service.spec.ts b/backend/src/webhooks/__tests__/webhooks.service.spec.ts index 96db7c4c..1c56e36f 100644 --- a/backend/src/webhooks/__tests__/webhooks.service.spec.ts +++ b/backend/src/webhooks/__tests__/webhooks.service.spec.ts @@ -1,5 +1,5 @@ import { BadRequestException, NotFoundException } from '@nestjs/common'; -import { beforeEach, describe, expect, it, mock } from 'bun:test'; +import { beforeEach, describe, expect, it } from 'bun:test'; import type { WebhookConfigurationRecord, WebhookDeliveryRecord } from '../../database/schema'; import type { AuthContext } from '../../auth/types'; import type { WorkflowDefinition } from '../../dsl/types'; @@ -7,6 +7,7 @@ import type { WebhookRepository } from '../repository/webhook.repository'; import type { WebhookDeliveryRepository } from '../repository/webhook-delivery.repository'; import { WebhooksService } from '../webhooks.service'; import type { WorkflowsService } from '../../workflows/workflows.service'; +import type { TemporalService } from '../../temporal/temporal.service'; const authContext: AuthContext = { userId: 'admin-user', @@ -291,12 +292,23 @@ describe('WebhooksService', () => { startPreparedRun, } as unknown as WorkflowsService; - // Mock the Docker-based script execution - const mockScriptExec = mock(() => ({ - exec: async () => ({ - stdout: '---RESULT_START---\n{"prTitle":"Test PR","prNumber":42}\n---RESULT_END---', - }), - })); + const temporalStartCalls: unknown[][] = []; + const temporalStartWorkflow = async (...args: unknown[]) => { + temporalStartCalls.push(args); + return { workflowId: 'webhook-parse-1', runId: 'run-1', taskQueue: 'shipsec-default' }; + }; + + const temporalResultCalls: unknown[][] = []; + const temporalGetWorkflowResult = async (...args: unknown[]) => { + temporalResultCalls.push(args); + return { prTitle: 'Test PR', prNumber: 42 }; + }; + + const temporalService = { + startWorkflow: temporalStartWorkflow, + getWorkflowResult: temporalGetWorkflowResult, + getDefaultTaskQueue: () => 'shipsec-default', + } as unknown as TemporalService; beforeEach(() => { repository = new InMemoryWebhookRepository(); @@ -305,12 +317,14 @@ describe('WebhooksService', () => { repository as unknown as WebhookRepository, deliveryRepository as unknown as WebhookDeliveryRepository, workflowsService, + temporalService, ); ensureWorkflowAdminAccessCalls.length = 0; getCompiledWorkflowContextCalls.length = 0; prepareRunPayloadCalls.length = 0; startPreparedRunCalls.length = 0; - mockScriptExec.mockClear(); + temporalStartCalls.length = 0; + temporalResultCalls.length = 0; }); describe('list', () => { diff --git a/backend/src/webhooks/webhooks.module.ts b/backend/src/webhooks/webhooks.module.ts index 677c7641..9eea93ea 100644 --- a/backend/src/webhooks/webhooks.module.ts +++ b/backend/src/webhooks/webhooks.module.ts @@ -6,12 +6,13 @@ import { WebhooksService } from './webhooks.service'; import { WebhookRepository } from './repository/webhook.repository'; import { WebhookDeliveryRepository } from './repository/webhook-delivery.repository'; import { WorkflowsModule } from '../workflows/workflows.module'; +import { TemporalModule } from '../temporal/temporal.module'; import { ApiKeysModule } from '../api-keys/api-keys.module'; import { AuthModule } from '../auth/auth.module'; import { DatabaseModule } from '../database/database.module'; @Module({ - imports: [WorkflowsModule, ApiKeysModule, AuthModule, DatabaseModule], + imports: [WorkflowsModule, TemporalModule, ApiKeysModule, AuthModule, DatabaseModule], controllers: [WebhooksController, InboundWebhookController, WebhooksAdminController], providers: [WebhooksService, WebhookRepository, WebhookDeliveryRepository], exports: [WebhooksService], diff --git a/backend/src/webhooks/webhooks.service.ts b/backend/src/webhooks/webhooks.service.ts index f0f521c3..493c1df0 100644 --- a/backend/src/webhooks/webhooks.service.ts +++ b/backend/src/webhooks/webhooks.service.ts @@ -1,5 +1,4 @@ import { randomUUID } from 'node:crypto'; -import { spawn } from 'child_process'; import { Injectable, @@ -16,6 +15,7 @@ import { } from '@shipsec/shared'; import type { AuthContext } from '../auth/types'; import { WorkflowsService } from '../workflows/workflows.service'; +import { TemporalService } from '../temporal/temporal.service'; import { WebhookRepository } from './repository/webhook.repository'; import { WebhookDeliveryRepository } from './repository/webhook-delivery.repository'; import type { WebhookConfigurationRecord, WebhookDeliveryRecord } from '../database/schema'; @@ -31,6 +31,7 @@ export class WebhooksService { private readonly repository: WebhookRepository, private readonly deliveryRepository: WebhookDeliveryRepository, private readonly workflowsService: WorkflowsService, + private readonly temporalService: TemporalService, ) {} // Management methods (auth required) @@ -390,157 +391,31 @@ export class WebhooksService { payload: Record, headers: Record, ): Promise> { - // Execute the parsing script in a Docker container with Bun - const pluginCode = Buffer.from( - ` -import { plugin } from "bun"; -const rx_any = /./; -const rx_http = /^https?:\\/\\//; -const rx_path = /^\\.*\\//; - -async function load_http_module(href) { - console.log("[http-loader] Fetching:", href); - const response = await fetch(href); - const text = await response.text(); - if (response.ok) { - return { - contents: text, - loader: href.match(/\\.(ts|tsx)$/) ? "ts" : "js", - }; - } else { - throw new Error("Failed to load module '" + href + "': " + text); - } -} + // Backend must never execute Docker. Delegate parsing to Temporal worker. + const ref = await this.temporalService.startWorkflow({ + workflowType: 'webhookParsingWorkflow', + workflowId: `webhook-parse-${randomUUID()}`, + taskQueue: this.temporalService.getDefaultTaskQueue(), + args: [ + { + parsingScript: script, + payload, + headers, + timeoutSeconds: 30, + }, + ], + }); -plugin({ - name: "http_imports", - setup(build) { - build.onResolve({ filter: rx_http }, (args) => { - const url = new URL(args.path); - return { - path: url.href.replace(/^(https?):/, ''), - namespace: url.protocol.replace(':', ''), - }; - }); - build.onResolve({ filter: rx_path }, (args) => { - if (rx_http.test(args.importer)) { - const url = new URL(args.path, args.importer); - return { - path: url.href.replace(/^(https?):/, ''), - namespace: url.protocol.replace(':', ''), - }; - } - }); - build.onLoad({ filter: rx_any, namespace: "http" }, (args) => load_http_module("http:" + args.path)); - build.onLoad({ filter: rx_any, namespace: "https" }, (args) => load_http_module("https:" + args.path)); - } -}); -`, - ).toString('base64'); - - const harnessCode = Buffer.from( - ` -import { script } from "./user_script.ts"; -const INPUT = JSON.parse(process.env.WEBHOOK_INPUT || '{}'); - -async function run() { - try { - const result = await script(INPUT); - console.log('---RESULT_START---'); - console.log(JSON.stringify(result)); - console.log('---RESULT_END---'); - } catch (err) { - console.error('Runtime Error:', err.message); - process.exit(1); - } -} + const result = await this.temporalService.getWorkflowResult({ + workflowId: ref.workflowId, + runId: ref.runId, + }); -run(); -`, - ).toString('base64'); - - // Ensure script has export keyword - let processedScript = script; - const exportRegex = /^(?!\s*export\s+)(.*?\s*(?:async\s+)?function\s+script\b)/m; - if (exportRegex.test(processedScript)) { - processedScript = processedScript.replace( - exportRegex, - (match) => `export ${match.trimStart()}`, - ); + if (!result || typeof result !== 'object') { + throw new Error('Parsing script returned invalid result (expected object)'); } - const userScriptB64 = Buffer.from(processedScript).toString('base64'); - - const shellCommand = [ - `echo "${pluginCode}" | base64 -d > plugin.ts`, - `echo "${userScriptB64}" | base64 -d > user_script.ts`, - `echo "${harnessCode}" | base64 -d > harness.ts`, - `bun run --preload ./plugin.ts harness.ts`, - ].join(' && '); - - const dockerArgs = [ - 'run', - '--rm', - '-i', - '--network', - 'bridge', - '-e', - `WEBHOOK_INPUT=${JSON.stringify({ payload, headers })}`, - 'oven/bun:alpine', - 'sh', - '-c', - shellCommand, - ]; - - return new Promise((resolve, reject) => { - const timeoutSeconds = 30; - const proc = spawn('docker', dockerArgs, { - stdio: ['pipe', 'pipe', 'pipe'], - }); - - let stdout = ''; - let stderr = ''; - - const timeout = setTimeout(() => { - proc.kill(); - reject(new Error(`Script execution timed out after ${timeoutSeconds}s`)); - }, timeoutSeconds * 1000); - proc.stdout.on('data', (data) => { - stdout += data.toString(); - }); - - proc.stderr.on('data', (data) => { - stderr += data.toString(); - }); - - proc.on('error', (error) => { - clearTimeout(timeout); - reject(new Error(`Failed to start Docker container: ${error.message}`)); - }); - - proc.on('close', (code) => { - clearTimeout(timeout); - - if (code !== 0) { - reject(new Error(`Script execution failed with exit code ${code}: ${stderr}`)); - return; - } - - // Parse output between RESULT_START and RESULT_END markers - const resultMatch = stdout.match(/---RESULT_START---\n(.*?)\n---RESULT_END---/s); - if (!resultMatch) { - reject(new Error('Script did not produce valid output')); - return; - } - - try { - const result = JSON.parse(resultMatch[1]); - resolve(result); - } catch (e) { - reject(new Error(`Failed to parse script output: ${e}`)); - } - }); - }); + return result as Record; } private validateParsedData( diff --git a/bun.lock b/bun.lock index 58e715d7..9cb8c615 100644 --- a/bun.lock +++ b/bun.lock @@ -125,6 +125,7 @@ "autoprefixer": "^10.4.21", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", + "dompurify": "^3.2.4", "lucide-react": "^0.544.0", "markdown-it": "^14.1.0", "markdown-it-html5-embed": "^1.0.0", @@ -150,6 +151,7 @@ "@tailwindcss/typography": "^0.5.19", "@testing-library/jest-dom": "^6.9.1", "@testing-library/react": "^16.3.0", + "@types/dompurify": "^3.0.5", "@types/markdown-it": "^14.1.2", "@types/markdown-it-link-attributes": "^3.0.5", "@types/node": "^24.6.2", @@ -1148,6 +1150,8 @@ "@types/debug": ["@types/debug@4.1.12", "", { "dependencies": { "@types/ms": "*" } }, "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ=="], + "@types/dompurify": ["@types/dompurify@3.2.0", "", { "dependencies": { "dompurify": "*" } }, "sha512-Fgg31wv9QbLDA0SpTOXO3MaxySc4DKGLi8sna4/Utjo4r3ZRPdCt4UQee8BWr+Q5z21yifghREPJGYaEOEIACg=="], + "@types/eslint": ["@types/eslint@9.6.1", "", { "dependencies": { "@types/estree": "*", "@types/json-schema": "*" } }, "sha512-FXx2pKgId/WyYo2jXw63kk7/+TY7u7AziEJxJAnSFzHlqTAS3Ync6SvgYAN/k4/PQpnnVuzoMuVnByKK2qp0ag=="], "@types/eslint-scope": ["@types/eslint-scope@3.7.7", "", { "dependencies": { "@types/eslint": "*", "@types/estree": "*" } }, "sha512-MzMFlSLBqNF2gcHWO0G1vP/YQyfvrxZ0bF+u7mzUdZ1/xK4A4sru+nraZz5i3iEIk1l1uyicaDVTB4QbbEkAYg=="], @@ -1640,7 +1644,7 @@ "dom-accessibility-api": ["dom-accessibility-api@0.6.3", "", {}, "sha512-7ZgogeTnjuHbo+ct10G9Ffp0mif17idi0IyWNVA/wcwcm7NPOD/WEHVP3n7n3MhXqxoIYm8d6MuZohYWIZ4T3w=="], - "dompurify": ["dompurify@3.2.7", "", { "optionalDependencies": { "@types/trusted-types": "^2.0.7" } }, "sha512-WhL/YuveyGXJaerVlMYGWhvQswa7myDG17P7Vu65EWC05o8vfeNbvNf4d/BOvH99+ZW+LlQsc1GDKMa1vNK6dw=="], + "dompurify": ["dompurify@3.3.1", "", { "optionalDependencies": { "@types/trusted-types": "^2.0.7" } }, "sha512-qkdCKzLNtrgPFP1Vo+98FRzJnBRGe4ffyCea9IwHB1fyxPOeNTHpLKYGd4Uk9xvNoH0ZoOjwZxNptyMwqrId1Q=="], "dotenv": ["dotenv@17.2.3", "", {}, "sha512-JVUnt+DUIzu87TABbhPmNfVdBDt18BLOWjMUFJMSi/Qqg7NTYtabbvSNJGOJ7afbRuv9D/lngizHtP7QyLQ+9w=="], @@ -3362,6 +3366,8 @@ "minio/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], + "monaco-editor/dompurify": ["dompurify@3.2.7", "", { "optionalDependencies": { "@types/trusted-types": "^2.0.7" } }, "sha512-WhL/YuveyGXJaerVlMYGWhvQswa7myDG17P7Vu65EWC05o8vfeNbvNf4d/BOvH99+ZW+LlQsc1GDKMa1vNK6dw=="], + "multer/mkdirp": ["mkdirp@0.5.6", "", { "dependencies": { "minimist": "^1.2.6" }, "bin": { "mkdirp": "bin/cmd.js" } }, "sha512-FP+p8RB8OWpF3YZBCrP5gtADmtXApB5AMLn+vdyA+PyxCjrCs00mjyUozssO33cwDeT3wNGdLxJ5M//YqtHAJw=="], "multer/type-is": ["type-is@1.6.18", "", { "dependencies": { "media-typer": "0.3.0", "mime-types": "~2.1.24" } }, "sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g=="], @@ -3394,8 +3400,6 @@ "postcss-nested/postcss-selector-parser": ["postcss-selector-parser@6.1.2", "", { "dependencies": { "cssesc": "^3.0.0", "util-deprecate": "^1.0.2" } }, "sha512-Q8qQfPiZ+THO/3ZrOrO0cJJKfpYCagtMUkXbnEfmgUjwXg6z/WBeOyS9APBBPCTSiDV+s4SwQGu8yFsiMRIudg=="], - "posthog-js/dompurify": ["dompurify@3.3.1", "", { "optionalDependencies": { "@types/trusted-types": "^2.0.7" } }, "sha512-qkdCKzLNtrgPFP1Vo+98FRzJnBRGe4ffyCea9IwHB1fyxPOeNTHpLKYGd4Uk9xvNoH0ZoOjwZxNptyMwqrId1Q=="], - "pretty-format/ansi-styles": ["ansi-styles@5.2.0", "", {}, "sha512-Cxwpt2SfTzTtXcfOlzGEee8O+c+MmUgGrNiBcXnuWxuFJHe6a5Hz7qwhwe5OgaSYI0IJvkLqWX1ASG+cJOkEiA=="], "pretty-format/react-is": ["react-is@17.0.2", "", {}, "sha512-w2GsyukL62IJnlaff/nRegPQR94C/XXamvMWmSHRJ4y7Ts/4ocGRmTHvOs8PSE6pB3dWOrD/nueuU5sduBsQ4w=="], diff --git a/frontend/src/pages/McpLibraryPage.tsx b/frontend/src/pages/McpLibraryPage.tsx index 89dc2c6d..685f60ce 100644 --- a/frontend/src/pages/McpLibraryPage.tsx +++ b/frontend/src/pages/McpLibraryPage.tsx @@ -48,12 +48,6 @@ import { AccordionItem, AccordionTrigger, } from '@/components/ui/accordion'; -import { - DropdownMenu, - DropdownMenuContent, - DropdownMenuItem, - DropdownMenuTrigger, -} from '@/components/ui/dropdown-menu'; import { Search, Plus, @@ -304,6 +298,8 @@ export function McpLibraryPage() { const [groupTemplates, setGroupTemplates] = useState([]); const [isLoadingTemplates, setIsLoadingTemplates] = useState(false); const [importingTemplates, setImportingTemplates] = useState>(new Set()); + const [templatesOpen, setTemplatesOpen] = useState(true); + const [templatesManuallyToggled, setTemplatesManuallyToggled] = useState(false); // Group template discovery preview state - keyed by template slug const [groupDiscoveryPreview, setGroupDiscoveryPreview] = useState< Record< @@ -394,6 +390,12 @@ export function McpLibraryPage() { } }, [groups, fetchGroupServers]); + // Default: show templates when no groups are installed; collapse when groups exist. + useEffect(() => { + if (templatesManuallyToggled) return; + setTemplatesOpen(groups.length === 0); + }, [groups.length, templatesManuallyToggled]); + // Sync JSON config to Manual form when valid single-server JSON is entered useEffect(() => { if (!jsonValue.trim() || editingServer) return; @@ -487,11 +489,6 @@ export function McpLibraryPage() { ); }, [groupTemplates, searchQuery]); - const importableTemplates = useMemo( - () => filteredTemplates.filter((template) => !importedGroupSlugs.has(template.slug)), - [filteredTemplates, importedGroupSlugs], - ); - const filteredGroups = useMemo(() => { const query = searchQuery.trim().toLowerCase(); if (!query) return groups; @@ -524,7 +521,9 @@ export function McpLibraryPage() { const getGroupServerToolCounts = (server: { serverId: string; toolCount: number }) => { const counts = toolCountsByServer[server.serverId]; - if (counts) { + // If we haven't loaded tools for this server (common for disabled servers), + // don't let a 0/0 cache override the server's known discovered toolCount. + if (counts && !(counts.total === 0 && server.toolCount > 0)) { return counts; } const fallbackTotal = server.toolCount; @@ -1687,186 +1686,202 @@ export function McpLibraryPage() { {/* Group Templates */} - {groups.length === 0 && ( -
-
- -

Available Groups

- - {filteredTemplates.length} {filteredTemplates.length === 1 ? 'group' : 'groups'} - +
+
+ +

Group templates

+ + {filteredTemplates.length} {filteredTemplates.length === 1 ? 'group' : 'groups'} + +
+
+
- {isLoadingTemplates ? ( -
- {Array.from({ length: 2 }).map((_, i) => ( - - - - - - - - - - ))} -
- ) : filteredTemplates.length === 0 ? ( - - - -

- {searchQuery ? 'No groups match your search.' : 'No group templates available.'} -

-
-
- ) : ( -
- {filteredTemplates.map((template) => { - const theme = getGroupTheme(template.slug); - const isImported = importedGroupSlugs.has(template.slug); - const isImporting = importingTemplates.has(template.slug); - - return ( - - + {isLoadingTemplates ? ( +
+ {Array.from({ length: 2 }).map((_, i) => ( + + + + + + + + + + ))} +
+ ) : filteredTemplates.length === 0 ? ( + + + +

+ {searchQuery ? 'No groups match your search.' : 'No group templates available.'} +

+
+
+ ) : ( +
+ {filteredTemplates.map((template) => { + const theme = getGroupTheme(template.slug); + const isImported = importedGroupSlugs.has(template.slug); + + return ( + -
- -
-
-
-

{template.name}

- - {template.servers.length} servers - -
- {template.description && ( -

- {template.description} -

+ - - - - {/* Discovery Preview */} - {groupDiscoveryPreview[template.slug] && ( - { - setGroupDiscoveryPreview((prev) => { - const { [template.slug]: _, ...rest } = prev; - return rest; - }); - }} - /> - )} - - {/* Discovery summary when available */} - {groupDiscoveryPreview[template.slug] && ( -
- - { - groupDiscoveryPreview[template.slug].filter( - (r) => r.status === 'completed', - ).length - }{' '} - of {template.servers.length} servers ready - +
+
- )} - - {/* Action buttons */} -
+
+
+

{template.name}

+ + {template.servers.length} servers + +
+ {template.description && ( +

+ {template.description} +

+ )} +
+ {isImported && ( + + Imported + + )} + + + {/* Discovery Preview */} {groupDiscoveryPreview[template.slug] && ( - + /> )} - - )} - -
- - - ); - })} -
- )} -
- )} + + +
+ +
+ ); + })} +
+ )} + + )} +
{/* Imported Groups Section */}
@@ -1889,55 +1904,6 @@ export function McpLibraryPage() { - {groups.length > 0 && ( - - - - - - {importableTemplates.length === 0 - ? groupTemplates.map((template) => ( - - - {template.name} - Imported - - )) - : importableTemplates.map((template) => { - const isImporting = importingTemplates.has(template.slug); - return ( - handleImportDiscoveredTemplate(template)} - className="flex items-center gap-2" - > - - {template.name} - - {template.servers.length} - - - ); - })} - - - )}
diff --git a/frontend/src/services/mcpGroupsApi.ts b/frontend/src/services/mcpGroupsApi.ts index 65348280..65e6c214 100644 --- a/frontend/src/services/mcpGroupsApi.ts +++ b/frontend/src/services/mcpGroupsApi.ts @@ -26,6 +26,8 @@ export interface McpGroupServerResponse { enabled: boolean; healthStatus: 'healthy' | 'unhealthy' | 'unknown'; toolCount: number; + recommended?: boolean; + defaultSelected?: boolean; } export interface McpGroupTemplateResponse { @@ -118,7 +120,7 @@ export const mcpGroupsApi = { */ async getGroupServers(groupId: string): Promise { const headers = await getApiAuthHeaders(); - const response = await fetch(`${API_BASE_URL}/api/v1/mcp-servers?groupId=${groupId}`, { + const response = await fetch(`${API_BASE_URL}/api/v1/mcp-groups/${groupId}/servers`, { headers, }); diff --git a/worker/src/temporal/activities/webhook-parsing.activity.ts b/worker/src/temporal/activities/webhook-parsing.activity.ts new file mode 100644 index 00000000..5f070d46 --- /dev/null +++ b/worker/src/temporal/activities/webhook-parsing.activity.ts @@ -0,0 +1,171 @@ +import { + createExecutionContext, + runComponentWithRunner, + type DockerRunnerConfig, +} from '@shipsec/component-sdk'; + +export interface ExecuteWebhookParsingScriptActivityInput { + parsingScript: string; + payload: Record; + headers: Record; + timeoutSeconds?: number; +} + +/** + * Executes a user-supplied webhook parsing script inside a Bun Docker container. + * + * Important: This MUST run in the worker (never in the backend API), since it requires Docker access. + */ +export async function executeWebhookParsingScriptActivity( + input: ExecuteWebhookParsingScriptActivityInput, +): Promise> { + const timeoutSeconds = input.timeoutSeconds ?? 30; + + // Ensure script has an `export` on the `script` function. + let processedScript = input.parsingScript; + const exportRegex = /^(?!\s*export\s+)(.*?\s*(?:async\s+)?function\s+script\b)/m; + if (exportRegex.test(processedScript)) { + processedScript = processedScript.replace( + exportRegex, + (match) => `export ${match.trimStart()}`, + ); + } + + // Bun plugin for HTTP imports (allows importing TS/JS modules from URLs). + const pluginCode = ` +import { plugin } from "bun"; +const rx_any = /./; +const rx_http = /^https?:\\/\\//; +const rx_path = /^\\.*\\//; + +async function load_http_module(href) { + console.log("[http-loader] Fetching:", href); + const response = await fetch(href); + const text = await response.text(); + if (response.ok) { + return { + contents: text, + loader: href.match(/\\.(ts|tsx)$/) ? "ts" : "js", + }; + } + throw new Error("Failed to load module '" + href + "': " + text); +} + +plugin({ + name: "http_imports", + setup(build) { + build.onResolve({ filter: rx_http }, (args) => { + const url = new URL(args.path); + return { + path: url.href.replace(/^(https?):/, ''), + namespace: url.protocol.replace(':', ''), + }; + }); + build.onResolve({ filter: rx_path }, (args) => { + if (rx_http.test(args.importer)) { + const url = new URL(args.path, args.importer); + return { + path: url.href.replace(/^(https?):/, ''), + namespace: url.protocol.replace(':', ''), + }; + } + }); + build.onLoad({ filter: rx_any, namespace: "http" }, (args) => load_http_module("http:" + args.path)); + build.onLoad({ filter: rx_any, namespace: "https" }, (args) => load_http_module("https:" + args.path)); + } +}); +`; + + // Harness reads params from SHIPSEC_INPUT_PATH (mounted file) to avoid env/arg size limits. + const harnessCode = ` +import { readFileSync, writeFileSync, existsSync, mkdirSync } from "node:fs"; + +async function run() { + try { + const inputPath = process.env.SHIPSEC_INPUT_PATH || "/shipsec-output/input.json"; + const payload = JSON.parse(readFileSync(inputPath, "utf8")); + + if (!payload.code) { + throw new Error("No parsing script provided in payload"); + } + + // Write user script so it can be imported by Bun. + writeFileSync("./user_script.ts", payload.code); + + // @ts-ignore + const { script } = await import("./user_script.ts"); + + const input = { + payload: payload.payload || {}, + headers: payload.headers || {}, + }; + + const result = await script(input); + + const OUTPUT_PATH = process.env.SHIPSEC_OUTPUT_PATH || "/shipsec-output/result.json"; + const OUTPUT_DIR = OUTPUT_PATH.substring(0, OUTPUT_PATH.lastIndexOf("/")); + if (!existsSync(OUTPUT_DIR)) { + mkdirSync(OUTPUT_DIR, { recursive: true }); + } + writeFileSync(OUTPUT_PATH, JSON.stringify(result || {})); + } catch (err) { + const message = err && typeof err.message === "string" ? err.message : String(err); + console.error("Runtime Error:", message); + process.exit(1); + } +} + +run(); +`; + + const pluginB64 = Buffer.from(pluginCode).toString('base64'); + const harnessB64 = Buffer.from(harnessCode).toString('base64'); + + const shellCommand = [ + `echo "${pluginB64}" | base64 -d > plugin.ts`, + `echo "${harnessB64}" | base64 -d > harness.ts`, + `bun run --preload ./plugin.ts harness.ts`, + ].join(' && '); + + const runnerConfig: DockerRunnerConfig = { + kind: 'docker', + image: 'oven/bun:alpine', + entrypoint: 'sh', + command: ['-c', shellCommand], + env: {}, + network: 'bridge', + timeoutSeconds, + stdinJson: false, + }; + + const context = createExecutionContext({ + runId: `webhook-parse-${Date.now()}`, + componentRef: 'webhook.parse', + logCollector: (entry) => { + const log = + entry.level === 'error' + ? console.error + : entry.level === 'warn' + ? console.warn + : entry.level === 'debug' + ? console.debug + : console.log; + log(`[Webhook Parse] ${entry.message}`); + }, + }); + + const params = { + code: processedScript, + payload: input.payload, + headers: input.headers, + }; + + return runComponentWithRunner>( + runnerConfig, + async () => { + throw new Error('Docker runner should handle webhook parsing execution'); + }, + params, + context, + ); +} diff --git a/worker/src/temporal/workers/dev.worker.ts b/worker/src/temporal/workers/dev.worker.ts index ce78c827..4bfd5571 100644 --- a/worker/src/temporal/workers/dev.worker.ts +++ b/worker/src/temporal/workers/dev.worker.ts @@ -38,6 +38,7 @@ import { discoverMcpGroupToolsActivity, cacheDiscoveryResultActivity, } from '../activities/mcp-discovery.activity'; +import { executeWebhookParsingScriptActivity } from '../activities/webhook-parsing.activity'; // ... (existing imports) @@ -232,6 +233,7 @@ async function main() { discoverMcpToolsActivity, discoverMcpGroupToolsActivity, cacheDiscoveryResultActivity, + executeWebhookParsingScriptActivity, }).join(', ')}`, ); @@ -273,6 +275,7 @@ async function main() { discoverMcpToolsActivity, discoverMcpGroupToolsActivity, cacheDiscoveryResultActivity, + executeWebhookParsingScriptActivity, }, bundlerOptions: { ignoreModules: ['child_process'], diff --git a/worker/src/temporal/workflows/index.ts b/worker/src/temporal/workflows/index.ts index 7b1b2023..ad3e97f6 100644 --- a/worker/src/temporal/workflows/index.ts +++ b/worker/src/temporal/workflows/index.ts @@ -1119,3 +1119,6 @@ export async function scheduleTriggerWorkflow( // Export MCP discovery workflow export { mcpDiscoveryWorkflow, mcpGroupDiscoveryWorkflow } from './mcp-discovery-workflow.js'; + +// Export webhook parsing workflow (Docker execution must run in worker). +export { webhookParsingWorkflow } from './webhook-parsing-workflow.js'; diff --git a/worker/src/temporal/workflows/webhook-parsing-workflow.ts b/worker/src/temporal/workflows/webhook-parsing-workflow.ts new file mode 100644 index 00000000..6fa07c96 --- /dev/null +++ b/worker/src/temporal/workflows/webhook-parsing-workflow.ts @@ -0,0 +1,32 @@ +import { proxyActivities } from '@temporalio/workflow'; +import type { ExecuteWebhookParsingScriptActivityInput } from '../activities/webhook-parsing.activity'; + +function getWebhookParsingActivities(timeoutSeconds?: number) { + // Keep this deterministic and bounded. A user parsing script that fails permanently (syntax/runtime) + // should fail fast rather than retrying indefinitely and blocking webhook delivery processing. + const seconds = Math.max(1, Math.min(timeoutSeconds ?? 120, 10 * 60)); + + return proxyActivities<{ + executeWebhookParsingScriptActivity: ( + input: ExecuteWebhookParsingScriptActivityInput, + ) => Promise>; + }>({ + startToCloseTimeout: `${seconds} seconds`, + scheduleToCloseTimeout: `${seconds} seconds`, + retry: { maximumAttempts: 1 }, + }); +} + +export interface WebhookParsingWorkflowInput { + parsingScript: string; + payload: Record; + headers: Record; + timeoutSeconds?: number; +} + +export async function webhookParsingWorkflow( + input: WebhookParsingWorkflowInput, +): Promise> { + const { executeWebhookParsingScriptActivity } = getWebhookParsingActivities(input.timeoutSeconds); + return executeWebhookParsingScriptActivity(input); +}