Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changeset/region-step-logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
'@objectstack/spec': patch
'@objectstack/service-automation': patch
---

feat(automation): surface structured-region body steps in run observability (#1505)

`loop` / `parallel` / `try_catch` previously ran their body, branch, and handler
regions against a region-local step log that was **discarded** — run logs
(`listRuns` / `getRun`) showed the container as a single opaque step, hiding the
per-iteration / per-branch steps that actually executed.

`AutomationEngine.runRegion()` now **returns** its body steps, and the container
node folds them into the parent run log via a new `NodeExecutionResult.childSteps`
field. Each surfaced step is tagged with its **immediate** container via three new
optional fields on `ExecutionStepLogSchema` (and the engine's `StepLogEntry`):

- `parentNodeId` — the enclosing `loop` / `parallel` / `try_catch` node
- `iteration` — zero-based loop iteration or parallel branch index
- `regionKind` — `loop-body` | `parallel-branch` | `try` | `catch`

Tagging fills only fields left undefined, so nested regions keep each step's
innermost container. A failed try-region attempt's partial steps are still not
surfaced (preserving `try_catch` retry semantics). Fully additive — existing run
logs and consumers are unaffected.
2 changes: 1 addition & 1 deletion docs/adr/0031-advanced-flow-node-executors-and-dag.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ADR-0031: Structured control-flow for flows (loop / parallel / try-catch) — native + AI-authored, BPMN as interop

**Status**: Accepted (2026-06-02) — implemented in #1482 (spec + loop), #1489 (parallel), #1499 (try/catch/retry), #1500 (BPMN mapping); docs #1497. Follow-ups: #1504 (BPMN XML plugin), #1505 (region step logs).
**Status**: Accepted (2026-06-02) — implemented in #1482 (spec + loop), #1489 (parallel), #1499 (try/catch/retry), #1500 (BPMN mapping); docs #1497. Follow-ups: #1504 (BPMN XML plugin); ~~#1505 (region step logs)~~ **done** — `runRegion` now returns its body steps and the container folds them into the run log via `NodeExecutionResult.childSteps`, tagged with `parentNodeId` / `iteration` / `regionKind` (`ExecutionStepLogSchema`).
**Deciders**: ObjectStack Protocol Architects
**Builds on**: [ADR-0018](./0018-unified-node-action-registry.md) (open action registry — node types are an open vocabulary, executors are the source of truth), [ADR-0019](./0019-approval-as-flow-node.md) (durable-pause node via suspend/resume), [ADR-0010](./0010-nl-to-flow-authoring.md) + [ADR-0011](./0011-actions-as-ai-tools.md) (AI flow authoring — **the design center**)
**Consumers**: `@objectstack/services/service-automation` (engine + builtin executors), `@objectstack/spec` (`automation/flow.zod.ts`, `automation/bpmn-interop.zod.ts`, `studio/flow-builder.zod.ts`), `../objectui` (flow designer)
Expand Down
28 changes: 28 additions & 0 deletions packages/services/service-automation/src/builtin/loop-node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ describe('loop container executor (ADR-0031)', () => {
]);
});

it('surfaces each iteration\'s body steps in the run log, tagged with parentNodeId + index (#1479)', async () => {
engine.registerFlow('loop_flow', loopFlow(
{
collection: '{items}',
iteratorVariable: 'item',
indexVariable: 'i',
body: { nodes: [{ id: 'b', type: 'collect', label: 'Body', config: { itemVar: 'item', idxVar: 'i' } }], edges: [] },
},
{ items: ['a', 'b', 'c'] },
));

await engine.execute('loop_flow');
const runs = await engine.listRuns('loop_flow');
const bodySteps = runs[0].steps.filter(s => s.nodeId === 'b');

// One body step per iteration, each tagged with the loop container + index.
expect(bodySteps).toHaveLength(3);
expect(bodySteps.map(s => s.iteration)).toEqual([0, 1, 2]);
expect(bodySteps.every(s => s.parentNodeId === 'loop')).toBe(true);
expect(bodySteps.every(s => s.regionKind === 'loop-body')).toBe(true);

// The container step and the after-loop step stay un-grouped (top level).
const loopStep = runs[0].steps.find(s => s.nodeId === 'loop');
const afterStep = runs[0].steps.find(s => s.nodeId === 'after');
expect(loopStep?.parentNodeId).toBeUndefined();
expect(afterStep?.parentNodeId).toBeUndefined();
});

it('runs a multi-node body region in order each iteration', async () => {
engine.registerFlow('loop_flow', loopFlow(
{
Expand Down
13 changes: 10 additions & 3 deletions packages/services/service-automation/src/builtin/loop-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { PluginContext } from '@objectstack/core';
import { defineActionDescriptor, LOOP_MAX_ITERATIONS_CEILING } from '@objectstack/spec/automation';
import type { FlowRegionParsed } from '@objectstack/spec/automation';
import type { AutomationContext } from '@objectstack/spec/contracts';
import type { AutomationEngine } from '../engine.js';
import type { AutomationEngine, StepLogEntry } from '../engine.js';
import { interpolate } from './template.js';

/**
Expand Down Expand Up @@ -110,15 +110,22 @@ export function registerLoopNode(engine: AutomationEngine, ctx: PluginContext):
}

let iterations = 0;
const childSteps: StepLogEntry[] = [];
for (let i = 0; i < collection.length; i++) {
variables.set(iteratorVariable, collection[i]);
if (indexVariable) variables.set(indexVariable, i);
// Body runs in the shared scope; iterator var + mutations are visible.
await engine.runRegion(body, variables, context ?? ({} as AutomationContext));
// #1479: collect each iteration's body steps, tagged with the index.
const iterSteps = await engine.runRegion(body, variables, context ?? ({} as AutomationContext), {
parentNodeId: node.id,
iteration: i,
regionKind: 'loop-body',
});
childSteps.push(...iterSteps);
iterations++;
}

return { success: true, output: { iterations } };
return { success: true, output: { iterations }, childSteps };
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ describe('parallel block executor (ADR-0031)', () => {
expect(order.filter(o => o === 'after')).toHaveLength(1);
});

it('surfaces each branch\'s steps in the run log, tagged with parentNodeId + branch index (#1479)', async () => {
engine.registerFlow('par_flow', parallelFlow([
{ name: 'A', nodes: [{ id: 'a', type: 'setvar', label: 'A', config: { key: 'a', value: 1 } }], edges: [] },
{ name: 'B', nodes: [{ id: 'b', type: 'setvar', label: 'B', config: { key: 'b', value: 2 } }], edges: [] },
]));

await engine.execute('par_flow');
const runs = await engine.listRuns('par_flow');

const stepA = runs[0].steps.find(s => s.nodeId === 'a');
const stepB = runs[0].steps.find(s => s.nodeId === 'b');
expect(stepA?.parentNodeId).toBe('par');
expect(stepA?.iteration).toBe(0);
expect(stepA?.regionKind).toBe('parallel-branch');
expect(stepB?.parentNodeId).toBe('par');
expect(stepB?.iteration).toBe(1);
expect(stepB?.regionKind).toBe('parallel-branch');
});

it('joins only after the slowest branch completes', async () => {
// Branch "slow" awaits several microtasks; "fast" resolves immediately.
// The join ('after') must still be last.
Expand Down
16 changes: 12 additions & 4 deletions packages/services/service-automation/src/builtin/parallel-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { PluginContext } from '@objectstack/core';
import { defineActionDescriptor } from '@objectstack/spec/automation';
import type { FlowRegionParsed } from '@objectstack/spec/automation';
import type { AutomationContext } from '@objectstack/spec/contracts';
import type { AutomationEngine } from '../engine.js';
import type { AutomationEngine, StepLogEntry } from '../engine.js';

/** One branch of a parallel block — a region plus an optional label. */
interface ParallelBranch extends FlowRegionParsed {
Expand Down Expand Up @@ -75,17 +75,25 @@ export function registerParallelNode(engine: AutomationEngine, ctx: PluginContex
};
}

let branchSteps: StepLogEntry[][];
try {
// Implicit join: continue once when ALL branches have completed.
await Promise.all(
branches.map(branch => engine.runRegion(branch, variables, context ?? ({} as AutomationContext))),
// #1479: each branch returns its body steps, tagged with the branch index.
branchSteps = await Promise.all(
branches.map((branch, i) =>
engine.runRegion(branch, variables, context ?? ({} as AutomationContext), {
parentNodeId: node.id,
iteration: i,
regionKind: 'parallel-branch',
}),
),
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { success: false, error: `parallel '${node.id}': branch failed — ${message}` };
}

return { success: true, output: { branches: branches.length } };
return { success: true, output: { branches: branches.length }, childSteps: branchSteps.flat() };
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ describe('try/catch/retry executor (ADR-0031)', () => {
expect(ran[ran.length - 1]).toBe('after'); // downstream continued after catch
});

it('surfaces the try region steps tagged regionKind=try (#1479)', async () => {
engine.registerFlow('tc_flow', tcFlow({
try: { nodes: [{ id: 't', type: 'ok', label: 'T', config: { tag: 'try' } }], edges: [] },
}));

await engine.execute('tc_flow');
const runs = await engine.listRuns('tc_flow');
const tryStep = runs[0].steps.find(s => s.nodeId === 't');
expect(tryStep?.parentNodeId).toBe('tc');
expect(tryStep?.regionKind).toBe('try');
});

it('surfaces the catch region steps tagged regionKind=catch when the try fails (#1479)', async () => {
engine.registerFlow('tc_flow', tcFlow({
try: { nodes: [{ id: 't', type: 'boom', label: 'T' }], edges: [] },
catch: { nodes: [{ id: 'c', type: 'handler', label: 'C' }], edges: [] },
}));

await engine.execute('tc_flow');
const runs = await engine.listRuns('tc_flow');
const catchStep = runs[0].steps.find(s => s.nodeId === 'c');
expect(catchStep?.parentNodeId).toBe('tc');
expect(catchStep?.regionKind).toBe('catch');
});

it('retries the try region with backoff and succeeds without running catch', async () => {
engine.registerFlow('tc_flow', tcFlow({
try: { nodes: [{ id: 't', type: 'flaky', label: 'T', config: { failTimes: 2 } }], edges: [] },
Expand Down
20 changes: 16 additions & 4 deletions packages/services/service-automation/src/builtin/try-catch-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ export function registerTryCatchNode(engine: AutomationEngine, ctx: PluginContex
if (delay > 0) await new Promise(r => setTimeout(r, delay));
}
try {
await engine.runRegion(tryRegion, variables, ctxOrEmpty);
return { success: true, output: { attempts: attempt + 1, caught: false } };
// #1479: surface the successful try region's steps.
const trySteps = await engine.runRegion(tryRegion, variables, ctxOrEmpty, {
parentNodeId: node.id,
regionKind: 'try',
});
return { success: true, output: { attempts: attempt + 1, caught: false }, childSteps: trySteps };
} catch (err) {
lastError = err instanceof Error ? err.message : String(err);
}
Expand All @@ -115,8 +119,16 @@ export function registerTryCatchNode(engine: AutomationEngine, ctx: PluginContex
if (catchRegion != null) {
variables.set(errorVariable, { nodeId: node.id, message: lastError });
try {
await engine.runRegion(catchRegion, variables, ctxOrEmpty);
return { success: true, output: { attempts: maxRetries + 1, caught: true, error: lastError } };
// #1479: surface the catch handler region's steps.
const catchSteps = await engine.runRegion(catchRegion, variables, ctxOrEmpty, {
parentNodeId: node.id,
regionKind: 'catch',
});
return {
success: true,
output: { attempts: maxRetries + 1, caught: true, error: lastError },
childSteps: catchSteps,
};
} catch (catchErr) {
const catchMsg = catchErr instanceof Error ? catchErr.message : String(catchErr);
return { success: false, error: `try_catch '${node.id}': catch region failed — ${catchMsg}` };
Expand Down
53 changes: 48 additions & 5 deletions packages/services/service-automation/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ export interface NodeExecutionResult {
* the form and `resume()` with the values.
*/
screen?: ScreenSpec;
/**
* #1479: step logs produced inside the node's structured region(s). A
* container node (`loop` / `parallel` / `try_catch`) collects the
* {@link AutomationEngine.runRegion} return value(s) here; {@link AutomationEngine.executeNode}
* appends them to the parent run log right after the container's own step,
* so per-iteration / per-branch body steps surface in run observability.
*/
childSteps?: StepLogEntry[];
}

// ─── Trigger Interface (Plugin Extension Point) ─────────────────────
Expand Down Expand Up @@ -202,6 +210,18 @@ export interface StepLogEntry {
completedAt?: string;
durationMs?: number;
error?: { code: string; message: string; stack?: string };
/**
* #1479: structured-region grouping. When a step ran inside a `loop` /
* `parallel` / `try_catch` body region, these tag it with its **immediate**
* container so run observability can distinguish per-iteration / per-branch
* body steps from top-level ones. Set by {@link AutomationEngine.runRegion}
* (innermost wins — never overwritten as steps bubble through nested regions).
*/
parentNodeId?: string;
/** Zero-based loop iteration or parallel branch index of the enclosing region. */
iteration?: number;
/** Which region kind the step ran in: `loop-body` | `parallel-branch` | `try` | `catch`. */
regionKind?: string;
}

/**
Expand Down Expand Up @@ -1471,6 +1491,12 @@ export class AutomationEngine implements IAutomationService {
durationMs: Date.now() - stepStart,
});

// #1479: fold a structured-region container's body/branch/handler
// steps into the run log, right after the container's own step.
if (result.childSteps?.length) {
steps.push(...result.childSteps);
}

// Write back output variables
if (result.output) {
for (const [key, value] of Object.entries(result.output)) {
Expand Down Expand Up @@ -1564,8 +1590,13 @@ export class AutomationEngine implements IAutomationService {
* nodes/edges, so the main DAG traversal (`traverseNext`) is never aware of
* scope markers — keeping the shared traversal untouched.
*
* Body step logs are kept in a region-local array (not yet merged into the
* parent run log); surfacing per-iteration steps is a follow-up.
* #1479: the executed body steps are **returned** (tagged with `grouping`)
* so the calling container node can fold them into the parent run log via
* `NodeExecutionResult.childSteps`. Tagging only fills fields left undefined,
* so when regions nest, each step keeps its **innermost** container's
* `parentNodeId` / `iteration` / `regionKind`. On failure the region throws
* as before (preserving `try_catch` retry semantics); a failed attempt's
* partial steps are not surfaced.
*
* Durable pause (`suspend`) inside a region is not supported in this
* iteration — it is converted into a clear error (mirrors the `subflow`
Expand All @@ -1575,16 +1606,15 @@ export class AutomationEngine implements IAutomationService {
region: FlowRegionParsed,
variables: Map<string, unknown>,
context: AutomationContext,
): Promise<void> {
grouping?: { parentNodeId: string; iteration?: number; regionKind?: string },
): Promise<StepLogEntry[]> {
const entryId = findRegionEntry(region);
const entry = region.nodes.find(n => n.id === entryId);
if (!entry) {
throw new Error(`region entry node '${entryId}' not found`);
}
// A synthetic flow view — executeNode/traverseNext only read `nodes`/`edges`.
const subFlow = { nodes: region.nodes, edges: region.edges ?? [] } as unknown as FlowParsed;
// TODO(#1479): merge region step logs into the parent run log so
// per-iteration body steps surface in run observability.
const regionSteps: StepLogEntry[] = [];
try {
await this.executeNode(entry, subFlow, variables, context, regionSteps);
Expand All @@ -1596,6 +1626,19 @@ export class AutomationEngine implements IAutomationService {
}
throw err;
}
// Tag this region's steps with their immediate container. Innermost wins:
// a step that already carries a `parentNodeId` (set by a nested region)
// is left untouched.
if (grouping) {
for (const step of regionSteps) {
if (step.parentNodeId === undefined) {
step.parentNodeId = grouping.parentNodeId;
if (grouping.iteration !== undefined) step.iteration = grouping.iteration;
if (grouping.regionKind !== undefined) step.regionKind = grouping.regionKind;
}
}
}
return regionSteps;
}

/**
Expand Down
7 changes: 7 additions & 0 deletions packages/spec/src/automation/execution.zod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ export const ExecutionStepLogSchema = lazySchema(() => z.object({
stack: z.string().optional().describe('Stack trace'),
}).optional().describe('Error details if step failed'),
retryAttempt: z.number().int().min(0).optional().describe('Retry attempt number (0 = first try)'),
// #1479: structured-region grouping. Tag a step that ran inside a
// `loop` / `parallel` / `try_catch` body region with its immediate container,
// so run observability can nest per-iteration / per-branch body steps under
// the container instead of showing it as a single opaque step.
parentNodeId: z.string().optional().describe('Enclosing structured-region container node ID (loop/parallel/try_catch)'),
iteration: z.number().int().min(0).optional().describe('Zero-based loop iteration or parallel branch index of the enclosing region'),
regionKind: z.string().optional().describe('Region kind the step ran in: loop-body | parallel-branch | try | catch'),
}));
export type ExecutionStepLog = z.infer<typeof ExecutionStepLogSchema>;

Expand Down