Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ src/
- **Boundary errors** (`extends Error`) — Thrown outside step execution, at the HTTP or Runner layer (e.g. `RunNotFoundError`, `PendingDataNotFoundError`, `ConfigurationError`). Caught by the HTTP server and translated into HTTP status codes (404, 400, etc.). These intentionally do NOT extend `WorkflowExecutorError` to prevent `base-step-executor` from catching them as step failures.
- **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`.
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` is called before `runWithActivityLog()` so neither cache hits nor uncertain-state errors emit activity log entries. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations).
- **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
Expand Down
17 changes: 3 additions & 14 deletions packages/workflow-executor/src/adapters/agent-client-agent-port.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
AgentPortError,
AgentProbeError,
RecordNotFoundError,
SchemaNotCachedError,
WorkflowExecutorError,
extractErrorMessage,
} from '../errors';
Expand Down Expand Up @@ -291,21 +292,9 @@ export default class AgentClientAgentPort implements AgentPort {
const cached = this.schemaCache.get(collectionName);

if (!cached) {
// eslint-disable-next-line no-console
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should never occur because schema is already loaded at this point

console.warn(
`[workflow-executor] Schema not found in cache for collection "${collectionName}". ` +
'Falling back to primaryKeyFields: ["id"]. Call getCollectionSchema first.',
);
throw new SchemaNotCachedError(collectionName);
}

return (
cached ?? {
collectionName,
collectionDisplayName: collectionName,
primaryKeyFields: ['id'],
fields: [],
actions: [],
}
);
return cached;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import type {
CreateActivityLogArgs,
} from '../ports/activity-log-port';
import type { Logger } from '../ports/logger-port';
import type {
ActivityLogAction,
ActivityLogsServiceInterface,
} from '@forestadmin/forestadmin-client';
import type { ActivityLogsServiceInterface } from '@forestadmin/forestadmin-client';

import { serializeRecordId } from './record-id-serializer';
import withRetry from './with-retry';
Expand All @@ -30,7 +27,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
this.service.createActivityLog({
forestServerToken: this.forestServerToken,
renderingId: String(args.renderingId),
action: args.action as ActivityLogAction,
action: args.action,
type: args.type,
// The lib writes this value verbatim into relationships.collection.data.id
// (JSON:API). The Forest server audit-trail API expects the numeric collectionId.
Expand Down Expand Up @@ -76,7 +73,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
});
}

async markFailed(handle: ActivityLogHandle, errorMessage: string): Promise<void> {
async markFailed(handle: ActivityLogHandle): Promise<void> {
return this.drainer.track(async () => {
try {
await withRetry(
Expand All @@ -92,7 +89,6 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
} catch (err) {
this.logger.error('activity log mark-as-failed failed', {
handleId: handle.id,
stepErrorMessage: errorMessage,
error: extractErrorMessage(err),
});
}
Expand Down
11 changes: 11 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ export class AgentPortError extends WorkflowExecutorError {
}
}

// Invariant guard: the agent port reads a collection's schema (for its primary keys) from the
// cache, which the executor must populate via getCollectionSchema before any record access.
export class SchemaNotCachedError extends WorkflowExecutorError {
constructor(collectionName: string) {
super(
`Collection schema for "${collectionName}" was not loaded before access — call getCollectionSchema first`,
'An error occurred while accessing your data. Please try again.',
);
}
}

export class WorkflowPortError extends WorkflowExecutorError {
constructor(operation: string, cause: unknown) {
super(
Expand Down
130 changes: 130 additions & 0 deletions packages/workflow-executor/src/executors/agent-with-log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import type { ActivityLogPort, CreateActivityLogArgs } from '../ports/activity-log-port';
import type {
AgentPort,
ExecuteActionQuery,
GetRecordQuery,
GetRelatedDataQuery,
GetSingleRelatedDataQuery,
UpdateRecordQuery,
} from '../ports/agent-port';
import type SchemaResolver from '../schema-resolver';
import type { StepUser } from '../types/execution-context';
import type { RecordData } from '../types/validated/collection';

// The audit-log target minus renderingId, which audit() stamps centrally.
export type AuditTarget = Omit<CreateActivityLogArgs, 'renderingId'>;

type WriteOptions = { beforeCall: () => Promise<void> };

export interface AgentWithLogDeps {
agentPort: AgentPort;
activityLogPort: ActivityLogPort;
schemaResolver: SchemaResolver;
user: StepUser;
}

// Wraps AgentPort and emits an activity-log entry around each data-access call
// (pending → success/failed). The audit target is derived from the call: the numeric
// collectionId is resolved from the call's collection name, the recordId from its id.
// Idempotency stays in the executors: write methods run a `beforeCall` thunk between
// createPending and the side effect (the executor persists its write-ahead marker there),
// so AgentWithLog never reaches into run state.
export default class AgentWithLog {
private readonly agentPort: AgentPort;
private readonly activityLogPort: ActivityLogPort;
private readonly schemaResolver: SchemaResolver;
private readonly user: StepUser;

constructor(deps: AgentWithLogDeps) {
this.agentPort = deps.agentPort;
this.activityLogPort = deps.activityLogPort;
this.schemaResolver = deps.schemaResolver;
this.user = deps.user;
}

async getRecord(query: GetRecordQuery): Promise<RecordData> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.audit({ action: 'index', type: 'read', collectionId, recordId: query.id }, () =>
this.agentPort.getRecord(query, this.user),
);
}

async getRelatedData(query: GetRelatedDataQuery): Promise<RecordData[]> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.audit(
{ action: 'listRelatedData', type: 'read', collectionId, recordId: query.id },
() => this.agentPort.getRelatedData(query, this.user),
);
}

async getSingleRelatedData(query: GetSingleRelatedDataQuery): Promise<RecordData | null> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.audit(
{ action: 'listRelatedData', type: 'read', collectionId, recordId: query.id },
() => this.agentPort.getSingleRelatedData(query, this.user),
);
}

async updateRecord(query: UpdateRecordQuery, opts: WriteOptions): Promise<RecordData> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.audit(
{ action: 'update', type: 'write', collectionId, recordId: query.id },
() => this.agentPort.updateRecord(query, this.user),
opts.beforeCall,
);
}

async executeAction(query: ExecuteActionQuery, opts: WriteOptions): Promise<unknown> {
const collectionId = await this.resolveCollectionId(query.collection);

return this.audit(
{ action: 'action', type: 'write', collectionId, recordId: query.id },
() => this.agentPort.executeAction(query, this.user),
opts.beforeCall,
);
}

// For operations that are not AgentPort calls (e.g. MCP tool invocation): the caller
// supplies the full audit target since there is no collection name to resolve.
logged<T>(
target: AuditTarget,
run: () => Promise<T>,
opts?: { beforeCall?: () => Promise<void> },
): Promise<T> {
return this.audit(target, run, opts?.beforeCall);
}

private async audit<T>(
args: AuditTarget,
run: () => Promise<T>,
beforeCall?: () => Promise<void>,
): Promise<T> {
const handle = await this.activityLogPort.createPending({
renderingId: this.user.renderingId,
...args,
});

try {
if (beforeCall) await beforeCall();
const result = await run();
void this.activityLogPort.markSucceeded(handle);

return result;
} catch (err) {
// The step error is logged/surfaced by base-step-executor when rethrown, so the audit
// transition only needs the handle.
void this.activityLogPort.markFailed(handle);
throw err;
}
}

private async resolveCollectionId(collectionName: string): Promise<string> {
const schema = await this.schemaResolver.resolve(collectionName);

return schema.collectionId;
}
}
57 changes: 15 additions & 42 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { CreateActivityLogArgs } from '../ports/activity-log-port';
import type { AgentPort } from '../ports/agent-port';
import type {
ExecutionContext,
Expand Down Expand Up @@ -26,6 +25,7 @@ import {
WorkflowExecutorError,
extractErrorMessage,
} from '../errors';
import AgentWithLog from './agent-with-log';
import patchBodySchemas from '../http/pending-data-validators';
import StepSummaryBuilder from './summary/step-summary-builder';

Expand All @@ -34,11 +34,21 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
{
protected readonly context: ExecutionContext<TStep>;

// Raw port — kept only for getActionFormInfo, which is intentionally not audited.
protected readonly agentPort: AgentPort;

// Audited data access — every call emits an activity-log entry.
protected readonly agent: AgentWithLog;

constructor(context: ExecutionContext<TStep>) {
this.context = context;
this.agentPort = context.agentPort;
this.agent = new AgentWithLog({
agentPort: context.agentPort,
activityLogPort: context.activityLogPort,
schemaResolver: context.schemaResolver,
user: context.user,
});
}

async execute(): Promise<StepExecutionResult> {
Expand All @@ -50,8 +60,9 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
});

try {
// Idempotency guard — mutating executors override this. Called before runWithActivityLog
// so that cache hits and uncertain-state errors never emit an activity log entry.
// Idempotency guard — mutating executors override this. Runs before doExecute so a cache
// hit or uncertain-state error short-circuits before any side effect, including the
// activity log emitted by AgentWithLog.
const cached = await this.checkIdempotency();

if (cached) {
Expand All @@ -63,7 +74,7 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
return cached;
}

const result = await this.runWithActivityLog();
const result = await this.runWithTimeout();

this.context.logger.info('Step execution completed', {
...this.logCtx,
Expand Down Expand Up @@ -112,44 +123,6 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
return Promise.resolve(null);
}

// Return null when the frontend performs the action (e.g. TriggerAction without
// executionType=FullyAutomated) — the front logs on its side. Override when the
// executor itself calls the agent.
protected buildActivityLogArgs(): CreateActivityLogArgs | null {
return null;
}

private async runWithActivityLog(): Promise<StepExecutionResult> {
const args = this.buildActivityLogArgs();
if (!args) return this.runWithTimeout();

const handle = await this.context.activityLogPort.createPending(args);

let result: StepExecutionResult;

try {
result = await this.runWithTimeout();
} catch (err) {
// Use userMessage (not the technical message) — errorMessage is rendered to end-users
// in the Forest Admin UI. Privacy: no collection/field/AI internals in the audit trail.
const errorMessage =
err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error';
void this.context.activityLogPort.markFailed(handle, errorMessage);
throw err;
}

if (result.stepOutcome.status === 'error') {
void this.context.activityLogPort.markFailed(
handle,
result.stepOutcome.error ?? 'Step failed',
);
} else {
void this.context.activityLogPort.markSucceeded(handle);
}

return result;
}

// Promise.race doesn't abort the losing branch — it keeps running in the background. The .catch()
// on execPromise must be attached BEFORE the race so a late rejection doesn't trigger
// UnhandledPromiseRejection. Late resolutions are silently discarded.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { CreateActivityLogArgs } from '../ports/activity-log-port';
import type { StepExecutionResult } from '../types/execution-context';
import type {
LoadRelatedRecordCandidate,
Expand Down Expand Up @@ -56,16 +55,6 @@ interface RelationTarget extends RelationRef {
}

export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<LoadRelatedRecordStepDefinition> {
protected override buildActivityLogArgs(): CreateActivityLogArgs | null {
return {
renderingId: this.context.user.renderingId,
action: 'listRelatedData',
type: 'read',
collectionId: this.context.collectionId,
recordId: this.context.baseRecordRef.recordId,
};
}

protected async doExecute(): Promise<StepExecutionResult> {
// Branch A -- Re-entry after pending execution found in RunStore
const pending = await this.patchAndReloadPendingData<LoadRelatedRecordStepExecutionData>(
Expand Down Expand Up @@ -276,16 +265,13 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
const relatedSchema = await this.getCollectionSchema(target.relatedCollectionName);
const referenceField = relatedSchema.referenceField ?? null;

const candidate = await this.agentPort.getSingleRelatedData(
{
collection: target.selectedRecordRef.collectionName,
id: target.selectedRecordRef.recordId,
relation: target.name,
relatedSchema,
...(referenceField && { fields: [referenceField] }),
},
this.context.user,
);
const candidate = await this.agent.getSingleRelatedData({
collection: target.selectedRecordRef.collectionName,
id: target.selectedRecordRef.recordId,
relation: target.name,
relatedSchema,
...(referenceField && { fields: [referenceField] }),
});

if (!candidate) return null;

Expand Down Expand Up @@ -434,16 +420,13 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
relatedSchema: CollectionSchema,
limit: number,
): Promise<RecordData[]> {
return this.agentPort.getRelatedData(
{
collection: target.selectedRecordRef.collectionName,
id: target.selectedRecordRef.recordId,
relation: target.name,
relatedSchema,
limit,
},
this.context.user,
);
return this.agent.getRelatedData({
collection: target.selectedRecordRef.collectionName,
id: target.selectedRecordRef.recordId,
relation: target.name,
relatedSchema,
limit,
});
}

/** Persists the loaded record ref and returns a success outcome. */
Expand Down
Loading
Loading