Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 89 additions & 6 deletions forge/comms/aclManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,81 @@ module.exports = function (app) {
return false
}
},
checkExpertPlatformTopic: async function (topicParts, usernameParts, acl) {
// topicParts = [ fullTopic , <userid>, <sessionid>, <command> ]
// usernameParts = [ 'forge_platform' | 'expert-agent', <userid> [, <sessionid>] ]

const ValidationError = function (message) {
const error = new Error(message)
error.name = 'ACLValidationError'
return error
}

try {
const [, userId, sessionId, command] = topicParts
if (!userId || !sessionId || !command) {
throw ValidationError('invalid topic format')
}

if (command === '+') {
if (!acl.allowWildcard?.command) {
throw ValidationError('invalid command wildcard')
}
} else {
// validate command format is [forge:<command>]|[insights:<command>]
const commandParts = command.split(':', 2)
if (commandParts.length !== 2) {
throw ValidationError('invalid command format')
}
const [commandAgent, commandName] = commandParts
switch (commandAgent) {
// case 'forge':
// if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) {
// throw ValidationError('invalid platform command for platform api')
// }
// break
case 'insights':
if (['mcp-call-tool', 'mcp-read-resource'].includes(commandName) === false) {
throw ValidationError('invalid platform command for insights')
}
break
default:
throw ValidationError('invalid platform command')
}
}

// at minimum, ensure session is present and either a wildcard or 8 or more chars
if (sessionId === '+') {
if (!acl.allowWildcard?.session) {
throw ValidationError('invalid session wildcard')
}
} else if (sessionId.length < 8) {
throw ValidationError('invalid session id')
}

if (userId === '+') {
if (!acl.allowWildcard?.user) {
throw ValidationError('invalid user wildcard')
}
} else {
const user = await app.db.models.User.byId(userId)
if (!user || user.suspended) {
throw ValidationError('invalid user')
}
// TODO: consider checking if the user has permission to operate on this channel here or in the command handler.
// For now, we just check the user exists and is not suspended.
}

return true
} catch (err) {
if (err.name === 'ACLValidationError') {
app.log.warn(`ACL validation error for expert platform topic: ${err.message}`)
} else {
app.log.error(`Unexpected error during ACL validation for expert platform topic: ${err.message}`)
}
}
return false
},
checkExpertTopic: async function (topicParts, usernameParts, acl) {
// topicParts = [ fullTopic , <userid>, <sessionid>, <entityType>, <entityId> [, <inflightType>] ]
// usernameParts = [ 'expert-client' | 'expert-agent', <userid> [, <sessionid>] ]
Expand Down Expand Up @@ -313,7 +388,9 @@ module.exports = function (app) {
// ff/v1/platform/sync
{ topic: /^ff\/v1\/platform\/sync$/ },
// ff/v1/platform/leader
{ topic: /^ff\/v1\/platform\/leader$/ }
{ topic: /^ff\/v1\/platform\/leader$/ },
// platform can listen for Expert Agent requests
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/request$/, verify: 'checkExpertPlatformTopic', allowWildcard: { user: true, session: true, command: true }, isPlatform: true, isSub: true, agent: 'platform' }
],
pub: [
// Send commands to project launchers
Expand All @@ -336,7 +413,9 @@ module.exports = function (app) {
// ff/v1/platform/sync
{ topic: /^ff\/v1\/platform\/sync$/ },
// ff/v1/platform/leader
{ topic: /^ff\/v1\/platform\/leader$/ }
{ topic: /^ff\/v1\/platform\/leader$/ },
// platform can respond to Expert Agent requests
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/response$/, verify: 'checkExpertPlatformTopic', isPlatform: true, isPub: true, agent: 'platform' }
]
},
project: {
Expand Down Expand Up @@ -420,12 +499,16 @@ module.exports = function (app) {
// backend client (agent)
expertAgent: {
sub: [
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { user: true, session: true, entity: true }, isAgent: true, isSub: true },
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { user: true, session: true, entity: true, inflightType: true }, isAgent: true, isSub: true }
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/request$/, verify: 'checkExpertTopic', channel: 'chat', allowWildcard: { user: true, session: true, entity: true }, isAgent: true, isSub: true, agent: 'support' },
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/response$/, verify: 'checkExpertTopic', channel: 'inflight', allowWildcard: { user: true, session: true, entity: true, inflightType: true }, isAgent: true, isSub: true, agent: 'support' },
// Expert agent can listen for platform responses
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/response$/, verify: 'checkExpertPlatformTopic', allowWildcard: { user: true, session: true, command: true }, isAgent: true, isSub: true, agent: 'platform' }
],
pub: [
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', isAgent: true, isPub: true },
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', isAgent: true, isPub: true }
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/chat\/response$/, verify: 'checkExpertTopic', channel: 'chat', isAgent: true, isPub: true, agent: 'support' },
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/([^/]+)\/([^/]+)\/support\/inflight\/([^/]+)\/request$/, verify: 'checkExpertTopic', channel: 'inflight', isAgent: true, isPub: true, agent: 'support' },
// Expert agent can respond to platform requests
{ topic: /^ff\/v1\/expert\/([^/]+)\/([^/]+)\/platform\/([^/]+)\/request$/, verify: 'checkExpertPlatformTopic', isAgent: true, isPub: true, agent: 'platform' }
]
}
}
Expand Down
101 changes: 97 additions & 4 deletions forge/comms/commsClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CommsClient extends EventEmitter {
// To aid testing, we use a url of `:test:` to allow us to configure
// the platform with comms enabled, but no active MQTT connection
if (this.app.config.broker.url !== ':test:') {
/** @type {MQTT.IClientOptions} */
/** @type {mqtt.IClientOptions} */
const brokerConfig = {
clientId: 'forge_platform:' + randomBytes(8).toString('hex'),
username: 'forge_platform',
Expand All @@ -46,12 +46,103 @@ class CommsClient extends EventEmitter {
this.client.on('error', (err) => {
this.app.log.info(`Connection error to comms broker: ${err.toString()}`)
})
this.client.on('message', (topic, message) => {
this.client.on('message', (topic, message, packet) => {
const topicParts = topic.split('/')
const ownerType = topicParts[3]
const ownerId = topicParts[4]
const messageType = topicParts[5]
if (ownerType === 'p') {

if (topicParts[2] === 'expert') {
const userId = topicParts[3]
const sessionId = topicParts[4]
const channel = topicParts[5] // platform
const channelCommand = topicParts[6] // dynamic value, e.g. mcp:call-tool or mcp:read-resource
const direction = topicParts[7] // request or response (only for inflight channels)

const supportedInsightsCommands = {
'insights:mcp-call-tool': 'mcp:call-tool',
'insights:mcp-read-resource': 'mcp:read-resource'
}
if (supportedInsightsCommands[channelCommand] && direction === 'request') {
const isInsightsToolCall = channel === 'platform' && channelCommand === 'insights:mcp-call-tool' && direction === 'request'
const isInsightsResourceCall = channel === 'platform' && channelCommand === 'insights:mcp-read-resource' && direction === 'request'
// MCP ROUTE: step 1 (hosted/remote common)
// Called By: from the Expert Agent (via MQTT inflight request)
// Calls To : inflight request handler (./instances.js or ./devices.js)

// When the topic is determined to be an expert insights inflight request, the acl manager has already verified the user
// has permission to access this topic. Now, we verify the payload contains the required fields to process the request.
// If OK, emit the message to the appropriate instance/device handler (handled in ./instances.js or ./devices.js)

const payload = JSON.parse(message.toString())
const command = supportedInsightsCommands[channelCommand]
const data = payload.data || {}
const { kind, mcpServer, toolDefinition, resourceDefinition, resourceTemplateDefinition } = payload.meta || {}
const correlationData = packet.properties?.correlationData
const userProperties = packet.properties?.userProperties
if (!correlationData || !userProperties) {
console.warn('Expert Insight tool call request missing correlationData or userProperties', payload)
return // do not respond, the agent will timeout and handle it
}

const mqttOptions = { properties: { correlationData, userProperties } }
const responseTopic = `ff/v1/expert/${userId}/${sessionId}/${channel}/${channelCommand}/response`

/** Callback for failed MCP request. Publishes a structured error back to the agent. */
const onError = (content, code, error) => {
const data = {
code: code || error?.code || 'MCP_ERROR',
content: `Error: ${content}`,
isError: true
}
if (error) {
data.type = error?.name || error?.constructor?.name || 'Error'
data.message = error?.message || error?.toString()
}
this.client.publish(responseTopic, JSON.stringify(data), mqttOptions)
}
/** Callback for successful MCP request. Publishes the result back to the agent. */
const onSuccess = (result) => {
this.client.publish(responseTopic, JSON.stringify(result), mqttOptions)
}

// check that the mcpServer contains the required fields to process the request
if (!mcpServer || !['instance', 'device'].includes(mcpServer.instanceType) || !mcpServer.instance || !mcpServer.mcpServer) {
console.warn('Invalid Expert Insight tool call request', payload)
return // do not respond, this is not for us.
}

// validate kind matches the topic channel parameter & that the toolDefinition/resourceDefinition/resourceTemplateDefinition is present
let definition = null
switch (true) {
case isInsightsToolCall && kind === 'mcp_tool':
definition = toolDefinition
break
case isInsightsResourceCall && kind === 'mcp_resource':
definition = resourceDefinition
break
case isInsightsResourceCall && kind === 'mcp_resource_template':
definition = resourceTemplateDefinition
break
}
if (!definition) {
onError('Invalid Expert Insight tool call request: missing or mismatched kind and definition', 'MCP_INVALID_DEFINITION')
return
}

this.emit(
`request/${mcpServer.instanceType}/expert/insight`, // event name
userId, // ID of user making the request
command, // command
mcpServer, // mcp server details
kind, // mcp kind (mcp_tool, mcp_resource, mcp_resource_template)
definition, // mcpFeatureDefinition
data, // call data
onSuccess, // success callback
onError // failure callback
)
}
} else if (ownerType === 'p') {
this.emit('status/project', {
id: ownerId,
status: message.toString()
Expand Down Expand Up @@ -125,7 +216,9 @@ class CommsClient extends EventEmitter {
// Device response heartbeat
'ff/v1/+/d/+/resources/heartbeat',
// Platform sync messages
'ff/v1/platform/sync'
'ff/v1/platform/sync',
// Listen for Expert platform requests
'ff/v1/expert/+/+/platform/+/request'
])
}
}
Expand Down
Loading
Loading