From 07bd151e699edc39244a49b9dcae21dffdc9565e Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 10 Apr 2026 18:05:08 -0700 Subject: [PATCH 1/2] improvement(sockets): workflow switching state machine --- .../components/cursors/cursors.tsx | 10 +- .../providers/socket-join-controller.test.ts | 247 ++++++++++++ .../providers/socket-join-controller.ts | 312 +++++++++++++++ .../providers/socket-join-target.test.ts | 54 +++ .../workspace/providers/socket-join-target.ts | 28 ++ .../workspace/providers/socket-provider.tsx | 357 ++++++++++++------ apps/sim/hooks/use-collaborative-workflow.ts | 14 +- apps/sim/socket/handlers/workflow.test.ts | 203 ++++++++++ apps/sim/socket/handlers/workflow.ts | 27 +- apps/sim/stores/operation-queue/store.test.ts | 84 +++++ apps/sim/stores/operation-queue/store.ts | 18 +- bun.lock | 1 - 12 files changed, 1237 insertions(+), 118 deletions(-) create mode 100644 apps/sim/app/workspace/providers/socket-join-controller.test.ts create mode 100644 apps/sim/app/workspace/providers/socket-join-controller.ts create mode 100644 apps/sim/app/workspace/providers/socket-join-target.test.ts create mode 100644 apps/sim/app/workspace/providers/socket-join-target.ts create mode 100644 apps/sim/socket/handlers/workflow.test.ts create mode 100644 apps/sim/stores/operation-queue/store.test.ts diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx index 2f45ea54b1b..e32a8303faf 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/cursors/cursors.tsx @@ -5,6 +5,7 @@ import { useViewport } from 'reactflow' import { getUserColor } from '@/lib/workspaces/colors' import { usePreventZoom } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks' import { useSocket } from '@/app/workspace/providers/socket-provider' +import { useWorkflowRegistry } from '@/stores/workflows/registry/store' interface CursorPoint { x: number @@ -19,11 +20,16 @@ interface CursorRenderData { } const CursorsComponent = () => { - const { presenceUsers, currentSocketId } = useSocket() + const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId) + const { currentWorkflowId, presenceUsers, currentSocketId } = useSocket() const viewport = useViewport() const preventZoomRef = usePreventZoom() const cursors = useMemo(() => { + if (!activeWorkflowId || currentWorkflowId !== activeWorkflowId) { + return [] + } + return presenceUsers .filter((user): user is typeof user & { cursor: CursorPoint } => Boolean(user.cursor)) .filter((user) => user.socketId !== currentSocketId) @@ -33,7 +39,7 @@ const CursorsComponent = () => { cursor: user.cursor, color: getUserColor(user.userId), })) - }, [currentSocketId, presenceUsers]) + }, [activeWorkflowId, currentSocketId, currentWorkflowId, presenceUsers]) if (!cursors.length) { return null diff --git a/apps/sim/app/workspace/providers/socket-join-controller.test.ts b/apps/sim/app/workspace/providers/socket-join-controller.test.ts new file mode 100644 index 00000000000..10850cfd3fb --- /dev/null +++ b/apps/sim/app/workspace/providers/socket-join-controller.test.ts @@ -0,0 +1,247 @@ +import { describe, expect, it } from 'vitest' +import { + SOCKET_JOIN_RETRY_BASE_DELAY_MS, + SOCKET_JOIN_RETRY_MAX_DELAY_MS, + SocketJoinController, +} from '@/app/workspace/providers/socket-join-controller' + +describe('SocketJoinController', () => { + it('blocks rejoining a deleted workflow until the desired workflow changes', () => { + const controller = new SocketJoinController() + + expect(controller.setConnected(true)).toEqual([]) + expect(controller.requestWorkflow('workflow-a')).toEqual([ + { type: 'join', workflowId: 'workflow-a' }, + ]) + expect(controller.handleJoinSuccess('workflow-a')).toMatchObject({ + apply: true, + ignored: false, + commands: [], + workflowId: 'workflow-a', + }) + + expect(controller.handleWorkflowDeleted('workflow-a')).toEqual({ + shouldClearCurrent: true, + commands: [], + }) + expect(controller.requestWorkflow('workflow-a')).toEqual([]) + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + }) + + it('joins only the latest desired workflow after rapid A to B to C switching', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + controller.handleJoinSuccess('workflow-a') + + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + expect(controller.requestWorkflow('workflow-c')).toEqual([]) + + expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({ + apply: false, + ignored: true, + workflowId: 'workflow-b', + commands: [{ type: 'join', workflowId: 'workflow-c' }], + }) + expect(controller.handleJoinSuccess('workflow-c')).toMatchObject({ + apply: true, + ignored: false, + workflowId: 'workflow-c', + commands: [], + }) + }) + + it('rejoins the original workflow when a stale success lands after switching back', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + controller.handleJoinSuccess('workflow-a') + + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + expect(controller.requestWorkflow('workflow-a')).toEqual([]) + + expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({ + apply: false, + ignored: true, + workflowId: 'workflow-b', + commands: [{ type: 'join', workflowId: 'workflow-a' }], + }) + expect(controller.handleJoinSuccess('workflow-a')).toMatchObject({ + apply: true, + ignored: false, + workflowId: 'workflow-a', + commands: [], + }) + }) + + it('leaves the room when a late join succeeds after navigating away', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + controller.handleJoinSuccess('workflow-a') + + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + expect(controller.requestWorkflow(null)).toEqual([]) + + expect(controller.handleJoinSuccess('workflow-b')).toMatchObject({ + apply: false, + ignored: true, + workflowId: 'workflow-b', + commands: [{ type: 'leave' }], + }) + }) + + it('preserves the last joined workflow during retryable switch failures', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + expect(controller.requestWorkflow('workflow-a')).toEqual([ + { type: 'join', workflowId: 'workflow-a' }, + ]) + controller.handleJoinSuccess('workflow-a') + + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + + const errorResult = controller.handleJoinError({ + workflowId: 'workflow-b', + retryable: true, + }) + + expect(errorResult.apply).toBe(false) + expect(errorResult.retryScheduled).toBe(true) + expect(errorResult.retriesExhausted).toBe(false) + expect(errorResult.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-b', + attempt: 1, + delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS, + }, + ]) + expect(controller.getJoinedWorkflowId()).toBe('workflow-a') + expect(controller.retryJoin('workflow-b')).toEqual([{ type: 'join', workflowId: 'workflow-b' }]) + }) + + it('uses capped exponential backoff for retryable join failures', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + + const first = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true }) + expect(first.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-a', + attempt: 1, + delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS, + }, + ]) + + controller.retryJoin('workflow-a') + const second = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true }) + expect(second.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-a', + attempt: 2, + delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS * 2, + }, + ]) + + controller.retryJoin('workflow-a') + controller.handleJoinError({ workflowId: 'workflow-a', retryable: true }) + controller.retryJoin('workflow-a') + const fourth = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true }) + expect(fourth.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-a', + attempt: 4, + delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS * 8, + }, + ]) + + controller.retryJoin('workflow-a') + const fifth = controller.handleJoinError({ workflowId: 'workflow-a', retryable: true }) + expect(fifth.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-a', + attempt: 5, + delayMs: SOCKET_JOIN_RETRY_MAX_DELAY_MS, + }, + ]) + }) + + it('blocks a permanently failed workflow and leaves the fallback room cleanly', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + controller.handleJoinSuccess('workflow-a') + + expect(controller.requestWorkflow('workflow-b')).toEqual([ + { type: 'join', workflowId: 'workflow-b' }, + ]) + + const errorResult = controller.handleJoinError({ + workflowId: 'workflow-b', + retryable: false, + }) + + expect(errorResult.apply).toBe(true) + expect(errorResult.commands).toEqual([{ type: 'leave' }]) + expect(controller.getJoinedWorkflowId()).toBeNull() + expect(controller.requestWorkflow('workflow-b')).toEqual([]) + expect(controller.requestWorkflow('workflow-c')).toEqual([ + { type: 'join', workflowId: 'workflow-c' }, + ]) + }) + + it('rejoins the desired workflow when the server session is lost', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + controller.handleJoinSuccess('workflow-a') + + expect(controller.forceRejoinWorkflow('workflow-a')).toEqual([ + { type: 'join', workflowId: 'workflow-a' }, + ]) + expect(controller.getJoinedWorkflowId()).toBeNull() + }) + + it('resolves retryable errors without workflowId against the pending join', () => { + const controller = new SocketJoinController() + + controller.setConnected(true) + controller.requestWorkflow('workflow-a') + + const errorResult = controller.handleJoinError({ retryable: true }) + + expect(errorResult.workflowId).toBe('workflow-a') + expect(errorResult.retryScheduled).toBe(true) + expect(errorResult.commands).toEqual([ + { + type: 'schedule-retry', + workflowId: 'workflow-a', + attempt: 1, + delayMs: SOCKET_JOIN_RETRY_BASE_DELAY_MS, + }, + ]) + }) +}) diff --git a/apps/sim/app/workspace/providers/socket-join-controller.ts b/apps/sim/app/workspace/providers/socket-join-controller.ts new file mode 100644 index 00000000000..645780f3556 --- /dev/null +++ b/apps/sim/app/workspace/providers/socket-join-controller.ts @@ -0,0 +1,312 @@ +export const SOCKET_JOIN_RETRY_BASE_DELAY_MS = 1000 +export const SOCKET_JOIN_RETRY_MAX_DELAY_MS = 10000 + +export type SocketJoinCommand = + | { type: 'cancel-retry' } + | { type: 'join'; workflowId: string } + | { type: 'leave' } + | { + type: 'schedule-retry' + workflowId: string + attempt: number + delayMs: number + } + +export interface SocketJoinSuccessResult { + apply: boolean + commands: SocketJoinCommand[] + ignored: boolean + workflowId: string +} + +export interface SocketJoinErrorResult { + apply: boolean + commands: SocketJoinCommand[] + ignored: boolean + retryScheduled: boolean + retriesExhausted: boolean + workflowId: string | null +} + +export interface SocketJoinDeleteResult { + commands: SocketJoinCommand[] + shouldClearCurrent: boolean +} + +/** + * Coordinates desired workflow room membership with async socket join results. + */ +export class SocketJoinController { + private desiredWorkflowId: string | null = null + private joinedWorkflowId: string | null = null + private pendingJoinWorkflowId: string | null = null + private blockedWorkflowId: string | null = null + private retryWorkflowId: string | null = null + private retryAttempt = 0 + private isConnected = false + + getDesiredWorkflowId(): string | null { + return this.desiredWorkflowId + } + + getPendingJoinWorkflowId(): string | null { + return this.pendingJoinWorkflowId + } + + getJoinedWorkflowId(): string | null { + return this.joinedWorkflowId + } + + setConnected(connected: boolean): SocketJoinCommand[] { + this.isConnected = connected + if (!connected) { + this.pendingJoinWorkflowId = null + this.joinedWorkflowId = null + return this.clearRetryCommands() + } + + return this.flush() + } + + requestWorkflow(workflowId: string | null): SocketJoinCommand[] { + const commands = this.takeRetryResetCommands(workflowId) + this.desiredWorkflowId = workflowId + + if (workflowId !== this.blockedWorkflowId) { + this.blockedWorkflowId = null + } + + return [...commands, ...this.flush()] + } + + forceRejoinWorkflow(workflowId: string | null): SocketJoinCommand[] { + const commands = this.requestWorkflow(workflowId) + const alreadyChangingRooms = commands.some( + (command) => command.type === 'join' || command.type === 'leave' + ) + + if ( + alreadyChangingRooms || + !this.isConnected || + !this.desiredWorkflowId || + this.pendingJoinWorkflowId === this.desiredWorkflowId || + this.blockedWorkflowId === this.desiredWorkflowId + ) { + return commands + } + + this.joinedWorkflowId = null + + return [...commands, ...this.flush()] + } + + handleWorkflowDeleted(workflowId: string): SocketJoinDeleteResult { + const commands = this.takeRetryResetCommands( + this.retryWorkflowId === workflowId ? null : this.retryWorkflowId + ) + + if (this.desiredWorkflowId === workflowId) { + this.blockedWorkflowId = workflowId + } + + if (this.pendingJoinWorkflowId === workflowId) { + this.pendingJoinWorkflowId = null + } + + const shouldClearCurrent = this.joinedWorkflowId === workflowId + if (shouldClearCurrent) { + this.joinedWorkflowId = null + } + + return { + commands: [...commands, ...this.flush()], + shouldClearCurrent, + } + } + + handleJoinSuccess(workflowId: string): SocketJoinSuccessResult { + const commands = this.clearRetryCommands(workflowId) + this.pendingJoinWorkflowId = null + this.joinedWorkflowId = workflowId + + const apply = this.desiredWorkflowId === workflowId && this.blockedWorkflowId !== workflowId + + return { + apply, + commands: [...commands, ...this.flush()], + ignored: !apply, + workflowId, + } + } + + handleJoinError({ + workflowId, + retryable, + }: { + workflowId?: string | null + retryable?: boolean + }): SocketJoinErrorResult { + const resolvedWorkflowId = workflowId ?? this.pendingJoinWorkflowId + + if (resolvedWorkflowId && this.pendingJoinWorkflowId === resolvedWorkflowId) { + this.pendingJoinWorkflowId = null + if (this.joinedWorkflowId === resolvedWorkflowId) { + this.joinedWorkflowId = null + } + } + + const isCurrentDesired = + Boolean(resolvedWorkflowId) && + this.desiredWorkflowId === resolvedWorkflowId && + this.blockedWorkflowId !== resolvedWorkflowId + + const baseCommands = + resolvedWorkflowId !== null + ? this.takeRetryResetCommands(resolvedWorkflowId) + : this.clearRetryCommands() + + if (!isCurrentDesired) { + return { + apply: false, + commands: [...baseCommands, ...this.flush()], + ignored: true, + retryScheduled: false, + retriesExhausted: false, + workflowId: resolvedWorkflowId, + } + } + + if (retryable && resolvedWorkflowId) { + const retryResult = this.scheduleRetry(resolvedWorkflowId) + + return { + apply: false, + commands: [...baseCommands, ...retryResult.commands], + ignored: false, + retryScheduled: retryResult.retryScheduled, + retriesExhausted: false, + workflowId: resolvedWorkflowId, + } + } + + const leaveCommands = this.blockWorkflow(resolvedWorkflowId) + + return { + apply: true, + commands: [...this.clearRetryCommands(), ...leaveCommands, ...this.flush()], + ignored: false, + retryScheduled: false, + retriesExhausted: false, + workflowId: resolvedWorkflowId, + } + } + + retryJoin(workflowId: string): SocketJoinCommand[] { + if ( + this.retryWorkflowId !== workflowId || + this.desiredWorkflowId !== workflowId || + this.blockedWorkflowId === workflowId + ) { + return [] + } + + return this.flush() + } + + private flush(): SocketJoinCommand[] { + if (!this.isConnected || this.pendingJoinWorkflowId) { + return [] + } + + if (!this.desiredWorkflowId) { + if (!this.joinedWorkflowId) { + return [] + } + + this.joinedWorkflowId = null + return [{ type: 'leave' }] + } + + if (this.blockedWorkflowId === this.desiredWorkflowId) { + return [] + } + + if (this.joinedWorkflowId === this.desiredWorkflowId) { + return [] + } + + this.pendingJoinWorkflowId = this.desiredWorkflowId + + return [{ type: 'join', workflowId: this.desiredWorkflowId }] + } + + private scheduleRetry(workflowId: string): { + commands: SocketJoinCommand[] + retryScheduled: boolean + } { + const nextAttempt = this.retryWorkflowId === workflowId ? this.retryAttempt + 1 : 1 + const delayMs = Math.min( + SOCKET_JOIN_RETRY_BASE_DELAY_MS * 2 ** Math.max(0, nextAttempt - 1), + SOCKET_JOIN_RETRY_MAX_DELAY_MS + ) + + this.retryWorkflowId = workflowId + this.retryAttempt = nextAttempt + + return { + commands: [ + { + type: 'schedule-retry', + workflowId, + attempt: nextAttempt, + delayMs, + }, + ], + retryScheduled: true, + } + } + + private takeRetryResetCommands(nextWorkflowId?: string | null): SocketJoinCommand[] { + const shouldClearRetry = + this.retryWorkflowId !== null && + (nextWorkflowId === undefined || this.retryWorkflowId !== nextWorkflowId) + + if (!shouldClearRetry) { + return [] + } + + this.retryWorkflowId = null + this.retryAttempt = 0 + + return [{ type: 'cancel-retry' }] + } + + private clearRetryCommands(workflowId?: string): SocketJoinCommand[] { + const shouldClear = + this.retryWorkflowId !== null && + (workflowId === undefined || this.retryWorkflowId === workflowId) + + if (!shouldClear) { + return [] + } + + this.retryWorkflowId = null + this.retryAttempt = 0 + + return [{ type: 'cancel-retry' }] + } + + private blockWorkflow(workflowId: string | null): SocketJoinCommand[] { + if (workflowId) { + this.blockedWorkflowId = workflowId + } + + if (!this.joinedWorkflowId) { + return [] + } + + this.joinedWorkflowId = null + + return [{ type: 'leave' }] + } +} diff --git a/apps/sim/app/workspace/providers/socket-join-target.test.ts b/apps/sim/app/workspace/providers/socket-join-target.test.ts new file mode 100644 index 00000000000..b93bb85098b --- /dev/null +++ b/apps/sim/app/workspace/providers/socket-join-target.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, it } from 'vitest' +import { + isSocketWorkflowVisible, + resolveSocketWorkflowTarget, +} from '@/app/workspace/providers/socket-join-target' + +describe('socket join target helpers', () => { + it('uses the route workflow when there is no explicit workflow target', () => { + expect( + resolveSocketWorkflowTarget({ + routeWorkflowId: 'workflow-route', + explicitWorkflowId: null, + }) + ).toBe('workflow-route') + }) + + it('prefers the explicit workflow target for embedded workflows', () => { + expect( + resolveSocketWorkflowTarget({ + routeWorkflowId: null, + explicitWorkflowId: 'workflow-embedded', + }) + ).toBe('workflow-embedded') + }) + + it('lets an explicit workflow override the route workflow', () => { + expect( + resolveSocketWorkflowTarget({ + routeWorkflowId: 'workflow-route', + explicitWorkflowId: 'workflow-embedded', + }) + ).toBe('workflow-embedded') + }) + + it('treats the explicit embedded workflow as visible', () => { + expect( + isSocketWorkflowVisible({ + workflowId: 'workflow-embedded', + routeWorkflowId: null, + explicitWorkflowId: 'workflow-embedded', + }) + ).toBe(true) + }) + + it('rejects mismatched workflow visibility', () => { + expect( + isSocketWorkflowVisible({ + workflowId: 'workflow-other', + routeWorkflowId: 'workflow-route', + explicitWorkflowId: null, + }) + ).toBe(false) + }) +}) diff --git a/apps/sim/app/workspace/providers/socket-join-target.ts b/apps/sim/app/workspace/providers/socket-join-target.ts new file mode 100644 index 00000000000..8ecc2458e19 --- /dev/null +++ b/apps/sim/app/workspace/providers/socket-join-target.ts @@ -0,0 +1,28 @@ +interface ResolveSocketWorkflowTargetArgs { + routeWorkflowId?: string | null + explicitWorkflowId?: string | null +} + +export function resolveSocketWorkflowTarget({ + routeWorkflowId, + explicitWorkflowId, +}: ResolveSocketWorkflowTargetArgs): string | null { + return explicitWorkflowId ?? routeWorkflowId ?? null +} + +interface IsSocketWorkflowVisibleArgs extends ResolveSocketWorkflowTargetArgs { + workflowId?: string | null +} + +export function isSocketWorkflowVisible({ + workflowId, + routeWorkflowId, + explicitWorkflowId, +}: IsSocketWorkflowVisibleArgs): boolean { + const targetWorkflowId = workflowId ?? null + if (!targetWorkflowId) { + return false + } + + return targetWorkflowId === resolveSocketWorkflowTarget({ routeWorkflowId, explicitWorkflowId }) +} diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index b2257280c10..c8b8db0be16 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -15,6 +15,14 @@ import { useParams } from 'next/navigation' import type { Socket } from 'socket.io-client' import { getEnv } from '@/lib/core/config/env' import { generateId } from '@/lib/core/utils/uuid' +import { + type SocketJoinCommand, + SocketJoinController, +} from '@/app/workspace/providers/socket-join-controller' +import { + isSocketWorkflowVisible, + resolveSocketWorkflowTarget, +} from '@/app/workspace/providers/socket-join-target' import { useOperationQueueStore } from '@/stores/operation-queue/store' import { useWorkflowRegistry as useWorkflowRegistryStore } from '@/stores/workflows/registry/store' @@ -61,6 +69,7 @@ interface SocketContextType { leaveWorkflow: () => void retryConnection: () => void emitWorkflowOperation: ( + workflowId: string, operation: string, target: string, payload: any, @@ -141,14 +150,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) const [authFailed, setAuthFailed] = useState(false) + const [explicitWorkflowId, setExplicitWorkflowId] = useState(null) const initializedRef = useRef(false) const socketRef = useRef(null) - const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode) + const currentWorkflowIdRef = useRef(null) + const explicitWorkflowIdRef = useRef(explicitWorkflowId) + const joinControllerRef = useRef(new SocketJoinController()) + const joinRetryTimeoutRef = useRef | null>(null) const params = useParams() const urlWorkflowId = params?.workflowId as string | undefined const urlWorkflowIdRef = useRef(urlWorkflowId) urlWorkflowIdRef.current = urlWorkflowId + explicitWorkflowIdRef.current = explicitWorkflowId const eventHandlers = useRef<{ workflowOperation?: (data: any) => void @@ -164,9 +178,121 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }>({}) const positionUpdateTimeouts = useRef>(new Map()) - const isRejoiningRef = useRef(false) const pendingPositionUpdates = useRef>(new Map()) + const setVisibleWorkflowId = useCallback((workflowId: string | null) => { + currentWorkflowIdRef.current = workflowId + setCurrentWorkflowId(workflowId) + }, []) + + const getRequestedWorkflowId = useCallback(() => { + return resolveSocketWorkflowTarget({ + routeWorkflowId: urlWorkflowIdRef.current ?? null, + explicitWorkflowId: explicitWorkflowIdRef.current, + }) + }, []) + + const isWorkflowVisible = useCallback((workflowId?: string | null) => { + return isSocketWorkflowVisible({ + workflowId: workflowId ?? currentWorkflowIdRef.current, + routeWorkflowId: urlWorkflowIdRef.current ?? null, + explicitWorkflowId: explicitWorkflowIdRef.current, + }) + }, []) + + const clearJoinRetryTimeout = useCallback(() => { + if (joinRetryTimeoutRef.current !== null) { + clearTimeout(joinRetryTimeoutRef.current) + joinRetryTimeoutRef.current = null + } + }, []) + + const resetVisibleWorkflowState = useCallback((workflowId?: string | null) => { + if (workflowId) { + useOperationQueueStore.getState().cancelOperationsForWorkflow(workflowId) + } + + positionUpdateTimeouts.current.forEach((timeoutId) => { + clearTimeout(timeoutId) + }) + positionUpdateTimeouts.current.clear() + pendingPositionUpdates.current.clear() + }, []) + + const clearJoinedWorkflowState = useCallback( + (cancelOperations = false) => { + const previousWorkflowId = currentWorkflowIdRef.current + resetVisibleWorkflowState(cancelOperations ? previousWorkflowId : null) + setPresenceUsers([]) + setVisibleWorkflowId(null) + }, + [resetVisibleWorkflowState, setVisibleWorkflowId] + ) + + const executeJoinCommands = useCallback( + (commands: SocketJoinCommand[]) => { + const socketInstance = socketRef.current + + commands.forEach((command) => { + if (command.type === 'cancel-retry') { + clearJoinRetryTimeout() + return + } + + if (command.type === 'leave') { + clearJoinedWorkflowState(true) + + if (!socketInstance) { + return + } + + logger.info('Leaving current workflow room') + socketInstance.emit('leave-workflow') + return + } + + if (command.type === 'join') { + const isWorkflowSwitch = + currentWorkflowIdRef.current !== null && + currentWorkflowIdRef.current !== command.workflowId + + if (isWorkflowSwitch) { + resetVisibleWorkflowState(currentWorkflowIdRef.current) + } else { + resetVisibleWorkflowState() + } + + if (!socketInstance) { + logger.warn('Cannot join workflow room: socket not available', { + workflowId: command.workflowId, + }) + return + } + + logger.info(`Joining workflow room: ${command.workflowId}`) + socketInstance.emit('join-workflow', { + workflowId: command.workflowId, + tabSessionId: getTabSessionId(), + }) + return + } + + clearJoinRetryTimeout() + joinRetryTimeoutRef.current = setTimeout(() => { + joinRetryTimeoutRef.current = null + executeJoinCommands(joinControllerRef.current.retryJoin(command.workflowId)) + }, command.delayMs) + + logger.warn('Realtime unavailable while joining workflow, scheduling retry', { + workflowId: command.workflowId, + attempt: command.attempt, + delayMs: command.delayMs, + }) + }) + }, + [clearJoinRetryTimeout, clearJoinedWorkflowState, resetVisibleWorkflowState] + ) + const generateSocketToken = async (): Promise => { const res = await fetch('/api/auth/socket-token', { method: 'POST', @@ -244,17 +370,16 @@ export function SocketProvider({ children, user }: SocketProviderProps) { connected: socketInstance.connected, transport: socketInstance.io.engine?.transport?.name, }) - // Note: join-workflow is handled by the useEffect watching isConnected + executeJoinCommands(joinControllerRef.current.setConnected(true)) }) socketInstance.on('disconnect', (reason) => { setIsConnected(false) setIsConnecting(false) setCurrentSocketId(null) - setCurrentWorkflowId(null) - setPresenceUsers([]) + executeJoinCommands(joinControllerRef.current.setConnected(false)) + clearJoinedWorkflowState(false) - // socket.active indicates if auto-reconnect will happen if (socketInstance.active) { setIsReconnecting(true) logger.info('Socket disconnected, will auto-reconnect', { reason }) @@ -317,6 +442,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) socketInstance.on('presence-update', (users: PresenceUser[]) => { + if (!isWorkflowVisible()) { + return + } + setPresenceUsers((prev) => { const prevMap = new Map(prev.map((u) => [u.socketId, u])) @@ -334,27 +463,50 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) }) - // Handle join workflow success - confirms room membership with presence list socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { - isRejoiningRef.current = false - // Ignore stale success responses from previous navigation - if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) { + const result = joinControllerRef.current.handleJoinSuccess(workflowId) + + if (result.ignored) { logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) - return + } else { + setVisibleWorkflowId(workflowId) + setPresenceUsers(presenceUsers || []) + logger.info(`Successfully joined workflow room: ${workflowId}`, { + presenceCount: presenceUsers?.length || 0, + }) } - setCurrentWorkflowId(workflowId) - setPresenceUsers(presenceUsers || []) - logger.info(`Successfully joined workflow room: ${workflowId}`, { - presenceCount: presenceUsers?.length || 0, - }) + + executeJoinCommands(result.commands) }) - socketInstance.on('join-workflow-error', ({ error, code }) => { - isRejoiningRef.current = false - logger.error('Failed to join workflow:', { error, code }) - if (code === 'ROOM_MANAGER_UNAVAILABLE') { - triggerOfflineMode() + socketInstance.on('join-workflow-error', ({ workflowId, error, code, retryable }) => { + const result = joinControllerRef.current.handleJoinError({ workflowId, retryable }) + + if (result.ignored) { + logger.debug('Ignoring stale join-workflow-error', { + workflowId: result.workflowId, + error, + code, + }) + } else if (result.retryScheduled) { + logger.warn('Retryable workflow join failure, waiting to retry', { + workflowId: result.workflowId, + error, + code, + }) + } else if (result.apply) { + if (result.workflowId) { + useOperationQueueStore.getState().cancelOperationsForWorkflow(result.workflowId) + } + + logger.error('Failed to join workflow:', { + workflowId: result.workflowId, + error, + code, + }) } + + executeJoinCommands(result.commands) }) socketInstance.on('workflow-operation', (data) => { @@ -371,13 +523,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('workflow-deleted', (data) => { logger.warn(`Workflow ${data.workflowId} has been deleted`) - setCurrentWorkflowId((current) => { - if (current === data.workflowId) { - setPresenceUsers([]) - return null - } - return current - }) + const result = joinControllerRef.current.handleWorkflowDeleted(data.workflowId) + if (result.shouldClearCurrent) { + clearJoinedWorkflowState(true) + } + executeJoinCommands(result.commands) eventHandlers.current.workflowDeleted?.(data) }) @@ -457,6 +607,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) socketInstance.on('cursor-update', (data) => { + if (!isWorkflowVisible()) { + return + } + setPresenceUsers((prev) => { const existingIndex = prev.findIndex((user) => user.socketId === data.socketId) if (existingIndex === -1) { @@ -471,6 +625,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) socketInstance.on('selection-update', (data) => { + if (!isWorkflowVisible()) { + return + } + setPresenceUsers((prev) => { const existingIndex = prev.findIndex((user) => user.socketId === data.socketId) if (existingIndex === -1) { @@ -498,15 +656,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.warn('Operation forbidden:', error) if (error?.type === 'SESSION_ERROR') { - const workflowId = urlWorkflowIdRef.current + const workflowId = getRequestedWorkflowId() - if (workflowId && !isRejoiningRef.current) { - isRejoiningRef.current = true + if (workflowId) { logger.info(`Session expired, rejoining workflow: ${workflowId}`) - socketInstance.emit('join-workflow', { - workflowId, - tabSessionId: getTabSessionId(), - }) + executeJoinCommands(joinControllerRef.current.forceRejoinWorkflow(workflowId)) } } }) @@ -514,6 +668,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('workflow-state', async (workflowData) => { logger.info('Received workflow state from server') + if ( + !workflowData?.id || + currentWorkflowIdRef.current !== workflowData.id || + !isWorkflowVisible() + ) { + logger.info('Ignoring workflow state for inactive room', { + workflowId: workflowData?.id, + currentWorkflowId: currentWorkflowIdRef.current, + desiredWorkflowId: urlWorkflowIdRef.current, + }) + return + } + if (workflowData?.state) { try { await rehydrateWorkflowStores(workflowData.id, workflowData.state) @@ -534,6 +701,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { initializeSocket() return () => { + clearJoinRetryTimeout() positionUpdateTimeouts.current.forEach((timeoutId) => { clearTimeout(timeoutId) }) @@ -552,77 +720,34 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const hydrationPhase = useWorkflowRegistryStore((s) => s.hydration.phase) useEffect(() => { - if (!socket || !isConnected || !urlWorkflowId) return - - if (hydrationPhase === 'creating') return - - // Skip if already in the correct room - if (currentWorkflowId === urlWorkflowId) return - - logger.info( - `URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms` - ) - - if (currentWorkflowId) { - logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`) - socket.emit('leave-workflow') + if (hydrationPhase === 'creating') { + return } - logger.info(`Joining workflow room: ${urlWorkflowId}`) - socket.emit('join-workflow', { - workflowId: urlWorkflowId, - tabSessionId: getTabSessionId(), - }) - }, [socket, isConnected, urlWorkflowId, currentWorkflowId, hydrationPhase]) + executeJoinCommands(joinControllerRef.current.requestWorkflow(getRequestedWorkflowId())) + }, [ + explicitWorkflowId, + getRequestedWorkflowId, + hydrationPhase, + urlWorkflowId, + executeJoinCommands, + ]) const joinWorkflow = useCallback( (workflowId: string) => { - if (!socket || !user?.id) { - logger.warn('Cannot join workflow: socket or user not available') - return - } - - if (currentWorkflowId === workflowId) { - logger.info(`Already in workflow ${workflowId}, skipping join`) + if (!user?.id) { + logger.warn('Cannot join workflow: user not available') return } - if (currentWorkflowId) { - logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${workflowId}`) - socket.emit('leave-workflow') - } - - logger.info(`Joining workflow: ${workflowId}`) - socket.emit('join-workflow', { - workflowId, - tabSessionId: getTabSessionId(), - }) - // currentWorkflowId will be set by join-workflow-success handler + setExplicitWorkflowId(workflowId) }, - [socket, user, currentWorkflowId] + [user] ) const leaveWorkflow = useCallback(() => { - if (socket && currentWorkflowId) { - logger.info(`Leaving workflow: ${currentWorkflowId}`) - import('@/stores/operation-queue/store') - .then(({ useOperationQueueStore }) => { - useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId) - }) - .catch((error) => { - logger.warn('Failed to cancel operations for workflow:', error) - }) - socket.emit('leave-workflow') - setCurrentWorkflowId(null) - setPresenceUsers([]) - - positionUpdateTimeouts.current.forEach((timeoutId) => { - clearTimeout(timeoutId) - }) - positionUpdateTimeouts.current.clear() - pendingPositionUpdates.current.clear() - } - }, [socket, currentWorkflowId]) + setExplicitWorkflowId(null) + }, []) /** * Retry socket connection after auth failure. @@ -640,8 +765,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }, [authFailed]) const emitWorkflowOperation = useCallback( - (operation: string, target: string, payload: any, operationId?: string) => { - if (!socket || !currentWorkflowId) { + (workflowId: string, operation: string, target: string, payload: any, operationId?: string) => { + if ( + !socket || + !currentWorkflowId || + workflowId !== currentWorkflowId || + !isWorkflowVisible(workflowId) + ) { + logger.debug('Skipping workflow operation emit for inactive room', { + workflowId, + currentWorkflowId, + desiredWorkflowId: urlWorkflowIdRef.current, + operation, + target, + }) return } @@ -653,7 +790,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { if (commit) { socket.emit('workflow-operation', { - workflowId: currentWorkflowId, + workflowId, operation, target, payload, @@ -670,7 +807,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } pendingPositionUpdates.current.set(blockId, { - workflowId: currentWorkflowId, + workflowId, operation, target, payload, @@ -692,7 +829,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } } else { socket.emit('workflow-operation', { - workflowId: currentWorkflowId, + workflowId, operation, target, payload, @@ -701,7 +838,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }) } }, - [socket, currentWorkflowId] + [socket, currentWorkflowId, isWorkflowVisible] ) const emitSubblockUpdate = useCallback( @@ -712,7 +849,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { operationId: string | undefined, workflowId: string ) => { - if (!socket) { + if ( + !socket || + workflowId !== currentWorkflowIdRef.current || + !isWorkflowVisible(workflowId) + ) { logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId }) return } @@ -736,7 +877,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { operationId: string | undefined, workflowId: string ) => { - if (!socket) { + if ( + !socket || + workflowId !== currentWorkflowIdRef.current || + !isWorkflowVisible(workflowId) + ) { logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId }) return } @@ -755,7 +900,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const lastCursorEmit = useRef(0) const emitCursorUpdate = useCallback( (cursor: { x: number; y: number } | null) => { - if (!socket || !currentWorkflowId) { + if (!socket || !currentWorkflowId || !isWorkflowVisible(currentWorkflowId)) { return } @@ -772,16 +917,16 @@ export function SocketProvider({ children, user }: SocketProviderProps) { lastCursorEmit.current = now } }, - [socket, currentWorkflowId] + [socket, currentWorkflowId, isWorkflowVisible] ) const emitSelectionUpdate = useCallback( (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => { - if (socket && currentWorkflowId) { + if (socket && currentWorkflowId && isWorkflowVisible(currentWorkflowId)) { socket.emit('selection-update', { selection }) } }, - [socket, currentWorkflowId] + [socket, currentWorkflowId, isWorkflowVisible] ) const onWorkflowOperation = useCallback((handler: (data: any) => void) => { diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index dada66d9c9b..aa1433f7ae6 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -152,13 +152,23 @@ export function useCollaborativeWorkflow() { // Register emit functions with operation queue store useEffect(() => { + const registeredWorkflowId = + isConnected && currentWorkflowId === activeWorkflowId ? currentWorkflowId : null + registerEmitFunctions( emitWorkflowOperation, emitSubblockUpdate, emitVariableUpdate, - currentWorkflowId + registeredWorkflowId ) - }, [emitWorkflowOperation, emitSubblockUpdate, emitVariableUpdate, currentWorkflowId]) + }, [ + activeWorkflowId, + currentWorkflowId, + emitWorkflowOperation, + emitSubblockUpdate, + emitVariableUpdate, + isConnected, + ]) useEffect(() => { const handleWorkflowOperation = (data: any) => { diff --git a/apps/sim/socket/handlers/workflow.test.ts b/apps/sim/socket/handlers/workflow.test.ts new file mode 100644 index 00000000000..dd1c300338f --- /dev/null +++ b/apps/sim/socket/handlers/workflow.test.ts @@ -0,0 +1,203 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { IRoomManager } from '@/socket/rooms' + +const { mockGetWorkflowState, mockVerifyWorkflowAccess } = vi.hoisted(() => ({ + mockGetWorkflowState: vi.fn(), + mockVerifyWorkflowAccess: vi.fn(), +})) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +vi.mock('@sim/db', () => ({ + db: { select: vi.fn() }, + user: { image: 'image' }, +})) + +vi.mock('@/socket/database/operations', () => ({ + getWorkflowState: mockGetWorkflowState, +})) + +vi.mock('@/socket/middleware/permissions', () => ({ + verifyWorkflowAccess: mockVerifyWorkflowAccess, +})) + +import { setupWorkflowHandlers } from '@/socket/handlers/workflow' + +interface JoinWorkflowPayload { + workflowId: string + tabSessionId?: string +} + +function createSocket(overrides?: Partial>) { + const handlers: Record Promise | void> = {} + const socket = { + id: 'socket-1', + userId: 'user-1', + userName: 'Test User', + userImage: 'avatar.png', + on: vi.fn((event: string, handler: (payload: JoinWorkflowPayload) => Promise | void) => { + handlers[event] = handler + }), + emit: vi.fn(), + join: vi.fn(), + leave: vi.fn(), + ...overrides, + } + + return { + handlers, + socket, + } +} + +function createRoomManager(overrides?: Partial): IRoomManager { + return { + isReady: vi.fn().mockReturnValue(true), + getWorkflowIdForSocket: vi.fn().mockResolvedValue(null), + removeUserFromRoom: vi.fn().mockResolvedValue(null), + broadcastPresenceUpdate: vi.fn().mockResolvedValue(undefined), + getWorkflowUsers: vi.fn().mockResolvedValue([]), + hasWorkflowRoom: vi.fn().mockResolvedValue(false), + addUserToRoom: vi.fn().mockResolvedValue(undefined), + getUserSession: vi.fn().mockResolvedValue(null), + updateUserActivity: vi.fn().mockResolvedValue(undefined), + updateRoomLastModified: vi.fn().mockResolvedValue(undefined), + emitToWorkflow: vi.fn(), + getUniqueUserCount: vi.fn().mockResolvedValue(1), + getTotalActiveConnections: vi.fn().mockResolvedValue(0), + handleWorkflowDeletion: vi.fn().mockResolvedValue(undefined), + handleWorkflowRevert: vi.fn().mockResolvedValue(undefined), + handleWorkflowUpdate: vi.fn().mockResolvedValue(undefined), + shutdown: vi.fn().mockResolvedValue(undefined), + initialize: vi.fn().mockResolvedValue(undefined), + io: { + in: vi.fn().mockReturnValue({ + fetchSockets: vi.fn().mockResolvedValue([]), + socketsLeave: vi.fn().mockResolvedValue(undefined), + }), + }, + ...overrides, + } as unknown as IRoomManager +} + +describe('setupWorkflowHandlers', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetWorkflowState.mockResolvedValue({ id: 'workflow-1', state: {} }) + mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true, role: 'admin' }) + }) + + it('includes workflowId when authentication is missing', async () => { + const { socket, handlers } = createSocket({ userId: undefined, userName: undefined }) + const roomManager = createRoomManager() + + setupWorkflowHandlers( + socket as unknown as Parameters[0], + roomManager + ) + + await handlers['join-workflow']({ workflowId: 'workflow-1', tabSessionId: 'tab-1' }) + + expect(socket.emit).toHaveBeenCalledWith('join-workflow-error', { + workflowId: 'workflow-1', + error: 'Authentication required', + code: 'AUTHENTICATION_REQUIRED', + retryable: false, + }) + }) + + it('includes workflowId when realtime is unavailable', async () => { + const { socket, handlers } = createSocket() + const roomManager = createRoomManager({ + isReady: vi.fn().mockReturnValue(false), + }) + + setupWorkflowHandlers( + socket as unknown as Parameters[0], + roomManager + ) + + await handlers['join-workflow']({ workflowId: 'workflow-1', tabSessionId: 'tab-1' }) + + expect(socket.emit).toHaveBeenCalledWith('join-workflow-error', { + workflowId: 'workflow-1', + error: 'Realtime unavailable', + code: 'ROOM_MANAGER_UNAVAILABLE', + retryable: true, + }) + }) + + it('includes workflowId when access is denied', async () => { + mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: false }) + + const { socket, handlers } = createSocket() + const roomManager = createRoomManager() + + setupWorkflowHandlers( + socket as unknown as Parameters[0], + roomManager + ) + + await handlers['join-workflow']({ workflowId: 'workflow-1', tabSessionId: 'tab-1' }) + + expect(socket.emit).toHaveBeenCalledWith('join-workflow-error', { + workflowId: 'workflow-1', + error: 'Access denied to workflow', + code: 'ACCESS_DENIED', + retryable: false, + }) + }) + + it('marks workflow access verification failures as retryable', async () => { + mockVerifyWorkflowAccess.mockRejectedValue(new Error('database unavailable')) + + const { socket, handlers } = createSocket() + const roomManager = createRoomManager() + + setupWorkflowHandlers( + socket as unknown as Parameters[0], + roomManager + ) + + await handlers['join-workflow']({ workflowId: 'workflow-1', tabSessionId: 'tab-1' }) + + expect(socket.emit).toHaveBeenCalledWith('join-workflow-error', { + workflowId: 'workflow-1', + error: 'Failed to verify workflow access', + code: 'VERIFY_WORKFLOW_ACCESS_FAILED', + retryable: true, + }) + }) + + it('includes workflowId when an unexpected join failure occurs', async () => { + const { socket, handlers } = createSocket() + const roomManager = createRoomManager({ + getWorkflowIdForSocket: vi.fn().mockRejectedValue(new Error('boom')), + removeUserFromRoom: vi.fn().mockResolvedValue(null), + }) + + setupWorkflowHandlers( + socket as unknown as Parameters[0], + roomManager + ) + + await handlers['join-workflow']({ workflowId: 'workflow-1', tabSessionId: 'tab-1' }) + + expect(socket.emit).toHaveBeenCalledWith('join-workflow-error', { + workflowId: 'workflow-1', + error: 'Failed to join workflow', + code: 'JOIN_WORKFLOW_FAILED', + retryable: true, + }) + }) +}) diff --git a/apps/sim/socket/handlers/workflow.ts b/apps/sim/socket/handlers/workflow.ts index 8353f0a3885..8796f2a3190 100644 --- a/apps/sim/socket/handlers/workflow.ts +++ b/apps/sim/socket/handlers/workflow.ts @@ -16,15 +16,22 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: if (!userId || !userName) { logger.warn(`Join workflow rejected: Socket ${socket.id} not authenticated`) - socket.emit('join-workflow-error', { error: 'Authentication required' }) + socket.emit('join-workflow-error', { + workflowId, + error: 'Authentication required', + code: 'AUTHENTICATION_REQUIRED', + retryable: false, + }) return } if (!roomManager.isReady()) { logger.warn(`Join workflow rejected: Room manager unavailable`) socket.emit('join-workflow-error', { + workflowId, error: 'Realtime unavailable', code: 'ROOM_MANAGER_UNAVAILABLE', + retryable: true, }) return } @@ -37,13 +44,23 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: const accessInfo = await verifyWorkflowAccess(userId, workflowId) if (!accessInfo.hasAccess) { logger.warn(`User ${userId} (${userName}) denied access to workflow ${workflowId}`) - socket.emit('join-workflow-error', { error: 'Access denied to workflow' }) + socket.emit('join-workflow-error', { + workflowId, + error: 'Access denied to workflow', + code: 'ACCESS_DENIED', + retryable: false, + }) return } userRole = accessInfo.role || 'read' } catch (error) { logger.warn(`Error verifying workflow access for ${userId}:`, error) - socket.emit('join-workflow-error', { error: 'Failed to verify workflow access' }) + socket.emit('join-workflow-error', { + workflowId, + error: 'Failed to verify workflow access', + code: 'VERIFY_WORKFLOW_ACCESS_FAILED', + retryable: true, + }) return } @@ -179,8 +196,10 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: await roomManager.removeUserFromRoom(socket.id, workflowId) const isReady = roomManager.isReady() socket.emit('join-workflow-error', { + workflowId, error: isReady ? 'Failed to join workflow' : 'Realtime unavailable', - code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE', + code: isReady ? 'JOIN_WORKFLOW_FAILED' : 'ROOM_MANAGER_UNAVAILABLE', + retryable: true, }) } }) diff --git a/apps/sim/stores/operation-queue/store.test.ts b/apps/sim/stores/operation-queue/store.test.ts new file mode 100644 index 00000000000..b37f673e68b --- /dev/null +++ b/apps/sim/stores/operation-queue/store.test.ts @@ -0,0 +1,84 @@ +/** + * @vitest-environment node + */ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), +})) + +import { registerEmitFunctions, useOperationQueueStore } from '@/stores/operation-queue/store' + +describe('operation queue room gating', () => { + beforeEach(() => { + vi.clearAllMocks() + useOperationQueueStore.setState({ + operations: [], + isProcessing: false, + hasOperationError: false, + }) + registerEmitFunctions(vi.fn(), vi.fn(), vi.fn(), null) + }) + + afterEach(() => { + useOperationQueueStore.setState({ + operations: [], + isProcessing: false, + hasOperationError: false, + }) + registerEmitFunctions(vi.fn(), vi.fn(), vi.fn(), null) + }) + + it('does not process workflow operations while no workflow is registered', () => { + const workflowEmit = vi.fn() + registerEmitFunctions(workflowEmit, vi.fn(), vi.fn(), null) + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + expect(workflowEmit).not.toHaveBeenCalled() + }) + + it('waits until the matching workflow is registered before emitting', () => { + const workflowEmit = vi.fn() + registerEmitFunctions(workflowEmit, vi.fn(), vi.fn(), null) + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + registerEmitFunctions(workflowEmit, vi.fn(), vi.fn(), 'workflow-b') + expect(workflowEmit).not.toHaveBeenCalled() + + registerEmitFunctions(workflowEmit, vi.fn(), vi.fn(), 'workflow-a') + expect(workflowEmit).toHaveBeenCalledWith( + 'workflow-a', + 'replace-state', + 'workflow', + { state: {} }, + 'op-1' + ) + + useOperationQueueStore.getState().confirmOperation('op-1') + }) +}) diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index 1ceac4a552e..052bf9c1213 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -31,7 +31,13 @@ const retryTimeouts = new Map() const operationTimeouts = new Map() let emitWorkflowOperation: - | ((operation: string, target: string, payload: any, operationId?: string) => void) + | (( + workflowId: string, + operation: string, + target: string, + payload: any, + operationId?: string + ) => void) | null = null let emitSubblockUpdate: | (( @@ -53,7 +59,13 @@ let emitVariableUpdate: | null = null export function registerEmitFunctions( - workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void, + workflowEmit: ( + workflowId: string, + operation: string, + target: string, + payload: any, + operationId?: string + ) => void, subblockEmit: ( blockId: string, subblockId: string, @@ -375,7 +387,7 @@ export const useOperationQueueStore = create((set, get) => } } else { if (emitWorkflowOperation) { - emitWorkflowOperation(op, target, payload, nextOperation.id) + emitWorkflowOperation(nextOperation.workflowId, op, target, payload, nextOperation.id) } } diff --git a/bun.lock b/bun.lock index e05bc532f5e..f8bde9a6cf3 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "simstudio", From e5f55af465b3dcb76667f17b95a7b23aaf0f3930 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 10 Apr 2026 18:43:50 -0700 Subject: [PATCH 2/2] address comments --- .../workspace-permissions-provider.tsx | 65 ++++++++++++------- .../providers/socket-join-controller.test.ts | 1 - .../providers/socket-join-controller.ts | 48 +++++--------- .../workspace/providers/socket-provider.tsx | 39 ++++++++++- 4 files changed, 95 insertions(+), 58 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index 6a29608b7e4..9dd8582c931 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -59,40 +59,61 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP const hasOperationError = useOperationQueueStore((state) => state.hasOperationError) const addNotification = useNotificationStore((state) => state.addNotification) const removeNotification = useNotificationStore((state) => state.removeNotification) - const { isReconnecting } = useSocket() - const reconnectingNotificationIdRef = useRef(null) + const { isReconnecting, isRetryingWorkflowJoin } = useSocket() + const realtimeStatusNotificationIdRef = useRef(null) + const realtimeStatusNotificationMessageRef = useRef(null) const isOfflineMode = hasOperationError + const realtimeStatusMessage = isReconnecting + ? 'Reconnecting...' + : isRetryingWorkflowJoin + ? 'Joining workflow...' + : null + + const clearRealtimeStatusNotification = useCallback(() => { + if (!realtimeStatusNotificationIdRef.current) { + return + } + + removeNotification(realtimeStatusNotificationIdRef.current) + realtimeStatusNotificationIdRef.current = null + realtimeStatusNotificationMessageRef.current = null + }, [removeNotification]) useEffect(() => { - if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) { - const id = addNotification({ - level: 'error', - message: 'Reconnecting...', - }) - reconnectingNotificationIdRef.current = id - } else if (!isReconnecting && reconnectingNotificationIdRef.current) { - removeNotification(reconnectingNotificationIdRef.current) - reconnectingNotificationIdRef.current = null + if (isOfflineMode || !realtimeStatusMessage) { + clearRealtimeStatusNotification() + return } - return () => { - if (reconnectingNotificationIdRef.current) { - removeNotification(reconnectingNotificationIdRef.current) - reconnectingNotificationIdRef.current = null - } + if ( + realtimeStatusNotificationIdRef.current && + realtimeStatusNotificationMessageRef.current === realtimeStatusMessage + ) { + return } - }, [isReconnecting, isOfflineMode, addNotification, removeNotification]) + + clearRealtimeStatusNotification() + + const id = addNotification({ + level: 'error', + message: realtimeStatusMessage, + }) + + realtimeStatusNotificationIdRef.current = id + realtimeStatusNotificationMessageRef.current = realtimeStatusMessage + }, [addNotification, clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage]) + + useEffect(() => { + return clearRealtimeStatusNotification + }, [clearRealtimeStatusNotification]) useEffect(() => { if (!isOfflineMode || hasShownOfflineNotification) { return } - if (reconnectingNotificationIdRef.current) { - removeNotification(reconnectingNotificationIdRef.current) - reconnectingNotificationIdRef.current = null - } + clearRealtimeStatusNotification() try { addNotification({ @@ -107,7 +128,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP } catch (error) { logger.error('Failed to add offline notification', { error }) } - }, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode]) + }, [addNotification, clearRealtimeStatusNotification, hasShownOfflineNotification, isOfflineMode]) const { data: workspacePermissions, diff --git a/apps/sim/app/workspace/providers/socket-join-controller.test.ts b/apps/sim/app/workspace/providers/socket-join-controller.test.ts index 10850cfd3fb..f79db4ca542 100644 --- a/apps/sim/app/workspace/providers/socket-join-controller.test.ts +++ b/apps/sim/app/workspace/providers/socket-join-controller.test.ts @@ -122,7 +122,6 @@ describe('SocketJoinController', () => { expect(errorResult.apply).toBe(false) expect(errorResult.retryScheduled).toBe(true) - expect(errorResult.retriesExhausted).toBe(false) expect(errorResult.commands).toEqual([ { type: 'schedule-retry', diff --git a/apps/sim/app/workspace/providers/socket-join-controller.ts b/apps/sim/app/workspace/providers/socket-join-controller.ts index 645780f3556..b3fb18b5e88 100644 --- a/apps/sim/app/workspace/providers/socket-join-controller.ts +++ b/apps/sim/app/workspace/providers/socket-join-controller.ts @@ -12,23 +12,22 @@ export type SocketJoinCommand = delayMs: number } -export interface SocketJoinSuccessResult { +interface SocketJoinSuccessResult { apply: boolean commands: SocketJoinCommand[] ignored: boolean workflowId: string } -export interface SocketJoinErrorResult { +interface SocketJoinErrorResult { apply: boolean commands: SocketJoinCommand[] ignored: boolean retryScheduled: boolean - retriesExhausted: boolean workflowId: string | null } -export interface SocketJoinDeleteResult { +interface SocketJoinDeleteResult { commands: SocketJoinCommand[] shouldClearCurrent: boolean } @@ -45,14 +44,6 @@ export class SocketJoinController { private retryAttempt = 0 private isConnected = false - getDesiredWorkflowId(): string | null { - return this.desiredWorkflowId - } - - getPendingJoinWorkflowId(): string | null { - return this.pendingJoinWorkflowId - } - getJoinedWorkflowId(): string | null { return this.joinedWorkflowId } @@ -171,20 +162,18 @@ export class SocketJoinController { commands: [...baseCommands, ...this.flush()], ignored: true, retryScheduled: false, - retriesExhausted: false, workflowId: resolvedWorkflowId, } } if (retryable && resolvedWorkflowId) { - const retryResult = this.scheduleRetry(resolvedWorkflowId) + const commands = this.scheduleRetry(resolvedWorkflowId) return { apply: false, - commands: [...baseCommands, ...retryResult.commands], + commands: [...baseCommands, ...commands], ignored: false, - retryScheduled: retryResult.retryScheduled, - retriesExhausted: false, + retryScheduled: true, workflowId: resolvedWorkflowId, } } @@ -196,7 +185,6 @@ export class SocketJoinController { commands: [...this.clearRetryCommands(), ...leaveCommands, ...this.flush()], ignored: false, retryScheduled: false, - retriesExhausted: false, workflowId: resolvedWorkflowId, } } @@ -240,10 +228,7 @@ export class SocketJoinController { return [{ type: 'join', workflowId: this.desiredWorkflowId }] } - private scheduleRetry(workflowId: string): { - commands: SocketJoinCommand[] - retryScheduled: boolean - } { + private scheduleRetry(workflowId: string): SocketJoinCommand[] { const nextAttempt = this.retryWorkflowId === workflowId ? this.retryAttempt + 1 : 1 const delayMs = Math.min( SOCKET_JOIN_RETRY_BASE_DELAY_MS * 2 ** Math.max(0, nextAttempt - 1), @@ -253,17 +238,14 @@ export class SocketJoinController { this.retryWorkflowId = workflowId this.retryAttempt = nextAttempt - return { - commands: [ - { - type: 'schedule-retry', - workflowId, - attempt: nextAttempt, - delayMs, - }, - ], - retryScheduled: true, - } + return [ + { + type: 'schedule-retry', + workflowId, + attempt: nextAttempt, + delayMs, + }, + ] } private takeRetryResetCommands(nextWorkflowId?: string | null): SocketJoinCommand[] { diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index c8b8db0be16..638e0cda063 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -61,6 +61,7 @@ interface SocketContextType { isConnected: boolean isConnecting: boolean isReconnecting: boolean + isRetryingWorkflowJoin: boolean authFailed: boolean currentWorkflowId: string | null currentSocketId: string | null @@ -110,6 +111,7 @@ const SocketContext = createContext({ isConnected: false, isConnecting: false, isReconnecting: false, + isRetryingWorkflowJoin: false, authFailed: false, currentWorkflowId: null, currentSocketId: null, @@ -146,6 +148,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [isConnected, setIsConnected] = useState(false) const [isConnecting, setIsConnecting] = useState(false) const [isReconnecting, setIsReconnecting] = useState(false) + const [isRetryingWorkflowJoin, setIsRetryingWorkflowJoin] = useState(false) const [currentWorkflowId, setCurrentWorkflowId] = useState(null) const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) @@ -236,10 +239,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) { commands.forEach((command) => { if (command.type === 'cancel-retry') { clearJoinRetryTimeout() + setIsRetryingWorkflowJoin(false) return } if (command.type === 'leave') { + setIsRetryingWorkflowJoin(false) clearJoinedWorkflowState(true) if (!socketInstance) { @@ -278,6 +283,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { } clearJoinRetryTimeout() + setIsRetryingWorkflowJoin(true) joinRetryTimeoutRef.current = setTimeout(() => { joinRetryTimeoutRef.current = null executeJoinCommands(joinControllerRef.current.retryJoin(command.workflowId)) @@ -376,6 +382,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('disconnect', (reason) => { setIsConnected(false) setIsConnecting(false) + setIsRetryingWorkflowJoin(false) setCurrentSocketId(null) executeJoinCommands(joinControllerRef.current.setConnected(false)) clearJoinedWorkflowState(false) @@ -469,6 +476,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { if (result.ignored) { logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) } else { + setIsRetryingWorkflowJoin(false) setVisibleWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) logger.info(`Successfully joined workflow room: ${workflowId}`, { @@ -495,6 +503,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { code, }) } else if (result.apply) { + setIsRetryingWorkflowJoin(false) if (result.workflowId) { useOperationQueueStore.getState().cancelOperationsForWorkflow(result.workflowId) } @@ -854,7 +863,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { workflowId !== currentWorkflowIdRef.current || !isWorkflowVisible(workflowId) ) { - logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId }) + const reason = !socket + ? 'socket_unavailable' + : workflowId !== currentWorkflowIdRef.current + ? 'joined_workflow_mismatch' + : 'workflow_not_visible' + + logger.debug('Skipping subblock update emit', { + workflowId, + blockId, + subblockId, + reason, + currentWorkflowId: currentWorkflowIdRef.current, + }) return } socket.emit('subblock-update', { @@ -882,7 +903,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { workflowId !== currentWorkflowIdRef.current || !isWorkflowVisible(workflowId) ) { - logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId }) + const reason = !socket + ? 'socket_unavailable' + : workflowId !== currentWorkflowIdRef.current + ? 'joined_workflow_mismatch' + : 'workflow_not_visible' + + logger.debug('Skipping variable update emit', { + workflowId, + variableId, + field, + reason, + currentWorkflowId: currentWorkflowIdRef.current, + }) return } socket.emit('variable-update', { @@ -975,6 +1008,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { isConnected, isConnecting, isReconnecting, + isRetryingWorkflowJoin, authFailed, currentWorkflowId, currentSocketId, @@ -1003,6 +1037,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { isConnected, isConnecting, isReconnecting, + isRetryingWorkflowJoin, authFailed, currentWorkflowId, currentSocketId,