diff --git a/forge/comms/aclManager.js b/forge/comms/aclManager.js index d760fb7db9..49f105769e 100644 --- a/forge/comms/aclManager.js +++ b/forge/comms/aclManager.js @@ -140,6 +140,81 @@ module.exports = function (app) { return false } }, + checkExpertPlatformTopic: async function (topicParts, usernameParts, acl) { + // topicParts = [ fullTopic , , , ] + // usernameParts = [ 'forge_platform' | 'expert-agent', [, ] ] + + const ValidationError = function (message) { + const error = new Error(message) + error.name = 'ACLValidationError' + return error + } + + try { + const [, userId, sessionId, command] = topicParts + if (!userId || !sessionId || !command) { + throw ValidationError('invalid topic format') + } + + if (command === '+') { + if (!acl.allowWildcard?.command) { + throw ValidationError('invalid command wildcard') + } + } else { + // validate command format is [forge:]|[insights:] + const commandParts = command.split(':', 2) + if (commandParts.length !== 2) { + throw ValidationError('invalid command format') + } + const [commandAgent, commandName] = commandParts + switch (commandAgent) { + // case 'forge': + // if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) { + // throw ValidationError('invalid platform command for platform api') + // } + // break + case 'insights': + if (['mcp-call-tool', 'mcp-read-resource'].includes(commandName) === false) { + throw ValidationError('invalid platform command for insights') + } + break + default: + throw ValidationError('invalid platform command') + } + } + + // at minimum, ensure session is present and either a wildcard or 8 or more chars + if (sessionId === '+') { + if (!acl.allowWildcard?.session) { + throw ValidationError('invalid session wildcard') + } + } else if (sessionId.length < 8) { + throw ValidationError('invalid session id') + } + + if (userId === '+') { + if (!acl.allowWildcard?.user) { + throw ValidationError('invalid user wildcard') + } + } else { + const user = await app.db.models.User.byId(userId) + if (!user || user.suspended) { + throw ValidationError('invalid user') + } + // TODO: consider checking if the user has permission to operate on this channel here or in the command handler. + // For now, we just check the user exists and is not suspended. + } + + return true + } catch (err) { + if (err.name === 'ACLValidationError') { + app.log.warn(`ACL validation error for expert platform topic: ${err.message}`) + } else { + app.log.error(`Unexpected error during ACL validation for expert platform topic: ${err.message}`) + } + } + return false + }, checkExpertTopic: async function (topicParts, usernameParts, acl) { // topicParts = [ fullTopic , , , , [, ] ] // usernameParts = [ 'expert-client' | 'expert-agent', [, ] ] @@ -313,7 +388,9 @@ module.exports = function (app) { // ff/v1/platform/sync { topic: /^ff\/v1\/platform\/sync$/ }, // ff/v1/platform/leader - { topic: /^ff\/v1\/platform\/leader$/ } + { topic: /^ff\/v1\/platform\/leader$/ }, + // platform can listen for Expert Agent requests + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/request$/, verify: 'checkExpertPlatformTopic', allowWildcard: { user: true, session: true, command: true }, isPlatform: true, isSub: true, agent: 'platform' } ], pub: [ // Send commands to project launchers @@ -336,7 +413,9 @@ module.exports = function (app) { // ff/v1/platform/sync { topic: /^ff\/v1\/platform\/sync$/ }, // ff/v1/platform/leader - { topic: /^ff\/v1\/platform\/leader$/ } + { topic: /^ff\/v1\/platform\/leader$/ }, + // platform can respond to Expert Agent requests + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/response$/, verify: 'checkExpertPlatformTopic', isPlatform: true, isPub: true, agent: 'platform' } ] }, project: { @@ -420,12 +499,16 @@ module.exports = function (app) { // backend client (agent) expertAgent: { sub: [ - { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { user: true, session: true, entity: true }, isAgent: true, isSub: true }, - { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { user: true, session: true, entity: true, inflightType: true }, isAgent: true, isSub: true } + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { user: true, session: true, entity: true }, isAgent: true, isSub: true, agent: 'support' }, + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { user: true, session: true, entity: true, inflightType: true }, isAgent: true, isSub: true, agent: 'support' }, + // Expert agent can listen for platform responses + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/response$/, verify: 'checkExpertPlatformTopic', allowWildcard: { user: true, session: true, command: true }, isAgent: true, isSub: true, agent: 'platform' } ], pub: [ - { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', isAgent: true, isPub: true }, - { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', isAgent: true, isPub: true } + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', isAgent: true, isPub: true, agent: 'support' }, + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', isAgent: true, isPub: true, agent: 'support' }, + // Expert agent can respond to platform requests + { topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/request$/, verify: 'checkExpertPlatformTopic', isAgent: true, isPub: true, agent: 'platform' } ] } } diff --git a/forge/comms/commsClient.js b/forge/comms/commsClient.js index 6dc70082f4..45bbf55c51 100644 --- a/forge/comms/commsClient.js +++ b/forge/comms/commsClient.js @@ -19,7 +19,7 @@ class CommsClient extends EventEmitter { // To aid testing, we use a url of `:test:` to allow us to configure // the platform with comms enabled, but no active MQTT connection if (this.app.config.broker.url !== ':test:') { - /** @type {MQTT.IClientOptions} */ + /** @type {mqtt.IClientOptions} */ const brokerConfig = { clientId: 'forge_platform:' + randomBytes(8).toString('hex'), username: 'forge_platform', @@ -46,12 +46,103 @@ class CommsClient extends EventEmitter { this.client.on('error', (err) => { this.app.log.info(`Connection error to comms broker: ${err.toString()}`) }) - this.client.on('message', (topic, message) => { + this.client.on('message', (topic, message, packet) => { const topicParts = topic.split('/') const ownerType = topicParts[3] const ownerId = topicParts[4] const messageType = topicParts[5] - if (ownerType === 'p') { + + if (topicParts[2] === 'expert') { + const userId = topicParts[3] + const sessionId = topicParts[4] + const channel = topicParts[5] // platform + const channelCommand = topicParts[6] // dynamic value, e.g. mcp:call-tool or mcp:read-resource + const direction = topicParts[7] // request or response (only for inflight channels) + + const supportedInsightsCommands = { + 'insights:mcp-call-tool': 'mcp:call-tool', + 'insights:mcp-read-resource': 'mcp:read-resource' + } + if (supportedInsightsCommands[channelCommand] && direction === 'request') { + const isInsightsToolCall = channel === 'platform' && channelCommand === 'insights:mcp-call-tool' && direction === 'request' + const isInsightsResourceCall = channel === 'platform' && channelCommand === 'insights:mcp-read-resource' && direction === 'request' + // MCP ROUTE: step 1 (hosted/remote common) + // Called By: from the Expert Agent (via MQTT inflight request) + // Calls To : inflight request handler (./instances.js or ./devices.js) + + // When the topic is determined to be an expert insights inflight request, the acl manager has already verified the user + // has permission to access this topic. Now, we verify the payload contains the required fields to process the request. + // If OK, emit the message to the appropriate instance/device handler (handled in ./instances.js or ./devices.js) + + const payload = JSON.parse(message.toString()) + const command = supportedInsightsCommands[channelCommand] + const data = payload.data || {} + const { kind, mcpServer, toolDefinition, resourceDefinition, resourceTemplateDefinition } = payload.meta || {} + const correlationData = packet.properties?.correlationData + const userProperties = packet.properties?.userProperties + if (!correlationData || !userProperties) { + console.warn('Expert Insight tool call request missing correlationData or userProperties', payload) + return // do not respond, the agent will timeout and handle it + } + + const mqttOptions = { properties: { correlationData, userProperties } } + const responseTopic = `ff/v1/expert/${userId}/${sessionId}/${channel}/${channelCommand}/response` + + /** Callback for failed MCP request. Publishes a structured error back to the agent. */ + const onError = (content, code, error) => { + const data = { + code: code || error?.code || 'MCP_ERROR', + content: `Error: ${content}`, + isError: true + } + if (error) { + data.type = error?.name || error?.constructor?.name || 'Error' + data.message = error?.message || error?.toString() + } + this.client.publish(responseTopic, JSON.stringify(data), mqttOptions) + } + /** Callback for successful MCP request. Publishes the result back to the agent. */ + const onSuccess = (result) => { + this.client.publish(responseTopic, JSON.stringify(result), mqttOptions) + } + + // check that the mcpServer contains the required fields to process the request + if (!mcpServer || !['instance', 'device'].includes(mcpServer.instanceType) || !mcpServer.instance || !mcpServer.mcpServer) { + console.warn('Invalid Expert Insight tool call request', payload) + return // do not respond, this is not for us. + } + + // validate kind matches the topic channel parameter & that the toolDefinition/resourceDefinition/resourceTemplateDefinition is present + let definition = null + switch (true) { + case isInsightsToolCall && kind === 'mcp_tool': + definition = toolDefinition + break + case isInsightsResourceCall && kind === 'mcp_resource': + definition = resourceDefinition + break + case isInsightsResourceCall && kind === 'mcp_resource_template': + definition = resourceTemplateDefinition + break + } + if (!definition) { + onError('Invalid Expert Insight tool call request: missing or mismatched kind and definition', 'MCP_INVALID_DEFINITION') + return + } + + this.emit( + `request/${mcpServer.instanceType}/expert/insight`, // event name + userId, // ID of user making the request + command, // command + mcpServer, // mcp server details + kind, // mcp kind (mcp_tool, mcp_resource, mcp_resource_template) + definition, // mcpFeatureDefinition + data, // call data + onSuccess, // success callback + onError // failure callback + ) + } + } else if (ownerType === 'p') { this.emit('status/project', { id: ownerId, status: message.toString() @@ -125,7 +216,9 @@ class CommsClient extends EventEmitter { // Device response heartbeat 'ff/v1/+/d/+/resources/heartbeat', // Platform sync messages - 'ff/v1/platform/sync' + 'ff/v1/platform/sync', + // Listen for Expert platform requests + 'ff/v1/expert/+/+/platform/+/request' ]) } } diff --git a/forge/comms/devices.js b/forge/comms/devices.js index ab85b6d9cc..e43cc6edbb 100644 --- a/forge/comms/devices.js +++ b/forge/comms/devices.js @@ -8,6 +8,7 @@ const { v4: uuidv4 } = require('uuid') const noop = () => {} const DEFAULT_TIMEOUT = 10000 +const { filterAccessibleMCPServerFeatures } = require('../services/expert.js') // declare command and response monitor types (and freeze them) const CommandMonitorTemplate = { @@ -37,6 +38,21 @@ Object.freeze(CommandMessageTemplate) /** @typedef {typeof CommandMonitorTemplate} ResponseMonitor */ /** @typedef {typeof CommandMessageTemplate} CommandMessage */ +/** + * @typedef MCPServerDetails + * @property {any} team - team + * @property {any} application - application + * @property {any} instance - instance + * @property {any} instanceName - instanceName + * @property {any} instanceType - instanceType + * @property {any} mcpServer - mcpServer + * @property {any} mcpServerName - mcpServerName + * @property {any} mcpEndpoint - mcpEndpoint + * @property {any} mcpServerNameUnique - mcpServerNameUnique + * @property {any} mcpServerDescription - mcpServerDescription + * @property {any} mcpServerTitle - mcpServerTitle + * @property {any} mcpServerVersion - mcpServerVersion + */ /** * DeviceCommsHandler @@ -63,6 +79,141 @@ class DeviceCommsHandler { // Listen for any incoming device status events client.on('status/device', (status) => { this.handleStatus(status) }) client.on('response/device', (response) => { this.handleCommandResponse(response) }) + + // Handle expert inflight requests sent from the FF Expert Agent - intended for an MCP server on a specific remote instance + client.on('request/device/expert/insight', async (userId, command, /** @type {MCPServerDetails} */ mcpServer, mcpDefinitionKind, mcpDefinition, data, onSuccess, onError) => { + // MCP ROUTE: step 2 (remote) + // Called By: an MQTT inflight message (from the Expert Agent) + // Calls To : device agent via the established command channel (sendCommandAwaitReply) + + const { team: teamId, application: applicationId, instance: instanceId, instanceType, mcpServer: mcpServerId } = mcpServer + const isToolCall = command === 'mcp:call-tool' + const isResourceCall = command === 'mcp:read-resource' && mcpDefinitionKind === 'mcp_resource' + const isResourceTemplateCall = command === 'mcp:read-resource' && mcpDefinitionKind === 'mcp_resource_template' + const toolDefinition = isToolCall ? mcpDefinition : null + const resourceDefinition = isResourceCall ? mcpDefinition : null + const resourceTemplateDefinition = isResourceTemplateCall ? mcpDefinition : null + + try { + // Premise: + // The incoming request contains information to call an MCP tool/resource on a specific instance. + // 1. Check that the for the MCP server supplied, the user has access (application level) + // 2. Check that the for the MCP server feature being performed (tool/resource/resource_template), that the user has access to (feature level) + // 3. Re-resolve the MCP server against the team's trusted MCP registry, verify + // instance/application ownership, and get/create access tokens as needed based on the + // instance node security settings + + // first pass - basic sanity checks, picking up associated models for the user, team membership, etc + if (!teamId || !applicationId || !instanceId || !mcpServerId || instanceType !== 'device') { + return onError('Invalid MCP request - missing required fields', 'MCP_INVALID_REQUEST') + } + + const instance = await this.app.db.models.Device.byId(instanceId) + if (!instance) { + return onError('Invalid instance', 'MCP_INVALID_INSTANCE') + } + + // get associated db models for the user and team membership + // reload the trusted registration and ensure it is still valid for this team and instance + const registration = await app.db.models.MCPRegistration.byId(mcpServerId) + if (!registration) { + return onError('No MCP registration found', 'MCP_NO_REGISTRATION') + } + + const application = instance.Application + await application.reload({ attributes: ['TeamId'] }) + const team = instance.Team + + const teamOk = team.hashid === teamId && application.TeamId === team.id && registration.TeamId === team.id + const applicationOk = application.hashid === applicationId && instance.ApplicationId === application.id + const instanceOk = registration.targetId === instance.id.toString() && registration.targetType === instanceType + + if (!teamOk || !applicationOk || !instanceOk) { + return onError('Invalid team, application, or instance', 'MCP_INVALID_TEAM_APPLICATION_INSTANCE') + } + + const serverEntry = { + application, + server: { + ...mcpServer, + tools: toolDefinition ? [toolDefinition] : [], + resources: resourceDefinition ? [resourceDefinition] : [], + resourceTemplates: resourceTemplateDefinition ? [resourceTemplateDefinition] : [] + } + } + const user = await app.db.models.User.byId(userId) + if (!user || user.hashid !== userId) { + return onError('Invalid user', 'MCP_INVALID_USER') + } + const existingRole = await user.getTeamMembership(teamId) + const accessibleServers = filterAccessibleMCPServerFeatures(app, [serverEntry], team, existingRole) + const accessibleServer = accessibleServers.find(s => s.mcpServer === mcpServerId) + if (!accessibleServer) { + return onError('User does not have access to MCP server', 'MCP_NO_ACCESS') + } + + const commandData = { + kind: mcpDefinitionKind, + endpoint: null // updated below after checks and other data is appended + } + + // Prepare command data based on the type of MCP call (tool, resource, or resource template) + if (isToolCall) { + const accessibleTool = accessibleServer.tools.find(t => t.name === data.name) + if (!accessibleTool) { + return onError('User does not have access to MCP tool', 'MCP_NO_ACCESS_TOOL') + } + commandData.name = data.name + commandData.input = data.input + } else if (isResourceCall) { + const accessibleResource = accessibleServer.resources.find(r => r.uri === resourceDefinition.uri) + if (!accessibleResource) { + return onError('User does not have access to MCP resource', 'MCP_NO_ACCESS_RESOURCE') + } + commandData.uri = data.uri + } else if (isResourceTemplateCall) { + const accessibleResourceTemplate = accessibleServer.resourceTemplates.find(r => r.uriTemplate === resourceTemplateDefinition.uriTemplate) + if (!accessibleResourceTemplate) { + return onError('User does not have access to MCP resource template', 'MCP_NO_ACCESS_RESOURCE_TEMPLATE') + } + // Prepare the commandData for the resource template call, including resolving the final URI from the template and input values + // NOTE: The Expert Agent will typically unfurl the template and provide a fully resolved URI, but if it is not provided (or contains + // placeholders), we will compute it below using the template and input values + commandData.uri = data.uri + commandData.uriTemplate = data.uriTemplate + commandData.input = data.input + if ((!commandData.uri || /\{([^}]+)\}/.test(commandData.uri))) { + // compute the final URI by replacing placeholders in the template with input values + const template = data.uriTemplate || commandData.uri + const input = data.input || {} + commandData.uri = template.replace(/\{([^}]+)\}/g, (match, key) => { + const cleanKey = key.replace(/[*?]/g, '') // strip RFC6570 modifiers e.g. {var*} or {var?} to get the clean key for input lookup + return input[cleanKey] !== undefined ? encodeURIComponent(input[cleanKey]) : match + }) + } + } else { + return onError('Invalid MCP command', 'MCP_INVALID_COMMAND') + } + + // update the endpoint with the resolved access token for the instance and team + const teamType = await instance.Team.getTeamType() + const teamHttpSecurityFeature = !!teamType.properties.features?.teamHttpSecurity + commandData.endpoint = { + mcpEndpoint: mcpServer.mcpEndpoint, + headers: mcpServer.headers || {}, + accessToken: await app.expert.mcp.getOrCreateToken(instance, mcpServer.instanceType, instanceId, teamHttpSecurityFeature) || null + } + + try { + const result = await this.sendCommandAwaitReply(teamId, instanceId, command, commandData, { timeout: 30000 }) + onSuccess(result) + } catch (err) { + return onError(`An error occurred performing insight request: ${err.message}`, 'MCP_INSIGHT_REQUEST_ERROR', err) + } + } catch (err) { + return onError(`Error handling expert insights inflight request: ${err.message}`, 'MCP_INSIGHT_REQUEST_ERROR', err) + } + }) client.on('logs/heartbeat', (beat) => { this.deviceLogHeartbeats[beat.id] = beat.timestamp }) @@ -315,8 +466,8 @@ class DeviceCommsHandler { * @param {String} deviceId The device Id * @param {String} command The command to send to the device * @param {Object} payload The payload to send to the device - * @param {Object} options Options - * @param {Number} options.timeout The timeout in milliseconds to wait for a response + * @param {Object} routingOptions Options + * @param {Number} [routingOptions.timeout=DEFAULT_TIMEOUT] The timeout in milliseconds to wait for a response * @returns {Promise} The response payload * @see handleCommandResponse */ diff --git a/forge/comms/index.js b/forge/comms/index.js index 04a7c463e0..565cd9e612 100644 --- a/forge/comms/index.js +++ b/forge/comms/index.js @@ -3,6 +3,7 @@ const fp = require('fastify-plugin') const ACLManager = require('./aclManager') const { CommsClient } = require('./commsClient') const { DeviceCommsHandler } = require('./devices') +const { InstanceCommsHandler } = require('./instances') /** * This module represents the real-time comms component of the platform. @@ -30,6 +31,7 @@ module.exports = fp(async function (app, _opts) { // Create the handler for any device-related messages const deviceCommsHandler = DeviceCommsHandler(app, client) + const instanceCommsHandler = InstanceCommsHandler(app, client) // Not in the current release, but when we handle Launcher status // via MQTT, it will arrive here. Compare to the status/device handler in `devices.js` @@ -40,6 +42,7 @@ module.exports = fp(async function (app, _opts) { // Setup the platform API for the comms component app.decorate('comms', { devices: deviceCommsHandler, + instances: instanceCommsHandler, aclManager: ACLManager(app), platform: { settings: { diff --git a/forge/comms/instances.js b/forge/comms/instances.js new file mode 100644 index 0000000000..9a5e036cb4 --- /dev/null +++ b/forge/comms/instances.js @@ -0,0 +1,159 @@ +// /** +// * This module provides the handler for instance events +// */ + +const { filterAccessibleMCPServerFeatures } = require('../services/expert.js') + +/** + * InstanceCommsHandler + * @class InstanceCommsHandler + * @memberof forge.comms + */ +class InstanceCommsHandler { + /** + * New InstanceCommsHandler instance + * @param {import('../forge').ForgeApplication} app Fastify app + * @param {import('./commsClient').CommsClient} client Comms Client + */ + constructor (app, client) { + this.app = app + this.client = client + + client.on('request/instance/expert/insight', async (userId, command, /** @type {MCPServerDetails} */ mcpServer, mcpDefinitionKind, mcpDefinition, data, onSuccess, onError) => { + // MCP ROUTE: step 2 (hosted) + // Called By: an MQTT inflight message (from the Expert Agent) + // Calls To : driver helper fn callMCPTool (via the forge app container wrapper) + + const { team: teamId, application: applicationId, instance: instanceId, instanceType, mcpServer: mcpServerId } = mcpServer + const isToolCall = command === 'mcp:call-tool' + const isResourceCall = command === 'mcp:read-resource' && mcpDefinitionKind === 'mcp_resource' + const isResourceTemplateCall = command === 'mcp:read-resource' && mcpDefinitionKind === 'mcp_resource_template' + const toolDefinition = isToolCall ? mcpDefinition : null + const resourceDefinition = isResourceCall ? mcpDefinition : null + const resourceTemplateDefinition = isResourceTemplateCall ? mcpDefinition : null + + try { + // Premise: + // The incoming request contains information to call an MCP tool on a specific instance. + // 1. Check that the for the MCP server supplied, the user has access (application level) + // 2. Check that the for the MCP server feature being performed (prompt/resource/tool), that the user has access to (feature level) + // 3. Re-resolve the MCP server against the team's trusted MCP registry, verify + // instance/application ownership, and get/create access tokens as needed based on the + // instance node security settings + + // first pass - basic sanity checks, picking up associated models for the user, team membership, etc + if (!teamId || !applicationId || !instanceId || !mcpServerId || instanceType !== 'instance') { + return onError('Invalid MCP request - missing required fields', 'MCP_INVALID_REQUEST') + } + + const instance = await this.app.db.models.Project.byId(instanceId) + if (!instance) { + return onError('Invalid instance', 'MCP_INVALID_INSTANCE') + } + + // get associated db models for the user and team membership + // reload the trusted registration and ensure it is still valid for this team and instance + const registration = await app.db.models.MCPRegistration.byId(mcpServerId) + if (!registration) { + return onError('No MCP registration found', 'MCP_NO_REGISTRATION') + } + + const application = instance.Application + await application.reload({ attributes: ['TeamId'] }) + const team = instance.Team + + const teamOk = team.hashid === teamId && application.TeamId === team.id && registration.TeamId === team.id + const applicationOk = application.hashid === applicationId && instance.ApplicationId === application.id + const instanceOk = registration.targetId === instance.id.toString() && registration.targetType === instanceType + + if (!teamOk || !applicationOk || !instanceOk) { + return onError('Invalid team, application, or instance', 'MCP_INVALID_TEAM_APPLICATION_INSTANCE') + } + + const serverEntry = { + application, + server: { + ...mcpServer, + tools: toolDefinition ? [toolDefinition] : [], + resources: resourceDefinition ? [resourceDefinition] : [], + resourceTemplates: resourceTemplateDefinition ? [resourceTemplateDefinition] : [] + } + } + const user = await app.db.models.User.byId(userId) + if (!user || user.hashid !== userId) { + return onError('Invalid user', 'MCP_INVALID_USER') + } + const existingRole = await user.getTeamMembership(teamId) + const accessibleServers = filterAccessibleMCPServerFeatures(app, [serverEntry], team, existingRole) + const accessibleServer = accessibleServers.find(s => s.mcpServer === mcpServerId) + if (!accessibleServer) { + return onError('User does not have access to MCP server', 'MCP_NO_ACCESS') + } + + // Prepare command data and method based on the type of MCP call (tool, resource, or resource template) + let commandMethod + const commandData = [] + if (isToolCall) { + const accessibleTool = accessibleServer.tools.find(t => t.name === data.name) + if (!accessibleTool) { + return onError('User does not have access to MCP tool', 'MCP_NO_ACCESS_TOOL') + } + commandData.push(data.name) + commandData.push(data.input) + commandMethod = app.containers.callMCPTool + } else if (isResourceCall) { + const accessibleResource = accessibleServer.resources.find(r => r.uri === resourceDefinition.uri) + if (!accessibleResource) { + return onError('User does not have access to MCP resource', 'MCP_NO_ACCESS_RESOURCE') + } + commandData.push(data.uri) + commandMethod = app.containers.readMCPResource + } else if (isResourceTemplateCall) { + const accessibleResourceTemplate = accessibleServer.resourceTemplates.find(r => r.uriTemplate === resourceTemplateDefinition.uriTemplate) + if (!accessibleResourceTemplate) { + return onError('User does not have access to MCP resource template', 'MCP_NO_ACCESS_RESOURCE_TEMPLATE') + } + // Prepare the commandData for the resource template call, including resolving the final URI from the template and input values + // NOTE: The Expert Agent will typically unfurl the template and provide a fully resolved URI, but if it is not provided (or contains + // placeholders), we will compute it below using the template and input values + let uri = data.uri + if ((!uri || /\{([^}]+)\}/.test(uri))) { + // compute the final URI by replacing placeholders in the template with input values + const template = data.uriTemplate || uri + const input = data.input || {} + uri = template.replace(/\{([^}]+)\}/g, (match, key) => { + const cleanKey = key.replace(/[*?]/g, '') // strip RFC6570 modifiers e.g. {var*} or {var?} to get the clean key for input lookup + return input[cleanKey] !== undefined ? encodeURIComponent(input[cleanKey]) : match + }) + } + commandData.push(uri) + commandMethod = app.containers.readMCPResource + } else { + return onError('Invalid MCP command', 'MCP_INVALID_COMMAND') + } + + // update the endpoint with the resolved access token for the instance and team + const teamType = await instance.Team.getTeamType() + const teamHttpSecurityFeature = !!teamType.properties.features?.teamHttpSecurity + const endpoint = { + mcpEndpoint: mcpServer.mcpEndpoint, + headers: mcpServer.headers || {}, + accessToken: await app.expert.mcp.getOrCreateToken(instance, mcpServer.instanceType, instanceId, teamHttpSecurityFeature) || null + } + + try { + const result = await commandMethod(instance, endpoint, ...commandData) + onSuccess(result) + } catch (err) { + return onError(`An error occurred performing insight request: ${err.message}`, 'MCP_INSIGHT_REQUEST_ERROR', err) + } + } catch (err) { + return onError(`Error handling expert insights inflight request: ${err.message}`, 'MCP_INSIGHT_REQUEST_ERROR', err) + } + }) + } +} + +module.exports = { + InstanceCommsHandler: (app, client) => new InstanceCommsHandler(app, client) +} diff --git a/forge/containers/stub/index.js b/forge/containers/stub/index.js index 03401a8b2d..52dd8a25c4 100644 --- a/forge/containers/stub/index.js +++ b/forge/containers/stub/index.js @@ -220,6 +220,43 @@ module.exports = { this._app.log.info(`[stub driver] Restarting flows ${project.id}`) }, + /** + * Gets the features of the MCP endpoints generated by the mcp nodes in the running node-red instance. + * @param {Project} project - the project model instance + * @param {Array} endpoints - list of MCP endpoints to query. + * Each entry may be a bare URL/path string, or an object `{ url, headers?, mcpAccessToken? }` + * where `mcpAccessToken` is `{ scheme, token, scope }`. + * @returns {Object} MCP features + */ + getMCPFeatures: async (project, endpoints) => { + this._app.log.info(`[stub driver] Getting MCP features for ${project.id}`) + return {} + }, + + /** + * Calls a tool on the MCP endpoints generated by the mcp nodes in the running node-red instance. + * @param {Project} project - the project model instance + * @param {string|McpEndpointSpec} endpoint - MCP endpoint to call the tool on. + * @param {string} name - name of the tool to call. + * @param {Object} input - arguments to pass to the tool + * @returns {Object} result of the tool call + */ + callMCPTool: async (project, endpoint, name, input) => { + this._app.log.info(`[stub driver] Calling MCP tool ${name} on ${project.id}`) + return {} + }, + + /** + * Reads a resource from the MCP endpoints generated by the mcp nodes in the running node-red instance. + * @param {Project} project - the project model instance + * @param {string|McpEndpointSpec} endpoint - MCP endpoint to call the tool on. + * @param {string} uri - URI of the resource to read. + */ + readMCPResource: async (project, endpoint, uri) => { + this._app.log.info(`[stub driver] Reading MCP resource ${uri} on ${project.id}`) + return {} + }, + /** * Get a Project's logs * @param {Project} project - the project model instance diff --git a/forge/containers/wrapper.js b/forge/containers/wrapper.js index f368f6496e..8e9ffa5d5a 100644 --- a/forge/containers/wrapper.js +++ b/forge/containers/wrapper.js @@ -211,6 +211,32 @@ module.exports = { await this._driver.restartFlows(project, options) } }, + getMCPFeatures: async (project, endpoints) => { + let value = {} + if (this._driver.getMCPFeatures) { + value = await this._driver.getMCPFeatures(project, endpoints) + } + return value + }, + callMCPTool: async (project, endpoint, name, input) => { + // MCP ROUTE: step 3 (hosted) + // Called By: forge app (forge/comms/instances.js) inflight request handler + // Calls To : loaded driver (localfs/k8s/docker) helper fn callMCPTool + let value = {} + if (this._driver.callMCPTool) { + value = await this._driver.callMCPTool(project, endpoint, name, input) + } + return value + }, + readMCPResource: async (project, endpoint, uri) => { + // Called By: forge app (forge/comms/instances.js) inflight request handler + // Calls To : loaded driver (localfs/k8s/docker) helper fn readMCPResource + let value = {} + if (this._driver.readMCPResource) { + value = await this._driver.readMCPResource(project, endpoint, uri) + } + return value + }, revokeUserToken: async (project, token) => { // logout:nodered(step-2) if (this._driver.revokeUserToken) { if (project.state === 'suspended') { diff --git a/forge/ee/db/models/MCPRegistration.js b/forge/ee/db/models/MCPRegistration.js index 6755c80773..344bb9efcb 100644 --- a/forge/ee/db/models/MCPRegistration.js +++ b/forge/ee/db/models/MCPRegistration.js @@ -1,7 +1,7 @@ /** * Stores MCP endpoints for a Team */ -const { DataTypes, literal } = require('sequelize') +const { DataTypes, literal, Op } = require('sequelize') module.exports = { name: 'MCPRegistration', @@ -48,12 +48,22 @@ module.exports = { static: { byTeam: async (teamIdOrHash, { includeTeam = false, - includeInstance = false + includeInstance = false, + filterId = null } = {}) => { let teamId = teamIdOrHash if (typeof teamId === 'string') { teamId = M.Team.decodeHashid(teamId) } + const where = { TeamId: teamId } + if (filterId) { + // accept an array of registration ids (numeric or hashid) to filter by + const idList = (Array.isArray(filterId) ? filterId : [filterId]) + .map(id => typeof id === 'string' ? M.MCPRegistration.decodeHashid(id) : id) + .flat() + .filter(id => !!id) + where.id = { [Op.in]: idList } + } const include = [] if (includeTeam) { include.push({ model: M.Team, include: { model: M.TeamType } }) @@ -61,13 +71,13 @@ module.exports = { if (includeInstance) { include.push({ model: M.Project, - attributes: ['hashid', 'id', 'name', 'slug', 'links', 'url', 'ApplicationId', 'state'], + attributes: ['hashid', 'id', 'name', 'slug', 'links', 'url', 'ApplicationId', 'state', 'versions'], required: false, on: instanceOwnershipJoin }) include.push({ model: M.Device, - attributes: ['hashid', 'id', 'name', 'type', 'ApplicationId', 'state'], + attributes: ['hashid', 'id', 'name', 'type', 'ApplicationId', 'state', 'agentVersion'], required: false, on: deviceOwnershipJoin, include: { @@ -77,7 +87,7 @@ module.exports = { }) } return this.findAll({ - where: { TeamId: teamId }, + where, include }) }, @@ -89,6 +99,16 @@ module.exports = { nodeId } }) + }, + byId: async (idOrHash, { includeAssociations = false } = {}) => { + let id = idOrHash + if (typeof idOrHash === 'string') { + id = M.MCPRegistration.decodeHashid(idOrHash) + } + const where = { id } + return this.findOne({ + where + }) } } } diff --git a/forge/ee/lib/expert/emxq-bridge/setup.js b/forge/ee/lib/expert/emxq-bridge/setup.js index afecb12239..cd2cae0f03 100644 --- a/forge/ee/lib/expert/emxq-bridge/setup.js +++ b/forge/ee/lib/expert/emxq-bridge/setup.js @@ -14,7 +14,7 @@ const axios = require('axios') const httpAgent = new http.Agent({ keepAlive: false }) const httpsAgent = new https.Agent({ keepAlive: false }) -const { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } = require('./templates.js') +const { connector, actionOut, sourceChat, sourceInflight, sourcePlatform, ruleIn, ruleOut } = require('./templates.js') // EMQX v5 IDs for connector/action/source resources are `:`. // Rule IDs are the rule's own `id` field. @@ -188,7 +188,8 @@ async function validateBridge (app, { cfg, client } = {}) { } const hasSourceChat = sources.some(s => s.name === sourceChat.name && s.type === 'mqtt') const hasSourceInflight = sources.some(s => s.name === sourceInflight.name && s.type === 'mqtt') - if (!hasSourceChat || !hasSourceInflight) { + const hasSourcePlatform = sources.some(s => s.name === sourcePlatform.name && s.type === 'mqtt') + if (!hasSourceChat || !hasSourceInflight || !hasSourcePlatform) { app.log.info('Expert bridge sources not found') return false } @@ -325,6 +326,8 @@ async function addBridge (app, { cfg, client } = {}) { await post(client, '/sources', sourceChat) app.log.info(`creating EMQX source ${sourceInflight.name}`) await post(client, '/sources', sourceInflight) + app.log.info(`creating EMQX source ${sourcePlatform.name}`) + await post(client, '/sources', sourcePlatform) app.log.info(`creating EMQX rule ${ruleOut.id}`) await post(client, '/rules', ruleOut) app.log.info(`creating EMQX rule ${ruleIn.id}`) diff --git a/forge/ee/lib/expert/emxq-bridge/templates.js b/forge/ee/lib/expert/emxq-bridge/templates.js index 6c055ca3d5..e7abca7b45 100644 --- a/forge/ee/lib/expert/emxq-bridge/templates.js +++ b/forge/ee/lib/expert/emxq-bridge/templates.js @@ -69,8 +69,25 @@ const sourceInflight = { } } +// Expert Broker → FF App Instance Broker. Platform requests initiated by the AI agent. +const sourcePlatform = { + name: 'ff-expert-to-app-platform-source', + type: 'mqtt', + connector: 'ff-expert-broker', + description: 'Subscribe to platform requests on the Expert Broker', + enable: true, + parameters: { + qos: 1, + topic: 'ff/v1/expert/+/+/platform/+/request' + }, + resource_opts: { + health_check_interval: '15s' + } +} + // Republish inbound bridge messages onto the FF App Instance Broker. // `topic` is already mountpoint-stripped by the Expert Broker, so no rewrite needed. +// This covers chat responses, inflight requests, and platform requests. // // v5 property forwarding via the inline republish action is fiddly on EMQX: // - `user_properties` is a template scalar, so `${pub_props.'User-Property'}` works. @@ -81,9 +98,9 @@ const sourceInflight = { const ruleIn = { id: 'ff-expert-to-app-rule', name: 'ff-expert-to-app-rule', - description: 'Republish chat responses and inflight requests on the FF App Instance Broker', + description: 'Republish chat responses, inflight requests and platform requests on the FF App Instance Broker', enable: true, - sql: 'SELECT\n *,\n pub_props.\'Correlation-Data\' as correlation_data,\n pub_props.\'Response-Topic\' as response_topic,\n pub_props.\'Content-Type\' as content_type,\n pub_props.\'Payload-Format-Indicator\' as payload_format_indicator,\n pub_props.\'Message-Expiry-Interval\' as message_expiry_interval\nFROM\n "$bridges/mqtt:ff-expert-to-app-chat-source",\n "$bridges/mqtt:ff-expert-to-app-inflight-source"', + sql: 'SELECT\n *,\n pub_props.\'Correlation-Data\' as correlation_data,\n pub_props.\'Response-Topic\' as response_topic,\n pub_props.\'Content-Type\' as content_type,\n pub_props.\'Payload-Format-Indicator\' as payload_format_indicator,\n pub_props.\'Message-Expiry-Interval\' as message_expiry_interval\nFROM\n "$bridges/mqtt:ff-expert-to-app-chat-source",\n "$bridges/mqtt:ff-expert-to-app-inflight-source",\n "$bridges/mqtt:ff-expert-to-app-platform-source"', actions: [ { args: { @@ -106,19 +123,20 @@ const ruleIn = { ] } -// FF App Instance Broker → Expert Broker. Forwards two patterns: +// FF App Instance Broker → Expert Broker. Forwards 3 patterns: // - ../support/chat/request // - ../support/inflight/+/response +// - ../support/platform/+/response // The Expert Broker's mountpoint applies the `/` namespace prefix on receipt. const ruleOut = { id: 'ff-app-to-expert-rule', name: 'ff-app-to-expert-rule', - description: 'Forward chat requests and inflight responses to the Expert Broker', + description: 'Forward chat requests, inflight responses and platform responses to the Expert Broker', enable: true, - sql: 'SELECT\n *\nFROM\n "ff/v1/expert/+/+/+/+/support/chat/request",\n "ff/v1/expert/+/+/+/+/support/inflight/+/response"', + sql: 'SELECT\n *\nFROM\n "ff/v1/expert/+/+/+/+/support/chat/request",\n "ff/v1/expert/+/+/+/+/support/inflight/+/response",\n "ff/v1/expert/+/+/platform/+/response"', actions: [ 'mqtt:ff-app-to-expert-action' ] } -module.exports = { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } +module.exports = { connector, actionOut, sourceChat, sourceInflight, sourcePlatform, ruleIn, ruleOut } diff --git a/forge/ee/lib/expert/index.js b/forge/ee/lib/expert/index.js index 2644947f57..c46429a8bd 100644 --- a/forge/ee/lib/expert/index.js +++ b/forge/ee/lib/expert/index.js @@ -51,8 +51,14 @@ module.exports = fp(async function (app, _opts) { let mcpAccessToken = await readCachedMcpAccessToken(instanceId) if (!mcpAccessToken) { - const instanceSettings = await instance.getSetting('settings') - const httpNodeAuth = instanceSettings?.httpNodeAuth + let httpNodeAuth + if (instanceType === 'instance') { + const instanceSettings = await instance.getSetting('settings') + httpNodeAuth = instanceSettings?.httpNodeAuth + } else if (instanceType === 'device') { + const deviceSettings = await instance.getSetting('security') + httpNodeAuth = deviceSettings?.httpNodeAuth + } const tokenName = 'FlowFuse Expert MCP Access Token' const scope = ['ff-expert:mcp', instanceType] if (httpNodeAuth?.type === 'flowforge-user' && teamHttpSecurityFeatureEnabled) { diff --git a/forge/ee/routes/expert/index.js b/forge/ee/routes/expert/index.js index 99e78bb9a8..030f4f59b7 100644 --- a/forge/ee/routes/expert/index.js +++ b/forge/ee/routes/expert/index.js @@ -7,9 +7,12 @@ * @memberof forge.routes.api */ const { default: axios } = require('axios') +const semver = require('semver') const { v4: uuidv4 } = require('uuid') const { filterAccessibleMCPServerFeatures } = require('../../../services/expert.js') +/** @type {typeof import('../../../comms/devices.js').DeviceCommsHandler} */ +const getDeviceComms = (app) => { return app.comms?.devices } /** * @param {import('../../forge.js').ForgeApplication} app @@ -130,15 +133,16 @@ module.exports = async function (app) { // instance's node security settings // first pass - get associated applications for the MCP servers selected by user + const mcpServerIds = [] for (const server of selectedCapabilities || []) { - const applicationId = server.application - if (!applicationId) { continue } - + const { application: applicationId, mcpServer } = server + if (!applicationId || !mcpServer) { continue } if (!Object.hasOwnProperty.call(applicationCache, applicationId)) { applicationCache[applicationId] = await app.db.models.Application.byId(applicationId) } const application = applicationCache[applicationId] if (application) { + mcpServerIds.push(mcpServer) mcpServersList.push({ server, application }) } } @@ -148,17 +152,17 @@ module.exports = async function (app) { // Build the team's trusted MCP registry. This is the source of truth for which MCP servers // exist and their transport details (instance, endpoint, protocol). The client-supplied - // transport fields (e.g. mcpServerUrl, instanceUrl, mcpEndpoint) are NEVER trusted - they - // are re-resolved from here so a user cannot point a minted access token at an arbitrary - // URL, nor attach a token for an instance they have not been authorised against. - // (mirrors the /mcp/features route) - const registrations = await app.db.models.MCPRegistration.byTeam(request.team.id, { includeInstance: true }) || [] + // transport fields are NEVER trusted - they should always be re-resolved from the trusted registry. + const registrations = await app.db.models.MCPRegistration.byTeam(request.team.id, { includeInstance: true, filterId: mcpServerIds }) || [] const trustedRegistrations = new Map() for (const reg of registrations) { - if (reg.targetType !== 'instance' || !reg.Project) { + if (reg.targetType === 'instance' && reg.Project) { + trustedRegistrations.set(`${reg.Project.id}::${reg.hashid}`, reg) + } else if (reg.targetType === 'device' && reg.Device) { + trustedRegistrations.set(`${reg.Device.hashid}::${reg.hashid}`, reg) + } else { continue - } // devices are not yet supported for MCP servers - trustedRegistrations.set(`${reg.Project.id}::${reg.name}`, reg) + } } // final pass - re-resolve each accessible server against the trusted registry, verify @@ -172,8 +176,8 @@ module.exports = async function (app) { // re-resolve against the trusted registry - drops any selection that is not a // registered MCP server for this team - const registration = trustedRegistrations.get(`${server.instance}::${server.mcpServerName}`) - const instance = registration?.Project + const registration = trustedRegistrations.get(`${server.instance}::${server.mcpServer}`) + const instance = registration?.Project || registration?.Device if (!instance) { server._invalid = true continue @@ -190,20 +194,6 @@ module.exports = async function (app) { const instanceType = registration.targetType // trusted instance type ('instance') - // Tamper detection (audit signal only - we overwrite with trusted values regardless). - // A well-behaved client echoes back the transport details we issued via /mcp/features, - // so any disagreement indicates either a stale client or tampering. We log it rather - // than reject, to avoid failing legitimate requests in a race condition. mcpServerUrl is - // not compared - FlowFuse never issues it as an authoritative field (the agent builds - // it client-side), so it is simply dropped below. - const tamperedFields = [] - if (server.instanceUrl !== undefined && server.instanceUrl !== instance.url) { tamperedFields.push('instanceUrl') } - if (server.mcpEndpoint !== undefined && server.mcpEndpoint !== registration.endpointRoute) { tamperedFields.push('mcpEndpoint') } - if (server.mcpProtocol !== undefined && server.mcpProtocol !== registration.protocol) { tamperedFields.push('mcpProtocol') } - if (tamperedFields.length > 0) { - app.log.warn(`Expert chat: correcting client-supplied MCP transport fields [${tamperedFields.join(', ')}] that did not match the trusted registry (user=${request.user.hashid}, team=${request.team.hashid}, instance=${instance.id})`) - } - // Overwrite all transport/identity fields with trusted values - never trust the client's // copy. The agent rebuilds mcpServerUrl from instanceUrl + mcpEndpoint, as it does for // the /mcp/features response. @@ -212,6 +202,7 @@ module.exports = async function (app) { server.instanceType = instanceType server.instanceName = instance.name server.instanceUrl = instance.url + server.mcpServer = registration.hashid server.mcpServerName = registration.name server.mcpEndpoint = registration.endpointRoute server.mcpProtocol = registration.protocol @@ -241,6 +232,7 @@ module.exports = async function (app) { Origin: request.headers.origin, 'X-Chat-Session-ID': sessionId, 'X-Chat-Transaction-ID': transactionId, + 'X-Chat-Namespace-ID': app.license.get('id'), ...(app.expert.serviceToken ? { Authorization: `Bearer ${app.expert.serviceToken}` } : {}) }, timeout: app.expert.requestTimeout @@ -299,18 +291,31 @@ module.exports = async function (app) { instance: { type: 'string' }, instanceType: { type: 'string', enum: ['instance', 'device'] }, instanceName: { type: 'string' }, + mcpServer: { type: 'string' }, mcpServerName: { type: 'string' }, prompts: { type: 'array', items: { type: 'object', additionalProperties: true } }, resources: { type: 'array', items: { type: 'object', additionalProperties: true } }, resourceTemplates: { type: 'array', items: { type: 'object', additionalProperties: true } }, tools: { type: 'array', items: { type: 'object', additionalProperties: true } }, - mcpProtocol: { type: 'string', enum: ['http', 'sse'] }, - mcpServerUrl: { type: 'string' }, title: { type: 'string' }, version: { type: 'string' }, description: { type: 'string' } }, - required: ['instance', 'instanceType', 'instanceName', 'mcpServerName', 'prompts', 'resources', 'resourceTemplates', 'tools', 'mcpProtocol'] + required: ['instance', 'instanceType', 'instanceName', 'mcpServer', 'mcpServerName', 'prompts', 'resources', 'resourceTemplates', 'tools'] + } + }, + incompatibleServers: { + type: 'array', + items: { + type: 'object', + properties: { + instance: { type: 'string' }, + instanceType: { type: 'string', enum: ['instance', 'device'] }, + instanceName: { type: 'string' }, + currentVersion: { type: 'string' }, + minimumSupportedVersion: { type: 'string' } + }, + required: ['instance', 'instanceType'] } } } @@ -346,30 +351,31 @@ module.exports = async function (app) { // 7. Filter the MCP features based on user RBACs (e.g. destructive tool access) // 3. Return the filtered MCP features to the client - /** @type {MCPServerItem[]} */ - const runningInstancesWithMCPServer = [] const transactionId = request.headers['x-chat-transaction-id'] - const mcpCapabilitiesUrl = `${app.expert.expertUrl.split('/').slice(0, -1).join('/')}/mcp/features` + const deviceComms = getDeviceComms(app) // Get the MCP servers registered for this team const mcpServers = await app.db.models.MCPRegistration.byTeam(request.team.id, { includeInstance: true }) || [] - // Scan each MCP server and ensure the user has access to the associated application and that the instance is running - // then collect the MCP server info for the running instances MCP servers - // filter out any that the user doesn't have access to + // PASS 1 - group registrations by instance. + // An instance can have multiple MCP servers registered, but the expensive per-instance work + // (live-state check + access token) should only run once per instance. So here we do only the + // cheap checks (team match, expected state, application access) and group the registrations by + // instance. The live-state check / token retrieval happens once per instance in pass 2. + /** @type {Object} */ + const instanceGroups = {} const applicationCache = {} for (const server of mcpServers) { - const { name, protocol, endpointRoute, TeamId, Project, Device, title, version, description } = server + const { hashid, name, protocol, endpointRoute, TeamId, Project, Device, title, version, description } = server if (TeamId !== request.team.id) { // shouldn't happen due to byTeam filter, but just in case continue } let instance, instanceId, instanceType if (Device) { - // instanceType = 'device' - // instance = Device - // instanceId = Device.hashid - continue // Devices are not yet supported for MCP servers + instanceType = 'device' + instance = Device + instanceId = Device.hashid } else if (Project) { instanceType = 'instance' instance = Project @@ -404,78 +410,148 @@ module.exports = async function (app) { continue // user doesn't have access to this instance } - // Now we have confirmed access is allowed, check instance is actually running before offering - // MCP features (querying a non-running instance would cause timeouts) - if (instance.liveState) { - const liveState = await instance.liveState({ omitStorageFlows: true }) - if (liveState?.meta?.state !== 'running') { - continue - } + // Group the registration under its instance so per-instance work runs only once in pass 2 + if (!instanceGroups[instanceId]) { + instanceGroups[instanceId] = { instance, instanceType, application, registrations: [] } } + instanceGroups[instanceId].registrations.push({ hashid, name, protocol, endpointRoute, title, version, description }) + } - // Check instance settings for node security. If FlowFuse auth is enabled, generate a short-lived (5 mins) - // auth token for the instance with a scope limited to MCP access and cache it in memory for subsequent requests - const mcpAccessToken = await app.expert.mcp.getOrCreateToken(instance, instanceType, instanceId, request.teamHttpSecurityFeature) - - runningInstancesWithMCPServer.push({ - team: request.team.hashid, - application: application.hashid, - instance: instanceId, - instanceType, - instanceName: instance.name, - instanceUrl: instance.url, - mcpServerName: name, - mcpEndpoint: endpointRoute, - mcpProtocol: protocol, - mcpAccessToken, - title, - version, - description - }) + // PASS 2 - for each unique instance, confirm instance is actually running (live-state check) and that + // the instance's launcher/agent version is new enough to support MCP features. + const instancesWithMCPServers = [] + const incompatibleServers = [] + const MIN_HOSTED_INSTANCE_LAUNCHER_VERSION = '2.32.0' + const MIN_REMOTE_INSTANCE_AGENT_VERSION = '4.0.0' + + for (const instanceId of Object.keys(instanceGroups)) { + try { + const { instance, instanceType, application, registrations } = instanceGroups[instanceId] + // check instance is actually running before offering MCP features (querying a non-running instance would cause timeouts) + // additionally, check that the instance's launcher/agent version is new enough to support MCP features. + // note: the version check can only be done after the live-state check, because the version is only available in the live-state response. + + if (instanceType === 'instance' && instance) { + // Check that instance launcher supports the required features before attempting to get the live state. + if (!instance.versions?.launcher?.current || semver.lt(instance.versions.launcher.current, MIN_HOSTED_INSTANCE_LAUNCHER_VERSION)) { + incompatibleServers.push({ instance: instanceId, instanceType, instanceName: instance.name, currentVersion: instance.versions?.launcher?.current, minimumSupportedVersion: MIN_HOSTED_INSTANCE_LAUNCHER_VERSION }) + continue // skip - launcher version too old to support MCP features via the admin API + } + // Next check that the instance is actually running before calling MCP features (querying a non-running instance would cause timeouts) + const liveState = await instance.liveState({ omitStorageFlows: true }) + if (liveState?.meta?.state !== 'running') { + continue + } + } else if (instanceType === 'device' && instance) { + // Check that device agent version supports the required features before attempting to get the live state. + if (!instance.agentVersion || semver.lt(instance.agentVersion, MIN_REMOTE_INSTANCE_AGENT_VERSION)) { + incompatibleServers.push({ instance: instanceId, instanceType, instanceName: instance.name, currentVersion: instance.agentVersion, minimumSupportedVersion: MIN_REMOTE_INSTANCE_AGENT_VERSION }) + continue // skip - agent version too old to support MCP features + } + // Next check that the device is actually running before offering MCP features (querying a non-running device would cause timeouts) + const response = await deviceComms.sendCommandAwaitReply(request.team.hashid, instanceId, 'get-liveState', {}, { timeout: 3000 }) + if (response?.state !== 'running') { + continue + } + } else { + continue // unsupported instance type or instance not found, skip it. + } + + // Check instance settings for node security. If FlowFuse auth is enabled, generate a short-lived (5 mins) + // auth token for the instance with a scope limited to MCP access and cache it in memory for subsequent requests + const mcpAccessToken = await app.expert.mcp.getOrCreateToken(instance, instanceType, instanceId, request.teamHttpSecurityFeature) + + instancesWithMCPServers.push({ + team: request.team.hashid, + application: application.hashid, + instance, + instanceType, + mcpServers: registrations.map(reg => ({ + team: request.team.hashid, + application: application.hashid, + instance: instanceId, + instanceType, + instanceName: instance.name, + instanceUrl: instance.url, + mcpServer: reg.hashid, + mcpServerName: reg.name, + mcpEndpoint: reg.endpointRoute, + mcpProtocol: reg.protocol, + mcpAccessToken, + accessToken: mcpAccessToken, + title: reg.title, + version: reg.version, + description: reg.description + })) + }) + } catch (error) { + continue // if we get an error trying, assume instance is offline/unreachable and skip it + } } // if no running instances with MCP server, return early - if (runningInstancesWithMCPServer.length === 0) { - return reply.send({ servers: [], transactionId }) + if (instancesWithMCPServers.length === 0) { + return reply.send({ servers: [], incompatibleServers, transactionId }) } - // Call to backend to request MCP capabilities from expert service - // For reference - this POST: - // * calls the backend expert service endpoint /mcp/features - // * it connects to each MCP server registered - // * retrieves the prompts/resources/tools - // * adds them to the response along with the MCP server info - const response = await axios.post(mcpCapabilitiesUrl, { - teamId: request.team.hashid, - servers: runningInstancesWithMCPServer - }, { - headers: { - Origin: request.headers.origin, - 'X-Chat-Transaction-ID': transactionId, - ...(app.expert.serviceToken ? { Authorization: `Bearer ${app.expert.serviceToken}` } : {}) - }, - timeout: app.expert.requestTimeout + // Request the features via the launcher (remote instances via MQTT, hosted instances via direct + // call to launcher admin API), one call per instance, all in parallel. + const featurePromises = instancesWithMCPServers.map(instanceWithMCPServers => { + // for HTTP protocol, we can call the launcher admin API directly (for hosted instances) or via MQTT (for remote instances) + const instance = instanceWithMCPServers.instance + if (!instance) { + return Promise.resolve({ ...instanceWithMCPServers, error: 'Instance not found' }) + } + if (instanceWithMCPServers.instanceType === 'instance') { + // hosted instance - call launcher API + return app.containers.getMCPFeatures(instance, instanceWithMCPServers.mcpServers) + } else if (instanceWithMCPServers.instanceType === 'device') { + // remote instance - call via command await dispatcher + return deviceComms.sendCommandAwaitReply(instanceWithMCPServers.team, instance.hashid, 'mcp:get-features', { mcpEndPoints: instanceWithMCPServers.mcpServers }, { timeout: 10000 }) + } else { + return Promise.resolve({ ...instanceWithMCPServers, error: 'Unsupported instance type' }) + } }) - if (response.data.transactionId !== transactionId) { - throw new Error('Transaction ID mismatch') - } - const mcpServersResponse = response.data.servers || [] + // wait for all promises to resolve + const featuresResponses = await Promise.all(featurePromises) + const mcpServersResponse = (featuresResponses || []).flat() + const serverList = [] // load the associate application models so that we can filter features based on user access for (const serverItem of mcpServersResponse) { - const application = applicationCache[serverItem.application] + const application = applicationCache[serverItem.spec.application] if (application) { serverList.push({ - server: serverItem, + server: { ...serverItem.features, ...serverItem.spec }, application }) } } // now check tools/resources/prompts access per server based on team membership - response.data.servers = filterAccessibleMCPServerFeatures(app, serverList, request.team, request.teamMembership) - - reply.send(response.data) + // response.data.servers = filterAccessibleMCPServerFeatures(app, serverList, request.team, request.teamMembership) + const finalServers = filterAccessibleMCPServerFeatures(app, serverList, request.team, request.teamMembership) + const finalServersView = finalServers.map(s => { + return { + // ownership info + team: s.team, + application: s.application, + instance: s.instance, + instanceType: s.instanceType, + instanceName: s.instanceName, + // mcp info + mcpServer: s.mcpServer, // the MCP Servers ID - for keying the UI list and for the client to send back in the chat context to select which MCP server to use for a given query + title: s.title || s.mcpServerName || s.mcpServer, + description: s.description, + mcpServerName: s.mcpServerName, + version: s.version, + tools: s.tools, + resources: s.resources, + resourceTemplates: s.resourceTemplates, + prompts: s.prompts + } + }) + reply.send({ servers: finalServersView, incompatibleServers, transactionId }) } catch (error) { reply.code(error.response?.status || 500).send({ code: error.response?.data?.code || 'unexpected_error', error: error.response?.data?.error || error.message }) } @@ -490,6 +566,7 @@ module.exports = async function (app) { * @property {string} instanceType * @property {string} instanceName * @property {string} instanceUrl + * @property {string} mcpServer * @property {string} mcpServerName * @property {string} mcpEndpoint * @property {string} mcpProtocol diff --git a/forge/routes/auth/index.js b/forge/routes/auth/index.js index 7b9f4175ad..8a252dc103 100644 --- a/forge/routes/auth/index.js +++ b/forge/routes/auth/index.js @@ -143,8 +143,17 @@ async function init (app, opts) { } } if (accessToken.scope?.includes('ff-expert:mcp')) { + const isDeviceScope = accessToken.scope?.includes('device') + const isInstanceScope = accessToken.scope?.includes('instance') + if (!isDeviceScope && !isInstanceScope) { + reply.code(401).send({ code: 'unauthorized', error: 'unauthorized' }) + return + } // must be a http token for expert MCP access - if (accessToken.ownerType !== 'http') { + if (isInstanceScope && accessToken.ownerType !== 'http') { + reply.code(401).send({ code: 'unauthorized', error: 'unauthorized' }) + return + } else if (isDeviceScope && accessToken.ownerType !== 'http:device') { reply.code(401).send({ code: 'unauthorized', error: 'unauthorized' }) return } diff --git a/frontend/src/components/expert/components/CapabilitiesSelector.vue b/frontend/src/components/expert/components/CapabilitiesSelector.vue index 1934f25358..d69e4a7af5 100644 --- a/frontend/src/components/expert/components/CapabilitiesSelector.vue +++ b/frontend/src/components/expert/components/CapabilitiesSelector.vue @@ -7,7 +7,7 @@ return-model multiple label-key="name" - :value-key="['instance', 'mcpServerUrl']" + :value-key="['mcpServer']" placeholder="Resources" open-above :options-min-width="280" @@ -17,11 +17,11 @@
  • diff --git a/test/unit/forge/comms/devices_spec.js b/test/unit/forge/comms/devices_spec.js index df915412d7..ad876a5889 100644 --- a/test/unit/forge/comms/devices_spec.js +++ b/test/unit/forge/comms/devices_spec.js @@ -1,6 +1,8 @@ const sleep = require('util').promisify(setTimeout) const should = require('should') // eslint-disable-line +const sinon = require('sinon') + const setup = require('../routes/setup') const FF_UTIL = require('flowforge-test-utils') @@ -8,6 +10,7 @@ const { DeviceCommsHandler } = FF_UTIL.require('forge/comms/devices') describe('DeviceCommsHandler', function () { let app + let eeApp const TestObjects = {} async function setupCE () { @@ -59,6 +62,14 @@ describe('DeviceCommsHandler', function () { after(async function () { await app.close() + + // NOTE: eeApp is intentionally closed here - it shares process-global DB state with + // the CE app, and closing it mid-file tears down the connection the remaining + // describes rely on. It is closed once all describes in this file have run. in th top + // level `after` handler + if (eeApp) { + await eeApp.close() + } }) /** * Get a mocked websocket/socket object. They are 99% the same for the purposes @@ -218,6 +229,296 @@ describe('DeviceCommsHandler', function () { }) }) + describe('request/device/expert/insight', function () { + // This handler bridges FF Expert (MQTT inflight) requests to an MCP server running on a + // remote *device*. It runs a series of ownership/permission checks before forwarding the + // call to the device agent via sendCommandAwaitReply. + // The MCPRegistration model and app.expert decorator are EE-only, so this block stands up + // a dedicated enterprise-licensed app rather than reusing the CE app from setupCE(). + const license = 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJGbG93Rm9yZ2UgSW5jLiIsInN1YiI6IkZsb3dGb3JnZSBJbmMuIERldmVsb3BtZW50IiwibmJmIjoxNjYyNDIyNDAwLCJleHAiOjc5ODY5MDIzOTksIm5vdGUiOiJEZXZlbG9wbWVudC1tb2RlIE9ubHkuIE5vdCBmb3IgcHJvZHVjdGlvbiIsInVzZXJzIjoxNTAsInRlYW1zIjo1MCwicHJvamVjdHMiOjUwLCJkZXZpY2VzIjo1MCwiZGV2Ijp0cnVlLCJpYXQiOjE2NjI0ODI5ODd9.e8Jeppq4aURwWYz-rEpnXs9RY2Y7HF7LJ6rMtMZWdw2Xls6-iyaiKV1TyzQw5sUBAhdUSZxgtiFH5e_cNJgrUg' + + /** Enterprise-licensed app + test rows for this block */ + const EE = {} + let client + let commsHandler + + // A read-only, non-destructive tool - accessible to any team role (incl. our owner alice) + const READONLY_TOOL = { name: 'my_tool', annotations: { readOnlyHint: true, destructiveHint: false } } + + // Build a valid mcpServer descriptor (as sent by the Expert Agent), allowing per-test overrides + function baseMcpServer (overrides = {}) { + return { + team: EE.ATeam.hashid, + application: EE.application.hashid, + instance: EE.device.hashid, + instanceType: 'device', + mcpServer: EE.registration.hashid, + mcpEndpoint: '/mcp', + headers: {}, + ...overrides + } + } + + // Emit the inflight request and resolve once the handler calls onSuccess/onError + function invokeInsight ({ userId, command, mcpServer, mcpDefinitionKind, mcpDefinition, data }) { + return new Promise((resolve) => { + const onSuccess = (result) => resolve({ ok: true, result }) + const onError = (message, code, err) => resolve({ ok: false, message, code, err }) + client.emit('request/device/expert/insight', userId, command, mcpServer, mcpDefinitionKind, mcpDefinition, data, onSuccess, onError) + }) + } + + before(async function () { + eeApp = await setup({ + license, + expert: { + enabled: true, + insights: { enabled: true }, + service: { url: 'http://localhost:9999', token: 'test-token', requestTimeout: 1000 } + } + }) + EE.alice = await eeApp.db.models.User.byUsername('alice') // team owner (created in setup) + EE.ATeam = eeApp.team + EE.application = eeApp.application + // chris is a valid user but NOT a member of ATeam (so has no MCP access) + EE.chris = await eeApp.db.models.User.create({ username: 'chris', name: 'Chris Kenobi', email: 'chris@example.com', email_verified: true, password: 'ccPassword' }) + // an application-owned device in ATeam + EE.device = await eeApp.factory.createDevice({ name: 'expert-device', ownerType: 'application' }, EE.ATeam, null, EE.application) + // a trusted MCP registration for that device + EE.registration = await eeApp.db.models.MCPRegistration.create({ + name: 'device-mcp', + protocol: 'http', + targetType: 'device', + targetId: '' + EE.device.id, + nodeId: 'mcp:node:device', + endpointRoute: '/mcp', + TeamId: EE.ATeam.id + }) + }) + + // after(async function () {}) + // NOTE: eeApp is intentionally not closed here - it shares process-global DB state with + // the outer CE app, and closing it mid-file tears down the connection the remaining + // describes rely on. It is closed once all describes in this file have run. in th top + // level `after` handler + + beforeEach(function () { + client = mockSocket() + commsHandler = DeviceCommsHandler(eeApp, client) + // The real round-trip to the device agent is exercised elsewhere; here we assert on what + // the handler decides to forward, and drive success/failure of the device call. + sinon.stub(commsHandler, 'sendCommandAwaitReply').resolves({ ok: 'device-result' }) + }) + + afterEach(function () { + commsHandler.stopLogWatcher() + sinon.restore() + }) + + it('returns MCP_INVALID_REQUEST when the request is not for a device', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ instanceType: 'instance' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_REQUEST') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_INVALID_REQUEST when a required field is missing', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ team: null }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_REQUEST') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_INVALID_INSTANCE when the device does not exist', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ instance: 999999 }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_INSTANCE') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_NO_REGISTRATION when no MCP registration is found', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ mcpServer: 999999 }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_REGISTRATION') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_INVALID_TEAM_APPLICATION_INSTANCE when the team does not match', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ team: 'wrongTeamHashid' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_TEAM_APPLICATION_INSTANCE') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_INVALID_TEAM_APPLICATION_INSTANCE when the application does not match', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ application: 'wrongAppHashid' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_TEAM_APPLICATION_INSTANCE') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_INVALID_USER when the user cannot be resolved', async function () { + const res = await invokeInsight({ + userId: 999999, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_USER') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_NO_ACCESS when the user is not a member of the team', async function () { + const res = await invokeInsight({ + userId: EE.chris.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_ACCESS') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('returns MCP_NO_ACCESS_TOOL when the requested tool is not in the accessible feature set', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'a_different_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_ACCESS_TOOL') + commsHandler.sendCommandAwaitReply.called.should.be.false() + }) + + it('forwards a tool call to the device and resolves with the result', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: { foo: 'bar' } } + }) + res.ok.should.be.true() + res.result.should.deepEqual({ ok: 'device-result' }) + + commsHandler.sendCommandAwaitReply.calledOnce.should.be.true() + const args = commsHandler.sendCommandAwaitReply.firstCall.args + args[0].should.equal(EE.ATeam.hashid) // teamId + args[1].should.equal(EE.device.hashid) // deviceId + args[2].should.equal('mcp:call-tool') // command + const commandData = args[3] + commandData.should.have.property('kind', 'mcp_tool') + commandData.should.have.property('name', 'my_tool') + commandData.should.have.property('input').which.deepEqual({ foo: 'bar' }) + commandData.should.have.property('endpoint').which.is.an.Object() + commandData.endpoint.should.have.property('mcpEndpoint', '/mcp') + args[4].should.have.property('timeout', 30000) + }) + + it('forwards a resource read to the device', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:read-resource', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_resource', + mcpDefinition: { uri: 'file:///data.txt' }, + data: { uri: 'file:///data.txt' } + }) + res.ok.should.be.true() + res.result.should.deepEqual({ ok: 'device-result' }) + + commsHandler.sendCommandAwaitReply.calledOnce.should.be.true() + const commandData = commsHandler.sendCommandAwaitReply.firstCall.args[3] + commandData.should.have.property('kind', 'mcp_resource') + commandData.should.have.property('uri', 'file:///data.txt') + }) + + it('resolves a resource template URI from the template + input before forwarding', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:read-resource', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_resource_template', + mcpDefinition: { uriTemplate: 'file:///{name}.txt' }, + // no resolved uri provided - the handler must compute it from the template + input + data: { uriTemplate: 'file:///{name}.txt', input: { name: 'report' } } + }) + res.ok.should.be.true() + + commsHandler.sendCommandAwaitReply.calledOnce.should.be.true() + const commandData = commsHandler.sendCommandAwaitReply.firstCall.args[3] + commandData.should.have.property('kind', 'mcp_resource_template') + commandData.should.have.property('uri', 'file:///report.txt') + }) + + it('returns MCP_INSIGHT_REQUEST_ERROR when the device command fails', async function () { + commsHandler.sendCommandAwaitReply.rejects(new Error('device unreachable')) + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INSIGHT_REQUEST_ERROR') + res.message.should.match(/device unreachable/) + }) + }) + describe('sendCommandAwaitReply', async function () { let commsHandler let client diff --git a/test/unit/forge/comms/instances_spec.js b/test/unit/forge/comms/instances_spec.js new file mode 100644 index 0000000000..8b469e8310 --- /dev/null +++ b/test/unit/forge/comms/instances_spec.js @@ -0,0 +1,324 @@ +const should = require('should') // eslint-disable-line +const sinon = require('sinon') + +const setup = require('../routes/setup') + +const FF_UTIL = require('flowforge-test-utils') +const { InstanceCommsHandler } = FF_UTIL.require('forge/comms/instances') + +describe('InstanceCommsHandler', function () { + /** + * Get a mocked websocket/socket object. They are 99% the same for the purposes + * of our tests - only different being one uses 'publish' and one uses 'send' + */ + function mockSocket () { + let received = [] + const handlers = {} + return { + platformId: 'test-platform-id', + publish: (topic, payload, opts, callback) => { + received.push({ topic, payload }) + if (callback) { + setImmediate(() => callback()) + } + }, + send: (data) => { + received.push(data) + }, + on: (event, callback) => { + handlers[event] = callback + }, + emit: function () { + const evt = arguments[0] + const args = Array.prototype.slice.call(arguments, 1) + handlers[evt].apply(null, args) + }, + received: () => received, + clearReceived: () => { received = [] } + } + } + + describe('request/instance/expert/insight', function () { + // This handler bridges FF Expert (MQTT inflight) requests to an MCP server running on a + // hosted *instance*. It runs a series of ownership/permission checks before forwarding the + // call to the instance via the container driver (app.containers.callMCPTool/readMCPResource). + // The MCPRegistration model and app.expert decorator are EE-only, so this block stands up + // an enterprise-licensed app. + const license = 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJGbG93Rm9yZ2UgSW5jLiIsInN1YiI6IkZsb3dGb3JnZSBJbmMuIERldmVsb3BtZW50IiwibmJmIjoxNjYyNDIyNDAwLCJleHAiOjc5ODY5MDIzOTksIm5vdGUiOiJEZXZlbG9wbWVudC1tb2RlIE9ubHkuIE5vdCBmb3IgcHJvZHVjdGlvbiIsInVzZXJzIjoxNTAsInRlYW1zIjo1MCwicHJvamVjdHMiOjUwLCJkZXZpY2VzIjo1MCwiZGV2Ijp0cnVlLCJpYXQiOjE2NjI0ODI5ODd9.e8Jeppq4aURwWYz-rEpnXs9RY2Y7HF7LJ6rMtMZWdw2Xls6-iyaiKV1TyzQw5sUBAhdUSZxgtiFH5e_cNJgrUg' + + /** Enterprise-licensed app + test rows for this block */ + const EE = {} + let eeApp + let client + + // A read-only, non-destructive tool - accessible to any team role (incl. our owner alice) + const READONLY_TOOL = { name: 'my_tool', annotations: { readOnlyHint: true, destructiveHint: false } } + + // Build a valid mcpServer descriptor (as sent by the Expert Agent), allowing per-test overrides + function baseMcpServer (overrides = {}) { + return { + team: EE.ATeam.hashid, + application: EE.application.hashid, + instance: EE.instance.id, // hosted instances are identified by their (uuid) id + instanceType: 'instance', + mcpServer: EE.registration.hashid, + mcpEndpoint: '/mcp', + headers: {}, + ...overrides + } + } + + // Emit the inflight request and resolve once the handler calls onSuccess/onError + function invokeInsight ({ userId, command, mcpServer, mcpDefinitionKind, mcpDefinition, data }) { + return new Promise((resolve) => { + const onSuccess = (result) => resolve({ ok: true, result }) + const onError = (message, code, err) => resolve({ ok: false, message, code, err }) + client.emit('request/instance/expert/insight', userId, command, mcpServer, mcpDefinitionKind, mcpDefinition, data, onSuccess, onError) + }) + } + + before(async function () { + eeApp = await setup({ + license, + expert: { + enabled: true, + insights: { enabled: true }, + service: { url: 'http://localhost:9999', token: 'test-token', requestTimeout: 1000 } + } + }) + EE.alice = await eeApp.db.models.User.byUsername('alice') // team owner (created in setup) + EE.ATeam = eeApp.team + EE.application = eeApp.application + EE.instance = eeApp.project // an instance owned by EE.application (created in setup) + // chris is a valid user but NOT a member of ATeam (so has no MCP access) + EE.chris = await eeApp.db.models.User.create({ username: 'chris', name: 'Chris Kenobi', email: 'chris@example.com', email_verified: true, password: 'ccPassword' }) + // a trusted MCP registration for that instance + EE.registration = await eeApp.db.models.MCPRegistration.create({ + name: 'instance-mcp', + protocol: 'http', + targetType: 'instance', + targetId: '' + EE.instance.id, + nodeId: 'mcp:node:instance', + endpointRoute: '/mcp', + TeamId: EE.ATeam.id + }) + }) + + after(async function () { + await eeApp.close() + }) + + beforeEach(function () { + client = mockSocket() + // Constructing the handler registers the 'request/instance/expert/insight' listener on the client + InstanceCommsHandler(eeApp, client) + // The real call to the instance/container driver is exercised elsewhere; here we assert on + // what the handler decides to forward, and drive success/failure of the instance call. + sinon.stub(eeApp.containers, 'callMCPTool').resolves({ ok: 'tool-result' }) + sinon.stub(eeApp.containers, 'readMCPResource').resolves({ ok: 'resource-result' }) + }) + + afterEach(function () { + sinon.restore() + }) + + it('returns MCP_INVALID_REQUEST when the request is not for an instance', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ instanceType: 'device' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_REQUEST') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_INVALID_REQUEST when a required field is missing', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ team: null }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_REQUEST') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_INVALID_INSTANCE when the instance does not exist', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ instance: '00000000-0000-0000-0000-000000000000' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_INSTANCE') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_NO_REGISTRATION when no MCP registration is found', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ mcpServer: 999999 }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_REGISTRATION') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_INVALID_TEAM_APPLICATION_INSTANCE when the team does not match', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ team: 'wrongTeamHashid' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_TEAM_APPLICATION_INSTANCE') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_INVALID_TEAM_APPLICATION_INSTANCE when the application does not match', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer({ application: 'wrongAppHashid' }), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_TEAM_APPLICATION_INSTANCE') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_INVALID_USER when the user cannot be resolved', async function () { + const res = await invokeInsight({ + userId: 999999, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INVALID_USER') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_NO_ACCESS when the user is not a member of the team', async function () { + const res = await invokeInsight({ + userId: EE.chris.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_ACCESS') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('returns MCP_NO_ACCESS_TOOL when the requested tool is not in the accessible feature set', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'a_different_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_NO_ACCESS_TOOL') + eeApp.containers.callMCPTool.called.should.be.false() + }) + + it('forwards a tool call to the instance and resolves with the result', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: { foo: 'bar' } } + }) + res.ok.should.be.true() + res.result.should.deepEqual({ ok: 'tool-result' }) + + eeApp.containers.callMCPTool.calledOnce.should.be.true() + eeApp.containers.readMCPResource.called.should.be.false() + const args = eeApp.containers.callMCPTool.firstCall.args + args[0].should.have.property('id', EE.instance.id) // the resolved instance + args[1].should.have.property('mcpEndpoint', '/mcp') // endpoint + args[2].should.equal('my_tool') // tool name + args[3].should.deepEqual({ foo: 'bar' }) // tool input + }) + + it('forwards a resource read to the instance', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:read-resource', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_resource', + mcpDefinition: { uri: 'file:///data.txt' }, + data: { uri: 'file:///data.txt' } + }) + res.ok.should.be.true() + res.result.should.deepEqual({ ok: 'resource-result' }) + + eeApp.containers.readMCPResource.calledOnce.should.be.true() + eeApp.containers.callMCPTool.called.should.be.false() + const args = eeApp.containers.readMCPResource.firstCall.args + args[0].should.have.property('id', EE.instance.id) + args[1].should.have.property('mcpEndpoint', '/mcp') + args[2].should.equal('file:///data.txt') // resource uri + }) + + it('resolves a resource template URI from the template + input before forwarding', async function () { + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:read-resource', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_resource_template', + mcpDefinition: { uriTemplate: 'file:///{name}.txt' }, + // no resolved uri provided - the handler must compute it from the template + input + data: { uriTemplate: 'file:///{name}.txt', input: { name: 'report' } } + }) + res.ok.should.be.true() + + eeApp.containers.readMCPResource.calledOnce.should.be.true() + const args = eeApp.containers.readMCPResource.firstCall.args + args[2].should.equal('file:///report.txt') // computed uri + }) + + it('returns MCP_INSIGHT_REQUEST_ERROR when the instance call fails', async function () { + eeApp.containers.callMCPTool.rejects(new Error('instance unreachable')) + const res = await invokeInsight({ + userId: EE.alice.hashid, + command: 'mcp:call-tool', + mcpServer: baseMcpServer(), + mcpDefinitionKind: 'mcp_tool', + mcpDefinition: READONLY_TOOL, + data: { name: 'my_tool', input: {} } + }) + res.ok.should.be.false() + res.code.should.equal('MCP_INSIGHT_REQUEST_ERROR') + res.message.should.match(/instance unreachable/) + }) + }) +}) diff --git a/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js b/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js index 36692cc4ce..3a0f93ebdf 100644 --- a/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js +++ b/test/unit/forge/ee/lib/expert/tasks/emqx-bridge/setup_spec.js @@ -10,7 +10,7 @@ const { getConfig, getList, makeClient, del, post } = bridge._internal -const { connector, actionOut, sourceChat, sourceInflight, ruleIn, ruleOut } = templates +const { connector, actionOut, sourceChat, sourceInflight, sourcePlatform, ruleIn, ruleOut } = templates // #region helpers @@ -62,7 +62,8 @@ function fullDiscovery (client) { status: 200, data: [ { name: sourceChat.name, type: 'mqtt', connector: connector.name }, - { name: sourceInflight.name, type: 'mqtt', connector: connector.name } + { name: sourceInflight.name, type: 'mqtt', connector: connector.name }, + { name: sourcePlatform.name, type: 'mqtt', connector: connector.name } ] }) client.get.withArgs('/actions').resolves({ @@ -97,7 +98,8 @@ function passingValidate (client, { licenceId = 'test-licence-id', server = 'exp status: 200, data: [ { name: sourceChat.name, type: 'mqtt' }, - { name: sourceInflight.name, type: 'mqtt' } + { name: sourceInflight.name, type: 'mqtt' }, + { name: sourcePlatform.name, type: 'mqtt' } ] }) client.get.withArgs('/rules?limit=100').resolves({ @@ -401,13 +403,14 @@ describe('EMQX bridge-setup', function () { await addBridge(app, { cfg, client }) - client.post.callCount.should.equal(6) // 1 connector + 1 action + 2 sources + 2 rules + client.post.callCount.should.equal(7) // 1 connector + 1 action + 3 sources + 2 rules client.post.getCall(0).args[0].should.equal('/connectors') client.post.getCall(1).args[0].should.equal('/actions') client.post.getCall(2).args[0].should.equal('/sources') client.post.getCall(3).args[0].should.equal('/sources') - client.post.getCall(4).args[0].should.equal('/rules') + client.post.getCall(4).args[0].should.equal('/sources') client.post.getCall(5).args[0].should.equal('/rules') + client.post.getCall(6).args[0].should.equal('/rules') const connectorPayload = client.post.getCall(0).args[1] connectorPayload.server.should.equal('expert.test:1884') @@ -458,15 +461,16 @@ describe('EMQX bridge-setup', function () { await removeBridge(app, { cfg, client }) - // 2 rules + 2 sources + 1 action + 1 connector = 6 deletes - client.delete.callCount.should.equal(6) + // 2 rules + 3 sources + 1 action + 1 connector = 7 deletes + client.delete.callCount.should.equal(7) const paths = client.delete.getCalls().map(c => c.args[0]) paths[0].should.match(/^\/rules\//) paths[1].should.match(/^\/rules\//) paths[2].should.match(/^\/sources\//) paths[3].should.match(/^\/sources\//) - paths[4].should.match(/^\/actions\//) - paths[5].should.equal(`/connectors/mqtt:${connector.name}`) + paths[4].should.match(/^\/sources\//) + paths[5].should.match(/^\/actions\//) + paths[6].should.equal(`/connectors/mqtt:${connector.name}`) }) it('tolerates 404s on individual deletes', async function () { @@ -553,9 +557,9 @@ describe('EMQX bridge-setup', function () { const result = await syncBridge(app, { force: true, client }) result.should.be.true() - // 1 connector delete + 6 creates (1 connector + 1 action + 2 sources + 2 rules) + // 1 connector delete + 7 creates (1 connector + 1 action + 3 sources + 2 rules) client.delete.callCount.should.equal(1) - client.post.callCount.should.equal(6) + client.post.callCount.should.equal(7) // didn't bother validating client.get.calledWith('/connectors').should.be.false() }) @@ -585,7 +589,7 @@ describe('EMQX bridge-setup', function () { const result = await syncBridge(app, { client }) result.should.be.true() client.delete.called.should.be.true() - client.post.callCount.should.equal(6) // 1 connector + 1 action + 2 sources + 2 rules + client.post.callCount.should.equal(7) // 1 connector + 1 action + 3 sources + 2 rules }) it('returns false when an underlying call throws. (force=true) (logs error)', async function () { diff --git a/test/unit/forge/ee/routes/expert/index_spec.js b/test/unit/forge/ee/routes/expert/index_spec.js index 7fa6516e2b..e93ffd1201 100644 --- a/test/unit/forge/ee/routes/expert/index_spec.js +++ b/test/unit/forge/ee/routes/expert/index_spec.js @@ -124,10 +124,12 @@ describe('Expert API', function () { }) describe('Chat Endpoint', function () { + /** The MCP registration created for the default instance in beforeEach */ + let defaultMcpRegistration beforeEach(async function () { // register an MCP server for the default instance so the /chat trusted-registry - // re-resolution recognises selectedCapabilities that reference it (mcpServerName 'mcp-server-1') - await createMcpRegistration(app, instance, { name: 'mcp-server-1', endpointRoute: '/mcp1' }) + // re-resolution recognises selectedCapabilities that reference it (mcpServer hashid) + defaultMcpRegistration = await createMcpRegistration(app, instance, { name: 'mcp-server-1', endpointRoute: '/mcp1' }) }) it('should return 401 for missing session', async function () { @@ -291,17 +293,18 @@ describe('Expert API', function () { // register a (trusted) MCP server for each instance so the /chat route's registry // re-resolution recognises the selectedCapabilities referencing them - await createMcpRegistration(app, instanceAlice2, { name: 'alice2' }) - await createMcpRegistration(app, instanceBob2, { name: 'bob2' }) - await createMcpRegistration(app, instanceChris2, { name: 'chris2' }) + const regAlice2 = await createMcpRegistration(app, instanceAlice2, { name: 'alice2' }) + const regBob2 = await createMcpRegistration(app, instanceBob2, { name: 'bob2' }) + const regChris2 = await createMcpRegistration(app, instanceChris2, { name: 'chris2' }) - const buildMcpServerFeaturesResponse = (name, applicationHashid, instance, instanceType) => ({ + const buildMcpServerFeaturesResponse = (name, applicationHashid, instance, instanceType, mcpServer) => ({ team: team.hashid, application: applicationHashid, instance: instanceType === 'instance' ? instance.id : instance.hashid, instanceType, instanceName: instance.name, mcpProtocol: 'http', + mcpServer, // the MCP registration hashid - re-resolved against the trusted registry mcpServerName: name, mcpServerUrl: `http://${name}/mcp`, prompts: [], @@ -364,9 +367,9 @@ describe('Expert API', function () { pageName: 'instance-editor-expert', scope: 'immersive', selectedCapabilities: [ - buildMcpServerFeaturesResponse('alice2', applicationAlice2.hashid, instanceAlice2, 'instance'), // an mcp server on alice2 instance - buildMcpServerFeaturesResponse('bob2', applicationBob2.hashid, instanceBob2, 'instance'), // an mcp server on bob2 instance - buildMcpServerFeaturesResponse('chris2', applicationChris2.hashid, instanceChris2, 'instance') // an mcp server on chris2 instance + buildMcpServerFeaturesResponse('alice2', applicationAlice2.hashid, instanceAlice2, 'instance', regAlice2.hashid), // an mcp server on alice2 instance + buildMcpServerFeaturesResponse('bob2', applicationBob2.hashid, instanceBob2, 'instance', regBob2.hashid), // an mcp server on bob2 instance + buildMcpServerFeaturesResponse('chris2', applicationChris2.hashid, instanceChris2, 'instance', regChris2.hashid) // an mcp server on chris2 instance ] } }) @@ -503,6 +506,7 @@ describe('Expert API', function () { instance: instance.id, instanceType: 'instance', instanceName: instance.name, + mcpServer: defaultMcpRegistration.hashid, mcpServerName: 'mcp-server-1', mcpServerUrl: 'http://instance-url/mcp1', mcpProtocol: 'http', @@ -563,6 +567,7 @@ describe('Expert API', function () { instance: instance.id, instanceType: 'instance', instanceName: instance.name, + mcpServer: defaultMcpRegistration.hashid, mcpServerName: 'mcp-server-1', mcpServerUrl: 'http://instance-url/mcp1', mcpProtocol: 'http', @@ -591,7 +596,7 @@ describe('Expert API', function () { capturedPostData.context.selectedCapabilities[0].mcpAccessToken.should.deepEqual({ token: null, scheme: '', scope: ['ff-expert:mcp', 'instance'] }) }) - it('should generate an access token for MCP server access when feature teamHttpSecurity is enabled', async function () { + it('should generate an access token for MCP server access for an instance when feature teamHttpSecurity is enabled', async function () { const token = bobToken await setFeatureForTeam(app, 'teamHttpSecurity', true) // Stub MCP registration to return 1 online instance @@ -630,6 +635,7 @@ describe('Expert API', function () { instance: instance.id, instanceType: 'instance', instanceName: instance.name, + mcpServer: defaultMcpRegistration.hashid, mcpServerName: 'mcp-server-1', mcpServerUrl: 'http://instance-url/mcp1', mcpProtocol: 'http', @@ -684,6 +690,111 @@ describe('Expert API', function () { }) }) + it('should generate an access token for MCP server access for a device when feature teamHttpSecurity is enabled', async function () { + const token = bobToken + await setFeatureForTeam(app, 'teamHttpSecurity', true) + + // register a (trusted) MCP server for the device so the /chat registry re-resolution recognises it + const deviceRegistration = await app.db.models.MCPRegistration.create({ + name: 'mcp-device-1', + protocol: 'http', + targetType: 'device', + targetId: '' + device.id, + nodeId: 'mcp:node:device-1', + endpointRoute: '/mcp', + TeamId: team.id + }) + + // a device's httpNodeAuth lives in DeviceSettings under the 'security' key (not ProjectSettings 'settings') + sinon.stub(app.db.models.DeviceSettings, 'findOne').callsFake(async (options) => { + if (options.where.DeviceId === device.id && options.where.key === 'security') { + return { value: { httpNodeAuth: { type: 'flowforge-user' } } } + } + return this.wrappedMethod.apply(this, arguments) + }) + + // fake the axios post response - capture post data and return resolved promise + let capturedPostData = null + sinon.stub(axios, 'post').callsFake((url, data) => { + capturedPostData = data + return Promise.resolve({ + data: { + transactionId: 'abc', + context: { } + } + }) + }) + const response = await app.inject({ + method: 'POST', + url: '/api/v1/expert/chat', + cookies: { sid: token }, + headers: { 'x-chat-transaction-id': 'abc' }, + payload: { + context: { + teamId: team.hashid, + query: 'test', + selectedCapabilities: [ + { + team: team.hashid, + application: application.hashid, + instance: device.hashid, // device hashid - re-resolved against the trusted registry + instanceType: 'device', + instanceName: device.name, + mcpServer: deviceRegistration.hashid, + mcpServerName: 'mcp-device-1', + mcpServerUrl: 'http://device-url/mcp', + mcpProtocol: 'http', + prompts: [{}], + resources: [{}], + resourceTemplates: [{}], + tools: [{}], + title: 'the device title', + version: '1.0.0-beta', + description: 'the device description' + } + ] + } + } + }) + response.statusCode.should.equal(200) + + // read AccessToken from DB and check it is valid - a device token has ownerType 'http:device' + const tokens = await app.db.models.AccessToken.findAll({ where: { ownerType: 'http:device', ownerId: '' + device.id } }) + tokens.should.be.an.Array() + tokens.should.have.length(1) + const dbToken = /* get newest token */ tokens.reduce((a, b) => (a.createdAt > b.createdAt ? a : b)) + dbToken.should.have.property('scope').which.is.an.Array().and.have.length(2) + dbToken.scope.should.containEql('ff-expert:mcp') + dbToken.scope.should.containEql('device') + dbToken.should.have.property('ownerType', 'http:device') + dbToken.should.have.property('ownerId', '' + device.id) + dbToken.should.have.property('expiresAt').which.is.a.Date() + const fiveMinsFromNow = Date.now() + (5 * 60 * 1000) + dbToken.expiresAt.getTime().should.be.approximately(fiveMinsFromNow, 2000) // check expiry (with grace period) + + // get the cached token and check it matches DB token (cache is keyed by the device id) + const cachedToken = await app.expert.mcp.getCachedToken(device.id) + should.exist(cachedToken) + cachedToken.should.have.property('token').and.be.a.String() + cachedToken.should.have.property('scheme', 'Bearer') + cachedToken.should.have.property('scope').which.is.an.Array().and.have.length(2) + cachedToken.scope.should.containEql('ff-expert:mcp') + cachedToken.scope.should.containEql('device') + + // db token should be a hash of the cached token + const hash = sha256(cachedToken.token) + hash.should.equal(dbToken.token) + + // Now assert the axios post payload (captured async) + capturedPostData.should.be.an.Object() + capturedPostData.context.selectedCapabilities[0].should.have.property('mcpAccessToken').and.be.an.Object() + capturedPostData.context.selectedCapabilities[0].mcpAccessToken.should.deepEqual({ + token: cachedToken.token, + scheme: 'Bearer', + scope: ['ff-expert:mcp', 'device'] + }) + }) + it('should NOT attach a cached token when the claimed application does not own the selected instance', async function () { // A cached MCP token must not be attached unless the selected instance genuinely belongs to the claimed application const token = bobToken // bob is a team owner (so passes the claimed-application permission check) @@ -704,7 +815,7 @@ describe('Expert API', function () { // Register the victim instance's MCP server (under the attacker-chosen name) so the // capability survives the trusted-registry re-resolution and the test specifically // exercises the instance/application OWNERSHIP check rather than the registry-miss path. - await createMcpRegistration(app, victimInstance, { name: 'attacker-controlled' }) + const victimRegistration = await createMcpRegistration(app, victimInstance, { name: 'attacker-controlled' }) // The victim instance uses FlowFuse http auth, so a real Bearer token would be minted/cached sinon.stub(app.db.models.ProjectSettings, 'findOne').callsFake(async (options) => { @@ -744,6 +855,7 @@ describe('Expert API', function () { instance: victimInstance.id, // victim instance (belongs to a different application) instanceType: 'instance', instanceName: 'target-instance', + mcpServer: victimRegistration.hashid, // resolves in the trusted registry, but fails the ownership check mcpServerName: 'attacker-controlled', mcpServerUrl: 'https://attacker.example/mcp', mcpProtocol: 'http', @@ -769,7 +881,6 @@ describe('Expert API', function () { it('should overwrite client-supplied transport fields with trusted registry values', async function () { const token = bobToken // team owner - const warnSpy = sinon.spy(app.log, 'warn') let capturedPostData = null sinon.stub(axios, 'post').callsFake((url, data) => { capturedPostData = data @@ -792,7 +903,8 @@ describe('Expert API', function () { instance: instance.id, instanceType: 'instance', instanceName: 'spoofed-name', - mcpServerName: 'mcp-server-1', // matches the registration created in beforeEach + mcpServer: defaultMcpRegistration.hashid, // matches the registration created in beforeEach + mcpServerName: 'mcp-server-1', mcpServerUrl: 'https://attacker.example/mcp', // client provided - must be dropped instanceUrl: 'https://attacker.example', // client provided - must be overwritten mcpEndpoint: '/evil', // client provided - must be overwritten @@ -821,14 +933,8 @@ describe('Expert API', function () { forwarded[0].should.have.property('instanceName', instance.name) forwarded[0].should.have.property('team', team.hashid) forwarded[0].should.have.property('application', application.hashid) - - // the mismatch between client-supplied and trusted transport fields should be logged (audit signal) - warnSpy.called.should.be.true() - const warnMessage = warnSpy.getCalls().map(c => c.args[0]).find(m => typeof m === 'string' && m.includes('transport fields')) - should.exist(warnMessage) - warnMessage.should.match(/instanceUrl/) - warnMessage.should.match(/mcpEndpoint/) - warnMessage.should.match(/mcpProtocol/) + // mcpServer is re-resolved to (and forwarded as) the trusted registration hashid + forwarded[0].should.have.property('mcpServer', defaultMcpRegistration.hashid) }) it('should clear cached MCP server access token when project setting httpNodeAuth is changed', async function () { @@ -864,12 +970,16 @@ describe('Expert API', function () { }) describe('MCP features Endpoint', function () { - let mockMcpRegistration1, mockMcpResponseServer1 + let mockMcpRegistration1 beforeEach(async function () { await setFeatureForTeam(app, 'teamHttpSecurity', true) + // The MCP features endpoint now gates on the instance launcher version - the default + // instance must advertise a launcher new enough to support MCP features. + instance.versions = { launcher: { current: '2.31.4' } } // create an common reusable MCP registration mockMcpRegistration1 = { id: 1, + hashid: 'mcpreg00001', name: 'mcp-server-1', protocol: 'http', targetType: 'instance', @@ -883,26 +993,22 @@ describe('Expert API', function () { description: 'the description 1' } - mockMcpResponseServer1 = { - team: team.hashid, - application: application.hashid, - instance: instance.id, - instanceType: 'instance', - instanceName: instance.name, - mcpServerName: 'mcp-server-1', - mcpServerUrl: 'http://instance-url/mcp1', - prompts: [{}], - resources: [{}], - resourceTemplates: [{}], - tools: [{}], - mcpProtocol: 'http', - title: 'the title 1', - version: '1.0.0-beta', - description: 'the description 1', - notInSchema: 'should not cause error or be included in response due to swagger schema' - } }) + // Stub app.containers.getMCPFeatures to behave like the launcher admin API: echo each + // requested server spec back (the spec carries ownership/transport details + the minted + // mcpAccessToken) and attach the supplied MCP feature set. Returns a handle whose + // `mcpServers` field captures the specs the route passed in (one entry per registration). + function stubGetMCPFeatures (features = { prompts: [{}], resources: [{}], resourceTemplates: [{}], tools: [{}] }) { + const captured = { mcpServers: null, calls: [] } + sinon.stub(app.containers, 'getMCPFeatures').callsFake(async (inst, mcpServers) => { + captured.mcpServers = mcpServers + captured.calls.push({ instance: inst, mcpServers }) + return mcpServers.map(spec => ({ spec, features: typeof features === 'function' ? features(spec) : features })) + }) + return captured + } + it('should return 401 for instance token', async function () { const token = instanceToken const response = await app.inject({ @@ -976,7 +1082,7 @@ describe('Expert API', function () { it('should return early with status 200 and empty servers array when there are no MCP registrations', async function () { const token = bobToken sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([]) - const post = sinon.stub(axios, 'post') // should not be called + const getFeatures = sinon.stub(app.containers, 'getMCPFeatures') // should not be called const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -986,8 +1092,8 @@ describe('Expert API', function () { }) response.statusCode.should.equal(200) const json = response.json() - json.should.deepEqual({ servers: [], transactionId: 'abc' }) - post.called.should.be.false() + json.should.deepEqual({ servers: [], incompatibleServers: [], transactionId: 'abc' }) + getFeatures.called.should.be.false() }) it('should get mcp features for team member', async function () { @@ -997,6 +1103,7 @@ describe('Expert API', function () { sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([ { id: 1, + hashid: 'mcpreg00001', name: 'mcp-server-1', protocol: 'http', targetType: 'instance', @@ -1050,36 +1157,34 @@ describe('Expert API', function () { title: 'the title 3', version: '3.0.0-beta', description: 'the description 3' + }, { + id: 4, // should be excluded due to being old launcher version + name: 'mcp-server-4', + protocol: 'http', + targetType: 'instance', + targetId: 'acbd-1234', + nodeId: 'mcp:node:1', + endpointRoute: '/mcp2', + TeamId: team.id, + Project: { + id: 'dddd4444', + name: 'old-instance', + ApplicationId: application.id, + state: 'running', + liveState: () => ({ meta: { state: 'running' } }), + getSetting: sinon.stub().resolves({}), // no special settings + versions: { launcher: { current: '2.0.0' } } // old launcher version + }, + title: 'the title 4', + version: '4.0.0-beta', + description: 'the description 4' } ]) // fake online status by stubbing liveState sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) - sinon.stub(axios, 'post').resolves({ - data: { - transactionId: 'abc', - servers: [ - { - team: team.hashid, - application: application.hashid, - instance: instance.id, - instanceType: 'instance', - instanceName: instance.name, - mcpServerName: 'mcp-server-1', - mcpServerUrl: 'http://instance-url/mcp1', - prompts: [{}], - resources: [{}], - resourceTemplates: [{}], - tools: [{}], - mcpProtocol: 'http', - title: 'the title 1', - version: '1.0.0-beta', - description: 'the description 1', - notInSchema: 'should not cause error or be included in response due to swagger schema' - } - ] - } - }) + // the launcher (via app.containers.getMCPFeatures) returns the MCP features per instance + const captured = stubGetMCPFeatures() const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1089,22 +1194,22 @@ describe('Expert API', function () { }) response.statusCode.should.equal(200) - // check that the post data to expert service was correct - const axiosPost = axios.post.getCall(0).args[1] - axiosPost.should.have.property('teamId', team.hashid) - axiosPost.should.have.property('servers').which.is.an.Array().and.has.length(1) - // since only 1 instance was correct and online, get index 0 and check its properties - const reg = axiosPost.servers[0] - reg.should.only.have.keys('team', 'application', 'instance', 'instanceType', 'instanceName', 'instanceUrl', 'mcpAccessToken', 'mcpServerName', 'mcpEndpoint', 'mcpProtocol', 'title', 'version', 'description') + // only the single online, correct-team instance should have been queried + app.containers.getMCPFeatures.calledOnce.should.be.true() + // check the per-server spec passed to the launcher was correct + captured.mcpServers.should.be.an.Array().and.has.length(1) + const reg = captured.mcpServers[0] reg.should.have.property('team', team.hashid) reg.should.have.property('application', application.hashid) reg.should.have.property('instance', instance.id) reg.should.have.property('instanceType', 'instance') reg.should.have.property('instanceName', instance.name) reg.should.have.property('instanceUrl', instance.url) + reg.should.have.property('mcpServer', 'mcpreg00001') reg.should.have.property('mcpServerName', 'mcp-server-1') reg.should.have.property('mcpEndpoint', '/mcp1') reg.should.have.property('mcpProtocol', 'http') + reg.should.have.property('mcpAccessToken') reg.should.have.property('title', 'the title 1') reg.should.have.property('version', '1.0.0-beta') reg.should.have.property('description', 'the description 1') @@ -1113,24 +1218,29 @@ describe('Expert API', function () { const result = response.json() result.should.have.property('transactionId', 'abc') result.should.have.property('servers').which.is.an.Array().and.has.length(1) - result.servers[0].should.only.have.keys('team', 'application', 'instance', 'instanceType', 'instanceName', 'mcpServerName', 'prompts', 'resources', 'resourceTemplates', 'tools', 'mcpProtocol', 'mcpServerUrl', 'title', 'version', 'description') + result.servers[0].should.only.have.keys('team', 'application', 'instance', 'instanceType', 'instanceName', 'mcpServer', 'mcpServerName', 'prompts', 'resources', 'resourceTemplates', 'tools', 'title', 'version', 'description') result.servers[0].should.have.property('team', team.hashid) result.servers[0].should.have.property('application', application.hashid) result.servers[0].should.have.property('instance', instance.id) result.servers[0].should.have.property('instanceType', 'instance') result.servers[0].should.have.property('instanceName', instance.name) + result.servers[0].should.have.property('mcpServer', 'mcpreg00001') result.servers[0].should.have.property('mcpServerName', 'mcp-server-1') result.servers[0].should.have.property('title', 'the title 1') result.servers[0].should.have.property('version', '1.0.0-beta') result.servers[0].should.have.property('description', 'the description 1') + // should not contain the transport fields (instanceUrl, mcpEndpoint, mcpProtocol) since those are not needed by the expert backend + result.should.not.have.property('instanceUrl') + result.should.not.have.property('mcpEndpoint') + result.should.not.have.property('mcpProtocol') }) - it('should return 500 if transactionId mismatches', async function () { + it('should report instances whose launcher version is too old as incompatible', async function () { const token = bobToken - sinon.stub(axios, 'post').resolves({ data: { transactionId: 'wrong' } }) - // fake 1 registration to avoid early return + // a single registration whose instance launcher is older than the minimum supported version sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([{ id: 1, + hashid: 'mcpreg00001', name: 'mcp-server-1', protocol: 'http', targetType: 'instance', @@ -1140,16 +1250,30 @@ describe('Expert API', function () { TeamId: team.id, Project: instance }]) - sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) + // launcher version below MIN_HOSTED_INSTANCE_LAUNCHER_VERSION (2.31.4) + instance.versions = { launcher: { current: '2.0.0' } } + const liveState = sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) + const getFeatures = sinon.stub(app.containers, 'getMCPFeatures') const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', cookies: { sid: token }, - headers: { 'x-chat-transaction-id': 'right' }, + headers: { 'x-chat-transaction-id': 'abc' }, payload: { context: { teamId: team.hashid } } }) - response.statusCode.should.equal(500) + response.statusCode.should.equal(200) + + const result = response.json() + result.should.have.property('servers').which.is.an.Array().and.has.length(0) + result.should.have.property('incompatibleServers').which.is.an.Array().and.has.length(1) + result.incompatibleServers[0].should.have.property('instance', instance.id) + result.incompatibleServers[0].should.have.property('instanceType', 'instance') + result.incompatibleServers[0].should.have.property('currentVersion', '2.0.0') + result.incompatibleServers[0].should.have.property('minimumSupportedVersion', '2.31.4') + // an incompatible instance must never be queried for features (or have its live state checked past the version gate) + getFeatures.called.should.be.false() + liveState.called.should.be.false() }) it('should only get permitted mcp features when granular RBACs is enabled', async function () { @@ -1207,6 +1331,7 @@ describe('Expert API', function () { sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([ { id: 1, + hashid: 'mcpreg-alice', name: 'mcp-server-alice', protocol: 'http', targetType: 'instance', @@ -1219,6 +1344,7 @@ describe('Expert API', function () { name: 'alice', state: 'running', ApplicationId: applicationAlice2.id, + versions: { launcher: { current: '2.31.4' } }, liveState: () => ({ meta: { state: 'running' } }), getSetting: sinon.stub().resolves({}) // no special settings }, @@ -1227,7 +1353,8 @@ describe('Expert API', function () { description: 'Alices MCP Server' }, { - id: 2, // should be excluded since it is offline + id: 2, + hashid: 'mcpreg-bob', name: 'mcp-server-bob', protocol: 'http', targetType: 'instance', @@ -1240,6 +1367,7 @@ describe('Expert API', function () { name: 'bob', state: 'running', ApplicationId: applicationBob2.id, + versions: { launcher: { current: '2.31.4' } }, liveState: () => ({ meta: { state: 'running' } }), getSetting: sinon.stub().resolves({}) // no special settings }, @@ -1248,7 +1376,8 @@ describe('Expert API', function () { description: 'Bobs MCP Server' }, { - id: 3, // should be excluded since it is for other team + id: 3, + hashid: 'mcpreg-chris', name: 'mcp-server-chris', protocol: 'http', targetType: 'instance', @@ -1261,6 +1390,7 @@ describe('Expert API', function () { name: 'chris', state: 'running', ApplicationId: applicationChris2.id, + versions: { launcher: { current: '2.31.4' } }, liveState: () => ({ meta: { state: 'running' } }), getSetting: sinon.stub().resolves({}) // no special settings }, @@ -1270,76 +1400,51 @@ describe('Expert API', function () { } ]) - const buildMcpServerFeaturesResponse = (name, applicationHashid) => ({ - team: team.hashid, - application: applicationHashid, - instance: name, - instanceType: 'instance', - instanceName: name, - mcpServerName: name, - mcpServerUrl: `http://${name}/mcp`, - prompts: [], - resources: [], - resourceTemplates: [], - tools: [ - { - name: 'destructive_tool', - annotations: { - destructiveHint: true, - readOnlyHint: false, - openWorldHint: false, - idempotentHint: false - } - }, - { - name: 'write_tool', - annotations: { - destructiveHint: false, - readOnlyHint: false, - openWorldHint: false, - idempotentHint: false - } - }, - { - name: 'read_tool', - annotations: { - destructiveHint: false, - readOnlyHint: true, - openWorldHint: false, - idempotentHint: false - } - }, - { - name: 'openworld_tool', - description: 'An openworld tool', - type: 'tool', - annotations: { - destructiveHint: false, - readOnlyHint: false, - openWorldHint: true, - idempotentHint: false - } + // every instance's MCP server advertises the same four tools; the route applies + // per-application RBAC filtering to each based on the requesting user's permissions + const toolSet = [ + { + name: 'destructive_tool', + annotations: { + destructiveHint: true, + readOnlyHint: false, + openWorldHint: false, + idempotentHint: false } - ], - mcpProtocol: 'http', - title: `${name} MCP Server`, - version: '1.0.0-beta', - description: `${name} MCP Server` - }) - - // Stub axios to return servers - sinon.stub(axios, 'post').callsFake((url, data) => { - return Promise.resolve({ - data: { - transactionId: 'right', - servers: [ - { ...buildMcpServerFeaturesResponse('alice', applicationAlice2.hashid) }, - { ...buildMcpServerFeaturesResponse('bob', applicationBob2.hashid) }, - { ...buildMcpServerFeaturesResponse('chris', applicationChris2.hashid) } - ] + }, + { + name: 'write_tool', + annotations: { + destructiveHint: false, + readOnlyHint: false, + openWorldHint: false, + idempotentHint: false } - }) - }) + }, + { + name: 'read_tool', + annotations: { + destructiveHint: false, + readOnlyHint: true, + openWorldHint: false, + idempotentHint: false + } + }, + { + name: 'openworld_tool', + description: 'An openworld tool', + type: 'tool', + annotations: { + destructiveHint: false, + readOnlyHint: false, + openWorldHint: true, + idempotentHint: false + } + } + ] + + // Stub the launcher feature fetch to return the toolset for every requested server + stubGetMCPFeatures({ prompts: [], resources: [], resourceTemplates: [], tools: toolSet }) // Helper function to check that the returned tools match expected tool names const checkTools = (serverResult, expectedToolNames) => { @@ -1409,17 +1514,8 @@ describe('Expert API', function () { sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) sinon.stub(instance, 'getSetting').resolves({ httpNodeAuth: { type: 'flowforge-user' } }) - // fake the axios post response - capture post data and return resolved promise - let capturedPostData = null - sinon.stub(axios, 'post').callsFake((url, data) => { - capturedPostData = data - return Promise.resolve({ - data: { - transactionId: 'abc', - servers: [mockMcpResponseServer1] - } - }) - }) + // capture the per-server specs the route hands to the launcher feature fetch + const captured = stubGetMCPFeatures() const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1434,10 +1530,10 @@ describe('Expert API', function () { tokens.should.be.an.Array() tokens.should.have.length(0) - // Now assert the axios post payload (captured async) - capturedPostData.should.be.an.Object() - capturedPostData.servers[0].should.have.property('mcpAccessToken').and.be.an.Object() - capturedPostData.servers[0].mcpAccessToken.should.deepEqual({ token: null, scheme: '', scope: ['ff-expert:mcp', 'instance'] }) + // Now assert the per-server spec passed to the launcher feature fetch + captured.mcpServers.should.be.an.Array().and.have.length(1) + captured.mcpServers[0].should.have.property('mcpAccessToken').and.be.an.Object() + captured.mcpServers[0].mcpAccessToken.should.deepEqual({ token: null, scheme: '', scope: ['ff-expert:mcp', 'instance'] }) }) it('should not generate an access token for MCP server when instance setting httpNodeAuth is not set', async function () { @@ -1447,17 +1543,8 @@ describe('Expert API', function () { sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([mockMcpRegistration1]) sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) sinon.stub(instance, 'getSetting').resolves({}) // no httpNodeAuth settings - // fake the axios post response - capture post data and return resolved promise - let capturedPostData = null - sinon.stub(axios, 'post').callsFake((url, data) => { - capturedPostData = data - return Promise.resolve({ - data: { - transactionId: 'abc', - servers: [mockMcpResponseServer1] - } - }) - }) + // capture the per-server specs the route hands to the launcher feature fetch + const captured = stubGetMCPFeatures() const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1471,10 +1558,10 @@ describe('Expert API', function () { const tokens = await app.db.models.AccessToken.findAll({ where: { ownerType: 'http', ownerId: instance.id } }) tokens.should.be.an.Array() tokens.should.have.length(0) - // Now assert the axios post payload (captured async) - capturedPostData.should.be.an.Object() - capturedPostData.servers[0].should.have.property('mcpAccessToken').and.be.an.Object() - capturedPostData.servers[0].mcpAccessToken.should.deepEqual({ token: null, scheme: '', scope: ['ff-expert:mcp', 'instance'] }) + // Now assert the per-server spec passed to the launcher feature fetch + captured.mcpServers.should.be.an.Array().and.have.length(1) + captured.mcpServers[0].should.have.property('mcpAccessToken').and.be.an.Object() + captured.mcpServers[0].mcpAccessToken.should.deepEqual({ token: null, scheme: '', scope: ['ff-expert:mcp', 'instance'] }) }) it('should generate an access token for MCP server access when feature teamHttpSecurity is enabled', async function () { @@ -1484,17 +1571,8 @@ describe('Expert API', function () { sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) sinon.stub(instance, 'getSetting').resolves({ httpNodeAuth: { type: 'flowforge-user' } }) - // fake the axios post response - capture post data and return resolved promise - let capturedPostData = null - sinon.stub(axios, 'post').callsFake((url, data) => { - capturedPostData = data - return Promise.resolve({ - data: { - transactionId: 'abc', - servers: [mockMcpResponseServer1] - } - }) - }) + // capture the per-server specs the route hands to the launcher feature fetch + const captured = stubGetMCPFeatures() const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1531,10 +1609,10 @@ describe('Expert API', function () { const hash = sha256(cachedToken.token) hash.should.equal(dbToken.token) - // Now assert the axios post payload (captured async) - capturedPostData.should.be.an.Object() - capturedPostData.servers[0].should.have.property('mcpAccessToken').and.be.an.Object() - capturedPostData.servers[0].mcpAccessToken.should.deepEqual({ + // Now assert the per-server spec passed to the launcher feature fetch + captured.mcpServers.should.be.an.Array().and.have.length(1) + captured.mcpServers[0].should.have.property('mcpAccessToken').and.be.an.Object() + captured.mcpServers[0].mcpAccessToken.should.deepEqual({ token: cachedToken.token, scheme: 'Bearer', scope: ['ff-expert:mcp', 'instance'] @@ -1549,15 +1627,8 @@ describe('Expert API', function () { sinon.stub(instance, 'getSetting').resolves({ httpNodeAuth: { type: 'flowforge-user' } }) const createHTTPNodeTokenSpy = sinon.spy(app.db.controllers.AccessToken, 'createHTTPNodeToken') - // fake the axios post response - check that the access token is included in the post data - sinon.stub(axios, 'post').callsFake((url, data) => { - return Promise.resolve({ - data: { - transactionId: 'abc', - servers: [mockMcpResponseServer1] - } - }) - }) + // stub the launcher feature fetch - the minted access token is included in the spec it receives + stubGetMCPFeatures() const response1 = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1621,17 +1692,8 @@ describe('Expert API', function () { sinon.stub(instance, 'liveState').returns({ meta: { state: 'running' } }) sinon.stub(instance, 'getSetting').resolves({ httpNodeAuth: { type: 'basic', user: 'nodeUser', pass: 'nodePass' } }) - // fake the axios post response - capture post data and return resolved promise - let capturedPostData = null - sinon.stub(axios, 'post').callsFake((url, data) => { - capturedPostData = data - return Promise.resolve({ - data: { - transactionId: 'abc', - servers: [mockMcpResponseServer1] - } - }) - }) + // capture the per-server specs the route hands to the launcher feature fetch + const captured = stubGetMCPFeatures() const response = await app.inject({ method: 'POST', url: '/api/v1/expert/mcp/features', @@ -1640,19 +1702,130 @@ describe('Expert API', function () { payload: { context: { teamId: team.hashid } } }) response.statusCode.should.equal(200) - // Now assert the axios post payload (captured async) - capturedPostData.should.be.an.Object() - capturedPostData.should.have.property('servers').which.is.an.Array() - capturedPostData.servers[0].should.have.property('mcpAccessToken') - capturedPostData.servers[0].mcpAccessToken.should.be.an.Object() + // Now assert the per-server spec passed to the launcher feature fetch + captured.mcpServers.should.be.an.Array().and.have.length(1) + captured.mcpServers[0].should.have.property('mcpAccessToken').and.be.an.Object() // For now, there no support for basic auth. The password is not available. // Instead, we send an empty token with scheme 'Basic' to permit the backend to // ignore basic auth entries (they are still sent so that they can be returned and listed for user awareness) - capturedPostData.servers[0].mcpAccessToken.should.have.property('token', '') - capturedPostData.servers[0].mcpAccessToken.should.have.property('scheme', 'Basic') - capturedPostData.servers[0].mcpAccessToken.should.have.property('scope').and.be.an.Array().and.have.length(2) - capturedPostData.servers[0].mcpAccessToken.scope.should.containEql('ff-expert:mcp') - capturedPostData.servers[0].mcpAccessToken.scope.should.containEql('instance') + captured.mcpServers[0].mcpAccessToken.should.have.property('token', '') + captured.mcpServers[0].mcpAccessToken.should.have.property('scheme', 'Basic') + captured.mcpServers[0].mcpAccessToken.should.have.property('scope').and.be.an.Array().and.have.length(2) + captured.mcpServers[0].mcpAccessToken.scope.should.containEql('ff-expert:mcp') + captured.mcpServers[0].mcpAccessToken.scope.should.containEql('instance') + }) + + // Build a byTeam stub registration whose target is a remote instance (device). The MCP + // features for devices are fetched over MQTT via deviceComms.sendCommandAwaitReply rather + // than the launcher admin API used for hosted instances. + const buildDeviceRegistration = (agentVersion = '3.9.1') => ({ + id: 1, + hashid: 'mcpregdev001', + name: 'mcp-server-device', + protocol: 'http', + targetType: 'device', + targetId: '999', + nodeId: 'mcp:node:1', + endpointRoute: '/mcp', + TeamId: team.id, + Device: { + hashid: 'devicehash001', + id: 999, + name: 'device-1', + url: 'http://device-1', + state: 'running', + ApplicationId: application.id, + agentVersion, + getSetting: sinon.stub().resolves({}) // no special settings + }, + title: 'Device MCP Server', + version: '1.0.0-beta', + description: 'Device MCP Server' + }) + + it('should get mcp features for a device via deviceComms (MQTT proxy)', async function () { + const token = bobToken + sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([buildDeviceRegistration('3.9.1')]) + + // wire up a fake device comms MQTT proxy that answers the live-state and feature requests + const sendCommandAwaitReply = sinon.stub().callsFake(async (teamHashid, deviceHashid, command, payload) => { + if (command === 'get-liveState') { + return { state: 'running' } + } + if (command === 'mcp:get-features') { + return payload.mcpEndPoints.map(spec => ({ spec, features: { prompts: [{}], resources: [{}], resourceTemplates: [{}], tools: [{}] } })) + } + return {} + }) + app.comms = { devices: { sendCommandAwaitReply } } + // the launcher admin API (used for hosted instances) must NOT be used for devices + const getFeatures = sinon.stub(app.containers, 'getMCPFeatures') + + try { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/expert/mcp/features', + cookies: { sid: token }, + headers: { 'x-chat-transaction-id': 'abc' }, + payload: { context: { teamId: team.hashid } } + }) + response.statusCode.should.equal(200) + + // hosted-instance path must not be used for a device + getFeatures.called.should.be.false() + + // device live-state checked via the MQTT proxy (team hashid + device hashid) + sendCommandAwaitReply.calledWith(team.hashid, 'devicehash001', 'get-liveState').should.be.true() + + // device features fetched via the MQTT proxy, carrying the per-server specs (incl. minted token) + const featuresCall = sendCommandAwaitReply.getCalls().find(c => c.args[2] === 'mcp:get-features') + should.exist(featuresCall) + featuresCall.args[0].should.equal(team.hashid) + featuresCall.args[1].should.equal('devicehash001') + featuresCall.args[3].should.have.property('mcpEndPoints').which.is.an.Array().and.has.length(1) + featuresCall.args[3].mcpEndPoints[0].should.have.property('mcpServer', 'mcpregdev001') + featuresCall.args[3].mcpEndPoints[0].should.have.property('mcpAccessToken').and.be.an.Object() + + const result = response.json() + result.should.have.property('servers').which.is.an.Array().and.has.length(1) + result.servers[0].should.have.property('instanceType', 'device') + result.servers[0].should.have.property('instance', 'devicehash001') + result.servers[0].should.have.property('mcpServer', 'mcpregdev001') + } finally { + app.comms = null + } + }) + + it('should report a device whose agent version is too old as incompatible', async function () { + const token = bobToken + // agent version older than MIN_REMOTE_INSTANCE_AGENT_VERSION (3.9.1) + sinon.stub(app.db.models.MCPRegistration, 'byTeam').resolves([buildDeviceRegistration('3.0.0')]) + + const sendCommandAwaitReply = sinon.stub().resolves({ state: 'running' }) + app.comms = { devices: { sendCommandAwaitReply } } + + try { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/expert/mcp/features', + cookies: { sid: token }, + headers: { 'x-chat-transaction-id': 'abc' }, + payload: { context: { teamId: team.hashid } } + }) + response.statusCode.should.equal(200) + + const result = response.json() + result.should.have.property('servers').which.is.an.Array().and.has.length(0) + result.should.have.property('incompatibleServers').which.is.an.Array().and.has.length(1) + result.incompatibleServers[0].should.have.property('instance', 'devicehash001') + result.incompatibleServers[0].should.have.property('instanceType', 'device') + result.incompatibleServers[0].should.have.property('currentVersion', '3.0.0') + result.incompatibleServers[0].should.have.property('minimumSupportedVersion', '3.9.1') + // the version gate happens before any MQTT round-trip - no live-state or feature request should be made + sendCommandAwaitReply.called.should.be.false() + } finally { + app.comms = null + } }) }) })