From 9432bb1ac7f5491576e4e1969c0f7e5685e89574 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 11:41:24 +0100 Subject: [PATCH 01/22] feat(workflow-executor): add UpdateRecordStepExecutor with confirmation flow Implements PRD-219. Adds an executor for update-record steps with three execution branches: automatic completion, awaiting user confirmation, and re-entry after confirmation. Extracts shared record selection helpers from ReadRecordStepExecutor into BaseStepExecutor for reuse. Co-Authored-By: Claude Opus 4.6 --- packages/workflow-executor/CLAUDE.md | 5 +- packages/workflow-executor/src/errors.ts | 6 + .../src/executors/base-step-executor.ts | 94 ++- .../executors/read-record-step-executor.ts | 84 +-- .../executors/update-record-step-executor.ts | 281 +++++++++ packages/workflow-executor/src/index.ts | 3 + .../workflow-executor/src/types/execution.ts | 1 + .../src/types/step-execution-data.ts | 15 + .../update-record-step-executor.test.ts | 578 ++++++++++++++++++ 9 files changed, 979 insertions(+), 88 deletions(-) create mode 100644 packages/workflow-executor/src/executors/update-record-step-executor.ts create mode 100644 packages/workflow-executor/test/executors/update-record-step-executor.test.ts diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 333bfdee1a..f0318c97bd 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ── ``` src/ -├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError +├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError ├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring) ├── types/ # Core type definitions (@draft) │ ├── step-definition.ts # StepType enum + step definition interfaces @@ -60,7 +60,8 @@ src/ ├── executors/ # Step executor implementations │ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers) │ ├── condition-step-executor.ts # AI-powered condition step (chooses among options) -│ └── read-record-step-executor.ts # AI-powered record field reading step +│ ├── read-record-step-executor.ts # AI-powered record field reading step +│ └── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow) ├── http/ # HTTP server (optional, for frontend data access) │ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger └── index.ts # Barrel exports diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index b835c391fa..8b872a8932 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -45,3 +45,9 @@ export class NoResolvedFieldsError extends WorkflowExecutorError { super(`None of the requested fields could be resolved: ${fieldNames.join(', ')}`); } } + +export class NoWritableFieldsError extends WorkflowExecutorError { + constructor(collectionName: string) { + super(`No writable fields on record from collection "${collectionName}"`); + } +} diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 2197843be8..b06ab9223c 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,18 +1,30 @@ import type { ExecutionContext, StepExecutionResult } from '../types/execution'; +import type { CollectionSchema, RecordRef } from '../types/record'; import type { StepDefinition } from '../types/step-definition'; -import type { StepExecutionData } from '../types/step-execution-data'; +import type { + LoadRelatedRecordStepExecutionData, + StepExecutionData, +} from '../types/step-execution-data'; import type { StepOutcome } from '../types/step-outcome'; import type { AIMessage, BaseMessage } from '@langchain/core/messages'; -import type { DynamicStructuredTool } from '@langchain/core/tools'; -import { SystemMessage } from '@langchain/core/messages'; +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; -import { MalformedToolCallError, MissingToolCallError } from '../errors'; +import { + MalformedToolCallError, + MissingToolCallError, + NoRecordsError, + WorkflowExecutorError, +} from '../errors'; import { isExecutedStepOnExecutor } from '../types/step-execution-data'; export default abstract class BaseStepExecutor { protected readonly context: ExecutionContext; + protected readonly schemaCache = new Map(); + constructor(context: ExecutionContext) { this.context = context; } @@ -107,4 +119,78 @@ export default abstract class BaseStepExecutor { + const stepExecutions = await this.context.runStore.getStepExecutions(); + const relatedRecords = stepExecutions + .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') + .map(e => e.record); + + return [this.context.baseRecordRef, ...relatedRecords]; + } + + /** Selects a record ref via AI when multiple are available, returns directly when only one. */ + protected async selectRecordRef( + records: RecordRef[], + prompt: string | undefined, + ): Promise { + if (records.length === 0) throw new NoRecordsError(); + if (records.length === 1) return records[0]; + + const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r))); + const identifierTuple = identifiers as [string, ...string[]]; + + const tool = new DynamicStructuredTool({ + name: 'select-record', + description: 'Select the most relevant record for this workflow step.', + schema: z.object({ + recordIdentifier: z.enum(identifierTuple), + }), + func: undefined, + }); + + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage( + 'You are an AI agent selecting the most relevant record for a workflow step.\n' + + 'Choose the record whose collection best matches the user request.\n' + + 'Pay attention to the collection name of each record.', + ), + new HumanMessage(prompt ?? 'Select the most relevant record.'), + ]; + + const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>( + messages, + tool, + ); + + const selectedIndex = identifiers.indexOf(recordIdentifier); + + if (selectedIndex === -1) { + throw new WorkflowExecutorError( + `AI selected record "${recordIdentifier}" which does not match any available record`, + ); + } + + return records[selectedIndex]; + } + + /** Fetches a collection schema from WorkflowPort, with caching. */ + protected async getCollectionSchema(collectionName: string): Promise { + const cached = this.schemaCache.get(collectionName); + if (cached) return cached; + + const schema = await this.context.workflowPort.getCollectionSchema(collectionName); + this.schemaCache.set(collectionName, schema); + + return schema; + } + + /** Formats a record ref as "Step X - CollectionDisplayName #id". */ + protected async toRecordIdentifier(record: RecordRef): Promise { + const schema = await this.getCollectionSchema(record.collectionName); + + return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`; + } } diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 6f7248c3c4..abff85a7e7 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -1,21 +1,13 @@ import type { StepExecutionResult } from '../types/execution'; import type { CollectionSchema, RecordRef } from '../types/record'; import type { AiTaskStepDefinition } from '../types/step-definition'; -import type { - FieldReadResult, - LoadRelatedRecordStepExecutionData, -} from '../types/step-execution-data'; +import type { FieldReadResult } from '../types/step-execution-data'; import { HumanMessage, SystemMessage } from '@langchain/core/messages'; import { DynamicStructuredTool } from '@langchain/core/tools'; import { z } from 'zod'; -import { - NoReadableFieldsError, - NoRecordsError, - NoResolvedFieldsError, - WorkflowExecutorError, -} from '../errors'; +import { NoReadableFieldsError, NoResolvedFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request. @@ -27,8 +19,6 @@ Important rules: - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; export default class ReadRecordStepExecutor extends BaseStepExecutor { - private readonly schemaCache = new Map(); - async execute(): Promise { const { stepDefinition: step } = this.context; const records = await this.getAvailableRecordRefs(); @@ -111,51 +101,6 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - if (records.length === 0) throw new NoRecordsError(); - if (records.length === 1) return records[0]; - - const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r))); - const identifierTuple = identifiers as [string, ...string[]]; - - const tool = new DynamicStructuredTool({ - name: 'select-record', - description: 'Select the most relevant record for this workflow step.', - schema: z.object({ - recordIdentifier: z.enum(identifierTuple), - }), - func: undefined, - }); - - const messages = [ - ...(await this.buildPreviousStepsMessages()), - new SystemMessage( - 'You are an AI agent selecting the most relevant record for a workflow step.\n' + - 'Choose the record whose collection best matches the user request.\n' + - 'Pay attention to the collection name of each record.', - ), - new HumanMessage(prompt ?? 'Select the most relevant record.'), - ]; - - const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>( - messages, - tool, - ); - - const selectedIndex = identifiers.indexOf(recordIdentifier); - - if (selectedIndex === -1) { - throw new WorkflowExecutorError( - `AI selected record "${recordIdentifier}" which does not match any available record`, - ); - } - - return records[selectedIndex]; - } - private buildReadFieldTool(schema: CollectionSchema): DynamicStructuredTool { const nonRelationFields = schema.fields.filter(f => !f.isRelationship); @@ -201,29 +146,4 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - const relatedRecords = stepExecutions - .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') - .map(e => e.record); - - return [this.context.baseRecordRef, ...relatedRecords]; - } - - private async getCollectionSchema(collectionName: string): Promise { - const cached = this.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema(collectionName); - this.schemaCache.set(collectionName, schema); - - return schema; - } - - private async toRecordIdentifier(record: RecordRef): Promise { - const schema = await this.getCollectionSchema(record.collectionName); - - return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`; - } } diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts new file mode 100644 index 0000000000..796fc83203 --- /dev/null +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -0,0 +1,281 @@ +import type { StepExecutionResult } from '../types/execution'; +import type { CollectionSchema, RecordRef } from '../types/record'; +import type { AiTaskStepDefinition } from '../types/step-definition'; +import type { UpdateRecordStepExecutionData } from '../types/step-execution-data'; + +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; + +import { NoWritableFieldsError, WorkflowExecutorError } from '../errors'; +import BaseStepExecutor from './base-step-executor'; + +const UPDATE_RECORD_SYSTEM_PROMPT = `You are an AI agent updating a field on a record based on a user request. +Select the field to update and provide the new value. + +Important rules: +- Be precise: only update the field that is directly relevant to the request. +- Final answer is definitive, you won't receive any other input from the user. +- Do not refer to yourself as "I" in the response, use a passive formulation instead.`; + +export default class UpdateRecordStepExecutor extends BaseStepExecutor { + async execute(): Promise { + // Branch A — Re-entry with user confirmation + if (this.context.userInput) { + return this.handleConfirmation(); + } + + // Branches B & C — First call + return this.handleFirstCall(); + } + + private async handleConfirmation(): Promise { + const stepExecutions = await this.context.runStore.getStepExecutions(); + const interruption = stepExecutions.find( + (e): e is UpdateRecordStepExecutionData => + e.type === 'update-record' && + e.stepIndex === this.context.stepIndex && + !!e.toolConfirmationInterruption, + ); + + if (!interruption) { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: 'No pending confirmation found for this step', + }, + }; + } + + const { confirmed } = this.context.userInput as { confirmed: boolean }; + + if (!confirmed) { + await this.context.runStore.saveStepExecution({ + ...interruption, + toolConfirmationInterruption: undefined, + executionResult: { + skipped: true, + } as unknown as UpdateRecordStepExecutionData['executionResult'], + }); + + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'success', + }, + }; + } + + // User confirmed — resolve and update + const { selectedRecordRef, toolConfirmationInterruption } = interruption; + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); + const { fieldDisplayName, value } = toolConfirmationInterruption as { + fieldDisplayName: string; + value: string; + }; + + const fieldName = this.resolveFieldName(schema, fieldDisplayName); + + try { + const updated = await this.context.agentPort.updateRecord( + selectedRecordRef.collectionName, + selectedRecordRef.recordId, + { [fieldName]: value }, + ); + + await this.context.runStore.saveStepExecution({ + ...interruption, + toolConfirmationInterruption: undefined, + executionParams: { fieldName: fieldDisplayName, value }, + executionResult: { updatedValues: updated.values }, + }); + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; + } + + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'success', + }, + }; + } + + private async handleFirstCall(): Promise { + const { stepDefinition: step } = this.context; + const records = await this.getAvailableRecordRefs(); + + let selectedRecordRef: RecordRef; + let schema: CollectionSchema; + let fieldDisplayName: string; + let value: string; + + try { + selectedRecordRef = await this.selectRecordRef(records, step.prompt); + schema = await this.getCollectionSchema(selectedRecordRef.collectionName); + const args = await this.selectFieldAndValue(schema, step.prompt); + fieldDisplayName = args.fieldName; + value = args.value; + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; + } + + // Branch B — automaticCompletion + if (step.automaticCompletion) { + return this.executeUpdate(selectedRecordRef, schema, fieldDisplayName, value); + } + + // Branch C — Awaiting confirmation + await this.context.runStore.saveStepExecution({ + type: 'update-record', + stepIndex: this.context.stepIndex, + toolConfirmationInterruption: { fieldDisplayName, value }, + selectedRecordRef, + }); + + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'awaiting-input', + }, + }; + } + + private async executeUpdate( + selectedRecordRef: RecordRef, + schema: CollectionSchema, + fieldDisplayName: string, + value: string, + ): Promise { + const fieldName = this.resolveFieldName(schema, fieldDisplayName); + + try { + const updated = await this.context.agentPort.updateRecord( + selectedRecordRef.collectionName, + selectedRecordRef.recordId, + { [fieldName]: value }, + ); + + await this.context.runStore.saveStepExecution({ + type: 'update-record', + stepIndex: this.context.stepIndex, + executionParams: { fieldName: fieldDisplayName, value }, + executionResult: { updatedValues: updated.values }, + selectedRecordRef, + }); + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; + } + + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'success', + }, + }; + } + + private async selectFieldAndValue( + schema: CollectionSchema, + prompt: string | undefined, + ): Promise<{ fieldName: string; value: string; reasoning: string }> { + const tool = this.buildUpdateFieldTool(schema); + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage(UPDATE_RECORD_SYSTEM_PROMPT), + new SystemMessage( + `The selected record belongs to the "${schema.collectionDisplayName}" collection.`, + ), + new HumanMessage(`**Request**: ${prompt ?? 'Update the relevant field.'}`), + ]; + + return this.invokeWithTool<{ fieldName: string; value: string; reasoning: string }>( + messages, + tool, + ); + } + + private buildUpdateFieldTool(schema: CollectionSchema): DynamicStructuredTool { + const nonRelationFields = schema.fields.filter(f => !f.isRelationship); + + if (nonRelationFields.length === 0) { + throw new NoWritableFieldsError(schema.collectionName); + } + + const displayNames = nonRelationFields.map(f => f.displayName) as [string, ...string[]]; + + return new DynamicStructuredTool({ + name: 'update-record-field', + description: 'Update a field on the selected record.', + schema: z.object({ + fieldName: z.enum(displayNames), + // z.string() intentionally: the value is always transmitted as string + // to updateRecord; data typing is handled by the agent/datasource layer. + value: z.string().describe('The new value for the field'), + reasoning: z.string().describe('Why this field and value were chosen'), + }), + func: undefined, + }); + } + + private resolveFieldName(schema: CollectionSchema, displayName: string): string { + const field = schema.fields.find( + f => f.displayName === displayName || f.fieldName === displayName, + ); + + if (!field) { + throw new WorkflowExecutorError( + `Field "${displayName}" not found in collection "${schema.collectionName}"`, + ); + } + + return field.fieldName; + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 916bbc0751..1b7aeab83a 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -18,6 +18,7 @@ export type { FieldReadResult, ConditionStepExecutionData, ReadRecordStepExecutionData, + UpdateRecordStepExecutionData, AiTaskStepExecutionData, LoadRelatedRecordStepExecutionData, ExecutedStepExecutionData, @@ -54,10 +55,12 @@ export { NoRecordsError, NoReadableFieldsError, NoResolvedFieldsError, + NoWritableFieldsError, } from './errors'; export { default as BaseStepExecutor } from './executors/base-step-executor'; export { default as ConditionStepExecutor } from './executors/condition-step-executor'; export { default as ReadRecordStepExecutor } from './executors/read-record-step-executor'; +export { default as UpdateRecordStepExecutor } from './executors/update-record-step-executor'; export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port'; export { default as ForestServerWorkflowPort } from './adapters/forest-server-workflow-port'; export { default as ExecutorHttpServer } from './http/executor-http-server'; diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index 406d1e4f0f..6f379d4424 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -41,4 +41,5 @@ export interface ExecutionContext readonly runStore: RunStore; readonly history: ReadonlyArray>; readonly remoteTools: readonly unknown[]; + readonly userInput?: UserInput; } diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index eb022a273c..2ddff36a15 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -40,6 +40,19 @@ export interface ReadRecordStepExecutionData extends BaseStepExecutionData { selectedRecordRef: RecordRef; } +// -- Update Record -- + +export interface UpdateRecordStepExecutionData extends BaseStepExecutionData { + type: 'update-record'; + executionParams?: { fieldName: string; value: string }; + executionResult?: { updatedValues: Record }; + toolConfirmationInterruption?: { + fieldDisplayName: string; + value: string; + }; + selectedRecordRef: RecordRef; +} + // -- Generic AI Task (fallback for untyped steps) -- export interface AiTaskStepExecutionData extends BaseStepExecutionData { @@ -61,12 +74,14 @@ export interface LoadRelatedRecordStepExecutionData extends BaseStepExecutionDat export type StepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData + | UpdateRecordStepExecutionData | AiTaskStepExecutionData | LoadRelatedRecordStepExecutionData; export type ExecutedStepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData + | UpdateRecordStepExecutionData | AiTaskStepExecutionData; // TODO: this condition should change when load-related-record gets its own executor diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts new file mode 100644 index 0000000000..fef81d313d --- /dev/null +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -0,0 +1,578 @@ +import type { AgentPort } from '../../src/ports/agent-port'; +import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; +import type { ExecutionContext, UserInput } from '../../src/types/execution'; +import type { CollectionSchema, RecordRef } from '../../src/types/record'; +import type { AiTaskStepDefinition } from '../../src/types/step-definition'; +import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; + +import { WorkflowExecutorError } from '../../src/errors'; +import UpdateRecordStepExecutor from '../../src/executors/update-record-step-executor'; +import { StepType } from '../../src/types/step-definition'; + +function makeStep(overrides: Partial = {}): AiTaskStepDefinition { + return { + type: StepType.UpdateRecord, + prompt: 'Set the customer status to active', + ...overrides, + }; +} + +function makeRecordRef(overrides: Partial = {}): RecordRef { + return { + collectionName: 'customers', + recordId: [42], + stepIndex: 0, + ...overrides, + }; +} + +function makeMockAgentPort( + updatedValues: Record = { status: 'active', name: 'John Doe' }, +): AgentPort { + return { + getRecord: jest.fn().mockResolvedValue({ values: updatedValues }), + updateRecord: jest.fn().mockResolvedValue({ + collectionName: 'customers', + recordId: [42], + values: updatedValues, + }), + getRelatedData: jest.fn(), + executeAction: jest.fn(), + } as unknown as AgentPort; +} + +function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { + return { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { fieldName: 'status', displayName: 'Status', isRelationship: false }, + { fieldName: 'name', displayName: 'Full Name', isRelationship: false }, + { fieldName: 'orders', displayName: 'Orders', isRelationship: true }, + ], + actions: [], + ...overrides, + }; +} + +function makeMockRunStore(overrides: Partial = {}): RunStore { + return { + getStepExecutions: jest.fn().mockResolvedValue([]), + saveStepExecution: jest.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +function makeMockWorkflowPort( + schemasByCollection: Record = { + customers: makeCollectionSchema(), + }, +): WorkflowPort { + return { + getPendingStepExecutions: jest.fn().mockResolvedValue([]), + updateStepExecution: jest.fn().mockResolvedValue(undefined), + getCollectionSchema: jest + .fn() + .mockImplementation((name: string) => + Promise.resolve( + schemasByCollection[name] ?? makeCollectionSchema({ collectionName: name }), + ), + ), + getMcpServerConfigs: jest.fn().mockResolvedValue([]), + }; +} + +function makeMockModel(toolCallArgs?: Record, toolName = 'update-record-field') { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: toolCallArgs ? [{ name: toolName, args: toolCallArgs, id: 'call_1' }] : undefined, + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + return { model, bindTools, invoke }; +} + +function makeContext( + overrides: Partial> = {}, +): ExecutionContext { + return { + runId: 'run-1', + stepId: 'update-1', + stepIndex: 0, + baseRecordRef: makeRecordRef(), + stepDefinition: makeStep(), + model: makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }).model, + agentPort: makeMockAgentPort(), + workflowPort: makeMockWorkflowPort(), + runStore: makeMockRunStore(), + history: [], + remoteTools: [], + ...overrides, + }; +} + +describe('UpdateRecordStepExecutor', () => { + describe('automaticCompletion: update direct (Branch B)', () => { + it('updates the record and returns success', async () => { + const updatedValues = { status: 'active', name: 'John Doe' }; + const agentPort = makeMockAgentPort(updatedValues); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'update-record', + stepIndex: 0, + executionParams: { fieldName: 'Status', value: 'active' }, + executionResult: { updatedValues }, + selectedRecordRef: expect.objectContaining({ + collectionName: 'customers', + recordId: [42], + }), + }), + ); + }); + }); + + describe('without automaticCompletion: awaiting-input (Branch C)', () => { + it('saves interruption and returns awaiting-input', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'update-record', + stepIndex: 0, + toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: expect.objectContaining({ + collectionName: 'customers', + recordId: [42], + }), + }), + ); + }); + }); + + describe('confirmation accepted (Branch A)', () => { + it('updates the record when user confirms', async () => { + const updatedValues = { status: 'active', name: 'John Doe' }; + const agentPort = makeMockAgentPort(updatedValues); + const interruption: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([interruption]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ agentPort, runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'update-record', + executionParams: { fieldName: 'Status', value: 'active' }, + executionResult: { updatedValues }, + toolConfirmationInterruption: undefined, + }), + ); + }); + }); + + describe('confirmation rejected (Branch A)', () => { + it('skips the update when user rejects', async () => { + const agentPort = makeMockAgentPort(); + const interruption: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([interruption]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: false }; + const context = makeContext({ agentPort, runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).not.toHaveBeenCalled(); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + executionResult: { skipped: true }, + toolConfirmationInterruption: undefined, + }), + ); + }); + }); + + describe('no interruption in phase 2 (Branch A)', () => { + it('returns error when no pending confirmation is found', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('No pending confirmation found for this step'); + }); + }); + + describe('multi-record AI selection', () => { + it('uses AI to select among multiple records then selects field', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [ + { fieldName: 'total', displayName: 'Total', isRelationship: false }, + { fieldName: 'status', displayName: 'Order Status', isRelationship: false }, + ], + }); + + // First call: select-record, second call: update-record-field + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-record', + args: { recordIdentifier: 'Step 2 - Orders #99' }, + id: 'call_1', + }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'update-record-field', + args: { fieldName: 'Order Status', value: 'shipped', reasoning: 'Mark as shipped' }, + id: 'call_2', + }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 2, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(bindTools).toHaveBeenCalledTimes(2); + + const selectTool = bindTools.mock.calls[0][0][0]; + expect(selectTool.name).toBe('select-record'); + + const updateTool = bindTools.mock.calls[1][0][0]; + expect(updateTool.name).toBe('update-record-field'); + + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + expect.objectContaining({ + toolConfirmationInterruption: { fieldDisplayName: 'Order Status', value: 'shipped' }, + selectedRecordRef: expect.objectContaining({ + recordId: [99], + collectionName: 'orders', + }), + }), + ); + }); + }); + + describe('NoWritableFieldsError', () => { + it('returns error when all fields are relationships', async () => { + const schema = makeCollectionSchema({ + fields: [{ fieldName: 'orders', displayName: 'Orders', isRelationship: true }], + }); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore(); + const workflowPort = makeMockWorkflowPort({ customers: schema }); + const context = makeContext({ model: mockModel.model, runStore, workflowPort }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'No writable fields on record from collection "customers"', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('AI malformed/missing tool call', () => { + it('returns error on malformed tool call', async () => { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: [], + invalid_tool_calls: [ + { name: 'update-record-field', args: '{bad json', error: 'JSON parse error' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'AI returned a malformed tool call for "update-record-field": JSON parse error', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('returns error when AI returns no tool call', async () => { + const invoke = jest.fn().mockResolvedValue({ tool_calls: [] }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('AI did not return a tool call'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('agentPort.updateRecord WorkflowExecutorError (Branch B)', () => { + it('returns error when updateRecord throws WorkflowExecutorError', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue( + new WorkflowExecutorError('Record locked'), + ); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('Record locked'); + }); + }); + + describe('agentPort.updateRecord WorkflowExecutorError (Branch A)', () => { + it('returns error when updateRecord throws WorkflowExecutorError during confirmation', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue( + new WorkflowExecutorError('Record locked'), + ); + const interruption: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([interruption]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ agentPort, runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('Record locked'); + }); + }); + + describe('agentPort.updateRecord infra error', () => { + it('lets infrastructure errors propagate (Branch B)', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + agentPort, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection refused'); + }); + + it('lets infrastructure errors propagate (Branch A)', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); + const interruption: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([interruption]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ agentPort, runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection refused'); + }); + }); + + describe('default prompt', () => { + it('uses default prompt when step.prompt is undefined', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ prompt: undefined }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + const humanMessage = messages[messages.length - 1]; + expect(humanMessage.content).toBe('**Request**: Update the relevant field.'); + }); + }); + + describe('previous steps context', () => { + it('includes previous steps summary in update-field messages', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'Yes', reasoning: 'Approved' }, + }, + ]), + }); + const context = makeContext({ + model: mockModel.model, + runStore, + history: [ + { + stepDefinition: { + type: StepType.Condition, + options: ['Yes', 'No'], + prompt: 'Should we proceed?', + }, + stepOutcome: { + type: 'condition', + stepId: 'prev-step', + stepIndex: 0, + status: 'success', + }, + }, + ], + }); + const executor = new UpdateRecordStepExecutor({ + ...context, + stepId: 'update-2', + stepIndex: 1, + }); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + // previous steps summary + system prompt + collection info + human message = 4 + expect(messages).toHaveLength(4); + expect(messages[0].content).toContain('Should we proceed?'); + expect(messages[0].content).toContain('"answer":"Yes"'); + expect(messages[1].content).toContain('updating a field on a record'); + }); + }); +}); From a70b36a0b5a05f613a911d1a7667e944ceaf66f4 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 11:47:09 +0100 Subject: [PATCH 02/22] refactor(workflow-executor): simplify interruption lookup in UpdateRecordStepExecutor Match on type + stepIndex only, then guard on toolConfirmationInterruption presence separately. Removes redundant filter predicate. Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 796fc83203..51c73a21cb 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -33,12 +33,10 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor - e.type === 'update-record' && - e.stepIndex === this.context.stepIndex && - !!e.toolConfirmationInterruption, + e.type === 'update-record' && e.stepIndex === this.context.stepIndex, ); - if (!interruption) { + if (!interruption?.toolConfirmationInterruption) { return { stepOutcome: { type: 'ai-task', @@ -74,10 +72,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor Date: Thu, 19 Mar 2026 11:57:39 +0100 Subject: [PATCH 03/22] refactor(workflow-executor): rename toolConfirmationInterruption to pendingUpdate The field stores the proposed field update (from AI or user-edited) pending confirmation. "pendingUpdate" better describes the content than "toolConfirmationInterruption" which described the mechanism. Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 12 ++++++------ .../src/types/step-execution-data.ts | 2 +- .../update-record-step-executor.test.ts | 16 ++++++++-------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 51c73a21cb..ba4b8f7f7b 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -36,7 +36,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor }; - toolConfirmationInterruption?: { + pendingUpdate?: { fieldDisplayName: string; value: string; }; diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index fef81d313d..ba84ba602b 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -177,7 +177,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', recordId: [42], @@ -194,7 +194,7 @@ describe('UpdateRecordStepExecutor', () => { const interruption: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ @@ -213,7 +213,7 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', executionParams: { fieldName: 'Status', value: 'active' }, executionResult: { updatedValues }, - toolConfirmationInterruption: undefined, + pendingUpdate: undefined, }), ); }); @@ -225,7 +225,7 @@ describe('UpdateRecordStepExecutor', () => { const interruption: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ @@ -242,7 +242,7 @@ describe('UpdateRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( expect.objectContaining({ executionResult: { skipped: true }, - toolConfirmationInterruption: undefined, + pendingUpdate: undefined, }), ); }); @@ -333,7 +333,7 @@ describe('UpdateRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( expect.objectContaining({ - toolConfirmationInterruption: { fieldDisplayName: 'Order Status', value: 'shipped' }, + pendingUpdate: { fieldDisplayName: 'Order Status', value: 'shipped' }, selectedRecordRef: expect.objectContaining({ recordId: [99], collectionName: 'orders', @@ -447,7 +447,7 @@ describe('UpdateRecordStepExecutor', () => { const interruption: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ @@ -489,7 +489,7 @@ describe('UpdateRecordStepExecutor', () => { const interruption: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - toolConfirmationInterruption: { fieldDisplayName: 'Status', value: 'active' }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ From 545bbd0f232efd9b33888ecb99c45d7684aa5822 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 12:01:10 +0100 Subject: [PATCH 04/22] refactor(workflow-executor): throw on missing pendingUpdate instead of error outcome MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Missing pendingUpdate in Branch A is a bug (orchestrator/RunStore), not a business case — throw WorkflowExecutorError instead of returning a graceful error outcome. Also rename local variable interruption → execution. Co-Authored-By: Claude Opus 4.6 --- .../executors/update-record-step-executor.ts | 20 +++++--------- .../update-record-step-executor.test.ts | 27 +++++++++---------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index ba4b8f7f7b..7eefe93759 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -31,28 +31,20 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { const stepExecutions = await this.context.runStore.getStepExecutions(); - const interruption = stepExecutions.find( + const execution = stepExecutions.find( (e): e is UpdateRecordStepExecutionData => e.type === 'update-record' && e.stepIndex === this.context.stepIndex, ); - if (!interruption?.pendingUpdate) { - return { - stepOutcome: { - type: 'ai-task', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: 'error', - error: 'No pending confirmation found for this step', - }, - }; + if (!execution?.pendingUpdate) { + throw new WorkflowExecutorError('No pending update found for this step'); } const { confirmed } = this.context.userInput as { confirmed: boolean }; if (!confirmed) { await this.context.runStore.saveStepExecution({ - ...interruption, + ...execution, pendingUpdate: undefined, executionResult: { skipped: true, @@ -70,7 +62,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { }); describe('without automaticCompletion: awaiting-input (Branch C)', () => { - it('saves interruption and returns awaiting-input', async () => { + it('saves execution and returns awaiting-input', async () => { const mockModel = makeMockModel({ fieldName: 'Status', value: 'active', @@ -191,14 +191,14 @@ describe('UpdateRecordStepExecutor', () => { it('updates the record when user confirms', async () => { const updatedValues = { status: 'active', name: 'John Doe' }; const agentPort = makeMockAgentPort(updatedValues); - const interruption: UpdateRecordStepExecutionData = { + const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([interruption]), + getStepExecutions: jest.fn().mockResolvedValue([execution]), }); const userInput: UserInput = { type: 'confirmation', confirmed: true }; const context = makeContext({ agentPort, runStore, userInput }); @@ -222,14 +222,14 @@ describe('UpdateRecordStepExecutor', () => { describe('confirmation rejected (Branch A)', () => { it('skips the update when user rejects', async () => { const agentPort = makeMockAgentPort(); - const interruption: UpdateRecordStepExecutionData = { + const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([interruption]), + getStepExecutions: jest.fn().mockResolvedValue([execution]), }); const userInput: UserInput = { type: 'confirmation', confirmed: false }; const context = makeContext({ agentPort, runStore, userInput }); @@ -248,8 +248,8 @@ describe('UpdateRecordStepExecutor', () => { }); }); - describe('no interruption in phase 2 (Branch A)', () => { - it('returns error when no pending confirmation is found', async () => { + describe('no pending update in phase 2 (Branch A)', () => { + it('throws when no pending update is found', async () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([]), }); @@ -257,10 +257,7 @@ describe('UpdateRecordStepExecutor', () => { const context = makeContext({ runStore, userInput }); const executor = new UpdateRecordStepExecutor(context); - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('No pending confirmation found for this step'); + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); }); }); @@ -444,14 +441,14 @@ describe('UpdateRecordStepExecutor', () => { (agentPort.updateRecord as jest.Mock).mockRejectedValue( new WorkflowExecutorError('Record locked'), ); - const interruption: UpdateRecordStepExecutionData = { + const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([interruption]), + getStepExecutions: jest.fn().mockResolvedValue([execution]), }); const userInput: UserInput = { type: 'confirmation', confirmed: true }; const context = makeContext({ agentPort, runStore, userInput }); @@ -486,14 +483,14 @@ describe('UpdateRecordStepExecutor', () => { it('lets infrastructure errors propagate (Branch A)', async () => { const agentPort = makeMockAgentPort(); (agentPort.updateRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); - const interruption: UpdateRecordStepExecutionData = { + const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const runStore = makeMockRunStore({ - getStepExecutions: jest.fn().mockResolvedValue([interruption]), + getStepExecutions: jest.fn().mockResolvedValue([execution]), }); const userInput: UserInput = { type: 'confirmation', confirmed: true }; const context = makeContext({ agentPort, runStore, userInput }); From ebf033f20ff8c11cd141aa1bcada3ec90ed94083 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 12:04:20 +0100 Subject: [PATCH 05/22] refactor(workflow-executor): type executionResult as union instead of casting Add { skipped: true } to the executionResult union type, removing the unsafe `as unknown as` cast when the user rejects the update. Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 4 +--- packages/workflow-executor/src/types/step-execution-data.ts | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 7eefe93759..305e81743f 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -46,9 +46,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor }; + executionResult?: { updatedValues: Record } | { skipped: true }; pendingUpdate?: { fieldDisplayName: string; value: string; From 61e2cb0db5e42e4e21e49272b7ab9a058534d2de Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 12:06:45 +0100 Subject: [PATCH 06/22] fix(workflow-executor): preserve pendingUpdate after confirmation for traceability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep pendingUpdate in the saved execution data after both confirm and reject — useful for audit trail (what was proposed vs what happened). Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 2 -- .../test/executors/update-record-step-executor.test.ts | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 305e81743f..bc03cdc44e 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -45,7 +45,6 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { type: 'update-record', executionParams: { fieldName: 'Status', value: 'active' }, executionResult: { updatedValues }, - pendingUpdate: undefined, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, }), ); }); @@ -242,7 +242,7 @@ describe('UpdateRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( expect.objectContaining({ executionResult: { skipped: true }, - pendingUpdate: undefined, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, }), ); }); From 31a27c6c023ef351c1d18356264d426364814c06 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 12:23:21 +0100 Subject: [PATCH 07/22] refactor(workflow-executor): remove unsafe cast in UpdateRecordStepExecutor Pass userInput as parameter to handleConfirmation() where it is already narrowed, removing the `as { confirmed: boolean }` cast. Also move resolveFieldName inside try-catch blocks so WorkflowExecutorError is properly caught and returned as a status: 'error' outcome. Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index bc03cdc44e..9fb2d821a8 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -1,4 +1,4 @@ -import type { StepExecutionResult } from '../types/execution'; +import type { StepExecutionResult, UserInput } from '../types/execution'; import type { CollectionSchema, RecordRef } from '../types/record'; import type { AiTaskStepDefinition } from '../types/step-definition'; import type { UpdateRecordStepExecutionData } from '../types/step-execution-data'; @@ -22,14 +22,14 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { // Branch A — Re-entry with user confirmation if (this.context.userInput) { - return this.handleConfirmation(); + return this.handleConfirmation(this.context.userInput); } // Branches B & C — First call return this.handleFirstCall(); } - private async handleConfirmation(): Promise { + private async handleConfirmation(userInput: UserInput): Promise { const stepExecutions = await this.context.runStore.getStepExecutions(); const execution = stepExecutions.find( (e): e is UpdateRecordStepExecutionData => @@ -40,7 +40,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { - const fieldName = this.resolveFieldName(schema, fieldDisplayName); - try { + const fieldName = this.resolveFieldName(schema, fieldDisplayName); const updated = await this.context.agentPort.updateRecord( selectedRecordRef.collectionName, selectedRecordRef.recordId, From f7a3edcf3dea5b0000a8460c2e7561f1a8bf46f7 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 14:14:44 +0100 Subject: [PATCH 08/22] refactor(workflow-executor): simplify UpdateRecordStepExecutor and improve coverage - Extract resolveAndUpdate to deduplicate resolve+update+persist logic - Extract buildOutcomeResult/buildSuccessResult/buildErrorResult helpers - Move getCollectionSchema inside try-catch in confirmation flow - Rename executionParams.fieldName to fieldDisplayName for consistency - Add tests for resolveFieldName failure in both branches - Add test for relationship field exclusion from update tool schema Co-Authored-By: Claude Opus 4.6 --- .../executors/update-record-step-executor.ts | 132 ++++++------------ .../src/types/step-execution-data.ts | 2 +- .../update-record-step-executor.test.ts | 84 ++++++++++- 3 files changed, 127 insertions(+), 91 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 9fb2d821a8..751d331bfd 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -20,12 +20,12 @@ Important rules: export default class UpdateRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { - // Branch A — Re-entry with user confirmation + // Branch A -- Re-entry with user confirmation if (this.context.userInput) { return this.handleConfirmation(this.context.userInput); } - // Branches B & C — First call + // Branches B & C -- First call return this.handleFirstCall(); } @@ -40,66 +40,23 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { @@ -107,38 +64,29 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { try { + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); const fieldName = this.resolveFieldName(schema, fieldDisplayName); const updated = await this.context.agentPort.updateRecord( selectedRecordRef.collectionName, @@ -171,38 +118,47 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor } | { skipped: true }; pendingUpdate?: { fieldDisplayName: string; diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index ebd2e70f1b..68234af1f5 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -145,7 +145,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - executionParams: { fieldName: 'Status', value: 'active' }, + executionParams: { fieldDisplayName: 'Status', value: 'active' }, executionResult: { updatedValues }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', @@ -211,7 +211,7 @@ describe('UpdateRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( expect.objectContaining({ type: 'update-record', - executionParams: { fieldName: 'Status', value: 'active' }, + executionParams: { fieldDisplayName: 'Status', value: 'active' }, executionResult: { updatedValues }, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, }), @@ -365,6 +365,86 @@ describe('UpdateRecordStepExecutor', () => { }); }); + describe('resolveFieldName failure', () => { + it('returns error when field is not found during confirmation (Branch A)', async () => { + const schema = makeCollectionSchema({ + fields: [{ fieldName: 'email', displayName: 'Email', isRelationship: false }], + }); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'NonExistentField', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const workflowPort = makeMockWorkflowPort({ customers: schema }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ runStore, workflowPort, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Field "NonExistentField" not found in collection "customers"', + ); + }); + + it('returns error when field is not found during automaticCompletion (Branch B)', async () => { + // AI returns a display name that doesn't match any field in the schema + const mockModel = makeMockModel({ + fieldName: 'NonExistentField', + value: 'test', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Field "NonExistentField" not found in collection "customers"', + ); + }); + }); + + describe('relationship fields excluded from update tool', () => { + it('excludes relationship fields from the tool schema', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ model: mockModel.model }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + // Second bindTools call is for update-record-field (first may be select-record) + const lastCall = mockModel.bindTools.mock.calls[mockModel.bindTools.mock.calls.length - 1]; + const tool = lastCall[0][0]; + expect(tool.name).toBe('update-record-field'); + + // Non-relationship display names should be accepted + expect(tool.schema.parse({ fieldName: 'Email', value: 'x', reasoning: 'r' })).toBeTruthy(); + expect(tool.schema.parse({ fieldName: 'Status', value: 'x', reasoning: 'r' })).toBeTruthy(); + expect( + tool.schema.parse({ fieldName: 'Full Name', value: 'x', reasoning: 'r' }), + ).toBeTruthy(); + + // Relationship display name should be rejected + expect(() => + tool.schema.parse({ fieldName: 'Orders', value: 'x', reasoning: 'r' }), + ).toThrow(); + }); + }); + describe('AI malformed/missing tool call', () => { it('returns error on malformed tool call', async () => { const invoke = jest.fn().mockResolvedValue({ From 981991700fc2bfeaae1e553ec7841dcc8f600447 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 14:29:33 +0100 Subject: [PATCH 09/22] refactor(workflow-executor): remove redundant buildSuccessResult/buildErrorResult wrappers Co-Authored-By: Claude Opus 4.6 --- .../src/executors/update-record-step-executor.ts | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 751d331bfd..7e2dfbf578 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -46,7 +46,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor Date: Thu, 19 Mar 2026 14:46:58 +0100 Subject: [PATCH 10/22] refactor(workflow-executor): rename history to previousSteps and document executionResult Co-Authored-By: Claude Opus 4.6 --- .../src/executors/base-step-executor.ts | 4 ++-- .../workflow-executor/src/types/execution.ts | 2 +- .../src/types/step-execution-data.ts | 1 + .../test/executors/base-step-executor.test.ts | 22 +++++++++---------- .../executors/condition-step-executor.test.ts | 4 ++-- .../read-record-step-executor.test.ts | 4 ++-- .../update-record-step-executor.test.ts | 4 ++-- 7 files changed, 21 insertions(+), 20 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index b06ab9223c..82a6abea7f 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -36,7 +36,7 @@ export default abstract class BaseStepExecutor { - if (!this.context.history.length) return []; + if (!this.context.previousSteps.length) return []; const summary = await this.summarizePreviousSteps(); @@ -52,7 +52,7 @@ export default abstract class BaseStepExecutor { const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - return this.context.history + return this.context.previousSteps .map(({ stepDefinition, stepOutcome }) => { const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index 6f379d4424..c69097b27b 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -39,7 +39,7 @@ export interface ExecutionContext readonly agentPort: AgentPort; readonly workflowPort: WorkflowPort; readonly runStore: RunStore; - readonly history: ReadonlyArray>; + readonly previousSteps: ReadonlyArray>; readonly remoteTools: readonly unknown[]; readonly userInput?: UserInput; } diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index b99166856c..cc81fdd783 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -45,6 +45,7 @@ export interface ReadRecordStepExecutionData extends BaseStepExecutionData { export interface UpdateRecordStepExecutionData extends BaseStepExecutionData { type: 'update-record'; executionParams?: { fieldDisplayName: string; value: string }; + /** User confirmed → values returned by updateRecord. User rejected → skipped. */ executionResult?: { updatedValues: Record } | { skipped: true }; pendingUpdate?: { fieldDisplayName: string; diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 86491fbb8f..9fff9ab99c 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -73,7 +73,7 @@ function makeContext(overrides: Partial = {}): ExecutionContex agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -98,7 +98,7 @@ describe('BaseStepExecutor', () => { ]); const executor = new TestableExecutor( makeContext({ - history: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' })], + previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' })], runStore, }), ); @@ -117,7 +117,7 @@ describe('BaseStepExecutor', () => { it('uses Input for matched steps and History for unmatched steps', async () => { const executor = new TestableExecutor( makeContext({ - history: [ + previousSteps: [ makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 }), makeHistoryEntry({ stepId: 'cond-2', stepIndex: 1, prompt: 'Second?' }), ], @@ -147,7 +147,7 @@ describe('BaseStepExecutor', () => { it('falls back to History when no matching step execution in RunStore', async () => { const executor = new TestableExecutor( makeContext({ - history: [ + previousSteps: [ makeHistoryEntry({ stepId: 'orphan', stepIndex: 5, prompt: 'Orphan step' }), makeHistoryEntry({ stepId: 'matched', stepIndex: 1, prompt: 'Matched step' }), ], @@ -183,7 +183,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -207,7 +207,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -236,7 +236,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -272,7 +272,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [condEntry, aiEntry], + previousSteps: [condEntry, aiEntry], runStore: makeMockRunStore([ { type: 'ai-task', @@ -299,7 +299,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { type: 'condition', @@ -336,7 +336,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { type: 'ai-task', @@ -361,7 +361,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { type: 'condition', diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index 23eb6c8365..3500dcce6d 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -53,7 +53,7 @@ function makeContext( agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -175,7 +175,7 @@ describe('ConditionStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - history: [ + previousSteps: [ { stepDefinition: { type: StepType.Condition, diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index eb9c3bc5de..0f0863dc1a 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -111,7 +111,7 @@ function makeContext( agentPort: makeMockAgentPort(), workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -700,7 +700,7 @@ describe('ReadRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - history: [ + previousSteps: [ { stepDefinition: { type: StepType.Condition, diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 68234af1f5..0786f651f3 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -112,7 +112,7 @@ function makeContext( agentPort: makeMockAgentPort(), workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -620,7 +620,7 @@ describe('UpdateRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - history: [ + previousSteps: [ { stepDefinition: { type: StepType.Condition, From 239ca2f3a602e00b2de6d262a30a5f69e7b50c39 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 14:48:13 +0100 Subject: [PATCH 11/22] docs(workflow-executor): document pendingUpdate field on UpdateRecordStepExecutionData Co-Authored-By: Claude Opus 4.6 --- packages/workflow-executor/src/types/step-execution-data.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index cc81fdd783..0d14594613 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -47,6 +47,7 @@ export interface UpdateRecordStepExecutionData extends BaseStepExecutionData { executionParams?: { fieldDisplayName: string; value: string }; /** User confirmed → values returned by updateRecord. User rejected → skipped. */ executionResult?: { updatedValues: Record } | { skipped: true }; + /** AI-selected field and value awaiting user confirmation. Used in the confirmation flow only. */ pendingUpdate?: { fieldDisplayName: string; value: string; From 50905bc5e6a3a521067d2b380c6efd19806ba4da Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 15:48:39 +0100 Subject: [PATCH 12/22] refactor(workflow-executor): extract findField helper to deduplicate field resolution Co-Authored-By: Claude Opus 4.6 --- .../src/executors/read-record-step-executor.ts | 8 +++----- .../src/executors/update-record-step-executor.ts | 5 ++--- packages/workflow-executor/src/index.ts | 1 + packages/workflow-executor/src/types/record.ts | 5 +++++ 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index abff85a7e7..8b25bd3631 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -9,6 +9,7 @@ import { z } from 'zod'; import { NoReadableFieldsError, NoResolvedFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; +import { findField } from '../types/record'; const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request. Select the field(s) that best answer the request. You can read one field or multiple fields at once. @@ -32,10 +33,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor - schema.fields.find(f => f.fieldName === name || f.displayName === name)?.fieldName, - ) + .map(name => findField(schema, name)?.fieldName) .filter((name): name is string => name !== undefined); if (resolvedFieldNames.length === 0) { @@ -135,7 +133,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - const field = schema.fields.find(f => f.fieldName === name || f.displayName === name); + const field = findField(schema, name); if (!field) return { error: `Field not found: ${name}`, fieldName: name, displayName: name }; diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 7e2dfbf578..ef5cab565b 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -9,6 +9,7 @@ import { z } from 'zod'; import { NoWritableFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; +import { findField } from '../types/record'; const UPDATE_RECORD_SYSTEM_PROMPT = `You are an AI agent updating a field on a record based on a user request. Select the field to update and provide the new value. @@ -195,9 +196,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor f.displayName === displayName || f.fieldName === displayName, - ); + const field = findField(schema, displayName); if (!field) { throw new WorkflowExecutorError( diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 1b7aeab83a..95aa37cd28 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -34,6 +34,7 @@ export type { RecordRef, RecordData, } from './types/record'; +export { findField } from './types/record'; export type { Step, diff --git a/packages/workflow-executor/src/types/record.ts b/packages/workflow-executor/src/types/record.ts index b5070c39f4..369a95e795 100644 --- a/packages/workflow-executor/src/types/record.ts +++ b/packages/workflow-executor/src/types/record.ts @@ -21,6 +21,11 @@ export interface CollectionSchema { actions: ActionSchema[]; } +/** Find a field by fieldName or displayName. */ +export function findField(schema: CollectionSchema, name: string): FieldSchema | undefined { + return schema.fields.find(f => f.fieldName === name || f.displayName === name); +} + // -- Record types (data — source: AgentPort/RunStore) -- /** Lightweight pointer to a specific record. */ From 129290c55d2ad9c31eba571372d73507c9c1c1c2 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 16:04:07 +0100 Subject: [PATCH 13/22] refactor(workflow-executor): apply PR review fixes across executors - Move findField to BaseStepExecutor with displayName priority over fieldName - Move buildOutcomeResult to BaseStepExecutor, use error !== undefined check - Fix resolveAndUpdate try-catch scope (saveStepExecution outside try block) - Fix ConditionStepExecutor catch-all to only catch WorkflowExecutorError - Add pendingUpdate visibility in step summary - Remove findField from types/record.ts and barrel export Co-Authored-By: Claude Opus 4.6 --- .../src/executors/base-step-executor.ts | 28 ++++++++++++- .../src/executors/condition-step-executor.ts | 25 +++++++----- .../executors/read-record-step-executor.ts | 24 ++--------- .../executors/update-record-step-executor.ts | 40 ++++++------------- packages/workflow-executor/src/index.ts | 1 - .../workflow-executor/src/types/record.ts | 5 --- .../executors/condition-step-executor.test.ts | 7 +--- 7 files changed, 61 insertions(+), 69 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 82a6abea7f..c86da75fb0 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,5 +1,5 @@ import type { ExecutionContext, StepExecutionResult } from '../types/execution'; -import type { CollectionSchema, RecordRef } from '../types/record'; +import type { CollectionSchema, FieldSchema, RecordRef } from '../types/record'; import type { StepDefinition } from '../types/step-definition'; import type { LoadRelatedRecordStepExecutionData, @@ -31,6 +31,30 @@ export default abstract class BaseStepExecutor; + /** Find a field by displayName first, then fallback to fieldName. */ + protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined { + return ( + schema.fields.find(f => f.displayName === name) ?? + schema.fields.find(f => f.fieldName === name) + ); + } + + /** Builds a StepExecutionResult with the given status and optional error. */ + protected buildOutcomeResult( + status: 'success' | 'error' | 'awaiting-input', + error?: string, + ): StepExecutionResult { + return { + stepOutcome: { + type: 'ai-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status, + ...(error !== undefined && { error }), + }, + }; + } + /** * Returns a SystemMessage array summarizing previously executed steps. * Empty array when there is no history. Ready to spread into a messages array. @@ -73,6 +97,8 @@ export default abstract class BaseStepExecutor(messages, tool); - } catch (error: unknown) { - return { - stepOutcome: { - type: 'condition', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: 'error', - error: (error as Error).message, - }, - }; + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'condition', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; } const { option: selectedOption, reasoning } = args; diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 8b25bd3631..3f8415cfae 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -9,7 +9,6 @@ import { z } from 'zod'; import { NoReadableFieldsError, NoResolvedFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; -import { findField } from '../types/record'; const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request. Select the field(s) that best answer the request. You can read one field or multiple fields at once. @@ -33,7 +32,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor findField(schema, name)?.fieldName) + .map(name => this.findField(schema, name)?.fieldName) .filter((name): name is string => name !== undefined); if (resolvedFieldNames.length === 0) { @@ -48,15 +47,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - const field = findField(schema, name); + const field = this.findField(schema, name); if (!field) return { error: `Field not found: ${name}`, fieldName: name, displayName: name }; diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index ef5cab565b..ec57891735 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -9,7 +9,6 @@ import { z } from 'zod'; import { NoWritableFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; -import { findField } from '../types/record'; const UPDATE_RECORD_SYSTEM_PROMPT = `You are an AI agent updating a field on a record based on a user request. Select the field to update and provide the new value. @@ -109,23 +108,16 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { + let updated: { values: Record }; + try { const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); const fieldName = this.resolveFieldName(schema, fieldDisplayName); - const updated = await this.context.agentPort.updateRecord( + updated = await this.context.agentPort.updateRecord( selectedRecordRef.collectionName, selectedRecordRef.recordId, { [fieldName]: value }, ); - - await this.context.runStore.saveStepExecution({ - ...existingExecution, - type: 'update-record', - stepIndex: this.context.stepIndex, - executionParams: { fieldDisplayName, value }, - executionResult: { updatedValues: updated.values }, - selectedRecordRef, - }); } catch (error) { if (error instanceof WorkflowExecutorError) { return this.buildOutcomeResult('error', error.message); @@ -134,22 +126,16 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor f.fieldName === name || f.displayName === name); -} - // -- Record types (data — source: AgentPort/RunStore) -- /** Lightweight pointer to a specific record. */ diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index 3500dcce6d..c439042448 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -261,7 +261,7 @@ describe('ConditionStepExecutor', () => { }); describe('error propagation', () => { - it('returns error status when model invocation fails', async () => { + it('lets infrastructure errors propagate', async () => { const invoke = jest.fn().mockRejectedValue(new Error('API timeout')); const bindTools = jest.fn().mockReturnValue({ invoke }); const context = makeContext({ @@ -269,10 +269,7 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('API timeout'); + await expect(executor.execute()).rejects.toThrow('API timeout'); }); it('lets run store errors propagate', async () => { From bc3d0c65c7f4d147cc0453d7020fe7e699625855 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 16:37:42 +0100 Subject: [PATCH 14/22] refactor(workflow-executor): rename AiTask to RecordTask for clarity Condition steps handle BPMN routing, record-task steps operate on client data. The previous "ai-task" name was misleading since both step types use AI. Rename to "record-task" to reflect the actual distinction. Co-Authored-By: Claude Opus 4.6 --- .../src/executors/base-step-executor.ts | 2 +- .../src/executors/read-record-step-executor.ts | 4 ++-- .../src/executors/update-record-step-executor.ts | 4 ++-- packages/workflow-executor/src/index.ts | 6 +++--- .../workflow-executor/src/types/step-definition.ts | 4 ++-- .../src/types/step-execution-data.ts | 8 ++++---- packages/workflow-executor/src/types/step-outcome.ts | 12 ++++++------ .../test/executors/base-step-executor.test.ts | 12 ++++++------ .../test/executors/read-record-step-executor.test.ts | 8 ++++---- .../executors/update-record-step-executor.test.ts | 8 ++++---- 10 files changed, 34 insertions(+), 34 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index c86da75fb0..f04361bee6 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -46,7 +46,7 @@ export default abstract class BaseStepExecutor { +export default class ReadRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { const { stepDefinition: step } = this.context; const records = await this.getAvailableRecordRefs(); diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index ec57891735..17377fcf2f 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -1,6 +1,6 @@ import type { StepExecutionResult, UserInput } from '../types/execution'; import type { CollectionSchema, RecordRef } from '../types/record'; -import type { AiTaskStepDefinition } from '../types/step-definition'; +import type { RecordTaskStepDefinition } from '../types/step-definition'; import type { UpdateRecordStepExecutionData } from '../types/step-execution-data'; import { HumanMessage, SystemMessage } from '@langchain/core/messages'; @@ -18,7 +18,7 @@ Important rules: - Final answer is definitive, you won't receive any other input from the user. - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; -export default class UpdateRecordStepExecutor extends BaseStepExecutor { +export default class UpdateRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { // Branch A -- Re-entry with user confirmation if (this.context.userInput) { diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 1b7aeab83a..e85a545492 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -1,14 +1,14 @@ export { StepType } from './types/step-definition'; export type { ConditionStepDefinition, - AiTaskStepDefinition, + RecordTaskStepDefinition, StepDefinition, } from './types/step-definition'; export type { StepStatus, ConditionStepOutcome, - AiTaskStepOutcome, + RecordTaskStepOutcome, StepOutcome, } from './types/step-outcome'; @@ -19,7 +19,7 @@ export type { ConditionStepExecutionData, ReadRecordStepExecutionData, UpdateRecordStepExecutionData, - AiTaskStepExecutionData, + RecordTaskStepExecutionData, LoadRelatedRecordStepExecutionData, ExecutedStepExecutionData, StepExecutionData, diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index ca23e5b413..fb3a5f0b28 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -19,7 +19,7 @@ export interface ConditionStepDefinition extends BaseStepDefinition { options: [string, ...string[]]; } -export interface AiTaskStepDefinition extends BaseStepDefinition { +export interface RecordTaskStepDefinition extends BaseStepDefinition { type: Exclude; recordSourceStepId?: string; automaticCompletion?: boolean; @@ -27,4 +27,4 @@ export interface AiTaskStepDefinition extends BaseStepDefinition { remoteToolsSourceId?: string; } -export type StepDefinition = ConditionStepDefinition | AiTaskStepDefinition; +export type StepDefinition = ConditionStepDefinition | RecordTaskStepDefinition; diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index 0d14594613..f3be7d5e4a 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -57,8 +57,8 @@ export interface UpdateRecordStepExecutionData extends BaseStepExecutionData { // -- Generic AI Task (fallback for untyped steps) -- -export interface AiTaskStepExecutionData extends BaseStepExecutionData { - type: 'ai-task'; +export interface RecordTaskStepExecutionData extends BaseStepExecutionData { + type: 'record-task'; executionParams?: Record; executionResult?: Record; toolConfirmationInterruption?: Record; @@ -77,14 +77,14 @@ export type StepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData | UpdateRecordStepExecutionData - | AiTaskStepExecutionData + | RecordTaskStepExecutionData | LoadRelatedRecordStepExecutionData; export type ExecutedStepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData | UpdateRecordStepExecutionData - | AiTaskStepExecutionData; + | RecordTaskStepExecutionData; // TODO: this condition should change when load-related-record gets its own executor // and produces executionParams/executionResult like other steps. diff --git a/packages/workflow-executor/src/types/step-outcome.ts b/packages/workflow-executor/src/types/step-outcome.ts index 9a564748eb..37f53afa00 100644 --- a/packages/workflow-executor/src/types/step-outcome.ts +++ b/packages/workflow-executor/src/types/step-outcome.ts @@ -6,10 +6,10 @@ type BaseStepStatus = 'success' | 'error'; export type ConditionStepStatus = BaseStepStatus | 'manual-decision'; /** AI task steps can pause mid-execution to await user input (e.g. tool confirmation). */ -export type AiTaskStepStatus = BaseStepStatus | 'awaiting-input'; +export type RecordTaskStepStatus = BaseStepStatus | 'awaiting-input'; /** Union of all step statuses. */ -export type StepStatus = ConditionStepStatus | AiTaskStepStatus; +export type StepStatus = ConditionStepStatus | RecordTaskStepStatus; /** * StepOutcome is sent to the orchestrator — it must NEVER contain client data. @@ -30,9 +30,9 @@ export interface ConditionStepOutcome extends BaseStepOutcome { selectedOption?: string; } -export interface AiTaskStepOutcome extends BaseStepOutcome { - type: 'ai-task'; - status: AiTaskStepStatus; +export interface RecordTaskStepOutcome extends BaseStepOutcome { + type: 'record-task'; + status: RecordTaskStepStatus; } -export type StepOutcome = ConditionStepOutcome | AiTaskStepOutcome; +export type StepOutcome = ConditionStepOutcome | RecordTaskStepOutcome; diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 9fff9ab99c..22dbb56680 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -220,14 +220,14 @@ describe('BaseStepExecutor', () => { expect(result).toContain('"error":"AI could not match an option"'); }); - it('includes status in History for ai-task steps without RunStore data', async () => { + it('includes status in History for record-task steps without RunStore data', async () => { const entry: { stepDefinition: StepDefinition; stepOutcome: StepOutcome } = { stepDefinition: { type: StepType.ReadRecord, prompt: 'Run task', }, stepOutcome: { - type: 'ai-task', + type: 'record-task', stepId: 'ai-step', stepIndex: 0, status: 'awaiting-input', @@ -263,7 +263,7 @@ describe('BaseStepExecutor', () => { prompt: 'Read name', }, stepOutcome: { - type: 'ai-task', + type: 'record-task', stepId: 'read-customer', stepIndex: 1, status: 'success', @@ -275,7 +275,7 @@ describe('BaseStepExecutor', () => { previousSteps: [condEntry, aiEntry], runStore: makeMockRunStore([ { - type: 'ai-task', + type: 'record-task', stepIndex: 1, executionParams: { answer: 'John Doe' }, }, @@ -327,7 +327,7 @@ describe('BaseStepExecutor', () => { prompt: 'Do something', }, stepOutcome: { - type: 'ai-task', + type: 'record-task', stepId: 'ai-step', stepIndex: 0, status: 'success', @@ -339,7 +339,7 @@ describe('BaseStepExecutor', () => { previousSteps: [entry], runStore: makeMockRunStore([ { - type: 'ai-task', + type: 'record-task', stepIndex: 0, }, ]), diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index 0f0863dc1a..35ae85925b 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -3,13 +3,13 @@ import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution'; import type { CollectionSchema, RecordRef } from '../../src/types/record'; -import type { AiTaskStepDefinition } from '../../src/types/step-definition'; +import type { RecordTaskStepDefinition } from '../../src/types/step-definition'; import { NoRecordsError, RecordNotFoundError } from '../../src/errors'; import ReadRecordStepExecutor from '../../src/executors/read-record-step-executor'; import { StepType } from '../../src/types/step-definition'; -function makeStep(overrides: Partial = {}): AiTaskStepDefinition { +function makeStep(overrides: Partial = {}): RecordTaskStepDefinition { return { type: StepType.ReadRecord, prompt: 'Read the customer email', @@ -99,8 +99,8 @@ function makeMockModel( } function makeContext( - overrides: Partial> = {}, -): ExecutionContext { + overrides: Partial> = {}, +): ExecutionContext { return { runId: 'run-1', stepId: 'read-1', diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 0786f651f3..8815a2cb8a 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -3,14 +3,14 @@ import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext, UserInput } from '../../src/types/execution'; import type { CollectionSchema, RecordRef } from '../../src/types/record'; -import type { AiTaskStepDefinition } from '../../src/types/step-definition'; +import type { RecordTaskStepDefinition } from '../../src/types/step-definition'; import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; import { WorkflowExecutorError } from '../../src/errors'; import UpdateRecordStepExecutor from '../../src/executors/update-record-step-executor'; import { StepType } from '../../src/types/step-definition'; -function makeStep(overrides: Partial = {}): AiTaskStepDefinition { +function makeStep(overrides: Partial = {}): RecordTaskStepDefinition { return { type: StepType.UpdateRecord, prompt: 'Set the customer status to active', @@ -96,8 +96,8 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'updat } function makeContext( - overrides: Partial> = {}, -): ExecutionContext { + overrides: Partial> = {}, +): ExecutionContext { return { runId: 'run-1', stepId: 'update-1', From d895154372e444515e02eadee3b8eec2c3bbf19b Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 17:06:00 +0100 Subject: [PATCH 15/22] refactor(workflow-executor): remove unused fields from RecordTaskStepDefinition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop recordSourceStepId and remoteToolsSourceId — both were declared but never read. allowedTools is kept as the orchestrator-provided tool filter. Co-Authored-By: Claude Sonnet 4.6 --- packages/workflow-executor/src/types/step-definition.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index fb3a5f0b28..44c93a2af2 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -21,10 +21,8 @@ export interface ConditionStepDefinition extends BaseStepDefinition { export interface RecordTaskStepDefinition extends BaseStepDefinition { type: Exclude; - recordSourceStepId?: string; automaticCompletion?: boolean; allowedTools?: string[]; - remoteToolsSourceId?: string; } export type StepDefinition = ConditionStepDefinition | RecordTaskStepDefinition; From d1651dbed8e28f79e97ecf8fe02a15b5e948dc88 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 17:11:32 +0100 Subject: [PATCH 16/22] feat(workflow-executor): add ToolTaskStepDefinition for MCP tool steps Introduces StepType.ToolTask ('tool-task') and ToolTaskStepDefinition for steps where the AI freely selects and executes a tool from allowedTools. Moves allowedTools out of RecordTaskStepDefinition where it didn't belong. Co-Authored-By: Claude Sonnet 4.6 --- packages/workflow-executor/src/index.ts | 1 + .../workflow-executor/src/types/step-definition.ts | 10 ++++++++-- packages/workflow-executor/test/index.test.ts | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index e85a545492..d41c2663bd 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -2,6 +2,7 @@ export { StepType } from './types/step-definition'; export type { ConditionStepDefinition, RecordTaskStepDefinition, + ToolTaskStepDefinition, StepDefinition, } from './types/step-definition'; diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index 44c93a2af2..4a82626d5e 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -6,6 +6,7 @@ export enum StepType { UpdateRecord = 'update-record', TriggerAction = 'trigger-action', LoadRelatedRecord = 'load-related-record', + ToolTask = 'tool-task', } interface BaseStepDefinition { @@ -20,9 +21,14 @@ export interface ConditionStepDefinition extends BaseStepDefinition { } export interface RecordTaskStepDefinition extends BaseStepDefinition { - type: Exclude; + type: Exclude; automaticCompletion?: boolean; +} + +export interface ToolTaskStepDefinition extends BaseStepDefinition { + type: StepType.ToolTask; allowedTools?: string[]; + automaticCompletion?: boolean; } -export type StepDefinition = ConditionStepDefinition | RecordTaskStepDefinition; +export type StepDefinition = ConditionStepDefinition | RecordTaskStepDefinition | ToolTaskStepDefinition; diff --git a/packages/workflow-executor/test/index.test.ts b/packages/workflow-executor/test/index.test.ts index 05affa035c..ff302f7a98 100644 --- a/packages/workflow-executor/test/index.test.ts +++ b/packages/workflow-executor/test/index.test.ts @@ -1,9 +1,9 @@ import { StepType } from '../src/index'; describe('StepType', () => { - it('should expose exactly 5 step types', () => { + it('should expose exactly 6 step types', () => { const values = Object.values(StepType); - expect(values).toHaveLength(5); + expect(values).toHaveLength(6); }); it.each([ @@ -12,6 +12,7 @@ describe('StepType', () => { ['UpdateRecord', 'update-record'], ['TriggerAction', 'trigger-action'], ['LoadRelatedRecord', 'load-related-record'], + ['ToolTask', 'tool-task'], ] as const)('should have %s = "%s"', (key, value) => { expect(StepType[key]).toBe(value); }); From 6a49657f3a80534c72467072a89ee0ac2143105c Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Thu, 19 Mar 2026 19:08:04 +0100 Subject: [PATCH 17/22] fix(workflow-executor): fix prettier formatting on StepDefinition union type Co-Authored-By: Claude Sonnet 4.6 --- packages/workflow-executor/src/types/step-definition.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index 4a82626d5e..a5c23f4250 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -31,4 +31,7 @@ export interface ToolTaskStepDefinition extends BaseStepDefinition { automaticCompletion?: boolean; } -export type StepDefinition = ConditionStepDefinition | RecordTaskStepDefinition | ToolTaskStepDefinition; +export type StepDefinition = + | ConditionStepDefinition + | RecordTaskStepDefinition + | ToolTaskStepDefinition; From 949d77ad2df11f47310241877473d9dea4a3a75e Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 20 Mar 2026 09:51:32 +0100 Subject: [PATCH 18/22] refactor(workflow-executor): introduce UpdateTarget to reduce resolveAndUpdate arity - Extract UpdateTarget interface {selectedRecordRef, fieldDisplayName, value} so resolveAndUpdate takes 2 params instead of 4 (addresses qlty reviewer comment) - Collapse 3 uninitialized let declarations in handleFirstCall into one target object - Fix missing runId in getAvailableRecordRefs and update-record saveStepExecution calls - Strengthen test assertions to include runId as first arg Co-Authored-By: Claude Sonnet 4.6 --- .../src/executors/base-step-executor.ts | 2 +- .../executors/update-record-step-executor.ts | 50 ++++++++++--------- .../update-record-step-executor.test.ts | 5 ++ 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index f04361bee6..e83950c6df 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -148,7 +148,7 @@ export default abstract class BaseStepExecutor { - const stepExecutions = await this.context.runStore.getStepExecutions(); + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const relatedRecords = stepExecutions .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') .map(e => e.record); diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 17377fcf2f..56e603f104 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -18,6 +18,12 @@ Important rules: - Final answer is definitive, you won't receive any other input from the user. - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; +interface UpdateTarget { + selectedRecordRef: RecordRef; + fieldDisplayName: string; + value: string; +} + export default class UpdateRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { // Branch A -- Re-entry with user confirmation @@ -30,7 +36,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { - const stepExecutions = await this.context.runStore.getStepExecutions(); + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const execution = stepExecutions.find( (e): e is UpdateRecordStepExecutionData => e.type === 'update-record' && e.stepIndex === this.context.stepIndex, @@ -41,7 +47,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { const { stepDefinition: step } = this.context; const records = await this.getAvailableRecordRefs(); - let selectedRecordRef: RecordRef; - let fieldDisplayName: string; - let value: string; + let target: UpdateTarget; try { - selectedRecordRef = await this.selectRecordRef(records, step.prompt); + const selectedRecordRef = await this.selectRecordRef(records, step.prompt); const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); const args = await this.selectFieldAndValue(schema, step.prompt); - fieldDisplayName = args.fieldName; - value = args.value; + target = { selectedRecordRef, fieldDisplayName: args.fieldName, value: args.value }; } catch (error) { if (error instanceof WorkflowExecutorError) { return this.buildOutcomeResult('error', error.message); @@ -83,15 +86,15 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { + const { selectedRecordRef, fieldDisplayName, value } = target; let updated: { values: Record }; try { @@ -126,7 +128,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { expect(result.stepOutcome.status).toBe('success'); expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', expect.objectContaining({ type: 'update-record', stepIndex: 0, @@ -174,6 +175,7 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('awaiting-input'); expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', expect.objectContaining({ type: 'update-record', stepIndex: 0, @@ -209,6 +211,7 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', expect.objectContaining({ type: 'update-record', executionParams: { fieldDisplayName: 'Status', value: 'active' }, @@ -240,6 +243,7 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); expect(agentPort.updateRecord).not.toHaveBeenCalled(); expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', expect.objectContaining({ executionResult: { skipped: true }, pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, @@ -329,6 +333,7 @@ describe('UpdateRecordStepExecutor', () => { expect(updateTool.name).toBe('update-record-field'); expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', expect.objectContaining({ pendingUpdate: { fieldDisplayName: 'Order Status', value: 'shipped' }, selectedRecordRef: expect.objectContaining({ From b939ac73a782fe36b021dd83be2f17729d2866c4 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 20 Mar 2026 09:56:00 +0100 Subject: [PATCH 19/22] test(workflow-executor): address PR review gaps on UpdateRecordStepExecutor Code changes: - Add JSDoc on buildOutcomeResult warning it is for record-task executors only - Add runtime guard on userInput.type in handleConfirmation (WorkflowExecutorError if an unexpected type is received, guarding against future UserInput union extension) New tests: - buildStepSummary: pendingUpdate branch coverage (update-record step with pendingUpdate but no executionParams emits Pending: in AI context) - handleConfirmation: stepIndex mismatch and missing pendingUpdate cases - stepOutcome shape: type/stepId/stepIndex asserted on happy path - unexpected userInput type: runtime guard verified - findField fieldName fallback: update succeeds when AI returns raw fieldName - schema caching: getCollectionSchema called once per collection in Branch B - RunStore error propagation: all four saveStepExecution/getStepExecutions call sites Co-Authored-By: Claude Sonnet 4.6 --- .../src/executors/base-step-executor.ts | 6 +- .../executors/update-record-step-executor.ts | 6 + .../test/executors/base-step-executor.test.ts | 35 ++++ .../update-record-step-executor.test.ts | 153 ++++++++++++++++++ 4 files changed, 199 insertions(+), 1 deletion(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index e83950c6df..bf677691a3 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -39,7 +39,11 @@ export default abstract class BaseStepExecutor { + if (userInput.type !== 'confirmation') { + throw new WorkflowExecutorError( + `UpdateRecordStepExecutor received unexpected userInput type: "${(userInput as { type: string }).type}"`, + ); + } + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const execution = stepExecutions.find( (e): e is UpdateRecordStepExecutionData => diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 22dbb56680..9e7dc5ebe7 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -355,6 +355,41 @@ describe('BaseStepExecutor', () => { expect(result).not.toContain('Input:'); }); + it('uses Pending when update-record step has pendingUpdate but no executionParams', async () => { + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + { + stepDefinition: { type: StepType.UpdateRecord, prompt: 'Set status to active' }, + stepOutcome: { + type: 'record-task', + stepId: 'update-1', + stepIndex: 0, + status: 'awaiting-input', + }, + }, + ], + runStore: makeMockRunStore([ + { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: { collectionName: 'customers', recordId: [1], stepIndex: 0 }, + }, + ]), + }), + ); + + const result = await executor + .buildPreviousStepsMessages() + .then(msgs => msgs[0]?.content ?? ''); + + expect(result).toContain('Pending:'); + expect(result).toContain('"fieldDisplayName":"Status"'); + expect(result).toContain('"value":"active"'); + expect(result).not.toContain('Input:'); + }); + it('shows "(no prompt)" when step has no prompt', async () => { const entry = makeHistoryEntry({ stepIndex: 0 }); entry.stepDefinition.prompt = undefined; diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 1225badd44..3db784b8bf 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -263,6 +263,41 @@ describe('UpdateRecordStepExecutor', () => { await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); }); + + it('throws when execution exists but stepIndex does not match', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'update-record', + stepIndex: 5, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); + }); + + it('throws when execution exists but pendingUpdate is absent', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'update-record', + stepIndex: 0, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); + }); }); describe('multi-record AI selection', () => { @@ -585,6 +620,124 @@ describe('UpdateRecordStepExecutor', () => { }); }); + describe('stepOutcome shape', () => { + it('emits correct type, stepId and stepIndex in the outcome', async () => { + const context = makeContext({ stepDefinition: makeStep({ automaticCompletion: true }) }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome).toMatchObject({ + type: 'record-task', + stepId: 'update-1', + stepIndex: 0, + status: 'success', + }); + }); + }); + + describe('unexpected userInput type', () => { + it('throws when userInput has an unknown type', async () => { + const userInput = { type: 'text-input', value: 'hello' } as unknown as UserInput; + const context = makeContext({ userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow( + 'UpdateRecordStepExecutor received unexpected userInput type: "text-input"', + ); + }); + }); + + describe('findField fieldName fallback', () => { + it('resolves update when AI returns raw fieldName instead of displayName', async () => { + const agentPort = makeMockAgentPort(); + // AI returns 'status' (fieldName) instead of 'Status' (displayName) + const mockModel = makeMockModel({ fieldName: 'status', value: 'active', reasoning: 'test' }); + const context = makeContext({ + model: mockModel.model, + agentPort, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + }); + }); + + describe('schema caching', () => { + it('fetches getCollectionSchema once per collection even when called twice (Branch B)', async () => { + const workflowPort = makeMockWorkflowPort(); + const context = makeContext({ + workflowPort, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + // Branch B calls getCollectionSchema in handleFirstCall and again in resolveAndUpdate + // but the cache should prevent the second network call + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + }); + + describe('RunStore error propagation', () => { + it('lets getStepExecutions errors propagate (Branch A)', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockRejectedValue(new Error('DB timeout')), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: true }; + const context = makeContext({ runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('DB timeout'); + }); + + it('lets saveStepExecution errors propagate when user rejects (Branch A)', async () => { + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const userInput: UserInput = { type: 'confirmation', confirmed: false }; + const context = makeContext({ runStore, userInput }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + + it('lets saveStepExecution errors propagate when saving awaiting-input (Branch C)', async () => { + const runStore = makeMockRunStore({ + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const context = makeContext({ runStore }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + + it('lets saveStepExecution errors propagate after successful updateRecord (Branch B)', async () => { + const runStore = makeMockRunStore({ + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const context = makeContext({ + runStore, + stepDefinition: makeStep({ automaticCompletion: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + }); + describe('default prompt', () => { it('uses default prompt when step.prompt is undefined', async () => { const mockModel = makeMockModel({ From 180f3fd1d797f52d386bb442a389d67b210ace59 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 20 Mar 2026 10:32:58 +0100 Subject: [PATCH 20/22] =?UTF-8?q?refactor(workflow-executor):=20rename=20T?= =?UTF-8?q?oolTask=E2=86=92McpTask=20and=20automaticCompletion=E2=86=92aut?= =?UTF-8?q?omaticExecution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - StepType.ToolTask → StepType.McpTask (wire value: 'tool-task' → 'mcp-task') - ToolTaskStepDefinition → McpTaskStepDefinition - mcpServerId: string[] → string (single ID) - automaticCompletion → automaticExecution on both RecordTaskStepDefinition and McpTaskStepDefinition - Remove TODO rename comments - Update all tests accordingly Co-Authored-By: Claude Sonnet 4.6 --- .../executors/update-record-step-executor.ts | 8 ++++--- packages/workflow-executor/src/index.ts | 1 - .../src/types/step-definition.ts | 16 +++++++------- .../update-record-step-executor.test.ts | 22 +++++++++---------- packages/workflow-executor/test/index.test.ts | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 25748198d3..bd48bc3225 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -38,7 +38,9 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor { if (userInput.type !== 'confirmation') { throw new WorkflowExecutorError( - `UpdateRecordStepExecutor received unexpected userInput type: "${(userInput as { type: string }).type}"`, + `UpdateRecordStepExecutor received unexpected userInput type: "${ + (userInput as { type: string }).type + }"`, ); } @@ -90,8 +92,8 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor; - automaticCompletion?: boolean; + type: Exclude; + automaticExecution?: boolean; } -export interface ToolTaskStepDefinition extends BaseStepDefinition { - type: StepType.ToolTask; - allowedTools?: string[]; - automaticCompletion?: boolean; +export interface McpTaskStepDefinition extends BaseStepDefinition { + type: StepType.McpTask; + mcpServerId?: string; + automaticExecution?: boolean; } export type StepDefinition = | ConditionStepDefinition | RecordTaskStepDefinition - | ToolTaskStepDefinition; + | McpTaskStepDefinition; diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 3db784b8bf..1df654e356 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -119,7 +119,7 @@ function makeContext( } describe('UpdateRecordStepExecutor', () => { - describe('automaticCompletion: update direct (Branch B)', () => { + describe('automaticExecution: update direct (Branch B)', () => { it('updates the record and returns success', async () => { const updatedValues = { status: 'active', name: 'John Doe' }; const agentPort = makeMockAgentPort(updatedValues); @@ -133,7 +133,7 @@ describe('UpdateRecordStepExecutor', () => { model: mockModel.model, agentPort, runStore, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -157,7 +157,7 @@ describe('UpdateRecordStepExecutor', () => { }); }); - describe('without automaticCompletion: awaiting-input (Branch C)', () => { + describe('without automaticExecution: awaiting-input (Branch C)', () => { it('saves execution and returns awaiting-input', async () => { const mockModel = makeMockModel({ fieldName: 'Status', @@ -432,7 +432,7 @@ describe('UpdateRecordStepExecutor', () => { ); }); - it('returns error when field is not found during automaticCompletion (Branch B)', async () => { + it('returns error when field is not found during automaticExecution (Branch B)', async () => { // AI returns a display name that doesn't match any field in the schema const mockModel = makeMockModel({ fieldName: 'NonExistentField', @@ -441,7 +441,7 @@ describe('UpdateRecordStepExecutor', () => { }); const context = makeContext({ model: mockModel.model, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -544,7 +544,7 @@ describe('UpdateRecordStepExecutor', () => { model: mockModel.model, agentPort, runStore, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -593,7 +593,7 @@ describe('UpdateRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, agentPort, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -622,7 +622,7 @@ describe('UpdateRecordStepExecutor', () => { describe('stepOutcome shape', () => { it('emits correct type, stepId and stepIndex in the outcome', async () => { - const context = makeContext({ stepDefinition: makeStep({ automaticCompletion: true }) }); + const context = makeContext({ stepDefinition: makeStep({ automaticExecution: true }) }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); @@ -656,7 +656,7 @@ describe('UpdateRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, agentPort, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -672,7 +672,7 @@ describe('UpdateRecordStepExecutor', () => { const workflowPort = makeMockWorkflowPort(); const context = makeContext({ workflowPort, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); @@ -730,7 +730,7 @@ describe('UpdateRecordStepExecutor', () => { }); const context = makeContext({ runStore, - stepDefinition: makeStep({ automaticCompletion: true }), + stepDefinition: makeStep({ automaticExecution: true }), }); const executor = new UpdateRecordStepExecutor(context); diff --git a/packages/workflow-executor/test/index.test.ts b/packages/workflow-executor/test/index.test.ts index ff302f7a98..1267b1cbbd 100644 --- a/packages/workflow-executor/test/index.test.ts +++ b/packages/workflow-executor/test/index.test.ts @@ -12,7 +12,7 @@ describe('StepType', () => { ['UpdateRecord', 'update-record'], ['TriggerAction', 'trigger-action'], ['LoadRelatedRecord', 'load-related-record'], - ['ToolTask', 'tool-task'], + ['McpTask', 'mcp-task'], ] as const)('should have %s = "%s"', (key, value) => { expect(StepType[key]).toBe(value); }); From 1309b54086485a8f27e1309a2b540bab4ba59626 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 20 Mar 2026 10:35:54 +0100 Subject: [PATCH 21/22] test(workflow-executor): rename misleading stepId 'ai-step' to 'read-record-1' Co-Authored-By: Claude Sonnet 4.6 --- .../test/executors/base-step-executor.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 9e7dc5ebe7..45f5bc9c42 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -228,7 +228,7 @@ describe('BaseStepExecutor', () => { }, stepOutcome: { type: 'record-task', - stepId: 'ai-step', + stepId: 'read-record-1', stepIndex: 0, status: 'awaiting-input', }, @@ -245,7 +245,7 @@ describe('BaseStepExecutor', () => { .buildPreviousStepsMessages() .then(msgs => msgs[0]?.content ?? ''); - expect(result).toContain('Step "ai-step"'); + expect(result).toContain('Step "read-record-1"'); expect(result).toContain('History: {"status":"awaiting-input"}'); }); @@ -328,7 +328,7 @@ describe('BaseStepExecutor', () => { }, stepOutcome: { type: 'record-task', - stepId: 'ai-step', + stepId: 'read-record-1', stepIndex: 0, status: 'success', }, @@ -350,7 +350,7 @@ describe('BaseStepExecutor', () => { .buildPreviousStepsMessages() .then(msgs => msgs[0]?.content ?? ''); - expect(result).toContain('Step "ai-step"'); + expect(result).toContain('Step "read-record-1"'); expect(result).toContain('Prompt: Do something'); expect(result).not.toContain('Input:'); }); From d99fe27f914565683bcebd8f3c478e4e7512dd5e Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Fri, 20 Mar 2026 11:44:36 +0100 Subject: [PATCH 22/22] refactor(workflow-executor): simplify userInput to userConfirmed boolean Replaces the UserInput discriminated union with a plain boolean field userConfirmed on ExecutionContext and PendingStepExecution. Since the only user input type in practice is a boolean confirmation, the union wrapper adds complexity with no benefit. Co-Authored-By: Claude Sonnet 4.6 --- .../executors/update-record-step-executor.ts | 18 ++----- packages/workflow-executor/src/index.ts | 1 - .../workflow-executor/src/types/execution.ts | 6 +-- .../update-record-step-executor.test.ts | 54 ++++++++----------- 4 files changed, 28 insertions(+), 51 deletions(-) diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index bd48bc3225..5d210b8596 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -1,4 +1,4 @@ -import type { StepExecutionResult, UserInput } from '../types/execution'; +import type { StepExecutionResult } from '../types/execution'; import type { CollectionSchema, RecordRef } from '../types/record'; import type { RecordTaskStepDefinition } from '../types/step-definition'; import type { UpdateRecordStepExecutionData } from '../types/step-execution-data'; @@ -27,23 +27,15 @@ interface UpdateTarget { export default class UpdateRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { // Branch A -- Re-entry with user confirmation - if (this.context.userInput) { - return this.handleConfirmation(this.context.userInput); + if (this.context.userConfirmed !== undefined) { + return this.handleConfirmation(); } // Branches B & C -- First call return this.handleFirstCall(); } - private async handleConfirmation(userInput: UserInput): Promise { - if (userInput.type !== 'confirmation') { - throw new WorkflowExecutorError( - `UpdateRecordStepExecutor received unexpected userInput type: "${ - (userInput as { type: string }).type - }"`, - ); - } - + private async handleConfirmation(): Promise { const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const execution = stepExecutions.find( (e): e is UpdateRecordStepExecutionData => @@ -54,7 +46,7 @@ export default class UpdateRecordStepExecutor extends BaseStepExecutor; - readonly userInput?: UserInput; + readonly userConfirmed?: boolean; } export interface StepExecutionResult { @@ -41,5 +39,5 @@ export interface ExecutionContext readonly runStore: RunStore; readonly previousSteps: ReadonlyArray>; readonly remoteTools: readonly unknown[]; - readonly userInput?: UserInput; + readonly userConfirmed?: boolean; } diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 1df654e356..1d2ace3713 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -1,7 +1,7 @@ import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; -import type { ExecutionContext, UserInput } from '../../src/types/execution'; +import type { ExecutionContext } from '../../src/types/execution'; import type { CollectionSchema, RecordRef } from '../../src/types/record'; import type { RecordTaskStepDefinition } from '../../src/types/step-definition'; import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; @@ -202,8 +202,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ agentPort, runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); @@ -234,8 +234,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: false }; - const context = makeContext({ agentPort, runStore, userInput }); + const userConfirmed = false; + const context = makeContext({ agentPort, runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); @@ -257,8 +257,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); @@ -275,8 +275,8 @@ describe('UpdateRecordStepExecutor', () => { }, ]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); @@ -292,8 +292,8 @@ describe('UpdateRecordStepExecutor', () => { }, ]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); @@ -420,8 +420,8 @@ describe('UpdateRecordStepExecutor', () => { getStepExecutions: jest.fn().mockResolvedValue([execution]), }); const workflowPort = makeMockWorkflowPort({ customers: schema }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ runStore, workflowPort, userInput }); + const userConfirmed = true; + const context = makeContext({ runStore, workflowPort, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); @@ -570,8 +570,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ agentPort, runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); @@ -612,8 +612,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ agentPort, runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('Connection refused'); @@ -636,18 +636,6 @@ describe('UpdateRecordStepExecutor', () => { }); }); - describe('unexpected userInput type', () => { - it('throws when userInput has an unknown type', async () => { - const userInput = { type: 'text-input', value: 'hello' } as unknown as UserInput; - const context = makeContext({ userInput }); - const executor = new UpdateRecordStepExecutor(context); - - await expect(executor.execute()).rejects.toThrow( - 'UpdateRecordStepExecutor received unexpected userInput type: "text-input"', - ); - }); - }); - describe('findField fieldName fallback', () => { it('resolves update when AI returns raw fieldName instead of displayName', async () => { const agentPort = makeMockAgentPort(); @@ -689,8 +677,8 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockRejectedValue(new Error('DB timeout')), }); - const userInput: UserInput = { type: 'confirmation', confirmed: true }; - const context = makeContext({ runStore, userInput }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('DB timeout'); @@ -707,8 +695,8 @@ describe('UpdateRecordStepExecutor', () => { getStepExecutions: jest.fn().mockResolvedValue([execution]), saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), }); - const userInput: UserInput = { type: 'confirmation', confirmed: false }; - const context = makeContext({ runStore, userInput }); + const userConfirmed = false; + const context = makeContext({ runStore, userConfirmed }); const executor = new UpdateRecordStepExecutor(context); await expect(executor.execute()).rejects.toThrow('Disk full');