Skip to content

Commit e39fa94

Browse files
fix(wait): resume live/draft async waits and preserve cell context on chained waits
1 parent 7e751b8 commit e39fa94

5 files changed

Lines changed: 145 additions & 4 deletions

File tree

apps/sim/executor/execution/executor.test.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,3 +330,45 @@ describe('DAGExecutor run-from-block snapshot metadata', () => {
330330
expect(capturedContext?.blockStates.has('unreachable__obranch-0')).toBe(false)
331331
})
332332
})
333+
334+
describe('DAGExecutor createExecutionContext useDraftState', () => {
335+
function buildMetadataUseDraftState(opts: {
336+
metadataUseDraftState?: boolean
337+
isDeployedContext?: boolean
338+
}): boolean | undefined {
339+
const executor = new DAGExecutor({
340+
workflow: { version: '1', blocks: [], connections: [] },
341+
contextExtensions: {
342+
workspaceId: 'ws-1',
343+
isDeployedContext: opts.isDeployedContext,
344+
metadata:
345+
opts.metadataUseDraftState === undefined
346+
? undefined
347+
: ({ useDraftState: opts.metadataUseDraftState } as ExecutionContext['metadata']),
348+
},
349+
})
350+
const { context } = (
351+
executor as unknown as {
352+
createExecutionContext: (workflowId: string) => { context: ExecutionContext }
353+
}
354+
).createExecutionContext('wf-1')
355+
return context.metadata.useDraftState
356+
}
357+
358+
it('honors explicit useDraftState=true even when isDeployedContext is true (table dispatcher)', () => {
359+
expect(
360+
buildMetadataUseDraftState({ metadataUseDraftState: true, isDeployedContext: true })
361+
).toBe(true)
362+
})
363+
364+
it('honors explicit useDraftState=false even when isDeployedContext is false', () => {
365+
expect(
366+
buildMetadataUseDraftState({ metadataUseDraftState: false, isDeployedContext: false })
367+
).toBe(false)
368+
})
369+
370+
it('falls back to the isDeployedContext heuristic when useDraftState is not provided', () => {
371+
expect(buildMetadataUseDraftState({ isDeployedContext: true })).toBe(false)
372+
expect(buildMetadataUseDraftState({ isDeployedContext: false })).toBe(true)
373+
})
374+
})

apps/sim/executor/execution/executor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ export class DAGExecutor {
427427
...this.contextExtensions.metadata,
428428
startTime: new Date().toISOString(),
429429
duration: 0,
430-
useDraftState: this.contextExtensions.isDeployedContext !== true,
430+
useDraftState:
431+
this.contextExtensions.metadata?.useDraftState ??
432+
this.contextExtensions.isDeployedContext !== true,
431433
},
432434
environmentVariables: this.environmentVariables,
433435
workflowVariables: this.workflowVariables,

apps/sim/executor/execution/snapshot-serializer.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,25 @@ describe('serializePauseSnapshot', () => {
142142
stringifySpy.mockRestore()
143143
}
144144
})
145+
146+
it('preserves an explicit useDraftState=true even when the context is a deployed (server-side) context', () => {
147+
const context = createContext({
148+
isDeployedContext: true,
149+
metadata: {
150+
requestId: 'request-1',
151+
executionId: 'execution-1',
152+
workflowId: 'workflow-1',
153+
workspaceId: 'workspace-1',
154+
userId: 'user-1',
155+
triggerType: 'manual',
156+
useDraftState: true,
157+
startTime: '2026-01-01T00:00:00.000Z',
158+
},
159+
})
160+
161+
const snapshot = serializePauseSnapshot(context, ['next-block'])
162+
const serialized = JSON.parse(snapshot.snapshot)
163+
164+
expect(serialized.metadata.useDraftState).toBe(true)
165+
})
145166
})

apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
/**
22
* @vitest-environment node
33
*/
4-
import { describe, expect, it } from 'vitest'
5-
import { updateResumeOutputInAggregationBuffers } from '@/lib/workflows/executor/human-in-the-loop-manager'
4+
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
vi.mock('@sim/db', () => dbChainMock)
8+
9+
import {
10+
PauseResumeManager,
11+
updateResumeOutputInAggregationBuffers,
12+
} from '@/lib/workflows/executor/human-in-the-loop-manager'
613
import type { SerializableExecutionState } from '@/executor/execution/types'
14+
import type { PausePoint, SerializedSnapshot } from '@/executor/types'
715

816
function createExecutionState(): SerializableExecutionState {
917
return {
@@ -143,3 +151,68 @@ describe('updateResumeOutputInAggregationBuffers', () => {
143151
})
144152
})
145153
})
154+
155+
describe('PauseResumeManager.persistPauseResult metadata merge on re-pause', () => {
156+
beforeEach(() => {
157+
vi.clearAllMocks()
158+
resetDbChainMock()
159+
})
160+
161+
it('preserves the stashed cellContext when an existing paused row re-pauses (chained waits)', async () => {
162+
const cellContext = {
163+
tableId: 'table-1',
164+
rowId: 'row-1',
165+
workspaceId: 'workspace-1',
166+
groupId: 'group-1',
167+
workflowId: 'workflow-1',
168+
}
169+
const existingRow = {
170+
id: 'paused-exec-1',
171+
workflowId: 'workflow-1',
172+
executionId: 'execution-1',
173+
status: 'partially_resumed',
174+
pausePoints: {
175+
'ctx-wait-1': { contextId: 'ctx-wait-1', blockId: 'wait1', resumeStatus: 'resuming' },
176+
},
177+
metadata: {
178+
pauseScope: 'execution',
179+
triggerIds: ['start'],
180+
executorUserId: 'user-1',
181+
cellContext,
182+
},
183+
}
184+
185+
// First `.limit(1)` resolves the select-for-update to the existing row,
186+
// forcing persistPauseResult down the update (not insert) branch.
187+
dbChainMockFns.limit.mockResolvedValueOnce([existingRow])
188+
189+
const snapshotSeed: SerializedSnapshot = { snapshot: '{}', triggerIds: [] }
190+
const pausePoints: PausePoint[] = [
191+
{
192+
contextId: 'ctx-wait-2',
193+
blockId: 'wait2',
194+
pauseKind: 'time',
195+
resumeAt: new Date(Date.now() + 60_000).toISOString(),
196+
resumeStatus: 'paused',
197+
} as PausePoint,
198+
]
199+
200+
await PauseResumeManager.persistPauseResult({
201+
workflowId: 'workflow-1',
202+
executionId: 'execution-1',
203+
pausePoints,
204+
snapshotSeed,
205+
executorUserId: 'user-1',
206+
})
207+
208+
const updateSetCall = dbChainMockFns.set.mock.calls.find(
209+
([arg]) => arg && typeof arg === 'object' && 'metadata' in (arg as Record<string, unknown>)
210+
)
211+
expect(updateSetCall).toBeDefined()
212+
213+
const updatedMetadata = (updateSetCall![0] as { metadata: Record<string, unknown> }).metadata
214+
expect(updatedMetadata.cellContext).toEqual(cellContext)
215+
expect(updatedMetadata.pauseScope).toBe('execution')
216+
expect(updatedMetadata.executorUserId).toBe('user-1')
217+
})
218+
})

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,10 @@ export class PauseResumeManager {
349349
totalPauseCount,
350350
resumedCount,
351351
status: nextStatus,
352-
metadata,
352+
// Merge rather than replace: foreign keys like `cellContext` (stashed
353+
// by the table cell task) live on the same metadata column and must
354+
// survive a re-pause so chained-wait resumes can still write the row back.
355+
metadata: { ...((existing.metadata as Record<string, unknown>) ?? {}), ...metadata },
353356
updatedAt: now,
354357
nextResumeAt: mergedNextResumeAt,
355358
})

0 commit comments

Comments
 (0)