From d1be9ff9841684b7aefd8bb51c5ee8975c6355a7 Mon Sep 17 00:00:00 2001 From: Noley Holland Date: Tue, 26 May 2026 07:42:17 -0700 Subject: [PATCH 1/2] Push live instance and device state to subscribed team members via MQTT/WS --- forge/comms/aclManager.js | 62 ++++- forge/comms/commsClient.js | 5 +- forge/comms/devices.js | 1 + forge/comms/index.js | 41 ++- forge/routes/api/device.js | 3 + forge/routes/api/project.js | 3 + frontend/src/components/DevicesBrowser.vue | 45 +++- frontend/src/composables/MqttTeamChannel.js | 91 +++++++ frontend/src/pages/application/index.vue | 42 ++- frontend/src/pages/device/index.vue | 39 ++- frontend/src/pages/instance/index.vue | 35 ++- .../components/compact/InstanceTile.vue | 34 ++- frontend/src/pages/team/Instances.vue | 49 +++- frontend/src/services/team-channel.service.ts | 121 ++++++++- .../src/types/services/team-channel.types.ts | 11 + test/unit/forge/comms/authRoutesV2_spec.js | 93 +++++++ .../services/team-channel.service.spec.js | 239 +++++++++++++++++- 17 files changed, 882 insertions(+), 32 deletions(-) create mode 100644 frontend/src/composables/MqttTeamChannel.js diff --git a/forge/comms/aclManager.js b/forge/comms/aclManager.js index ec816e2b25..f8ffeae505 100644 --- a/forge/comms/aclManager.js +++ b/forge/comms/aclManager.js @@ -140,6 +140,58 @@ module.exports = function (app) { return false } }, + checkUserCanReadInstance: async function (requestParts, usernameParts) { + // requestParts = [ fullTopic, , ] + // usernameParts = [ 'team-frontend', , , ] + const topicTeamHash = requestParts[1] + const projectId = requestParts[2] + const usernameUserHash = usernameParts[1] + const usernameTeamHash = usernameParts[2] + if (topicTeamHash !== usernameTeamHash) { + return false + } + try { + const team = await app.db.models.Team.byId(usernameTeamHash) + if (!team) return false + const user = await app.db.models.User.byId(usernameUserHash) + if (!user) return false + const membership = await app.db.models.TeamMember.getTeamMembership(user.id, team.id, false) + if (!membership) return false + const project = await app.db.models.Project.byId(projectId) + if (!project || project.TeamId !== team.id) return false + const applicationId = app.db.models.Application.encodeHashid(project.ApplicationId) + return app.hasPermission(membership, 'project:read', { applicationId }) + } catch (error) { + app.log.error('Unexpected error during instance-state ACL check', { requestParts, usernameParts, error }) + return false + } + }, + checkUserCanReadDevice: async function (requestParts, usernameParts) { + // requestParts = [ fullTopic, , ] + // usernameParts = [ 'team-frontend', , , ] + const topicTeamHash = requestParts[1] + const deviceId = requestParts[2] + const usernameUserHash = usernameParts[1] + const usernameTeamHash = usernameParts[2] + if (topicTeamHash !== usernameTeamHash) { + return false + } + try { + const team = await app.db.models.Team.byId(usernameTeamHash) + if (!team) return false + const user = await app.db.models.User.byId(usernameUserHash) + if (!user) return false + const membership = await app.db.models.TeamMember.getTeamMembership(user.id, team.id, false) + if (!membership) return false + const device = await app.db.models.Device.byId(deviceId) + if (!device || device.TeamId !== team.id) return false + const applicationId = device.ApplicationId ? app.db.models.Application.encodeHashid(device.ApplicationId) : null + return app.hasPermission(membership, 'device:read', { applicationId }) + } catch (error) { + app.log.error('Unexpected error during device-state ACL check', { requestParts, usernameParts, error }) + return false + } + }, checkExpertTopic: async function (topicParts, usernameParts, acl) { // topicParts = [ fullTopic , , , , [, ] ] // usernameParts = [ 'expert-client' | 'expert-agent', [, ] ] @@ -320,6 +372,10 @@ module.exports = function (app) { { topic: /^ff\/v1\/[^/]+\/team\/updated$/ }, // - ff/v1//u//membership { topic: /^ff\/v1\/[^/]+\/u\/[^/]+\/membership$/ }, + // - ff/v1//p//state + { topic: /^ff\/v1\/[^/]+\/p\/[^/]+\/state$/ }, + // - ff/v1//d//state + { topic: /^ff\/v1\/[^/]+\/d\/[^/]+\/state$/ }, // ff/v1/platform/sync { topic: /^ff\/v1\/platform\/sync$/ }, // ff/v1/platform/leader @@ -383,7 +439,11 @@ module.exports = function (app) { // - ff/v1//team/updated { topic: /^ff\/v1\/([^/]+)\/team\/updated$/, verify: 'checkUserIsTeamMember' }, // - ff/v1//u//membership - { topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' } + { topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' }, + // - ff/v1//p//state + { topic: /^ff\/v1\/([^/]+)\/p\/([^/]+)\/state$/, verify: 'checkUserCanReadInstance' }, + // - ff/v1//d//state + { topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/state$/, verify: 'checkUserCanReadDevice' } ], pub: [] }, diff --git a/forge/comms/commsClient.js b/forge/comms/commsClient.js index a5922cc24a..02bd6b9138 100644 --- a/forge/comms/commsClient.js +++ b/forge/comms/commsClient.js @@ -47,17 +47,20 @@ class CommsClient extends EventEmitter { }) this.client.on('message', (topic, message) => { const topicParts = topic.split('/') + const teamId = topicParts[2] const ownerType = topicParts[3] const ownerId = topicParts[4] const messageType = topicParts[5] - if (ownerType === 'p') { + if (ownerType === 'l' && messageType === 'status') { this.emit('status/project', { + teamId, id: ownerId, status: message.toString() }) } else if (ownerType === 'd') { if (messageType === 'status') { this.emit('status/device', { + teamId, id: ownerId, status: message.toString() }) diff --git a/forge/comms/devices.js b/forge/comms/devices.js index ab85b6d9cc..54867ac852 100644 --- a/forge/comms/devices.js +++ b/forge/comms/devices.js @@ -123,6 +123,7 @@ class DeviceCommsHandler { try { const payload = JSON.parse(status.status) await this.app.db.controllers.Device.updateState(device, payload) + this.app.comms?.team?.publishDeviceState(teamId, deviceId, payload) if (payload === null) { // This device is busy updating - don't interrupt it diff --git a/forge/comms/index.js b/forge/comms/index.js index 4cb7bf79c5..2df9934020 100644 --- a/forge/comms/index.js +++ b/forge/comms/index.js @@ -31,11 +31,36 @@ module.exports = fp(async function (app, _opts) { // Create the handler for any device-related messages const deviceCommsHandler = DeviceCommsHandler(app, client) - // Not in the current release, but when we handle Launcher status - // via MQTT, it will arrive here. Compare to the status/device handler in `devices.js` - // client.on('status/project', (status) => { - // // console.info(status) - // }) + function publishInstanceState (teamHash, instanceId, meta) { + if (!teamHash || !instanceId) return + const msg = { id: instanceId, meta: meta || null, ts: Date.now() } + client.publish(`ff/v1/${teamHash}/p/${instanceId}/state`, JSON.stringify(msg), { retain: true }) + } + function publishDeviceState (teamHash, deviceId, meta) { + if (!teamHash || !deviceId) return + const msg = { id: deviceId, meta: meta || null, ts: Date.now() } + client.publish(`ff/v1/${teamHash}/d/${deviceId}/state`, JSON.stringify(msg), { retain: true }) + } + // empty + retain:true clears the broker's retained message + function clearInstanceState (teamHash, instanceId) { + if (!teamHash || !instanceId) return + client.publish(`ff/v1/${teamHash}/p/${instanceId}/state`, '', { retain: true }) + } + function clearDeviceState (teamHash, deviceId) { + if (!teamHash || !deviceId) return + client.publish(`ff/v1/${teamHash}/d/${deviceId}/state`, '', { retain: true }) + } + + client.on('status/project', (status) => { + try { + const meta = status.status ? JSON.parse(status.status) : null + publishInstanceState(status.teamId, status.id, meta) + } catch (err) { + if (!(err instanceof SyntaxError)) { + app.log.error({ msg: 'Failed to relay instance state', project: status.id, team: status.teamId, err: err.message }) + } + } + }) // Setup the platform API for the comms component app.decorate('comms', { @@ -71,7 +96,11 @@ module.exports = fp(async function (app, _opts) { if (!teamHash || !userHash) return const msg = { reason: reason || null, srcId: srcId || null } client.publish(`ff/v1/${teamHash}/u/${userHash}/membership`, JSON.stringify(msg)) - } + }, + publishInstanceState, + publishDeviceState, + clearInstanceState, + clearDeviceState } }) diff --git a/forge/routes/api/device.js b/forge/routes/api/device.js index a9167e422d..3cc74c2075 100644 --- a/forge/routes/api/device.js +++ b/forge/routes/api/device.js @@ -427,7 +427,10 @@ module.exports = async function (app) { { model: app.db.models.TeamType } ] }) + const teamHash = team.hashid + const deviceHash = request.device.hashid await request.device.destroy() + app.comms?.team?.clearDeviceState(teamHash, deviceHash) await app.auditLog.Team.team.device.deleted(request.session.User, null, team, request.device) if (app.license.active() && app.billing) { await app.billing.updateTeamBillingCounts(team) diff --git a/forge/routes/api/project.js b/forge/routes/api/project.js index 6af261ce62..493ab05025 100644 --- a/forge/routes/api/project.js +++ b/forge/routes/api/project.js @@ -280,7 +280,10 @@ module.exports = async function (app) { }) } + const teamHash = request.project.Team.hashid + const instanceId = request.project.id await request.project.destroy() + app.comms?.team?.clearInstanceState(teamHash, instanceId) await app.auditLog.Team.project.deleted(request.session.User, null, request.project.Team, request.project) await app.auditLog.Project.project.deleted(request.session.User, null, request.project.Team, request.project) reply.send({ status: 'okay' }) diff --git a/frontend/src/components/DevicesBrowser.vue b/frontend/src/components/DevicesBrowser.vue index 6e44c23e60..5afdf20835 100644 --- a/frontend/src/components/DevicesBrowser.vue +++ b/frontend/src/components/DevicesBrowser.vue @@ -352,6 +352,7 @@ import { markRaw } from 'vue' import deviceApi from '../api/devices.js' import teamApi from '../api/team.js' import DropdownMenu from '../components/DropdownMenu.vue' +import { useMqttAvailability, useMqttResourceList } from '../composables/MqttTeamChannel.js' import usePermissions from '../composables/Permissions.js' import { getTeamProperty } from '../composables/TeamProperties.js' import deviceActionsMixin from '../mixins/DeviceActions.js' @@ -429,8 +430,16 @@ export default { emits: ['instance-updated'], setup () { const { hasPermission } = usePermissions() + const { mqttAvailable, resolveMqttAvailability } = useMqttAvailability() + const { syncMqttSubscriptions, teardownMqttSubscriptions } = useMqttResourceList('device') - return { hasPermission } + return { + hasPermission, + mqttAvailable, + resolveMqttAvailability, + syncMqttSubscriptions, + teardownMqttSubscriptions + } }, data () { return { @@ -607,12 +616,18 @@ export default { } } }, - mounted () { + async mounted () { + await this.resolveMqttAvailability() this.fullReloadOfData() - this.pollTimer = createPollTimer(this.pollTimerElapsed, POLL_TIME) // auto starts + if (!this.mqttAvailable) { + this.pollTimer = createPollTimer(this.pollTimerElapsed, POLL_TIME) // auto starts + } }, async unmounted () { - this.pollTimer.stop() + if (this.pollTimer) { + this.pollTimer.stop() + } + this.teardownMqttSubscriptions() if (this.deviceCountDeltaSincePageLoad !== 0) { // Trigger a refresh of team info to resync following device // changes @@ -854,6 +869,11 @@ export default { fullReloadOfData () { this.checkedDevices = [] this.loadDevices(true) + .then(() => { + this.resyncMqtt() + return undefined + }) + .catch(() => undefined) this.pollForDeviceStatuses(true) }, @@ -865,6 +885,7 @@ export default { async loadMoreDevices () { await this.fetchDevices() + this.resyncMqtt() }, async pollForDeviceStatuses (reset) { @@ -873,6 +894,22 @@ export default { } }, + resyncMqtt () { + this.syncMqttSubscriptions(this.devices.keys(), this.mqttAvailable, this.onMqttDeviceState) + }, + onMqttDeviceState (id, payload) { + const meta = (payload && payload.meta) || {} + const existing = this.allDeviceStatuses.get(id) || { id } + this.allDeviceStatuses.set(id, { + ...existing, + id, + status: meta.state || existing.status, + mode: meta.mode || existing.mode, + lastSeenAt: new Date().toISOString(), + lastSeenMs: 0 + }) + }, + async fetchDevices (resetPage = false) { if (resetPage) { this.nextCursor = null diff --git a/frontend/src/composables/MqttTeamChannel.js b/frontend/src/composables/MqttTeamChannel.js new file mode 100644 index 0000000000..9d1a4850ac --- /dev/null +++ b/frontend/src/composables/MqttTeamChannel.js @@ -0,0 +1,91 @@ +import { inject, ref } from 'vue' + +export function useMqttAvailability () { + const services = inject('$services', null) + const mqttAvailable = ref(false) + + async function resolveMqttAvailability () { + const teamChannel = services?.teamChannel + if (!teamChannel) { + mqttAvailable.value = false + return false + } + try { + mqttAvailable.value = await teamChannel.ready() + } catch { + mqttAvailable.value = false + } + return mqttAvailable.value + } + + return { mqttAvailable, resolveMqttAvailability } +} + +function subscribeFor (teamChannel, kind, id, onPayload) { + return kind === 'instance' + ? teamChannel.subscribeInstance(id, onPayload) + : teamChannel.subscribeDevice(id, onPayload) +} + +export function useMqttResourceSubscription (kind) { + const services = inject('$services', null) + let unsubscriber = null + + function setupMqttSubscription (id, onPayload) { + teardownMqttSubscription() + if (!id || typeof onPayload !== 'function') return + const teamChannel = services?.teamChannel + if (!teamChannel) return + unsubscriber = subscribeFor(teamChannel, kind, id, onPayload) + } + + function teardownMqttSubscription () { + if (!unsubscriber) return + try { unsubscriber() } catch {} + unsubscriber = null + } + + return { setupMqttSubscription, teardownMqttSubscription } +} + +export function useMqttResourceList (kind) { + const services = inject('$services', null) + const unsubscribers = new Map() + + function syncMqttSubscriptions (visibleIds, available, onPayload) { + if (!available) { + teardownMqttSubscriptions() + return + } + const teamChannel = services?.teamChannel + if (!teamChannel) return + const visible = new Set(visibleIds) + for (const [id, unsub] of unsubscribers) { + if (!visible.has(id)) { + try { unsub() } catch {} + unsubscribers.delete(id) + } + } + for (const id of visible) { + if (!unsubscribers.has(id)) { + unsubscribers.set(id, subscribeFor(teamChannel, kind, id, (payload) => onPayload(id, payload))) + } + } + } + + function dropMqttSubscription (id) { + const unsub = unsubscribers.get(id) + if (!unsub) return + try { unsub() } catch {} + unsubscribers.delete(id) + } + + function teardownMqttSubscriptions () { + for (const unsub of unsubscribers.values()) { + try { unsub() } catch {} + } + unsubscribers.clear() + } + + return { syncMqttSubscriptions, dropMqttSubscription, teardownMqttSubscriptions } +} diff --git a/frontend/src/pages/application/index.vue b/frontend/src/pages/application/index.vue index 9d63f3eb0b..3100a47b72 100644 --- a/frontend/src/pages/application/index.vue +++ b/frontend/src/pages/application/index.vue @@ -25,7 +25,9 @@ @instance-delete="instanceShowConfirmDelete" /> - + @@ -34,6 +36,7 @@ import { mapState } from 'pinia' import InstanceStatusPolling from '../../components/InstanceStatusPolling.vue' +import { useMqttAvailability, useMqttResourceList } from '../../composables/MqttTeamChannel.js' import usePermissions from '../../composables/Permissions.js' import applicationMixin from '../../mixins/Application.js' @@ -56,8 +59,17 @@ export default { mixins: [applicationMixin, instanceActionsMixin], setup () { const { hasPermission, isVisitingAdmin } = usePermissions() + const { mqttAvailable, resolveMqttAvailability } = useMqttAvailability() + const { syncMqttSubscriptions, teardownMqttSubscriptions } = useMqttResourceList('instance') - return { hasPermission, isVisitingAdmin } + return { + hasPermission, + isVisitingAdmin, + mqttAvailable, + resolveMqttAvailability, + syncMqttSubscriptions, + teardownMqttSubscriptions + } }, computed: { ...mapState(useContextStore, ['team']), @@ -133,6 +145,32 @@ export default { '$route.params': { handler: 'updateApplication', immediate: true + }, + instancesArray: 'resyncMqtt' + }, + async mounted () { + await this.resolveMqttAvailability() + this.resyncMqtt() + }, + beforeUnmount () { + this.teardownMqttSubscriptions() + }, + methods: { + resyncMqtt () { + this.syncMqttSubscriptions( + this.applicationInstances?.keys?.() || [], + this.mqttAvailable, + this.onMqttStateMessage + ) + }, + onMqttStateMessage (id, payload) { + const current = this.applicationInstances.get(id) + if (!current) return + const meta = (payload && payload.meta) || {} + this.applicationInstances.set(id, { + ...current, + meta: { ...(current.meta || {}), ...meta } + }) } } } diff --git a/frontend/src/pages/device/index.vue b/frontend/src/pages/device/index.vue index 45d549131e..6b881f15d7 100644 --- a/frontend/src/pages/device/index.vue +++ b/frontend/src/pages/device/index.vue @@ -146,6 +146,7 @@ import SectionNavigationHeader from '../../components/SectionNavigationHeader.vu import StatusBadge from '../../components/StatusBadge.vue' import SubscriptionExpiredBanner from '../../components/banners/SubscriptionExpired.vue' import TeamTrialBanner from '../../components/banners/TeamTrial.vue' +import { useMqttAvailability, useMqttResourceSubscription } from '../../composables/MqttTeamChannel.js' import { useNavigationHelper } from '../../composables/NavigationHelper.js' import usePermissions from '../../composables/Permissions.js' import deviceActionsMixin from '../../mixins/DeviceActions.js' @@ -207,8 +208,19 @@ export default { setup () { const { hasPermission, isVisitingAdmin } = usePermissions() const { navigateTo, openInANewTab } = useNavigationHelper() + const { mqttAvailable, resolveMqttAvailability } = useMqttAvailability() + const { setupMqttSubscription, teardownMqttSubscription } = useMqttResourceSubscription('device') - return { hasPermission, isVisitingAdmin, navigateTo, openInANewTab } + return { + hasPermission, + isVisitingAdmin, + navigateTo, + openInANewTab, + mqttAvailable, + resolveMqttAvailability, + setupMqttSubscription, + teardownMqttSubscription + } }, data: function () { return { @@ -363,15 +375,22 @@ export default { } }, watch: { - device: 'deviceChanged' + device: 'deviceChanged', + 'device.id': function (newId, oldId) { + if (newId === oldId) return + this.refreshMqttSubscription() + } }, async mounted () { this.mounted = true + await this.resolveMqttAvailability() await this.loadDevice() this.setContextualDevice(this.device) + this.refreshMqttSubscription() }, beforeUnmount () { this.setContextualDevice(null) + this.teardownMqttSubscription() }, unmounted () { this.pollTimer?.stop() @@ -410,7 +429,7 @@ export default { return this.$router.push({ name: 'Home' }) } } - if (!this.pollTimer) { + if (!this.pollTimer && !this.mqttAvailable) { this.pollTimer = createPollTimer(this.pollTimerElapsed, POLL_TIME) } @@ -573,6 +592,20 @@ export default { deviceChanged () { this.deviceStateMutator = new DeviceStateMutator(this.device) }, + refreshMqttSubscription () { + if (!this.mqttAvailable) return + this.setupMqttSubscription(this.device?.id, (payload) => this.onMqttDeviceState(payload)) + }, + onMqttDeviceState (payload) { + const meta = (payload && payload.meta) || {} + if (!this.device) return + const next = { ...this.device } + if (meta.state !== undefined) next.status = meta.state + if (meta.mode !== undefined) next.mode = meta.mode + next.lastSeenAt = new Date().toISOString() + this.device = next + this.deviceStateMutator?.clearState() + }, showConfirmDeleteDialog () { Dialog.show({ header: 'Delete Device', diff --git a/frontend/src/pages/instance/index.vue b/frontend/src/pages/instance/index.vue index 82e1e25491..b736bee60d 100644 --- a/frontend/src/pages/instance/index.vue +++ b/frontend/src/pages/instance/index.vue @@ -70,7 +70,7 @@ /> - + @@ -84,6 +84,7 @@ import StatusBadge from '../../components/StatusBadge.vue' import SubscriptionExpiredBanner from '../../components/banners/SubscriptionExpired.vue' import TeamTrialBanner from '../../components/banners/TeamTrial.vue' import InstanceActionsButton from '../../components/instance/ActionButton.vue' +import { useMqttAvailability, useMqttResourceSubscription } from '../../composables/MqttTeamChannel.js' import usePermissions from '../../composables/Permissions.js' import instanceMixin from '../../mixins/Instance.js' @@ -113,10 +114,16 @@ export default { mixins: [instanceMixin], setup () { const { hasPermission, isVisitingAdmin } = usePermissions() + const { mqttAvailable, resolveMqttAvailability } = useMqttAvailability() + const { setupMqttSubscription, teardownMqttSubscription } = useMqttResourceSubscription('instance') return { hasPermission, - isVisitingAdmin + isVisitingAdmin, + mqttAvailable, + resolveMqttAvailability, + setupMqttSubscription, + teardownMqttSubscription } }, data: function () { @@ -175,8 +182,30 @@ export default { return null } }, - mounted () { + watch: { + 'instance.id': function (newId, oldId) { + if (newId === oldId) return + this.refreshMqttSubscription() + } + }, + async mounted () { this.mounted = true + await this.resolveMqttAvailability() + this.refreshMqttSubscription() + }, + beforeUnmount () { + this.teardownMqttSubscription() + }, + methods: { + refreshMqttSubscription () { + if (!this.mqttAvailable) return + this.setupMqttSubscription(this.instance?.id, (payload) => this.onMqttInstanceState(payload)) + }, + onMqttInstanceState (payload) { + const meta = payload && payload.meta + if (!meta) return + this.instanceUpdated({ meta }) + } } } diff --git a/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue b/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue index 6c3ecc9e1f..3ec824f33b 100644 --- a/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue +++ b/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue @@ -82,7 +82,7 @@ - +