diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 333bfdee1a..f0318c97bd 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ── ``` src/ -├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError +├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError ├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring) ├── types/ # Core type definitions (@draft) │ ├── step-definition.ts # StepType enum + step definition interfaces @@ -60,7 +60,8 @@ src/ ├── executors/ # Step executor implementations │ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers) │ ├── condition-step-executor.ts # AI-powered condition step (chooses among options) -│ └── read-record-step-executor.ts # AI-powered record field reading step +│ ├── read-record-step-executor.ts # AI-powered record field reading step +│ └── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow) ├── http/ # HTTP server (optional, for frontend data access) │ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger └── index.ts # Barrel exports diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index b835c391fa..8b872a8932 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -45,3 +45,9 @@ export class NoResolvedFieldsError extends WorkflowExecutorError { super(`None of the requested fields could be resolved: ${fieldNames.join(', ')}`); } } + +export class NoWritableFieldsError extends WorkflowExecutorError { + constructor(collectionName: string) { + super(`No writable fields on record from collection "${collectionName}"`); + } +} diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 2197843be8..bf677691a3 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,30 +1,70 @@ import type { ExecutionContext, StepExecutionResult } from '../types/execution'; +import type { CollectionSchema, FieldSchema, RecordRef } from '../types/record'; import type { StepDefinition } from '../types/step-definition'; -import type { StepExecutionData } from '../types/step-execution-data'; +import type { + LoadRelatedRecordStepExecutionData, + StepExecutionData, +} from '../types/step-execution-data'; import type { StepOutcome } from '../types/step-outcome'; import type { AIMessage, BaseMessage } from '@langchain/core/messages'; -import type { DynamicStructuredTool } from '@langchain/core/tools'; -import { SystemMessage } from '@langchain/core/messages'; +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; -import { MalformedToolCallError, MissingToolCallError } from '../errors'; +import { + MalformedToolCallError, + MissingToolCallError, + NoRecordsError, + WorkflowExecutorError, +} from '../errors'; import { isExecutedStepOnExecutor } from '../types/step-execution-data'; export default abstract class BaseStepExecutor { protected readonly context: ExecutionContext; + protected readonly schemaCache = new Map(); + constructor(context: ExecutionContext) { this.context = context; } abstract execute(): Promise; + /** Find a field by displayName first, then fallback to fieldName. */ + protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined { + return ( + schema.fields.find(f => f.displayName === name) ?? + schema.fields.find(f => f.fieldName === name) + ); + } + + /** + * Builds a StepExecutionResult with the given status and optional error. + * Only for record-task executors — hardcodes type: 'record-task'. + * ConditionStepExecutor and future non-record-task executors must NOT call this method. + */ + protected buildOutcomeResult( + status: 'success' | 'error' | 'awaiting-input', + error?: string, + ): StepExecutionResult { + return { + stepOutcome: { + type: 'record-task', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status, + ...(error !== undefined && { error }), + }, + }; + } + /** * Returns a SystemMessage array summarizing previously executed steps. * Empty array when there is no history. Ready to spread into a messages array. */ protected async buildPreviousStepsMessages(): Promise { - if (!this.context.history.length) return []; + if (!this.context.previousSteps.length) return []; const summary = await this.summarizePreviousSteps(); @@ -40,7 +80,7 @@ export default abstract class BaseStepExecutor { const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - return this.context.history + return this.context.previousSteps .map(({ stepDefinition, stepOutcome }) => { const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); @@ -61,6 +101,8 @@ export default abstract class BaseStepExecutor { + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); + const relatedRecords = stepExecutions + .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') + .map(e => e.record); + + return [this.context.baseRecordRef, ...relatedRecords]; + } + + /** Selects a record ref via AI when multiple are available, returns directly when only one. */ + protected async selectRecordRef( + records: RecordRef[], + prompt: string | undefined, + ): Promise { + if (records.length === 0) throw new NoRecordsError(); + if (records.length === 1) return records[0]; + + const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r))); + const identifierTuple = identifiers as [string, ...string[]]; + + const tool = new DynamicStructuredTool({ + name: 'select-record', + description: 'Select the most relevant record for this workflow step.', + schema: z.object({ + recordIdentifier: z.enum(identifierTuple), + }), + func: undefined, + }); + + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage( + 'You are an AI agent selecting the most relevant record for a workflow step.\n' + + 'Choose the record whose collection best matches the user request.\n' + + 'Pay attention to the collection name of each record.', + ), + new HumanMessage(prompt ?? 'Select the most relevant record.'), + ]; + + const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>( + messages, + tool, + ); + + const selectedIndex = identifiers.indexOf(recordIdentifier); + + if (selectedIndex === -1) { + throw new WorkflowExecutorError( + `AI selected record "${recordIdentifier}" which does not match any available record`, + ); + } + + return records[selectedIndex]; + } + + /** Fetches a collection schema from WorkflowPort, with caching. */ + protected async getCollectionSchema(collectionName: string): Promise { + const cached = this.schemaCache.get(collectionName); + if (cached) return cached; + + const schema = await this.context.workflowPort.getCollectionSchema(collectionName); + this.schemaCache.set(collectionName, schema); + + return schema; + } + + /** Formats a record ref as "Step X - CollectionDisplayName #id". */ + protected async toRecordIdentifier(record: RecordRef): Promise { + const schema = await this.getCollectionSchema(record.collectionName); + + return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`; + } } diff --git a/packages/workflow-executor/src/executors/condition-step-executor.ts b/packages/workflow-executor/src/executors/condition-step-executor.ts index 217abdcff4..de6409b76b 100644 --- a/packages/workflow-executor/src/executors/condition-step-executor.ts +++ b/packages/workflow-executor/src/executors/condition-step-executor.ts @@ -5,6 +5,7 @@ import { HumanMessage, SystemMessage } from '@langchain/core/messages'; import { DynamicStructuredTool } from '@langchain/core/tools'; import { z } from 'zod'; +import { WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; interface GatewayToolArgs { @@ -66,16 +67,20 @@ export default class ConditionStepExecutor extends BaseStepExecutor(messages, tool); - } catch (error: unknown) { - return { - stepOutcome: { - type: 'condition', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: 'error', - error: (error as Error).message, - }, - }; + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return { + stepOutcome: { + type: 'condition', + stepId: this.context.stepId, + stepIndex: this.context.stepIndex, + status: 'error', + error: error.message, + }, + }; + } + + throw error; } const { option: selectedOption, reasoning } = args; diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 6f7248c3c4..23eb651b9f 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -1,21 +1,13 @@ import type { StepExecutionResult } from '../types/execution'; import type { CollectionSchema, RecordRef } from '../types/record'; -import type { AiTaskStepDefinition } from '../types/step-definition'; -import type { - FieldReadResult, - LoadRelatedRecordStepExecutionData, -} from '../types/step-execution-data'; +import type { RecordTaskStepDefinition } from '../types/step-definition'; +import type { FieldReadResult } from '../types/step-execution-data'; import { HumanMessage, SystemMessage } from '@langchain/core/messages'; import { DynamicStructuredTool } from '@langchain/core/tools'; import { z } from 'zod'; -import { - NoReadableFieldsError, - NoRecordsError, - NoResolvedFieldsError, - WorkflowExecutorError, -} from '../errors'; +import { NoReadableFieldsError, NoResolvedFieldsError, WorkflowExecutorError } from '../errors'; import BaseStepExecutor from './base-step-executor'; const READ_RECORD_SYSTEM_PROMPT = `You are an AI agent reading fields from a record to answer a user request. @@ -26,9 +18,7 @@ Important rules: - Final answer is definitive, you won't receive any other input from the user. - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; -export default class ReadRecordStepExecutor extends BaseStepExecutor { - private readonly schemaCache = new Map(); - +export default class ReadRecordStepExecutor extends BaseStepExecutor { async execute(): Promise { const { stepDefinition: step } = this.context; const records = await this.getAvailableRecordRefs(); @@ -42,10 +32,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor - schema.fields.find(f => f.fieldName === name || f.displayName === name)?.fieldName, - ) + .map(name => this.findField(schema, name)?.fieldName) .filter((name): name is string => name !== undefined); if (resolvedFieldNames.length === 0) { @@ -60,15 +47,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - if (records.length === 0) throw new NoRecordsError(); - if (records.length === 1) return records[0]; - - const identifiers = await Promise.all(records.map(r => this.toRecordIdentifier(r))); - const identifierTuple = identifiers as [string, ...string[]]; - - const tool = new DynamicStructuredTool({ - name: 'select-record', - description: 'Select the most relevant record for this workflow step.', - schema: z.object({ - recordIdentifier: z.enum(identifierTuple), - }), - func: undefined, - }); - - const messages = [ - ...(await this.buildPreviousStepsMessages()), - new SystemMessage( - 'You are an AI agent selecting the most relevant record for a workflow step.\n' + - 'Choose the record whose collection best matches the user request.\n' + - 'Pay attention to the collection name of each record.', - ), - new HumanMessage(prompt ?? 'Select the most relevant record.'), - ]; - - const { recordIdentifier } = await this.invokeWithTool<{ recordIdentifier: string }>( - messages, - tool, - ); - - const selectedIndex = identifiers.indexOf(recordIdentifier); - - if (selectedIndex === -1) { - throw new WorkflowExecutorError( - `AI selected record "${recordIdentifier}" which does not match any available record`, - ); - } - - return records[selectedIndex]; - } - private buildReadFieldTool(schema: CollectionSchema): DynamicStructuredTool { const nonRelationFields = schema.fields.filter(f => !f.isRelationship); @@ -190,7 +117,7 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - const field = schema.fields.find(f => f.fieldName === name || f.displayName === name); + const field = this.findField(schema, name); if (!field) return { error: `Field not found: ${name}`, fieldName: name, displayName: name }; @@ -201,29 +128,4 @@ export default class ReadRecordStepExecutor extends BaseStepExecutor { - const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - const relatedRecords = stepExecutions - .filter((e): e is LoadRelatedRecordStepExecutionData => e.type === 'load-related-record') - .map(e => e.record); - - return [this.context.baseRecordRef, ...relatedRecords]; - } - - private async getCollectionSchema(collectionName: string): Promise { - const cached = this.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema(collectionName); - this.schemaCache.set(collectionName, schema); - - return schema; - } - - private async toRecordIdentifier(record: RecordRef): Promise { - const schema = await this.getCollectionSchema(record.collectionName); - - return `Step ${record.stepIndex} - ${schema.collectionDisplayName} #${record.recordId}`; - } } diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts new file mode 100644 index 0000000000..5d210b8596 --- /dev/null +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -0,0 +1,197 @@ +import type { StepExecutionResult } from '../types/execution'; +import type { CollectionSchema, RecordRef } from '../types/record'; +import type { RecordTaskStepDefinition } from '../types/step-definition'; +import type { UpdateRecordStepExecutionData } from '../types/step-execution-data'; + +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; + +import { NoWritableFieldsError, WorkflowExecutorError } from '../errors'; +import BaseStepExecutor from './base-step-executor'; + +const UPDATE_RECORD_SYSTEM_PROMPT = `You are an AI agent updating a field on a record based on a user request. +Select the field to update and provide the new value. + +Important rules: +- Be precise: only update the field that is directly relevant to the request. +- Final answer is definitive, you won't receive any other input from the user. +- Do not refer to yourself as "I" in the response, use a passive formulation instead.`; + +interface UpdateTarget { + selectedRecordRef: RecordRef; + fieldDisplayName: string; + value: string; +} + +export default class UpdateRecordStepExecutor extends BaseStepExecutor { + async execute(): Promise { + // Branch A -- Re-entry with user confirmation + if (this.context.userConfirmed !== undefined) { + return this.handleConfirmation(); + } + + // Branches B & C -- First call + return this.handleFirstCall(); + } + + private async handleConfirmation(): Promise { + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); + const execution = stepExecutions.find( + (e): e is UpdateRecordStepExecutionData => + e.type === 'update-record' && e.stepIndex === this.context.stepIndex, + ); + + if (!execution?.pendingUpdate) { + throw new WorkflowExecutorError('No pending update found for this step'); + } + + if (!this.context.userConfirmed) { + await this.context.runStore.saveStepExecution(this.context.runId, { + ...execution, + executionResult: { skipped: true }, + }); + + return this.buildOutcomeResult('success'); + } + + const { selectedRecordRef, pendingUpdate } = execution; + const target: UpdateTarget = { + selectedRecordRef, + fieldDisplayName: pendingUpdate.fieldDisplayName, + value: pendingUpdate.value, + }; + + return this.resolveAndUpdate(target, execution); + } + + private async handleFirstCall(): Promise { + const { stepDefinition: step } = this.context; + const records = await this.getAvailableRecordRefs(); + + let target: UpdateTarget; + + try { + const selectedRecordRef = await this.selectRecordRef(records, step.prompt); + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); + const args = await this.selectFieldAndValue(schema, step.prompt); + target = { selectedRecordRef, fieldDisplayName: args.fieldName, value: args.value }; + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return this.buildOutcomeResult('error', error.message); + } + + throw error; + } + + // Branch B -- automaticExecution + if (step.automaticExecution) { + return this.resolveAndUpdate(target); + } + + // Branch C -- Awaiting confirmation + await this.context.runStore.saveStepExecution(this.context.runId, { + type: 'update-record', + stepIndex: this.context.stepIndex, + pendingUpdate: { fieldDisplayName: target.fieldDisplayName, value: target.value }, + selectedRecordRef: target.selectedRecordRef, + }); + + return this.buildOutcomeResult('awaiting-input'); + } + + /** + * Resolves the field name, calls updateRecord, and persists execution data. + * When `existingExecution` is provided (confirmation flow), it is spread into the + * saved execution to preserve pendingUpdate for traceability. + */ + private async resolveAndUpdate( + target: UpdateTarget, + existingExecution?: UpdateRecordStepExecutionData, + ): Promise { + const { selectedRecordRef, fieldDisplayName, value } = target; + let updated: { values: Record }; + + try { + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); + const fieldName = this.resolveFieldName(schema, fieldDisplayName); + updated = await this.context.agentPort.updateRecord( + selectedRecordRef.collectionName, + selectedRecordRef.recordId, + { [fieldName]: value }, + ); + } catch (error) { + if (error instanceof WorkflowExecutorError) { + return this.buildOutcomeResult('error', error.message); + } + + throw error; + } + + await this.context.runStore.saveStepExecution(this.context.runId, { + ...existingExecution, + type: 'update-record', + stepIndex: this.context.stepIndex, + executionParams: { fieldDisplayName, value }, + executionResult: { updatedValues: updated.values }, + selectedRecordRef, + }); + + return this.buildOutcomeResult('success'); + } + + private async selectFieldAndValue( + schema: CollectionSchema, + prompt: string | undefined, + ): Promise<{ fieldName: string; value: string; reasoning: string }> { + const tool = this.buildUpdateFieldTool(schema); + const messages = [ + ...(await this.buildPreviousStepsMessages()), + new SystemMessage(UPDATE_RECORD_SYSTEM_PROMPT), + new SystemMessage( + `The selected record belongs to the "${schema.collectionDisplayName}" collection.`, + ), + new HumanMessage(`**Request**: ${prompt ?? 'Update the relevant field.'}`), + ]; + + return this.invokeWithTool<{ fieldName: string; value: string; reasoning: string }>( + messages, + tool, + ); + } + + private buildUpdateFieldTool(schema: CollectionSchema): DynamicStructuredTool { + const nonRelationFields = schema.fields.filter(f => !f.isRelationship); + + if (nonRelationFields.length === 0) { + throw new NoWritableFieldsError(schema.collectionName); + } + + const displayNames = nonRelationFields.map(f => f.displayName) as [string, ...string[]]; + + return new DynamicStructuredTool({ + name: 'update-record-field', + description: 'Update a field on the selected record.', + schema: z.object({ + fieldName: z.enum(displayNames), + // z.string() intentionally: the value is always transmitted as string + // to updateRecord; data typing is handled by the agent/datasource layer. + value: z.string().describe('The new value for the field'), + reasoning: z.string().describe('Why this field and value were chosen'), + }), + func: undefined, + }); + } + + private resolveFieldName(schema: CollectionSchema, displayName: string): string { + const field = this.findField(schema, displayName); + + if (!field) { + throw new WorkflowExecutorError( + `Field "${displayName}" not found in collection "${schema.collectionName}"`, + ); + } + + return field.fieldName; + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 916bbc0751..2a1b3df0a8 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -1,14 +1,14 @@ export { StepType } from './types/step-definition'; export type { ConditionStepDefinition, - AiTaskStepDefinition, + RecordTaskStepDefinition, StepDefinition, } from './types/step-definition'; export type { StepStatus, ConditionStepOutcome, - AiTaskStepOutcome, + RecordTaskStepOutcome, StepOutcome, } from './types/step-outcome'; @@ -18,7 +18,8 @@ export type { FieldReadResult, ConditionStepExecutionData, ReadRecordStepExecutionData, - AiTaskStepExecutionData, + UpdateRecordStepExecutionData, + RecordTaskStepExecutionData, LoadRelatedRecordStepExecutionData, ExecutedStepExecutionData, StepExecutionData, @@ -36,7 +37,6 @@ export type { export type { Step, - UserInput, PendingStepExecution, StepExecutionResult, ExecutionContext, @@ -54,10 +54,12 @@ export { NoRecordsError, NoReadableFieldsError, NoResolvedFieldsError, + NoWritableFieldsError, } from './errors'; export { default as BaseStepExecutor } from './executors/base-step-executor'; export { default as ConditionStepExecutor } from './executors/condition-step-executor'; export { default as ReadRecordStepExecutor } from './executors/read-record-step-executor'; +export { default as UpdateRecordStepExecutor } from './executors/update-record-step-executor'; export { default as AgentClientAgentPort } from './adapters/agent-client-agent-port'; export { default as ForestServerWorkflowPort } from './adapters/forest-server-workflow-port'; export { default as ExecutorHttpServer } from './http/executor-http-server'; diff --git a/packages/workflow-executor/src/types/execution.ts b/packages/workflow-executor/src/types/execution.ts index 406d1e4f0f..25f71fe448 100644 --- a/packages/workflow-executor/src/types/execution.ts +++ b/packages/workflow-executor/src/types/execution.ts @@ -13,8 +13,6 @@ export interface Step { stepOutcome: StepOutcome; } -export type UserInput = { type: 'confirmation'; confirmed: boolean }; - export interface PendingStepExecution { readonly runId: string; readonly stepId: string; @@ -22,7 +20,7 @@ export interface PendingStepExecution { readonly baseRecordRef: RecordRef; readonly stepDefinition: StepDefinition; readonly previousSteps: ReadonlyArray; - readonly userInput?: UserInput; + readonly userConfirmed?: boolean; } export interface StepExecutionResult { @@ -39,6 +37,7 @@ export interface ExecutionContext readonly agentPort: AgentPort; readonly workflowPort: WorkflowPort; readonly runStore: RunStore; - readonly history: ReadonlyArray>; + readonly previousSteps: ReadonlyArray>; readonly remoteTools: readonly unknown[]; + readonly userConfirmed?: boolean; } diff --git a/packages/workflow-executor/src/types/step-definition.ts b/packages/workflow-executor/src/types/step-definition.ts index ca23e5b413..e2a324618a 100644 --- a/packages/workflow-executor/src/types/step-definition.ts +++ b/packages/workflow-executor/src/types/step-definition.ts @@ -6,6 +6,7 @@ export enum StepType { UpdateRecord = 'update-record', TriggerAction = 'trigger-action', LoadRelatedRecord = 'load-related-record', + McpTask = 'mcp-task', } interface BaseStepDefinition { @@ -19,12 +20,18 @@ export interface ConditionStepDefinition extends BaseStepDefinition { options: [string, ...string[]]; } -export interface AiTaskStepDefinition extends BaseStepDefinition { - type: Exclude; - recordSourceStepId?: string; - automaticCompletion?: boolean; - allowedTools?: string[]; - remoteToolsSourceId?: string; +export interface RecordTaskStepDefinition extends BaseStepDefinition { + type: Exclude; + automaticExecution?: boolean; } -export type StepDefinition = ConditionStepDefinition | AiTaskStepDefinition; +export interface McpTaskStepDefinition extends BaseStepDefinition { + type: StepType.McpTask; + mcpServerId?: string; + automaticExecution?: boolean; +} + +export type StepDefinition = + | ConditionStepDefinition + | RecordTaskStepDefinition + | McpTaskStepDefinition; diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index eb022a273c..f3be7d5e4a 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -40,10 +40,25 @@ export interface ReadRecordStepExecutionData extends BaseStepExecutionData { selectedRecordRef: RecordRef; } +// -- Update Record -- + +export interface UpdateRecordStepExecutionData extends BaseStepExecutionData { + type: 'update-record'; + executionParams?: { fieldDisplayName: string; value: string }; + /** User confirmed → values returned by updateRecord. User rejected → skipped. */ + executionResult?: { updatedValues: Record } | { skipped: true }; + /** AI-selected field and value awaiting user confirmation. Used in the confirmation flow only. */ + pendingUpdate?: { + fieldDisplayName: string; + value: string; + }; + selectedRecordRef: RecordRef; +} + // -- Generic AI Task (fallback for untyped steps) -- -export interface AiTaskStepExecutionData extends BaseStepExecutionData { - type: 'ai-task'; +export interface RecordTaskStepExecutionData extends BaseStepExecutionData { + type: 'record-task'; executionParams?: Record; executionResult?: Record; toolConfirmationInterruption?: Record; @@ -61,13 +76,15 @@ export interface LoadRelatedRecordStepExecutionData extends BaseStepExecutionDat export type StepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData - | AiTaskStepExecutionData + | UpdateRecordStepExecutionData + | RecordTaskStepExecutionData | LoadRelatedRecordStepExecutionData; export type ExecutedStepExecutionData = | ConditionStepExecutionData | ReadRecordStepExecutionData - | AiTaskStepExecutionData; + | UpdateRecordStepExecutionData + | RecordTaskStepExecutionData; // TODO: this condition should change when load-related-record gets its own executor // and produces executionParams/executionResult like other steps. diff --git a/packages/workflow-executor/src/types/step-outcome.ts b/packages/workflow-executor/src/types/step-outcome.ts index 9a564748eb..37f53afa00 100644 --- a/packages/workflow-executor/src/types/step-outcome.ts +++ b/packages/workflow-executor/src/types/step-outcome.ts @@ -6,10 +6,10 @@ type BaseStepStatus = 'success' | 'error'; export type ConditionStepStatus = BaseStepStatus | 'manual-decision'; /** AI task steps can pause mid-execution to await user input (e.g. tool confirmation). */ -export type AiTaskStepStatus = BaseStepStatus | 'awaiting-input'; +export type RecordTaskStepStatus = BaseStepStatus | 'awaiting-input'; /** Union of all step statuses. */ -export type StepStatus = ConditionStepStatus | AiTaskStepStatus; +export type StepStatus = ConditionStepStatus | RecordTaskStepStatus; /** * StepOutcome is sent to the orchestrator — it must NEVER contain client data. @@ -30,9 +30,9 @@ export interface ConditionStepOutcome extends BaseStepOutcome { selectedOption?: string; } -export interface AiTaskStepOutcome extends BaseStepOutcome { - type: 'ai-task'; - status: AiTaskStepStatus; +export interface RecordTaskStepOutcome extends BaseStepOutcome { + type: 'record-task'; + status: RecordTaskStepStatus; } -export type StepOutcome = ConditionStepOutcome | AiTaskStepOutcome; +export type StepOutcome = ConditionStepOutcome | RecordTaskStepOutcome; diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 86491fbb8f..45f5bc9c42 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -73,7 +73,7 @@ function makeContext(overrides: Partial = {}): ExecutionContex agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -98,7 +98,7 @@ describe('BaseStepExecutor', () => { ]); const executor = new TestableExecutor( makeContext({ - history: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' })], + previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' })], runStore, }), ); @@ -117,7 +117,7 @@ describe('BaseStepExecutor', () => { it('uses Input for matched steps and History for unmatched steps', async () => { const executor = new TestableExecutor( makeContext({ - history: [ + previousSteps: [ makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 }), makeHistoryEntry({ stepId: 'cond-2', stepIndex: 1, prompt: 'Second?' }), ], @@ -147,7 +147,7 @@ describe('BaseStepExecutor', () => { it('falls back to History when no matching step execution in RunStore', async () => { const executor = new TestableExecutor( makeContext({ - history: [ + previousSteps: [ makeHistoryEntry({ stepId: 'orphan', stepIndex: 5, prompt: 'Orphan step' }), makeHistoryEntry({ stepId: 'matched', stepIndex: 1, prompt: 'Matched step' }), ], @@ -183,7 +183,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -207,7 +207,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -220,15 +220,15 @@ describe('BaseStepExecutor', () => { expect(result).toContain('"error":"AI could not match an option"'); }); - it('includes status in History for ai-task steps without RunStore data', async () => { + it('includes status in History for record-task steps without RunStore data', async () => { const entry: { stepDefinition: StepDefinition; stepOutcome: StepOutcome } = { stepDefinition: { type: StepType.ReadRecord, prompt: 'Run task', }, stepOutcome: { - type: 'ai-task', - stepId: 'ai-step', + type: 'record-task', + stepId: 'read-record-1', stepIndex: 0, status: 'awaiting-input', }, @@ -236,7 +236,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([]), }), ); @@ -245,7 +245,7 @@ describe('BaseStepExecutor', () => { .buildPreviousStepsMessages() .then(msgs => msgs[0]?.content ?? ''); - expect(result).toContain('Step "ai-step"'); + expect(result).toContain('Step "read-record-1"'); expect(result).toContain('History: {"status":"awaiting-input"}'); }); @@ -263,7 +263,7 @@ describe('BaseStepExecutor', () => { prompt: 'Read name', }, stepOutcome: { - type: 'ai-task', + type: 'record-task', stepId: 'read-customer', stepIndex: 1, status: 'success', @@ -272,10 +272,10 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [condEntry, aiEntry], + previousSteps: [condEntry, aiEntry], runStore: makeMockRunStore([ { - type: 'ai-task', + type: 'record-task', stepIndex: 1, executionParams: { answer: 'John Doe' }, }, @@ -299,7 +299,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { type: 'condition', @@ -327,8 +327,8 @@ describe('BaseStepExecutor', () => { prompt: 'Do something', }, stepOutcome: { - type: 'ai-task', - stepId: 'ai-step', + type: 'record-task', + stepId: 'read-record-1', stepIndex: 0, status: 'success', }, @@ -336,10 +336,10 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { - type: 'ai-task', + type: 'record-task', stepIndex: 0, }, ]), @@ -350,18 +350,53 @@ describe('BaseStepExecutor', () => { .buildPreviousStepsMessages() .then(msgs => msgs[0]?.content ?? ''); - expect(result).toContain('Step "ai-step"'); + expect(result).toContain('Step "read-record-1"'); expect(result).toContain('Prompt: Do something'); expect(result).not.toContain('Input:'); }); + it('uses Pending when update-record step has pendingUpdate but no executionParams', async () => { + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + { + stepDefinition: { type: StepType.UpdateRecord, prompt: 'Set status to active' }, + stepOutcome: { + type: 'record-task', + stepId: 'update-1', + stepIndex: 0, + status: 'awaiting-input', + }, + }, + ], + runStore: makeMockRunStore([ + { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: { collectionName: 'customers', recordId: [1], stepIndex: 0 }, + }, + ]), + }), + ); + + const result = await executor + .buildPreviousStepsMessages() + .then(msgs => msgs[0]?.content ?? ''); + + expect(result).toContain('Pending:'); + expect(result).toContain('"fieldDisplayName":"Status"'); + expect(result).toContain('"value":"active"'); + expect(result).not.toContain('Input:'); + }); + it('shows "(no prompt)" when step has no prompt', async () => { const entry = makeHistoryEntry({ stepIndex: 0 }); entry.stepDefinition.prompt = undefined; const executor = new TestableExecutor( makeContext({ - history: [entry], + previousSteps: [entry], runStore: makeMockRunStore([ { type: 'condition', diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index 23eb6c8365..c439042448 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -53,7 +53,7 @@ function makeContext( agentPort: {} as ExecutionContext['agentPort'], workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -175,7 +175,7 @@ describe('ConditionStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - history: [ + previousSteps: [ { stepDefinition: { type: StepType.Condition, @@ -261,7 +261,7 @@ describe('ConditionStepExecutor', () => { }); describe('error propagation', () => { - it('returns error status when model invocation fails', async () => { + it('lets infrastructure errors propagate', async () => { const invoke = jest.fn().mockRejectedValue(new Error('API timeout')); const bindTools = jest.fn().mockReturnValue({ invoke }); const context = makeContext({ @@ -269,10 +269,7 @@ describe('ConditionStepExecutor', () => { }); const executor = new ConditionStepExecutor(context); - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe('API timeout'); + await expect(executor.execute()).rejects.toThrow('API timeout'); }); it('lets run store errors propagate', async () => { diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index eb9c3bc5de..35ae85925b 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -3,13 +3,13 @@ import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution'; import type { CollectionSchema, RecordRef } from '../../src/types/record'; -import type { AiTaskStepDefinition } from '../../src/types/step-definition'; +import type { RecordTaskStepDefinition } from '../../src/types/step-definition'; import { NoRecordsError, RecordNotFoundError } from '../../src/errors'; import ReadRecordStepExecutor from '../../src/executors/read-record-step-executor'; import { StepType } from '../../src/types/step-definition'; -function makeStep(overrides: Partial = {}): AiTaskStepDefinition { +function makeStep(overrides: Partial = {}): RecordTaskStepDefinition { return { type: StepType.ReadRecord, prompt: 'Read the customer email', @@ -99,8 +99,8 @@ function makeMockModel( } function makeContext( - overrides: Partial> = {}, -): ExecutionContext { + overrides: Partial> = {}, +): ExecutionContext { return { runId: 'run-1', stepId: 'read-1', @@ -111,7 +111,7 @@ function makeContext( agentPort: makeMockAgentPort(), workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), - history: [], + previousSteps: [], remoteTools: [], ...overrides, }; @@ -700,7 +700,7 @@ describe('ReadRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - history: [ + previousSteps: [ { stepDefinition: { type: StepType.Condition, diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts new file mode 100644 index 0000000000..1d2ace3713 --- /dev/null +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -0,0 +1,801 @@ +import type { AgentPort } from '../../src/ports/agent-port'; +import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; +import type { ExecutionContext } from '../../src/types/execution'; +import type { CollectionSchema, RecordRef } from '../../src/types/record'; +import type { RecordTaskStepDefinition } from '../../src/types/step-definition'; +import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; + +import { WorkflowExecutorError } from '../../src/errors'; +import UpdateRecordStepExecutor from '../../src/executors/update-record-step-executor'; +import { StepType } from '../../src/types/step-definition'; + +function makeStep(overrides: Partial = {}): RecordTaskStepDefinition { + return { + type: StepType.UpdateRecord, + prompt: 'Set the customer status to active', + ...overrides, + }; +} + +function makeRecordRef(overrides: Partial = {}): RecordRef { + return { + collectionName: 'customers', + recordId: [42], + stepIndex: 0, + ...overrides, + }; +} + +function makeMockAgentPort( + updatedValues: Record = { status: 'active', name: 'John Doe' }, +): AgentPort { + return { + getRecord: jest.fn().mockResolvedValue({ values: updatedValues }), + updateRecord: jest.fn().mockResolvedValue({ + collectionName: 'customers', + recordId: [42], + values: updatedValues, + }), + getRelatedData: jest.fn(), + executeAction: jest.fn(), + } as unknown as AgentPort; +} + +function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { + return { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { fieldName: 'status', displayName: 'Status', isRelationship: false }, + { fieldName: 'name', displayName: 'Full Name', isRelationship: false }, + { fieldName: 'orders', displayName: 'Orders', isRelationship: true }, + ], + actions: [], + ...overrides, + }; +} + +function makeMockRunStore(overrides: Partial = {}): RunStore { + return { + getStepExecutions: jest.fn().mockResolvedValue([]), + saveStepExecution: jest.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +function makeMockWorkflowPort( + schemasByCollection: Record = { + customers: makeCollectionSchema(), + }, +): WorkflowPort { + return { + getPendingStepExecutions: jest.fn().mockResolvedValue([]), + updateStepExecution: jest.fn().mockResolvedValue(undefined), + getCollectionSchema: jest + .fn() + .mockImplementation((name: string) => + Promise.resolve( + schemasByCollection[name] ?? makeCollectionSchema({ collectionName: name }), + ), + ), + getMcpServerConfigs: jest.fn().mockResolvedValue([]), + }; +} + +function makeMockModel(toolCallArgs?: Record, toolName = 'update-record-field') { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: toolCallArgs ? [{ name: toolName, args: toolCallArgs, id: 'call_1' }] : undefined, + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + return { model, bindTools, invoke }; +} + +function makeContext( + overrides: Partial> = {}, +): ExecutionContext { + return { + runId: 'run-1', + stepId: 'update-1', + stepIndex: 0, + baseRecordRef: makeRecordRef(), + stepDefinition: makeStep(), + model: makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }).model, + agentPort: makeMockAgentPort(), + workflowPort: makeMockWorkflowPort(), + runStore: makeMockRunStore(), + previousSteps: [], + remoteTools: [], + ...overrides, + }; +} + +describe('UpdateRecordStepExecutor', () => { + describe('automaticExecution: update direct (Branch B)', () => { + it('updates the record and returns success', async () => { + const updatedValues = { status: 'active', name: 'John Doe' }; + const agentPort = makeMockAgentPort(updatedValues); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ + type: 'update-record', + stepIndex: 0, + executionParams: { fieldDisplayName: 'Status', value: 'active' }, + executionResult: { updatedValues }, + selectedRecordRef: expect.objectContaining({ + collectionName: 'customers', + recordId: [42], + }), + }), + ); + }); + }); + + describe('without automaticExecution: awaiting-input (Branch C)', () => { + it('saves execution and returns awaiting-input', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'User requested status change', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: expect.objectContaining({ + collectionName: 'customers', + recordId: [42], + }), + }), + ); + }); + }); + + describe('confirmation accepted (Branch A)', () => { + it('updates the record when user confirms', async () => { + const updatedValues = { status: 'active', name: 'John Doe' }; + const agentPort = makeMockAgentPort(updatedValues); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ + type: 'update-record', + executionParams: { fieldDisplayName: 'Status', value: 'active' }, + executionResult: { updatedValues }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + }), + ); + }); + }); + + describe('confirmation rejected (Branch A)', () => { + it('skips the update when user rejects', async () => { + const agentPort = makeMockAgentPort(); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const userConfirmed = false; + const context = makeContext({ agentPort, runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).not.toHaveBeenCalled(); + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ + executionResult: { skipped: true }, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + }), + ); + }); + }); + + describe('no pending update in phase 2 (Branch A)', () => { + it('throws when no pending update is found', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([]), + }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); + }); + + it('throws when execution exists but stepIndex does not match', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'update-record', + stepIndex: 5, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); + }); + + it('throws when execution exists but pendingUpdate is absent', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'update-record', + stepIndex: 0, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('No pending update found for this step'); + }); + }); + + describe('multi-record AI selection', () => { + it('uses AI to select among multiple records then selects field', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [ + { fieldName: 'total', displayName: 'Total', isRelationship: false }, + { fieldName: 'status', displayName: 'Order Status', isRelationship: false }, + ], + }); + + // First call: select-record, second call: update-record-field + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-record', + args: { recordIdentifier: 'Step 2 - Orders #99' }, + id: 'call_1', + }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'update-record-field', + args: { fieldName: 'Order Status', value: 'shipped', reasoning: 'Mark as shipped' }, + id: 'call_2', + }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + { type: 'load-related-record', stepIndex: 2, record: relatedRecord }, + ]), + }); + const workflowPort = makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }); + const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(bindTools).toHaveBeenCalledTimes(2); + + const selectTool = bindTools.mock.calls[0][0][0]; + expect(selectTool.name).toBe('select-record'); + + const updateTool = bindTools.mock.calls[1][0][0]; + expect(updateTool.name).toBe('update-record-field'); + + expect(runStore.saveStepExecution).toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ + pendingUpdate: { fieldDisplayName: 'Order Status', value: 'shipped' }, + selectedRecordRef: expect.objectContaining({ + recordId: [99], + collectionName: 'orders', + }), + }), + ); + }); + }); + + describe('NoWritableFieldsError', () => { + it('returns error when all fields are relationships', async () => { + const schema = makeCollectionSchema({ + fields: [{ fieldName: 'orders', displayName: 'Orders', isRelationship: true }], + }); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore(); + const workflowPort = makeMockWorkflowPort({ customers: schema }); + const context = makeContext({ model: mockModel.model, runStore, workflowPort }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'No writable fields on record from collection "customers"', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('resolveFieldName failure', () => { + it('returns error when field is not found during confirmation (Branch A)', async () => { + const schema = makeCollectionSchema({ + fields: [{ fieldName: 'email', displayName: 'Email', isRelationship: false }], + }); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'NonExistentField', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const workflowPort = makeMockWorkflowPort({ customers: schema }); + const userConfirmed = true; + const context = makeContext({ runStore, workflowPort, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Field "NonExistentField" not found in collection "customers"', + ); + }); + + it('returns error when field is not found during automaticExecution (Branch B)', async () => { + // AI returns a display name that doesn't match any field in the schema + const mockModel = makeMockModel({ + fieldName: 'NonExistentField', + value: 'test', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Field "NonExistentField" not found in collection "customers"', + ); + }); + }); + + describe('relationship fields excluded from update tool', () => { + it('excludes relationship fields from the tool schema', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ model: mockModel.model }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + // Second bindTools call is for update-record-field (first may be select-record) + const lastCall = mockModel.bindTools.mock.calls[mockModel.bindTools.mock.calls.length - 1]; + const tool = lastCall[0][0]; + expect(tool.name).toBe('update-record-field'); + + // Non-relationship display names should be accepted + expect(tool.schema.parse({ fieldName: 'Email', value: 'x', reasoning: 'r' })).toBeTruthy(); + expect(tool.schema.parse({ fieldName: 'Status', value: 'x', reasoning: 'r' })).toBeTruthy(); + expect( + tool.schema.parse({ fieldName: 'Full Name', value: 'x', reasoning: 'r' }), + ).toBeTruthy(); + + // Relationship display name should be rejected + expect(() => + tool.schema.parse({ fieldName: 'Orders', value: 'x', reasoning: 'r' }), + ).toThrow(); + }); + }); + + describe('AI malformed/missing tool call', () => { + it('returns error on malformed tool call', async () => { + const invoke = jest.fn().mockResolvedValue({ + tool_calls: [], + invalid_tool_calls: [ + { name: 'update-record-field', args: '{bad json', error: 'JSON parse error' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'AI returned a malformed tool call for "update-record-field": JSON parse error', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + + it('returns error when AI returns no tool call', async () => { + const invoke = jest.fn().mockResolvedValue({ tool_calls: [] }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: { bindTools } as unknown as ExecutionContext['model'], + runStore, + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('AI did not return a tool call'); + expect(runStore.saveStepExecution).not.toHaveBeenCalled(); + }); + }); + + describe('agentPort.updateRecord WorkflowExecutorError (Branch B)', () => { + it('returns error when updateRecord throws WorkflowExecutorError', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue( + new WorkflowExecutorError('Record locked'), + ); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore(); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('Record locked'); + }); + }); + + describe('agentPort.updateRecord WorkflowExecutorError (Branch A)', () => { + it('returns error when updateRecord throws WorkflowExecutorError during confirmation', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue( + new WorkflowExecutorError('Record locked'), + ); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('Record locked'); + }); + }); + + describe('agentPort.updateRecord infra error', () => { + it('lets infrastructure errors propagate (Branch B)', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + agentPort, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection refused'); + }); + + it('lets infrastructure errors propagate (Branch A)', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue(new Error('Connection refused')); + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const userConfirmed = true; + const context = makeContext({ agentPort, runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Connection refused'); + }); + }); + + describe('stepOutcome shape', () => { + it('emits correct type, stepId and stepIndex in the outcome', async () => { + const context = makeContext({ stepDefinition: makeStep({ automaticExecution: true }) }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome).toMatchObject({ + type: 'record-task', + stepId: 'update-1', + stepIndex: 0, + status: 'success', + }); + }); + }); + + describe('findField fieldName fallback', () => { + it('resolves update when AI returns raw fieldName instead of displayName', async () => { + const agentPort = makeMockAgentPort(); + // AI returns 'status' (fieldName) instead of 'Status' (displayName) + const mockModel = makeMockModel({ fieldName: 'status', value: 'active', reasoning: 'test' }); + const context = makeContext({ + model: mockModel.model, + agentPort, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.updateRecord).toHaveBeenCalledWith('customers', [42], { status: 'active' }); + }); + }); + + describe('schema caching', () => { + it('fetches getCollectionSchema once per collection even when called twice (Branch B)', async () => { + const workflowPort = makeMockWorkflowPort(); + const context = makeContext({ + workflowPort, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + // Branch B calls getCollectionSchema in handleFirstCall and again in resolveAndUpdate + // but the cache should prevent the second network call + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + }); + + describe('RunStore error propagation', () => { + it('lets getStepExecutions errors propagate (Branch A)', async () => { + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockRejectedValue(new Error('DB timeout')), + }); + const userConfirmed = true; + const context = makeContext({ runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('DB timeout'); + }); + + it('lets saveStepExecution errors propagate when user rejects (Branch A)', async () => { + const execution: UpdateRecordStepExecutionData = { + type: 'update-record', + stepIndex: 0, + pendingUpdate: { fieldDisplayName: 'Status', value: 'active' }, + selectedRecordRef: makeRecordRef(), + }; + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const userConfirmed = false; + const context = makeContext({ runStore, userConfirmed }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + + it('lets saveStepExecution errors propagate when saving awaiting-input (Branch C)', async () => { + const runStore = makeMockRunStore({ + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const context = makeContext({ runStore }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + + it('lets saveStepExecution errors propagate after successful updateRecord (Branch B)', async () => { + const runStore = makeMockRunStore({ + saveStepExecution: jest.fn().mockRejectedValue(new Error('Disk full')), + }); + const context = makeContext({ + runStore, + stepDefinition: makeStep({ automaticExecution: true }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await expect(executor.execute()).rejects.toThrow('Disk full'); + }); + }); + + describe('default prompt', () => { + it('uses default prompt when step.prompt is undefined', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const context = makeContext({ + model: mockModel.model, + stepDefinition: makeStep({ prompt: undefined }), + }); + const executor = new UpdateRecordStepExecutor(context); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + const humanMessage = messages[messages.length - 1]; + expect(humanMessage.content).toBe('**Request**: Update the relevant field.'); + }); + }); + + describe('previous steps context', () => { + it('includes previous steps summary in update-field messages', async () => { + const mockModel = makeMockModel({ + fieldName: 'Status', + value: 'active', + reasoning: 'test', + }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'Yes', reasoning: 'Approved' }, + }, + ]), + }); + const context = makeContext({ + model: mockModel.model, + runStore, + previousSteps: [ + { + stepDefinition: { + type: StepType.Condition, + options: ['Yes', 'No'], + prompt: 'Should we proceed?', + }, + stepOutcome: { + type: 'condition', + stepId: 'prev-step', + stepIndex: 0, + status: 'success', + }, + }, + ], + }); + const executor = new UpdateRecordStepExecutor({ + ...context, + stepId: 'update-2', + stepIndex: 1, + }); + + await executor.execute(); + + const messages = mockModel.invoke.mock.calls[0][0]; + // previous steps summary + system prompt + collection info + human message = 4 + expect(messages).toHaveLength(4); + expect(messages[0].content).toContain('Should we proceed?'); + expect(messages[0].content).toContain('"answer":"Yes"'); + expect(messages[1].content).toContain('updating a field on a record'); + }); + }); +}); diff --git a/packages/workflow-executor/test/index.test.ts b/packages/workflow-executor/test/index.test.ts index 05affa035c..1267b1cbbd 100644 --- a/packages/workflow-executor/test/index.test.ts +++ b/packages/workflow-executor/test/index.test.ts @@ -1,9 +1,9 @@ import { StepType } from '../src/index'; describe('StepType', () => { - it('should expose exactly 5 step types', () => { + it('should expose exactly 6 step types', () => { const values = Object.values(StepType); - expect(values).toHaveLength(5); + expect(values).toHaveLength(6); }); it.each([ @@ -12,6 +12,7 @@ describe('StepType', () => { ['UpdateRecord', 'update-record'], ['TriggerAction', 'trigger-action'], ['LoadRelatedRecord', 'load-related-record'], + ['McpTask', 'mcp-task'], ] as const)('should have %s = "%s"', (key, value) => { expect(StepType[key]).toBe(value); });