Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions apps/sim/executor/execution/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,45 @@ describe('DAGExecutor run-from-block snapshot metadata', () => {
expect(capturedContext?.blockStates.has('unreachable__obranch-0')).toBe(false)
})
})

describe('DAGExecutor createExecutionContext useDraftState', () => {
function buildMetadataUseDraftState(opts: {
metadataUseDraftState?: boolean
isDeployedContext?: boolean
}): boolean | undefined {
const executor = new DAGExecutor({
workflow: { version: '1', blocks: [], connections: [] },
contextExtensions: {
workspaceId: 'ws-1',
isDeployedContext: opts.isDeployedContext,
metadata:
opts.metadataUseDraftState === undefined
? undefined
: ({ useDraftState: opts.metadataUseDraftState } as ExecutionContext['metadata']),
},
})
const { context } = (
executor as unknown as {
createExecutionContext: (workflowId: string) => { context: ExecutionContext }
}
).createExecutionContext('wf-1')
return context.metadata.useDraftState
}

it('honors explicit useDraftState=true even when isDeployedContext is true (table dispatcher)', () => {
expect(
buildMetadataUseDraftState({ metadataUseDraftState: true, isDeployedContext: true })
).toBe(true)
})

it('honors explicit useDraftState=false even when isDeployedContext is false', () => {
expect(
buildMetadataUseDraftState({ metadataUseDraftState: false, isDeployedContext: false })
).toBe(false)
})

it('falls back to the isDeployedContext heuristic when useDraftState is not provided', () => {
expect(buildMetadataUseDraftState({ isDeployedContext: true })).toBe(false)
expect(buildMetadataUseDraftState({ isDeployedContext: false })).toBe(true)
})
})
4 changes: 3 additions & 1 deletion apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,9 @@ export class DAGExecutor {
...this.contextExtensions.metadata,
startTime: new Date().toISOString(),
duration: 0,
useDraftState: this.contextExtensions.isDeployedContext !== true,
useDraftState:
this.contextExtensions.metadata?.useDraftState ??
this.contextExtensions.isDeployedContext !== true,
Comment thread
TheodoreSpeaks marked this conversation as resolved.
},
environmentVariables: this.environmentVariables,
workflowVariables: this.workflowVariables,
Expand Down
21 changes: 21 additions & 0 deletions apps/sim/executor/execution/snapshot-serializer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,25 @@ describe('serializePauseSnapshot', () => {
stringifySpy.mockRestore()
}
})

it('preserves an explicit useDraftState=true even when the context is a deployed (server-side) context', () => {
const context = createContext({
isDeployedContext: true,
metadata: {
requestId: 'request-1',
executionId: 'execution-1',
workflowId: 'workflow-1',
workspaceId: 'workspace-1',
userId: 'user-1',
triggerType: 'manual',
useDraftState: true,
startTime: '2026-01-01T00:00:00.000Z',
},
})

const snapshot = serializePauseSnapshot(context, ['next-block'])
const serialized = JSON.parse(snapshot.snapshot)

expect(serialized.metadata.useDraftState).toBe(true)
})
})
2 changes: 1 addition & 1 deletion apps/sim/lib/api/contracts/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export const workflowStateSchema = z.object({
metadata: z
.object({
name: z.string().optional(),
description: z.string().optional(),
description: z.string().nullable().optional(),
})
.optional(),
})
Expand Down
77 changes: 75 additions & 2 deletions apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { updateResumeOutputInAggregationBuffers } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

vi.mock('@sim/db', () => dbChainMock)

import {
PauseResumeManager,
updateResumeOutputInAggregationBuffers,
} from '@/lib/workflows/executor/human-in-the-loop-manager'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { PausePoint, SerializedSnapshot } from '@/executor/types'

function createExecutionState(): SerializableExecutionState {
return {
Expand Down Expand Up @@ -143,3 +151,68 @@ describe('updateResumeOutputInAggregationBuffers', () => {
})
})
})

describe('PauseResumeManager.persistPauseResult metadata merge on re-pause', () => {
beforeEach(() => {
vi.clearAllMocks()
resetDbChainMock()
})

it('preserves the stashed cellContext when an existing paused row re-pauses (chained waits)', async () => {
const cellContext = {
tableId: 'table-1',
rowId: 'row-1',
workspaceId: 'workspace-1',
groupId: 'group-1',
workflowId: 'workflow-1',
}
const existingRow = {
id: 'paused-exec-1',
workflowId: 'workflow-1',
executionId: 'execution-1',
status: 'partially_resumed',
pausePoints: {
'ctx-wait-1': { contextId: 'ctx-wait-1', blockId: 'wait1', resumeStatus: 'resuming' },
},
metadata: {
pauseScope: 'execution',
triggerIds: ['start'],
executorUserId: 'user-1',
cellContext,
},
}

// First `.limit(1)` resolves the select-for-update to the existing row,
// forcing persistPauseResult down the update (not insert) branch.
dbChainMockFns.limit.mockResolvedValueOnce([existingRow])

const snapshotSeed: SerializedSnapshot = { snapshot: '{}', triggerIds: [] }
const pausePoints: PausePoint[] = [
{
contextId: 'ctx-wait-2',
blockId: 'wait2',
pauseKind: 'time',
resumeAt: new Date(Date.now() + 60_000).toISOString(),
resumeStatus: 'paused',
} as PausePoint,
]

await PauseResumeManager.persistPauseResult({
workflowId: 'workflow-1',
executionId: 'execution-1',
pausePoints,
snapshotSeed,
executorUserId: 'user-1',
})

const updateSetCall = dbChainMockFns.set.mock.calls.find(
([arg]) => arg && typeof arg === 'object' && 'metadata' in (arg as Record<string, unknown>)
)
expect(updateSetCall).toBeDefined()

const updatedMetadata = (updateSetCall![0] as { metadata: Record<string, unknown> }).metadata
expect(updatedMetadata.cellContext).toEqual(cellContext)
expect(updatedMetadata.pauseScope).toBe('execution')
expect(updatedMetadata.executorUserId).toBe('user-1')
})
})
5 changes: 4 additions & 1 deletion apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ export class PauseResumeManager {
totalPauseCount,
resumedCount,
status: nextStatus,
metadata,
// Merge rather than replace: foreign keys like `cellContext` (stashed
// by the table cell task) live on the same metadata column and must
// survive a re-pause so chained-wait resumes can still write the row back.
metadata: { ...((existing.metadata as Record<string, unknown>) ?? {}), ...metadata },
updatedAt: now,
nextResumeAt: mergedNextResumeAt,
})
Expand Down
Loading