Skip to content

Commit 70d0777

Browse files
committed
address comments
1 parent 32a9a2f commit 70d0777

28 files changed

Lines changed: 1683 additions & 197 deletions

apps/sim/executor/dag/construction/edges.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ describe('EdgeConstructor', () => {
188188
sourceHandle: 'loop_exit',
189189
targetHandle: undefined,
190190
})
191+
expect(Array.from(dag.nodes.get(loopEndId)!.outgoingEdges.values())).toContainEqual({
192+
target: afterTemplateId,
193+
sourceHandle: 'loop_exit',
194+
targetHandle: undefined,
195+
})
191196
expect(dag.nodes.get(loopEndId)!.incomingEdges).not.toContain(loopStartId)
192197
})
193198

apps/sim/executor/dag/construction/edges.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,9 @@ export class EdgeConstructor {
346346
}
347347
}
348348

349-
this.addEdge(dag, sentinelEndId, sentinelStartId, EDGE.LOOP_CONTINUE, undefined, true)
349+
this.addEdge(dag, sentinelEndId, sentinelStartId, EDGE.LOOP_CONTINUE, undefined, {
350+
registerIncoming: false,
351+
})
350352
}
351353
}
352354

@@ -385,7 +387,9 @@ export class EdgeConstructor {
385387
}
386388
}
387389

388-
this.addEdge(dag, sentinelEndId, sentinelStartId, EDGE.PARALLEL_CONTINUE, undefined, true)
390+
this.addEdge(dag, sentinelEndId, sentinelStartId, EDGE.PARALLEL_CONTINUE, undefined, {
391+
registerIncoming: false,
392+
})
389393
}
390394
}
391395

@@ -730,7 +734,9 @@ export class EdgeConstructor {
730734
const sourceId = buildParallelSentinelStartId(subflowId)
731735
const targetId = buildParallelSentinelEndId(subflowId)
732736
if (dag.nodes.has(sourceId) && dag.nodes.has(targetId)) {
733-
this.addEdge(dag, sourceId, targetId, EDGE.PARALLEL_EXIT, undefined, true, false)
737+
this.addEdge(dag, sourceId, targetId, EDGE.PARALLEL_EXIT, undefined, {
738+
registerIncoming: false,
739+
})
734740
}
735741
return
736742
}
@@ -739,7 +745,9 @@ export class EdgeConstructor {
739745
const sourceId = buildSentinelStartId(subflowId)
740746
const targetId = buildSentinelEndId(subflowId)
741747
if (dag.nodes.has(sourceId) && dag.nodes.has(targetId)) {
742-
this.addEdge(dag, sourceId, targetId, EDGE.LOOP_EXIT, undefined, true, false)
748+
this.addEdge(dag, sourceId, targetId, EDGE.LOOP_EXIT, undefined, {
749+
registerIncoming: false,
750+
})
743751
}
744752
}
745753
}
@@ -750,8 +758,7 @@ export class EdgeConstructor {
750758
targetId: string,
751759
sourceHandle?: string,
752760
targetHandle?: string,
753-
isControlBackEdge = false,
754-
registerIncoming = true
761+
options: { registerIncoming?: boolean } = {}
755762
): void {
756763
const sourceNode = dag.nodes.get(sourceId)
757764
const targetNode = dag.nodes.get(targetId)
@@ -769,7 +776,8 @@ export class EdgeConstructor {
769776
targetHandle,
770777
})
771778

772-
if (!isControlBackEdge && registerIncoming) {
779+
const { registerIncoming = true } = options
780+
if (registerIncoming) {
773781
targetNode.incomingEdges.add(sourceId)
774782
}
775783
}

apps/sim/executor/dag/construction/sentinels.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ export function createSubflowSentinelNode(config: SubflowSentinelNodeConfig): DA
1717
block: {
1818
id: config.id,
1919
enabled: true,
20+
position: { x: 0, y: 0 },
2021
metadata: {
2122
id: config.blockType,
2223
name: config.name,
2324
},
24-
config: { params: {} },
25-
} as any,
25+
config: { tool: config.blockType, params: {} },
26+
inputs: {},
27+
outputs: {},
28+
},
2629
incomingEdges: new Set(),
2730
outgoingEdges: new Map(),
2831
metadata: {

apps/sim/executor/execution/block-executor.test.ts

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,110 @@ describe('BlockExecutor', () => {
126126
totalCount: output.result.length,
127127
})
128128
})
129+
130+
it('persists stable outer-branch aliases for completed parallel branch outputs', async () => {
131+
const block = createBlock()
132+
const workflow: SerializedWorkflow = {
133+
version: '1',
134+
blocks: [block],
135+
connections: [],
136+
loops: {},
137+
parallels: {},
138+
}
139+
const state = new ExecutionState()
140+
const resolver = new VariableResolver(workflow, {}, state)
141+
const output = { result: 'branch-2' }
142+
const handler: BlockHandler = {
143+
canHandle: () => true,
144+
execute: async () => output,
145+
}
146+
const executor = new BlockExecutor(
147+
[handler],
148+
resolver,
149+
{
150+
workspaceId: 'workspace-1',
151+
executionId: 'execution-1',
152+
userId: 'user-1',
153+
metadata: {
154+
requestId: 'request-1',
155+
executionId: 'execution-1',
156+
workflowId: 'workflow-1',
157+
workspaceId: 'workspace-1',
158+
userId: 'user-1',
159+
triggerType: 'manual',
160+
useDraftState: false,
161+
startTime: new Date().toISOString(),
162+
},
163+
},
164+
state
165+
)
166+
const node = createNode(block)
167+
node.id = 'function-block-1₍0₎'
168+
node.metadata = {
169+
isParallelBranch: true,
170+
subflowId: 'parallel-1',
171+
subflowType: 'parallel',
172+
originalBlockId: block.id,
173+
branchIndex: 2,
174+
}
175+
176+
await executor.execute(createContext(state), node, block)
177+
178+
expect(state.getBlockOutput('function-block-1__obranch-2')).toEqual(output)
179+
expect(state.getBlockOutput('function-block-1₍2₎')).toEqual(output)
180+
expect(state.getBlockOutput('function-block-1₍0₎')).toEqual(output)
181+
})
182+
183+
it('does not write global aliases for parallel branches inside cloned outer branches', async () => {
184+
const block = createBlock()
185+
const workflow: SerializedWorkflow = {
186+
version: '1',
187+
blocks: [block],
188+
connections: [],
189+
loops: {},
190+
parallels: {},
191+
}
192+
const state = new ExecutionState()
193+
const resolver = new VariableResolver(workflow, {}, state)
194+
const output = { result: 'outer-2-inner-0' }
195+
const handler: BlockHandler = {
196+
canHandle: () => true,
197+
execute: async () => output,
198+
}
199+
const executor = new BlockExecutor(
200+
[handler],
201+
resolver,
202+
{
203+
workspaceId: 'workspace-1',
204+
executionId: 'execution-1',
205+
userId: 'user-1',
206+
metadata: {
207+
requestId: 'request-1',
208+
executionId: 'execution-1',
209+
workflowId: 'workflow-1',
210+
workspaceId: 'workspace-1',
211+
userId: 'user-1',
212+
triggerType: 'manual',
213+
useDraftState: false,
214+
startTime: new Date().toISOString(),
215+
},
216+
},
217+
state
218+
)
219+
const node = createNode(block)
220+
node.id = 'function-block-1__cloneabc__obranch-2₍0₎'
221+
node.metadata = {
222+
isParallelBranch: true,
223+
subflowId: 'inner-parallel',
224+
subflowType: 'parallel',
225+
originalBlockId: block.id,
226+
branchIndex: 0,
227+
}
228+
229+
await executor.execute(createContext(state), node, block)
230+
231+
expect(state.getBlockOutput(node.id)).toEqual(output)
232+
expect(state.getBlockOutput('function-block-1__obranch-0')).toBeUndefined()
233+
expect(state.getBlockOutput('function-block-1₍0₎')).toBeUndefined()
234+
})
129235
})

apps/sim/executor/execution/block-executor.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ import {
4646
} from '@/executor/utils/iteration-context'
4747
import { isJSONString } from '@/executor/utils/json'
4848
import { filterOutputForLog } from '@/executor/utils/output-filter'
49-
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
49+
import {
50+
buildBranchNodeId,
51+
buildOuterBranchScopedId,
52+
extractOuterBranchIndex,
53+
} from '@/executor/utils/subflow-utils'
5054
import {
5155
FUNCTION_BLOCK_CONTEXT_VARS_KEY,
5256
FUNCTION_BLOCK_DISPLAY_CODE_KEY,
@@ -295,11 +299,21 @@ export class BlockExecutor {
295299

296300
const originalBlockId = node.metadata.originalBlockId
297301
const branchIndex = node.metadata.branchIndex
298-
if (node.metadata.isParallelBranch && originalBlockId && branchIndex !== undefined) {
302+
if (
303+
node.metadata.isParallelBranch &&
304+
originalBlockId &&
305+
branchIndex !== undefined &&
306+
extractOuterBranchIndex(node.id) === undefined
307+
) {
299308
const globalBranchNodeId = buildBranchNodeId(originalBlockId, branchIndex)
300309
if (globalBranchNodeId !== node.id) {
301310
this.state.setBlockOutput(globalBranchNodeId, output, duration)
302311
}
312+
this.state.setBlockOutput(
313+
buildOuterBranchScopedId(originalBlockId, branchIndex),
314+
output,
315+
duration
316+
)
303317
}
304318
}
305319

apps/sim/executor/execution/edge-manager.test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,6 +1393,107 @@ describe('EdgeManager', () => {
13931393
expect(readyNodes).not.toContain(target2Id)
13941394
expect(readyNodes).toHaveLength(0)
13951395
})
1396+
1397+
it('should handle no matching router route without queuing downstream sentinels', () => {
1398+
const routerId = 'router-1'
1399+
const routeTargetId = 'route-target'
1400+
const downstreamSentinelId = 'loop-downstream-sentinel-end'
1401+
const afterLoopId = 'after-loop'
1402+
1403+
const routerNode = createMockNode(routerId, [
1404+
{ target: routeTargetId, sourceHandle: 'router-route1' },
1405+
])
1406+
const routeTargetNode = createMockNode(
1407+
routeTargetId,
1408+
[{ target: downstreamSentinelId }],
1409+
[routerId]
1410+
)
1411+
const downstreamSentinelNode = createMockNode(
1412+
downstreamSentinelId,
1413+
[{ target: afterLoopId, sourceHandle: EDGE.LOOP_EXIT }],
1414+
[routeTargetId]
1415+
)
1416+
downstreamSentinelNode.metadata = {
1417+
isSentinel: true,
1418+
sentinelType: 'end',
1419+
subflowId: 'downstream',
1420+
subflowType: 'loop',
1421+
}
1422+
const afterLoopNode = createMockNode(afterLoopId, [], [downstreamSentinelId])
1423+
1424+
const nodes = new Map<string, DAGNode>([
1425+
[routerId, routerNode],
1426+
[routeTargetId, routeTargetNode],
1427+
[downstreamSentinelId, downstreamSentinelNode],
1428+
[afterLoopId, afterLoopNode],
1429+
])
1430+
1431+
const dag = createMockDAG(nodes)
1432+
const edgeManager = new EdgeManager(dag)
1433+
const readyNodes = edgeManager.processOutgoingEdges(routerNode, {
1434+
selectedRoute: 'missing-route',
1435+
})
1436+
1437+
expect(readyNodes).not.toContain(routeTargetId)
1438+
expect(readyNodes).not.toContain(downstreamSentinelId)
1439+
expect(readyNodes).toHaveLength(0)
1440+
})
1441+
1442+
it('should allow no matching router route to queue an enclosing subflow sentinel', () => {
1443+
const loopId = 'loop-1'
1444+
const routerId = 'router-1'
1445+
const routeTargetId = 'route-target'
1446+
const loopEndId = `loop-${loopId}-sentinel-end`
1447+
const loopStartId = `loop-${loopId}-sentinel-start`
1448+
1449+
const routerNode = createMockNode(
1450+
routerId,
1451+
[{ target: routeTargetId, sourceHandle: 'router-route1' }],
1452+
[loopStartId]
1453+
)
1454+
routerNode.metadata = {
1455+
subflowId: loopId,
1456+
subflowType: 'loop',
1457+
}
1458+
const routeTargetNode = createMockNode(routeTargetId, [{ target: loopEndId }], [routerId])
1459+
routeTargetNode.metadata = {
1460+
subflowId: loopId,
1461+
subflowType: 'loop',
1462+
}
1463+
const loopEndNode = createMockNode(
1464+
loopEndId,
1465+
[
1466+
{ target: loopStartId, sourceHandle: EDGE.LOOP_CONTINUE },
1467+
{ target: 'after-loop', sourceHandle: EDGE.LOOP_EXIT },
1468+
],
1469+
[routeTargetId]
1470+
)
1471+
loopEndNode.metadata = {
1472+
isSentinel: true,
1473+
sentinelType: 'end',
1474+
subflowId: loopId,
1475+
subflowType: 'loop',
1476+
}
1477+
const loopStartNode = createMockNode(loopStartId, [{ target: routerId }], [loopEndId])
1478+
const afterLoopNode = createMockNode('after-loop', [], [loopEndId])
1479+
1480+
const nodes = new Map<string, DAGNode>([
1481+
[routerId, routerNode],
1482+
[routeTargetId, routeTargetNode],
1483+
[loopEndId, loopEndNode],
1484+
[loopStartId, loopStartNode],
1485+
['after-loop', afterLoopNode],
1486+
])
1487+
1488+
const dag = createMockDAG(nodes)
1489+
const edgeManager = new EdgeManager(dag)
1490+
const readyNodes = edgeManager.processOutgoingEdges(routerNode, {
1491+
selectedRoute: 'missing-route',
1492+
})
1493+
1494+
expect(readyNodes).not.toContain(routeTargetId)
1495+
expect(readyNodes).toContain(loopEndId)
1496+
})
13961497
})
13971498

13981499
describe('Condition inside loop - loop control edges should not be cascade-deactivated', () => {

0 commit comments

Comments
 (0)