diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 9a3dabe201..001eacf29f 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -88,7 +88,7 @@ src/ - **Boundary errors** (`extends Error`) — Thrown outside step execution, at the HTTP or Runner layer (e.g. `RunNotFoundError`, `PendingDataNotFoundError`, `ConfigurationError`). Caught by the HTTP server and translated into HTTP status codes (404, 400, etc.). These intentionally do NOT extend `WorkflowExecutorError` to prevent `base-step-executor` from catching them as step failures. - **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`. - **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`). -- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` is called before `runWithActivityLog()` so neither cache hits nor uncertain-state errors emit activity log entries. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe. +- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe. - **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations). - **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry. - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. diff --git a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts index 9ca49d4500..8ece667f43 100644 --- a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts +++ b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts @@ -19,6 +19,7 @@ import { AgentPortError, AgentProbeError, RecordNotFoundError, + SchemaNotCachedError, WorkflowExecutorError, extractErrorMessage, } from '../errors'; @@ -291,21 +292,9 @@ export default class AgentClientAgentPort implements AgentPort { const cached = this.schemaCache.get(collectionName); if (!cached) { - // eslint-disable-next-line no-console - console.warn( - `[workflow-executor] Schema not found in cache for collection "${collectionName}". ` + - 'Falling back to primaryKeyFields: ["id"]. Call getCollectionSchema first.', - ); + throw new SchemaNotCachedError(collectionName); } - return ( - cached ?? { - collectionName, - collectionDisplayName: collectionName, - primaryKeyFields: ['id'], - fields: [], - actions: [], - } - ); + return cached; } } diff --git a/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts b/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts index f72ed8089e..feb270f96f 100644 --- a/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts +++ b/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts @@ -5,10 +5,7 @@ import type { CreateActivityLogArgs, } from '../ports/activity-log-port'; import type { Logger } from '../ports/logger-port'; -import type { - ActivityLogAction, - ActivityLogsServiceInterface, -} from '@forestadmin/forestadmin-client'; +import type { ActivityLogsServiceInterface } from '@forestadmin/forestadmin-client'; import { serializeRecordId } from './record-id-serializer'; import withRetry from './with-retry'; @@ -30,7 +27,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort this.service.createActivityLog({ forestServerToken: this.forestServerToken, renderingId: String(args.renderingId), - action: args.action as ActivityLogAction, + action: args.action, type: args.type, // The lib writes this value verbatim into relationships.collection.data.id // (JSON:API). The Forest server audit-trail API expects the numeric collectionId. @@ -76,7 +73,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort }); } - async markFailed(handle: ActivityLogHandle, errorMessage: string): Promise { + async markFailed(handle: ActivityLogHandle): Promise { return this.drainer.track(async () => { try { await withRetry( @@ -92,7 +89,6 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort } catch (err) { this.logger.error('activity log mark-as-failed failed', { handleId: handle.id, - stepErrorMessage: errorMessage, error: extractErrorMessage(err), }); } diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 70af3e8abe..3dea5306f6 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -259,6 +259,17 @@ export class AgentPortError extends WorkflowExecutorError { } } +// Invariant guard: the agent port reads a collection's schema (for its primary keys) from the +// cache, which the executor must populate via getCollectionSchema before any record access. +export class SchemaNotCachedError extends WorkflowExecutorError { + constructor(collectionName: string) { + super( + `Collection schema for "${collectionName}" was not loaded before access — call getCollectionSchema first`, + 'An error occurred while accessing your data. Please try again.', + ); + } +} + export class WorkflowPortError extends WorkflowExecutorError { constructor(operation: string, cause: unknown) { super( diff --git a/packages/workflow-executor/src/executors/agent-with-log.ts b/packages/workflow-executor/src/executors/agent-with-log.ts new file mode 100644 index 0000000000..e50ffa4c5f --- /dev/null +++ b/packages/workflow-executor/src/executors/agent-with-log.ts @@ -0,0 +1,130 @@ +import type { ActivityLogPort, CreateActivityLogArgs } from '../ports/activity-log-port'; +import type { + AgentPort, + ExecuteActionQuery, + GetRecordQuery, + GetRelatedDataQuery, + GetSingleRelatedDataQuery, + UpdateRecordQuery, +} from '../ports/agent-port'; +import type SchemaResolver from '../schema-resolver'; +import type { StepUser } from '../types/execution-context'; +import type { RecordData } from '../types/validated/collection'; + +// The audit-log target minus renderingId, which audit() stamps centrally. +export type AuditTarget = Omit; + +type WriteOptions = { beforeCall: () => Promise }; + +export interface AgentWithLogDeps { + agentPort: AgentPort; + activityLogPort: ActivityLogPort; + schemaResolver: SchemaResolver; + user: StepUser; +} + +// Wraps AgentPort and emits an activity-log entry around each data-access call +// (pending → success/failed). The audit target is derived from the call: the numeric +// collectionId is resolved from the call's collection name, the recordId from its id. +// Idempotency stays in the executors: write methods run a `beforeCall` thunk between +// createPending and the side effect (the executor persists its write-ahead marker there), +// so AgentWithLog never reaches into run state. +export default class AgentWithLog { + private readonly agentPort: AgentPort; + private readonly activityLogPort: ActivityLogPort; + private readonly schemaResolver: SchemaResolver; + private readonly user: StepUser; + + constructor(deps: AgentWithLogDeps) { + this.agentPort = deps.agentPort; + this.activityLogPort = deps.activityLogPort; + this.schemaResolver = deps.schemaResolver; + this.user = deps.user; + } + + async getRecord(query: GetRecordQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.audit({ action: 'index', type: 'read', collectionId, recordId: query.id }, () => + this.agentPort.getRecord(query, this.user), + ); + } + + async getRelatedData(query: GetRelatedDataQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.audit( + { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, + () => this.agentPort.getRelatedData(query, this.user), + ); + } + + async getSingleRelatedData(query: GetSingleRelatedDataQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.audit( + { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, + () => this.agentPort.getSingleRelatedData(query, this.user), + ); + } + + async updateRecord(query: UpdateRecordQuery, opts: WriteOptions): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.audit( + { action: 'update', type: 'write', collectionId, recordId: query.id }, + () => this.agentPort.updateRecord(query, this.user), + opts.beforeCall, + ); + } + + async executeAction(query: ExecuteActionQuery, opts: WriteOptions): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.audit( + { action: 'action', type: 'write', collectionId, recordId: query.id }, + () => this.agentPort.executeAction(query, this.user), + opts.beforeCall, + ); + } + + // For operations that are not AgentPort calls (e.g. MCP tool invocation): the caller + // supplies the full audit target since there is no collection name to resolve. + logged( + target: AuditTarget, + run: () => Promise, + opts?: { beforeCall?: () => Promise }, + ): Promise { + return this.audit(target, run, opts?.beforeCall); + } + + private async audit( + args: AuditTarget, + run: () => Promise, + beforeCall?: () => Promise, + ): Promise { + const handle = await this.activityLogPort.createPending({ + renderingId: this.user.renderingId, + ...args, + }); + + try { + if (beforeCall) await beforeCall(); + const result = await run(); + void this.activityLogPort.markSucceeded(handle); + + return result; + } catch (err) { + // The step error is logged/surfaced by base-step-executor when rethrown, so the audit + // transition only needs the handle. + void this.activityLogPort.markFailed(handle); + throw err; + } + } + + private async resolveCollectionId(collectionName: string): Promise { + const schema = await this.schemaResolver.resolve(collectionName); + + return schema.collectionId; + } +} diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index fb91e1549f..d4e6cec1a5 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { AgentPort } from '../ports/agent-port'; import type { ExecutionContext, @@ -26,6 +25,7 @@ import { WorkflowExecutorError, extractErrorMessage, } from '../errors'; +import AgentWithLog from './agent-with-log'; import patchBodySchemas from '../http/pending-data-validators'; import StepSummaryBuilder from './summary/step-summary-builder'; @@ -34,11 +34,21 @@ export default abstract class BaseStepExecutor; + // Raw port — kept only for getActionFormInfo, which is intentionally not audited. protected readonly agentPort: AgentPort; + // Audited data access — every call emits an activity-log entry. + protected readonly agent: AgentWithLog; + constructor(context: ExecutionContext) { this.context = context; this.agentPort = context.agentPort; + this.agent = new AgentWithLog({ + agentPort: context.agentPort, + activityLogPort: context.activityLogPort, + schemaResolver: context.schemaResolver, + user: context.user, + }); } async execute(): Promise { @@ -50,8 +60,9 @@ export default abstract class BaseStepExecutor { - const args = this.buildActivityLogArgs(); - if (!args) return this.runWithTimeout(); - - const handle = await this.context.activityLogPort.createPending(args); - - let result: StepExecutionResult; - - try { - result = await this.runWithTimeout(); - } catch (err) { - // Use userMessage (not the technical message) — errorMessage is rendered to end-users - // in the Forest Admin UI. Privacy: no collection/field/AI internals in the audit trail. - const errorMessage = - err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error'; - void this.context.activityLogPort.markFailed(handle, errorMessage); - throw err; - } - - if (result.stepOutcome.status === 'error') { - void this.context.activityLogPort.markFailed( - handle, - result.stepOutcome.error ?? 'Step failed', - ); - } else { - void this.context.activityLogPort.markSucceeded(handle); - } - - return result; - } - // Promise.race doesn't abort the losing branch — it keeps running in the background. The .catch() // on execPromise must be attached BEFORE the race so a late rejection doesn't trigger // UnhandledPromiseRejection. Late resolutions are silently discarded. diff --git a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts index 232e359f38..9fce505981 100644 --- a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { LoadRelatedRecordCandidate, @@ -56,16 +55,6 @@ interface RelationTarget extends RelationRef { } export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'listRelatedData', - type: 'read', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore const pending = await this.patchAndReloadPendingData( @@ -276,16 +265,13 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - return this.agentPort.getRelatedData( - { - collection: target.selectedRecordRef.collectionName, - id: target.selectedRecordRef.recordId, - relation: target.name, - relatedSchema, - limit, - }, - this.context.user, - ); + return this.agent.getRelatedData({ + collection: target.selectedRecordRef.collectionName, + id: target.selectedRecordRef.recordId, + relation: target.name, + relatedSchema, + limit, + }); } /** Persists the loaded record ref and returns a success outcome. */ diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 8d7c157b1a..92c5d3ff04 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { ExecutionContext, StepExecutionResult } from '../types/execution-context'; import type { McpStepExecutionData, McpToolCall } from '../types/step-execution-data'; import type { McpStepDefinition } from '../types/validated/step-definition'; @@ -46,16 +45,6 @@ export default class McpStepExecutor extends BaseStepExecutor }; } - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'action', - type: 'write', - collectionId: this.context.collectionId, - label: this.context.stepDefinition.mcpServerId, - }; - } - protected buildOutcomeResult(outcome: { status: RecordStepStatus; error?: string; @@ -126,20 +115,31 @@ export default class McpStepExecutor extends BaseStepExecutor const tool = tools.find(t => t.base.name === target.name && t.sourceId === target.sourceId); if (!tool) throw new McpToolNotFoundError(target.name); - await this.context.runStore.saveStepExecution(this.context.runId, { - ...existingExecution, - type: 'mcp', - stepIndex: this.context.stepIndex, - idempotencyPhase: 'executing', - }); - - let toolResult: unknown; - - try { - toolResult = await tool.base.invoke(target.input); - } catch (cause) { - throw new McpToolInvocationError(target.name, cause); - } + const toolResult = await this.agent.logged( + { + action: 'action', + type: 'write', + label: this.context.stepDefinition.mcpServerId, + collectionId: this.context.collectionId, + recordId: this.context.baseRecordRef.recordId, + }, + async () => { + try { + return await tool.base.invoke(target.input); + } catch (cause) { + throw new McpToolInvocationError(target.name, cause); + } + }, + { + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + ...existingExecution, + type: 'mcp', + stepIndex: this.context.stepIndex, + idempotencyPhase: 'executing', + }), + }, + ); // 1. Persist raw result immediately — safe state before any further network calls const baseExecutionResult = { success: true as const, toolResult }; @@ -160,12 +160,15 @@ export default class McpStepExecutor extends BaseStepExecutor try { formattedResponse = await this.formatToolResult(target, toolResult); } catch (cause) { - this.context.logger.error('Failed to format MCP tool result, using generic fallback', { - runId: this.context.runId, - stepIndex: this.context.stepIndex, - toolName: target.name, - cause: cause instanceof Error ? cause.message : String(cause), - }); + this.context.logger.error( + 'Failed to format MCP tool result, persisting raw result without summary', + { + runId: this.context.runId, + stepIndex: this.context.stepIndex, + toolName: target.name, + cause: cause instanceof Error ? cause.message : String(cause), + }, + ); } if (formattedResponse) { diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index b6c761632d..2c9c495ee3 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { FieldReadResult } from '../types/step-execution-data'; import type { CollectionSchema } from '../types/validated/collection'; @@ -19,16 +18,6 @@ Important rules: - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; export default class ReadRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'index', - type: 'read', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected async doExecute(): Promise { const { stepDefinition: step } = this.context; const { preRecordedArgs } = step; @@ -50,14 +39,11 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor { - const cached = this.context.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema( - collectionName, - this.context.runId, - ); - this.context.schemaCache.set(collectionName, schema); - - return schema; + protected getCollectionSchema(collectionName: string): Promise { + return this.context.schemaResolver.resolve(collectionName); } protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined { diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 3fd1533df9..75e360d55b 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -23,6 +23,7 @@ import type { } from '../types/validated/step-definition'; import { StepStateError, causeMessage, extractErrorMessage } from '../errors'; +import SchemaResolver from '../schema-resolver'; import ConditionStepExecutor from './condition-step-executor'; import GuidanceStepExecutor from './guidance-step-executor'; import LoadRelatedRecordStepExecutor from './load-related-record-step-executor'; @@ -137,7 +138,7 @@ export default class StepExecutorFactory { agentPort: cfg.agentPort, workflowPort: cfg.workflowPort, runStore: cfg.runStore, - schemaCache: cfg.schemaCache, + schemaResolver: new SchemaResolver(cfg.schemaCache, cfg.workflowPort, step.runId), logger: cfg.logger, incomingPendingData, stepTimeoutMs: cfg.stepTimeoutMs, diff --git a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts index 3f8d09e504..b65d1a678e 100644 --- a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts +++ b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { ActionRef, TriggerRecordActionStepExecutionData } from '../types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../types/validated/collection'; @@ -29,22 +28,6 @@ interface ActionTarget extends ActionRef { } export default class TriggerRecordActionStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - // Skip when the frontend executes the action itself (non fully-automated mode). - // The front logs on its side via the standard agent activity flow. - if (this.context.stepDefinition.executionType !== StepExecutionMode.FullyAutomated) { - return null; - } - - return { - renderingId: this.context.user.renderingId, - action: 'action', - type: 'write', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected override async checkIdempotency(): Promise { const existing = await this.findPendingExecution( 'trigger-action', @@ -146,20 +129,21 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< private async executeOnExecutor(target: ActionTarget): Promise { const { selectedRecordRef, displayName, name } = target; - await this.context.runStore.saveStepExecution(this.context.runId, { - type: 'trigger-action', - stepIndex: this.context.stepIndex, - selectedRecordRef, - idempotencyPhase: 'executing', - }); - - const actionResult = await this.agentPort.executeAction( + const actionResult = await this.agent.executeAction( { collection: selectedRecordRef.collectionName, action: name, id: selectedRecordRef.recordId, }, - this.context.user, + { + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + type: 'trigger-action', + stepIndex: this.context.stepIndex, + selectedRecordRef, + idempotencyPhase: 'executing', + }), + }, ); await this.context.runStore.saveStepExecution(this.context.runId, { diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index f978e16d04..fa3fa5d540 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { FieldWithValue, UpdateRecordStepExecutionData } from '../types/step-execution-data'; import type { CollectionSchema, FieldSchema, RecordRef } from '../types/validated/collection'; @@ -127,16 +126,6 @@ interface UpdateTarget extends FieldWithValue { } export default class UpdateRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'update', - type: 'write', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected override async checkIdempotency(): Promise { const existing = await this.findPendingExecution( 'update-record', @@ -259,21 +248,22 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { const { selectedRecordRef, displayName, name, value } = target; - await this.context.runStore.saveStepExecution(this.context.runId, { - ...existingExecution, - type: 'update-record', - stepIndex: this.context.stepIndex, - selectedRecordRef, - idempotencyPhase: 'executing', - }); - - const updated = await this.agentPort.updateRecord( + const updated = await this.agent.updateRecord( { collection: selectedRecordRef.collectionName, id: selectedRecordRef.recordId, values: { [name]: value }, }, - this.context.user, + { + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + ...existingExecution, + type: 'update-record', + stepIndex: this.context.stepIndex, + selectedRecordRef, + idempotencyPhase: 'executing', + }), + }, ); await this.context.runStore.saveStepExecution(this.context.runId, { diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 14435b4850..7fa349a083 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -122,6 +122,7 @@ export { default as Runner } from './runner'; export type { RunnerConfig, RunnerState } from './runner'; export { default as validateSecrets } from './validate-secrets'; export { default as SchemaCache } from './schema-cache'; +export { default as SchemaResolver } from './schema-resolver'; export { default as InMemoryStore } from './stores/in-memory-store'; export { default as DatabaseStore } from './stores/database-store'; export type { DatabaseStoreOptions } from './stores/database-store'; diff --git a/packages/workflow-executor/src/ports/activity-log-port.ts b/packages/workflow-executor/src/ports/activity-log-port.ts index d12ab9ce63..51320571e6 100644 --- a/packages/workflow-executor/src/ports/activity-log-port.ts +++ b/packages/workflow-executor/src/ports/activity-log-port.ts @@ -1,10 +1,12 @@ import type { RecordId } from '../types/validated/collection'; +import type { ActivityLogAction, ActivityLogType } from '@forestadmin/forestadmin-client'; export interface CreateActivityLogArgs { renderingId: number; - action: string; - type: 'read' | 'write'; - collectionId?: string; + action: ActivityLogAction; + type: ActivityLogType; + // Numeric Forest collection id; the adapter forwards it to the lib's `collectionName` param. + collectionId: string; recordId?: RecordId; label?: string; } @@ -15,11 +17,14 @@ export interface ActivityLogHandle { } // Per-run scoped port: token baked into the adapter's constructor. markSucceeded/markFailed -// retry transient failures internally and are invoked with `void` from base-step-executor. +// retry transient failures internally and are invoked with `void` from AgentWithLog. export interface ActivityLogPort { createPending(args: CreateActivityLogArgs): Promise; markSucceeded(handle: ActivityLogHandle): Promise; - markFailed(handle: ActivityLogHandle, errorMessage: string): Promise; + // The server status endpoint accepts just { status }; the step error itself is logged by + // base-step-executor when the failure is rethrown (as the sole caller, AgentWithLog.audit, + // does), so no error message is threaded here. Only call this on a rethrow path. + markFailed(handle: ActivityLogHandle): Promise; } // Produces per-run ActivityLogPort instances and exposes drain() at the process level so the diff --git a/packages/workflow-executor/src/schema-resolver.ts b/packages/workflow-executor/src/schema-resolver.ts new file mode 100644 index 0000000000..61212bd383 --- /dev/null +++ b/packages/workflow-executor/src/schema-resolver.ts @@ -0,0 +1,30 @@ +import type { WorkflowPort } from './ports/workflow-port'; +import type SchemaCache from './schema-cache'; +import type { CollectionSchema } from './types/validated/collection'; + +// Per-run schema resolution: binds the shared SchemaCache, the orchestrator port and the +// current runId once, so callers never thread a loader. Writes into the SAME SchemaCache +// instance AgentClientAgentPort reads from (get/iterate): the resolver always populates an +// entry before the agent-port reads it, so the agent-port's SchemaNotCachedError guard does +// not fire under normal TTLs (a TTL shorter than a single step's round-trip could still evict). +export default class SchemaResolver { + private readonly cache: SchemaCache; + private readonly workflowPort: WorkflowPort; + private readonly runId: string; + + constructor(cache: SchemaCache, workflowPort: WorkflowPort, runId: string) { + this.cache = cache; + this.workflowPort = workflowPort; + this.runId = runId; + } + + async resolve(collectionName: string): Promise { + const cached = this.cache.get(collectionName); + if (cached) return cached; + + const schema = await this.workflowPort.getCollectionSchema(collectionName, this.runId); + this.cache.set(collectionName, schema); + + return schema; + } +} diff --git a/packages/workflow-executor/src/types/execution-context.ts b/packages/workflow-executor/src/types/execution-context.ts index 4a1ca06128..fcf2f2c02c 100644 --- a/packages/workflow-executor/src/types/execution-context.ts +++ b/packages/workflow-executor/src/types/execution-context.ts @@ -5,7 +5,7 @@ import type { AgentPort } from '../ports/agent-port'; import type { Logger } from '../ports/logger-port'; import type { RunStore } from '../ports/run-store'; import type { WorkflowPort } from '../ports/workflow-port'; -import type SchemaCache from '../schema-cache'; +import type SchemaResolver from '../schema-resolver'; import type { RecordRef } from './validated/collection'; import type { AvailableStepExecution, Step, StepUser } from './validated/execution'; import type { StepDefinition } from './validated/step-definition'; @@ -36,7 +36,7 @@ export interface ExecutionContext readonly workflowPort: WorkflowPort; readonly runStore: RunStore; readonly user: StepUser; - readonly schemaCache: SchemaCache; + readonly schemaResolver: SchemaResolver; readonly previousSteps: ReadonlyArray>; readonly logger: Logger; readonly incomingPendingData?: unknown; diff --git a/packages/workflow-executor/src/types/validated/collection.ts b/packages/workflow-executor/src/types/validated/collection.ts index 5ca727fc8a..27ddda751c 100644 --- a/packages/workflow-executor/src/types/validated/collection.ts +++ b/packages/workflow-executor/src/types/validated/collection.ts @@ -74,6 +74,7 @@ export type ActionSchema = z.infer; export const CollectionSchemaSchema = z .object({ collectionName: z.string().min(1), + collectionId: z.string().min(1), // null when the rendering has no explicit displayName configured — normalized to collectionName. collectionDisplayName: z.string().nullable(), primaryKeyFields: z.array(z.string().min(1)).min(1), diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts index cf03360cc1..219fe3fa4d 100644 --- a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -4,7 +4,7 @@ import { createRemoteAgentClient } from '@forestadmin/agent-client'; import jsonwebtoken from 'jsonwebtoken'; import AgentClientAgentPort from '../../src/adapters/agent-client-agent-port'; -import { AgentProbeError, RecordNotFoundError } from '../../src/errors'; +import { AgentProbeError, RecordNotFoundError, SchemaNotCachedError } from '../../src/errors'; import SchemaCache from '../../src/schema-cache'; jest.mock('@forestadmin/agent-client', () => ({ @@ -49,6 +49,7 @@ describe('AgentClientAgentPort', () => { const schemaCache = new SchemaCache(); schemaCache.set('users', { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -62,6 +63,7 @@ describe('AgentClientAgentPort', () => { }); schemaCache.set('orders', { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['tenantId', 'orderId'], fields: [ @@ -72,6 +74,7 @@ describe('AgentClientAgentPort', () => { }); schemaCache.set('posts', { collectionName: 'posts', + collectionId: 'col-posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], fields: [ @@ -189,17 +192,11 @@ describe('AgentClientAgentPort', () => { }); }); - it('should fallback to pk field "id" when collection is unknown', async () => { - mockCollection.list.mockResolvedValue([{ id: 1 }]); - - const result = await port.getRecord({ collection: 'unknown', id: [1] }, user); - - expect(mockCollection.list).toHaveBeenCalledWith( - expect.objectContaining({ - filters: { field: 'id', operator: 'Equal', value: 1 }, - }), + it('throws SchemaNotCachedError when the collection schema was not loaded first', async () => { + await expect(port.getRecord({ collection: 'unknown', id: [1] }, user)).rejects.toBeInstanceOf( + SchemaNotCachedError, ); - expect(result.collectionName).toBe('unknown'); + expect(mockCollection.list).not.toHaveBeenCalled(); }); }); @@ -273,6 +270,7 @@ describe('AgentClientAgentPort', () => { describe('getRelatedData', () => { const postsSchema = { collectionName: 'posts', + collectionId: 'col-posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], fields: [ @@ -480,6 +478,7 @@ describe('AgentClientAgentPort', () => { // that jsonapi-serializer emits as a nested object on the parent. const ordersSchema = { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['id'], fields: [ @@ -758,6 +757,7 @@ describe('AgentClientAgentPort', () => { const schemaCache = new SchemaCache(); schemaCache.set('users', { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ fieldName: 'id', displayName: 'id', isRelationship: false }], diff --git a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts index 30c8a4eaca..56cd44385f 100644 --- a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts +++ b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts @@ -591,6 +591,7 @@ describe('ForestServerWorkflowPort', () => { describe('getCollectionSchema', () => { const collectionSchema: CollectionSchema = { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [], @@ -626,6 +627,7 @@ describe('ForestServerWorkflowPort', () => { // Shape invalide : fields[0] manque fieldName (violation FieldSchema.fieldName.min(1)). mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ displayName: 'Email', isRelationship: false }], @@ -638,6 +640,7 @@ describe('ForestServerWorkflowPort', () => { it('strips unknown extra fields on the wire (orchestrator drift tolerance)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], referenceField: 'name', @@ -671,6 +674,7 @@ describe('ForestServerWorkflowPort', () => { it('defaults actions to [] when the orchestrator omits it', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [], @@ -684,6 +688,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts a field without type (omitted by the orchestrator)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ fieldName: 'email', displayName: 'Email', isRelationship: false }], @@ -700,6 +705,7 @@ describe('ForestServerWorkflowPort', () => { async displayName => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: displayName, primaryKeyFields: ['id'], fields: [], @@ -715,6 +721,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts relationType BelongsToMany (many-to-many relation)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -738,6 +745,7 @@ describe('ForestServerWorkflowPort', () => { it("strips the target key from relatedCollectionName (Forest 'collection.key' reference)", async () => { mockQuery.mockResolvedValue({ collectionName: 'accounts', + collectionId: 'col-accounts', collectionDisplayName: 'Accounts', primaryKeyFields: ['id'], fields: [ @@ -761,6 +769,7 @@ describe('ForestServerWorkflowPort', () => { it('leaves relatedCollectionName unchanged when it carries no target key (no dot)', async () => { mockQuery.mockResolvedValue({ collectionName: 'accounts', + collectionId: 'col-accounts', collectionDisplayName: 'Accounts', primaryKeyFields: ['id'], fields: [ @@ -784,6 +793,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts type File (Forest Admin extension)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -800,6 +810,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts type [File] (array of files)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -821,6 +832,7 @@ describe('ForestServerWorkflowPort', () => { it('rejects enumValues: [] (empty enum is invalid)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -1033,6 +1045,7 @@ describe('ForestServerWorkflowPort', () => { it('getCollectionSchema retries on HTTP 408 (timeout)', async () => { const validSchema: CollectionSchema = { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ diff --git a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts index fd5a89e465..3ad407023f 100644 --- a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts +++ b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts @@ -21,7 +21,12 @@ describe('ForestadminClientActivityLogPortFactory', () => { const factory = new ForestadminClientActivityLogPortFactory(service, makeLogger()); const port = factory.forRun('token-42'); - await port.createPending({ renderingId: 1, action: 'update', type: 'write' }); + await port.createPending({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(port).toBeInstanceOf(ForestadminClientActivityLogPort); expect(service.createActivityLog).toHaveBeenCalledWith( diff --git a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts index 428206a2d9..dd11300c76 100644 --- a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts +++ b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts @@ -54,7 +54,12 @@ describe('ForestadminClientActivityLogPort', () => { }); const port = makePort(service); - const handle = await port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const handle = await port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(handle).toEqual({ id: 'log-1', index: '0' }); expect(service.createActivityLog).toHaveBeenCalledWith( @@ -89,7 +94,12 @@ describe('ForestadminClientActivityLogPort', () => { const logger = makeLogger(); const port = makePort(service, { logger }); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); await jest.advanceTimersByTimeAsync(100); const handle = await promise; @@ -106,7 +116,12 @@ describe('ForestadminClientActivityLogPort', () => { service.createActivityLog.mockRejectedValue(makeHttpError(502)); const port = makePort(service); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); const settled = promise.catch(err => err); await jest.advanceTimersByTimeAsync(2_600); const err = await settled; @@ -121,7 +136,12 @@ describe('ForestadminClientActivityLogPort', () => { const port = makePort(service); await expect( - port.createPending({ renderingId: 5, action: 'update', type: 'write' }), + port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }), ).rejects.toBeInstanceOf(ActivityLogCreationError); expect(service.createActivityLog).toHaveBeenCalledTimes(1); }); @@ -132,7 +152,12 @@ describe('ForestadminClientActivityLogPort', () => { const port = makePort(service); await expect( - port.createPending({ renderingId: 5, action: 'update', type: 'write' }), + port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }), ).rejects.toBeInstanceOf(ActivityLogCreationError); expect(service.createActivityLog).toHaveBeenCalledTimes(1); }); @@ -145,7 +170,12 @@ describe('ForestadminClientActivityLogPort', () => { .mockResolvedValueOnce({ id: 'log-3', attributes: { index: '2' } }); const port = makePort(service); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); await jest.advanceTimersByTimeAsync(100); await expect(promise).resolves.toEqual({ id: 'log-3', index: '2' }); }); @@ -158,7 +188,12 @@ describe('ForestadminClientActivityLogPort', () => { }); const port = makePort(service); - await port.createPending({ renderingId: 42, action: 'update', type: 'write' }); + await port.createPending({ + renderingId: 42, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(service.createActivityLog).toHaveBeenCalledWith( expect.objectContaining({ renderingId: '42' }), @@ -232,14 +267,14 @@ describe('ForestadminClientActivityLogPort', () => { }); describe('markFailed', () => { - it('sends status: failed (no errorMessage — server schema rejects unknown fields) and retries on 503', async () => { + it('sends status: failed to the server without the local errorMessage, and retries on 503', async () => { const service = makeService(); service.updateActivityLogStatus .mockRejectedValueOnce(makeHttpError(503)) .mockResolvedValueOnce(undefined); const port = makePort(service); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(100); await promise; @@ -254,21 +289,18 @@ describe('ForestadminClientActivityLogPort', () => { ); }); - it('swallows errors after retries are exhausted (fire-and-forget) and logs with stepErrorMessage', async () => { + it('swallows errors after retries are exhausted (fire-and-forget) and logs the failure', async () => { const service = makeService(); service.updateActivityLogStatus.mockRejectedValue(makeHttpError(503)); const logger = makeLogger(); const port = makePort(service, { logger }); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'step-error-msg'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(2_600); await expect(promise).resolves.toBeUndefined(); expect(logger.error).toHaveBeenCalledWith( 'activity log mark-as-failed failed', - expect.objectContaining({ - handleId: 'log-1', - stepErrorMessage: 'step-error-msg', - }), + expect.objectContaining({ handleId: 'log-1' }), ); }); @@ -279,7 +311,7 @@ describe('ForestadminClientActivityLogPort', () => { .mockResolvedValueOnce(undefined); const port = makePort(service); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(100); await expect(promise).resolves.toBeUndefined(); expect(service.updateActivityLogStatus).toHaveBeenCalledTimes(2); @@ -327,7 +359,7 @@ describe('ForestadminClientActivityLogPort', () => { const drainer = new ActivityLogDrainer(); const port = makePort(service, { drainer }); - const markPromise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const markPromise = port.markFailed({ id: 'log-1', index: '0' }); let drainResolved = false; const drainPromise = drainer.drain().then(() => { diff --git a/packages/workflow-executor/test/executors/agent-with-log.test.ts b/packages/workflow-executor/test/executors/agent-with-log.test.ts new file mode 100644 index 0000000000..f42e193a10 --- /dev/null +++ b/packages/workflow-executor/test/executors/agent-with-log.test.ts @@ -0,0 +1,256 @@ +import type { AgentWithLogDeps } from '../../src/executors/agent-with-log'; +import type { AgentPort } from '../../src/ports/agent-port'; +import type SchemaResolver from '../../src/schema-resolver'; +import type { StepUser } from '../../src/types/execution-context'; +import type { CollectionSchema } from '../../src/types/validated/collection'; + +import { NoRecordsError } from '../../src/errors'; +import AgentWithLog from '../../src/executors/agent-with-log'; + +function makeUser(): StepUser { + return { + id: 1, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + } as StepUser; +} + +function makeSchema(collectionId = 'col-customers'): CollectionSchema { + return { + collectionName: 'customers', + collectionId, + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + referenceField: null, + fields: [], + actions: [], + }; +} + +function makeActivityLogPort() { + return { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; +} + +function makeDeps(overrides: Partial = {}) { + const activityLogPort = makeActivityLogPort(); + const agentPort = { + getRecord: jest + .fn() + .mockResolvedValue({ collectionName: 'customers', recordId: [42], values: {} }), + updateRecord: jest + .fn() + .mockResolvedValue({ collectionName: 'customers', recordId: [42], values: {} }), + getRelatedData: jest.fn().mockResolvedValue([]), + getSingleRelatedData: jest.fn().mockResolvedValue(null), + executeAction: jest.fn().mockResolvedValue({ ok: true }), + } as unknown as AgentPort; + const schemaResolver = { + resolve: jest.fn().mockResolvedValue(makeSchema()), + } as unknown as SchemaResolver; + + const deps = { + agentPort, + activityLogPort, + schemaResolver, + user: makeUser(), + ...overrides, + }; + + return { deps, agentPort, activityLogPort, schemaResolver }; +} + +describe('AgentWithLog', () => { + describe('read methods', () => { + it('logs getRecord as index/read against the call target and returns the data', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + const result = await agent.getRecord({ collection: 'customers', id: [42], fields: ['name'] }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'index', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }); + expect(activityLogPort.markSucceeded).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(agentPort.getRecord).toHaveBeenCalledWith( + { collection: 'customers', id: [42], fields: ['name'] }, + expect.objectContaining({ id: 1 }), + ); + expect(result).toEqual({ collectionName: 'customers', recordId: [42], values: {} }); + }); + + it('logs getRelatedData as listRelatedData/read', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await agent.getRelatedData({ + collection: 'customers', + id: [42], + relation: 'orders', + relatedSchema: makeSchema('col-orders'), + limit: 50, + }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'listRelatedData', type: 'read', recordId: [42] }), + ); + }); + + it('logs getSingleRelatedData as listRelatedData/read (xToOne)', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await agent.getSingleRelatedData({ + collection: 'customers', + id: [42], + relation: 'order', + relatedSchema: makeSchema('col-orders'), + }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'listRelatedData', type: 'read', recordId: [42] }), + ); + }); + }); + + describe('write methods', () => { + it('runs beforeCall between createPending and the agent call (audit precedes the side effect)', async () => { + const order: string[] = []; + const { deps, agentPort, activityLogPort } = makeDeps(); + (activityLogPort.createPending as jest.Mock).mockImplementation(async () => { + order.push('createPending'); + + return { id: 'log-1', index: '0' }; + }); + (agentPort.updateRecord as jest.Mock).mockImplementation(async () => { + order.push('updateRecord'); + + return { collectionName: 'customers', recordId: [42], values: {} }; + }); + const agent = new AgentWithLog(deps); + + await agent.updateRecord( + { collection: 'customers', id: [42], values: { name: 'X' } }, + { + beforeCall: async () => { + order.push('beforeCall'); + }, + }, + ); + + expect(order).toEqual(['createPending', 'beforeCall', 'updateRecord']); + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'update', type: 'write', recordId: [42] }), + ); + }); + + it('does NOT run beforeCall or the agent call when createPending throws', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + (activityLogPort.createPending as jest.Mock).mockRejectedValue(new Error('audit down')); + const beforeCall = jest.fn().mockResolvedValue(undefined); + const agent = new AgentWithLog(deps); + + await expect( + agent.updateRecord( + { collection: 'customers', id: [42], values: { name: 'X' } }, + { beforeCall }, + ), + ).rejects.toThrow('audit down'); + expect(beforeCall).not.toHaveBeenCalled(); + expect(agentPort.updateRecord).not.toHaveBeenCalled(); + }); + + it('marks failed and rethrows when beforeCall throws — the side effect never runs', async () => { + // beforeCall persists the write-ahead "executing" marker. If that save fails, the datasource + // write must not run, and the already-created pending entry is resolved to failed (never left + // orphan-pending). The record is genuinely untouched, which "failed" correctly conveys. + const { deps, agentPort, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await expect( + agent.updateRecord( + { collection: 'customers', id: [42], values: { name: 'X' } }, + { + beforeCall: async () => { + throw new Error('marker save failed'); + }, + }, + ), + ).rejects.toThrow('marker save failed'); + + expect(agentPort.updateRecord).not.toHaveBeenCalled(); + expect(activityLogPort.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(activityLogPort.markSucceeded).not.toHaveBeenCalled(); + }); + }); + + describe('failure marking', () => { + it('marks failed (not succeeded) and rethrows the original WorkflowExecutorError', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + (agentPort.updateRecord as jest.Mock).mockRejectedValue(new NoRecordsError()); + const agent = new AgentWithLog(deps); + + await expect( + agent.updateRecord( + { collection: 'customers', id: [42], values: { name: 'X' } }, + { beforeCall: async () => undefined }, + ), + ).rejects.toBeInstanceOf(NoRecordsError); + + expect(activityLogPort.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(activityLogPort.markSucceeded).not.toHaveBeenCalled(); + }); + + it('marks failed and rethrows a non-WorkflowExecutorError unchanged', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + (agentPort.getRecord as jest.Mock).mockRejectedValue(new Error('boom')); + const agent = new AgentWithLog(deps); + + await expect(agent.getRecord({ collection: 'customers', id: [42] })).rejects.toThrow('boom'); + + expect(activityLogPort.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + }); + }); + + describe('logged (generic, non-AgentPort operations)', () => { + it('audits an arbitrary operation against the provided target with a label', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + const result = await agent.logged( + { + action: 'action', + type: 'write', + label: 'my-mcp-server', + collectionId: 'col-1', + recordId: [7], + }, + async () => 'done', + { beforeCall: async () => undefined }, + ); + + expect(result).toBe('done'); + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + label: 'my-mcp-server', + collectionId: 'col-1', + recordId: [7], + }); + }); + }); +}); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index bef89c9760..c31dd1f71d 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -18,10 +18,10 @@ import { NoRecordsError, RunStorePortError, StepStateError, - WorkflowExecutorError, } from '../../src/errors'; import BaseStepExecutor from '../../src/executors/base-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; /** Concrete subclass that exposes protected methods for testing. */ @@ -104,8 +104,12 @@ function makeMockActivityLogPort(): ExecutionContext['activityLogPort'] { } function makeContext(overrides: Partial = {}): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as ExecutionContext['workflowPort']); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'step-0', stepIndex: 0, collectionId: 'col-1', @@ -122,7 +126,7 @@ function makeContext(overrides: Partial = {}): ExecutionContex }, model: {} as ExecutionContext['model'], agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -135,7 +139,7 @@ function makeContext(overrides: Partial = {}): ExecutionContex permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: makeMockLogger(), @@ -530,181 +534,6 @@ describe('BaseStepExecutor', () => { }); }); - describe('activity log lifecycle', () => { - class LoggedExecutor extends BaseStepExecutor { - constructor(context: ExecutionContext, private readonly errorToThrow?: unknown) { - super(context); - } - - protected override buildActivityLogArgs() { - return { - renderingId: 1, - action: 'update', - type: 'write' as const, - collectionId: 'col-1', - recordId: [42], - }; - } - - protected async doExecute(): Promise { - if (this.errorToThrow !== undefined) throw this.errorToThrow; - - return this.buildOutcomeResult({ status: 'success' }); - } - - protected buildOutcomeResult(outcome: { - status: BaseStepStatus; - error?: string; - }): StepExecutionResult { - return { - stepOutcome: { - type: 'record', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: outcome.status, - ...(outcome.error !== undefined && { error: outcome.error }), - }, - }; - } - } - - it('creates pending log, runs doExecute, then marks succeeded on success', async () => { - const context = makeContext(); - const executor = new LoggedExecutor(context); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('success'); - expect(context.activityLogPort.createPending).toHaveBeenCalledWith( - expect.objectContaining({ - action: 'update', - type: 'write', - collectionId: 'col-1', - }), - ); - expect(context.activityLogPort.markSucceeded).toHaveBeenCalledWith({ - id: 'log-1', - index: '0', - }); - expect(context.activityLogPort.markFailed).not.toHaveBeenCalled(); - }); - - it('marks failed when doExecute throws a WorkflowExecutorError', async () => { - const context = makeContext(); - const executor = new LoggedExecutor(context, new NoRecordsError()); - - await executor.execute(); - - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'No records available', - ); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - - it('fails the step and does NOT run doExecute when createPending throws ActivityLogCreationError', async () => { - // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require - const { ActivityLogCreationError } = require('../../src/errors'); - const context = makeContext(); - (context.activityLogPort.createPending as jest.Mock).mockRejectedValue( - new ActivityLogCreationError(new Error('net')), - ); - const doExecuteSpy = jest.fn().mockResolvedValue({ - stepOutcome: { type: 'record', stepId: 'x', stepIndex: 0, status: 'success' }, - }); - - class NeverRunExecutor extends LoggedExecutor { - protected override async doExecute(): Promise { - return doExecuteSpy(); - } - } - - const executor = new NeverRunExecutor(context); - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe( - 'Could not record this step in the audit log. Please try again, or contact your administrator if the problem persists.', - ); - expect(doExecuteSpy).not.toHaveBeenCalled(); - }); - - it('does NOT create pending log when buildActivityLogArgs returns null (default)', async () => { - const context = makeContext(); - const executor = new TestableExecutor(context); - - await executor.execute(); - - expect(context.activityLogPort.createPending).not.toHaveBeenCalled(); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - - it('calls markFailed with userMessage (not the technical message) on WorkflowExecutorError', async () => { - class DualMessageError extends WorkflowExecutorError { - constructor() { - super( - 'Internal: datasource "customers" returned no record for pk=42', - 'The record no longer exists.', - ); - } - } - const context = makeContext(); - const executor = new LoggedExecutor(context, new DualMessageError()); - - await executor.execute(); - - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'The record no longer exists.', - ); - }); - - it('marks failed when doExecute returns an error outcome without throwing', async () => { - class ErrorOutcomeExecutor extends BaseStepExecutor { - protected override buildActivityLogArgs() { - return { - renderingId: 1, - action: 'update', - type: 'write' as const, - collectionName: 'customers', - recordId: [42], - }; - } - - protected async doExecute(): Promise { - return this.buildOutcomeResult({ status: 'error', error: 'soft failure' }); - } - - protected buildOutcomeResult(outcome: { - status: BaseStepStatus; - error?: string; - }): StepExecutionResult { - return { - stepOutcome: { - type: 'record', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: outcome.status, - ...(outcome.error !== undefined && { error: outcome.error }), - }, - }; - } - } - - const context = makeContext(); - const executor = new ErrorOutcomeExecutor(context); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'soft failure', - ); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - }); - describe('invokeWithTool', () => { function makeMockModel(response: unknown) { const invoke = jest.fn().mockResolvedValue(response); diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index 83135f3378..12aa578102 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -7,6 +7,7 @@ import type { ConditionStepOutcome } from '../../src/types/validated/step-outcom import { RunStorePortError } from '../../src/errors'; import ConditionStepExecutor from '../../src/executors/condition-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): ConditionStepDefinition { @@ -44,8 +45,12 @@ function makeMockModel(toolCallArgs?: Record) { function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as ExecutionContext['workflowPort']); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'cond-1', stepIndex: 0, collectionId: 'col-1', @@ -57,7 +62,7 @@ function makeContext( stepDefinition: makeStep(), model: makeMockModel().model, agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -70,7 +75,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, diff --git a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts index aa5d0b09ee..ec7ca80f43 100644 --- a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts @@ -6,6 +6,7 @@ import type { GuidanceStepOutcome } from '../../src/types/validated/step-outcome import GuidanceStepExecutor from '../../src/executors/guidance-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeMockRunStore(overrides: Partial = {}): RunStore { @@ -21,8 +22,12 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as ExecutionContext['workflowPort']); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'guidance-1', stepIndex: 0, collectionId: 'col-1', @@ -34,7 +39,7 @@ function makeContext( stepDefinition: { type: StepType.Guidance, executionType: StepExecutionMode.Manual }, model: {} as ExecutionContext['model'], agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -47,7 +52,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 5ffe45226c..dab0dc43ad 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -9,6 +9,7 @@ import type { LoadRelatedRecordStepDefinition } from '../../src/types/validated/ import { AgentPortError, RunStorePortError } from '../../src/errors'; import LoadRelatedRecordStepExecutor from '../../src/executors/load-related-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep( @@ -72,6 +73,7 @@ function makeMockAgentPort(relatedData: RecordData[] = [makeRelatedRecordData()] function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -140,8 +142,12 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'selec function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'load-1', stepIndex: 0, collectionId: 'col-1', @@ -149,7 +155,7 @@ function makeContext( stepDefinition: makeStep(), model: makeMockModel({ relationName: 'Order', reasoning: 'User requested order' }).model, agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -162,7 +168,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, @@ -724,6 +730,58 @@ describe('LoadRelatedRecordStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs listRelatedData against the source record and its collection, not the trigger', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const { model } = makeMockModel({ relationName: 'Order', reasoning: 'r' }); + const context = makeContext({ + model, + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new LoadRelatedRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'listRelatedData', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('logs the relation read once on the awaiting-input (Branch C) path', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const { model } = makeMockModel({ relationName: 'Order', reasoning: 'r' }); + const context = makeContext({ model, runStore, activityLogPort }); + + const result = await new LoadRelatedRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ + action: 'listRelatedData', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }), + ); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves AI suggestion in pendingData and returns awaiting-input (single record — no field/record AI calls)', async () => { const agentPort = makeMockAgentPort(); // returns 1 record: orders #99 diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index b057b6394f..238731250b 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -9,6 +9,7 @@ import RemoteTool from '@forestadmin/ai-proxy/src/remote-tool'; import { RunStorePortError, StepStateError } from '../../src/errors'; import McpStepExecutor from '../../src/executors/mcp-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; // --------------------------------------------------------------------------- @@ -87,8 +88,12 @@ function makeMockModel(toolName: string, toolArgs: Record) { function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'mcp-1', stepIndex: 0, collectionId: 'col-1', @@ -101,7 +106,7 @@ function makeContext( getRelatedData: jest.fn(), executeAction: jest.fn(), } as unknown as ExecutionContext['agentPort'], - workflowPort: makeMockWorkflowPort(), + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -114,7 +119,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, @@ -267,7 +272,7 @@ describe('McpStepExecutor', () => { }), ); expect(logger.error).toHaveBeenCalledWith( - 'Failed to format MCP tool result, using generic fallback', + 'Failed to format MCP tool result, persisting raw result without summary', expect.objectContaining({ toolName: 'send_notification' }), ); }); @@ -951,7 +956,7 @@ describe('McpStepExecutor', () => { }); describe('activity log', () => { - it('creates activity log with collectionId, renderingId, action, type and mcpServerId as label', async () => { + it('logs against the run base record with collectionId, renderingId, action, type and mcpServerId as label', async () => { const tool = new MockRemoteTool({ name: 'send_notification', sourceId: 'mcp-server-1' }); const { model } = makeMockModel('send_notification', { message: 'Hello' }); const activityLogPort = { @@ -977,6 +982,7 @@ describe('McpStepExecutor', () => { action: 'action', type: 'write', collectionId: 'col-1', + recordId: [42], label: 'my-mcp-server', }); }); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index d483ce2d37..789f178d9e 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -8,6 +8,7 @@ import type { ReadRecordStepDefinition } from '../../src/types/validated/step-de import { AgentPortError, NoRecordsError, RecordNotFoundError } from '../../src/errors'; import ReadRecordStepExecutor from '../../src/executors/read-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): ReadRecordStepDefinition { @@ -48,6 +49,7 @@ function makeMockAgentPort( function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -107,8 +109,12 @@ function makeMockModel( function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'read-1', stepIndex: 0, collectionId: 'col-1', @@ -116,7 +122,7 @@ function makeContext( stepDefinition: makeStep(), model: makeMockModel({ fieldNames: ['email'] }).model, agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -129,7 +135,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, @@ -454,6 +460,7 @@ describe('ReadRecordStepExecutor', () => { const ordersSchema = makeCollectionSchema({ collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], }); @@ -501,7 +508,19 @@ describe('ReadRecordStepExecutor', () => { const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } }, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + activityLogPort, + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -519,6 +538,13 @@ describe('ReadRecordStepExecutor', () => { }), }), ); + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'index', + type: 'read', + collectionId: 'col-orders', + recordId: [99], + }); }); it('includes step index in select-record tool schema when records have stepIndex', async () => { diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index 59648d2751..7375e12f30 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -9,6 +9,7 @@ import type { TriggerActionStepDefinition } from '../../src/types/validated/step import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; import TriggerRecordActionStepExecutor from '../../src/executors/trigger-record-action-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep( @@ -44,6 +45,7 @@ function makeMockAgentPort(): AgentPort { function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -106,8 +108,12 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'selec function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'trigger-1', stepIndex: 0, collectionId: 'col-1', @@ -118,7 +124,7 @@ function makeContext( reasoning: 'User requested welcome email', }).model, agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -131,7 +137,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, @@ -188,6 +194,117 @@ describe('TriggerRecordActionStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs the action against the acted record and its collection, not the trigger', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.executeAction as jest.Mock).mockResolvedValue({ ok: true }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + agentPort, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new TriggerRecordActionStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('logs against a related record in another collection (cross-collection)', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionId: 'col-orders', + collectionDisplayName: 'Orders', + actions: [ + { + name: 'cancel-order', + displayName: 'Cancel Order', + endpoint: '/forest/actions/cancel-order', + }, + ], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-action', + args: { actionName: 'Cancel Order', reasoning: 'r' }, + id: 'c2', + }, + ], + }); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRecord, + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const agentPort = makeMockAgentPort(); + (agentPort.executeAction as jest.Mock).mockResolvedValue({ ok: true }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + baseRecordRef, + model, + runStore, + agentPort, + activityLogPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }), + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new TriggerRecordActionStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + collectionId: 'col-orders', + recordId: [99], + }); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves pendingAction and returns awaiting-input', async () => { const mockModel = makeMockModel({ diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 48becbae9f..f9ad2ac4bc 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -6,9 +6,15 @@ import type { UpdateRecordStepExecutionData } from '../../src/types/step-executi import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; import type { UpdateRecordStepDefinition } from '../../src/types/validated/step-definition'; -import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; +import { + ActivityLogCreationError, + AgentPortError, + RunStorePortError, + StepStateError, +} from '../../src/errors'; import UpdateRecordStepExecutor from '../../src/executors/update-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): UpdateRecordStepDefinition { @@ -47,6 +53,7 @@ function makeMockAgentPort( function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -104,8 +111,12 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'updat function makeContext( overrides: Partial> = {}, ): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + return { - runId: 'run-1', + runId, stepId: 'update-1', stepIndex: 0, collectionId: 'col-1', @@ -115,7 +126,7 @@ function makeContext( input: { fieldName: 'Status', value: 'active', reasoning: 'User requested status change' }, }).model, agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), + workflowPort, runStore: makeMockRunStore(), user: { id: 1, @@ -128,7 +139,7 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, @@ -181,6 +192,158 @@ describe('UpdateRecordStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs the update against the acted record and its collection, not the trigger', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + model: makeMockModel({ input: { fieldName: 'Status', value: 'active', reasoning: 'r' } }) + .model, + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new UpdateRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('does not log the update while only awaiting confirmation', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + model: makeMockModel({ input: { fieldName: 'Status', value: 'active', reasoning: 'r' } }) + .model, + runStore, + activityLogPort, + }); + + const result = await new UpdateRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).not.toHaveBeenCalled(); + }); + + it('logs against a related record in another collection (cross-collection)', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionId: 'col-orders', + collectionDisplayName: 'Orders', + fields: [ + { fieldName: 'total', displayName: 'Total', isRelationship: false, type: 'Number' }, + ], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'update-record-field', + args: { input: { fieldName: 'Total', value: 200, reasoning: 'r' } }, + id: 'c2', + }, + ], + }); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRecord, + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }), + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new UpdateRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-orders', + recordId: [99], + }); + }); + + it('does not persist the executing marker when the activity log cannot be created', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest + .fn() + .mockRejectedValue(new ActivityLogCreationError(new Error('audit down'))), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + const result = await new UpdateRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Could not record this step in the audit log. Please try again, or contact your administrator if the problem persists.', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ idempotencyPhase: 'executing' }), + ); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves execution and returns awaiting-input', async () => { const mockModel = makeMockModel({ @@ -395,13 +558,20 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const context = makeContext({ agentPort, runStore }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ agentPort, runStore, activityLogPort }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); expect(result.stepOutcome.status).toBe('success'); expect(agentPort.updateRecord).not.toHaveBeenCalled(); + // No side effect happened → no audit-log entry (PRD-442 #2: no premature/duplicate log). + expect(activityLogPort.createPending).not.toHaveBeenCalled(); expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 2089b18bb4..08f8b99514 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -36,6 +36,7 @@ const STEP_USER: StepUser = { const COLLECTION_SCHEMA: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -48,6 +49,7 @@ const COLLECTION_SCHEMA: CollectionSchema = { const COLLECTION_SCHEMA_WITH_STATUS: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -59,6 +61,7 @@ const COLLECTION_SCHEMA_WITH_STATUS: CollectionSchema = { const COLLECTION_SCHEMA_WITH_ACTIONS: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [{ fieldName: 'id', displayName: 'Id', isRelationship: false, type: 'Number' }], @@ -69,6 +72,7 @@ const COLLECTION_SCHEMA_WITH_ACTIONS: CollectionSchema = { const COLLECTION_SCHEMA_WITH_RELATION: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -86,6 +90,7 @@ const COLLECTION_SCHEMA_WITH_RELATION: CollectionSchema = { const ORDERS_SCHEMA: CollectionSchema = { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['id'], fields: [ diff --git a/packages/workflow-executor/test/schema-cache.test.ts b/packages/workflow-executor/test/schema-cache.test.ts index 216721640d..a65c152176 100644 --- a/packages/workflow-executor/test/schema-cache.test.ts +++ b/packages/workflow-executor/test/schema-cache.test.ts @@ -5,6 +5,7 @@ import SchemaCache from '../src/schema-cache'; function makeSchema(collectionName: string): CollectionSchema { return { collectionName, + collectionId: `col-${collectionName}`, collectionDisplayName: collectionName, primaryKeyFields: ['id'], fields: [], diff --git a/packages/workflow-executor/test/schema-resolver.test.ts b/packages/workflow-executor/test/schema-resolver.test.ts new file mode 100644 index 0000000000..dd338d766a --- /dev/null +++ b/packages/workflow-executor/test/schema-resolver.test.ts @@ -0,0 +1,65 @@ +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { CollectionSchema } from '../src/types/validated/collection'; + +import SchemaCache from '../src/schema-cache'; +import SchemaResolver from '../src/schema-resolver'; + +function makeSchema(collectionName: string): CollectionSchema { + return { + collectionName, + collectionId: `col-${collectionName}`, + collectionDisplayName: collectionName, + primaryKeyFields: ['id'], + fields: [], + actions: [], + }; +} + +function makeWorkflowPort(schema: CollectionSchema) { + return { + getCollectionSchema: jest.fn().mockResolvedValue(schema), + } as unknown as WorkflowPort & { getCollectionSchema: jest.Mock }; +} + +describe('SchemaResolver', () => { + it('returns the cached schema without calling the orchestrator on a hit', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('customers'); + cache.set('customers', schema); + const workflowPort = makeWorkflowPort(makeSchema('other')); + const resolver = new SchemaResolver(cache, workflowPort, 'run-1'); + + const result = await resolver.resolve('customers'); + + expect(result).toBe(schema); + expect(workflowPort.getCollectionSchema).not.toHaveBeenCalled(); + }); + + it('fetches with the bound runId, caches, and skips the fetch on a subsequent hit', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('orders'); + const workflowPort = makeWorkflowPort(schema); + const resolver = new SchemaResolver(cache, workflowPort, 'run-42'); + + const result = await resolver.resolve('orders'); + + expect(result).toBe(schema); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('orders', 'run-42'); + + // second call hits the cache — orchestrator not queried again + await resolver.resolve('orders'); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + + it('writes the fetched schema into the shared cache (read back by other consumers)', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('products'); + const resolver = new SchemaResolver(cache, makeWorkflowPort(schema), 'run-1'); + + await resolver.resolve('products'); + + // The same shared SchemaCache instance is what AgentClientAgentPort reads via .get(). + expect(cache.get('products')).toBe(schema); + }); +});