Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c884d37
feat(workflow-executor): add TriggerActionStepExecutor with confirmat…
Mar 20, 2026
b28c467
refactor(workflow-executor): propagate actionName through ActionTarge…
Mar 20, 2026
21e4bac
refactor(workflow-executor): extract ActionRef and FieldRef, align Re…
Mar 20, 2026
e4ad28f
refactor(workflow-executor): rename fieldNames→fieldDisplayNames and …
Mar 20, 2026
30687a8
refactor(workflow-executor): use FieldRef[] in ReadRecord executionPa…
Mar 20, 2026
fd6250e
style(workflow-executor): fix prettier formatting in read-record-step…
Mar 20, 2026
b0bb7c8
refactor(workflow-executor): rename Zod schema keys to remove "displa…
Mar 20, 2026
9b9a1df
refactor(workflow-executor): introduce executor hierarchy and central…
Mar 20, 2026
c8b9247
refactor(workflow-executor): apply Template Method pattern and consol…
Mar 20, 2026
39d3e5d
refactor(workflow-executor): add load-related-record executor, typed …
Mar 21, 2026
a83c6ee
feat(workflow-executor): add userMessage to error hierarchy for end-u…
Mar 21, 2026
ff60271
feat(workflow-executor): add Logger port, ConsoleLogger adapter and i…
Mar 21, 2026
8295118
refactor(workflow-executor): remove redundant level field from Consol…
Mar 21, 2026
b7d414d
refactor(workflow-executor): extract StepSummaryBuilder, add load-rel…
Mar 21, 2026
c19db33
refactor(workflow-executor): named query types, save actionResult, mo…
Mar 21, 2026
971cd67
style(workflow-executor): remove unused Id import and inline getRelat…
Mar 21, 2026
91f240b
refactor(workflow-executor): move remoteTools out of ExecutionContext…
Mar 21, 2026
02c5af6
refactor(workflow-executor): centralise cause logging in BaseStepExec…
Mar 23, 2026
e4db841
feat(workflow-executor): add agentPortError and safe-agent-port wrapper
Mar 23, 2026
699ce28
feat(workflow-executor): add McpTaskStepExecutionData types and mcp-t…
Mar 23, 2026
b370b0a
feat(ai-proxy): re-export RemoteTool and langchain core types
Mar 23, 2026
581ecfd
refactor(workflow-executor): replace @langchain/core imports with @fo…
Mar 23, 2026
369a398
chore(workflow-executor): remove direct @langchain/core dependency
Mar 23, 2026
013560e
feat(workflow-executor): implement Runner polling loop and step dispatch
Mar 23, 2026
6a76a2d
feat(workflow-executor): add formattedResponse after MCP tool execution
Mar 23, 2026
062d12d
refactor(workflow-executor): add McpTaskStepOutcome and decouple McpT…
Mar 23, 2026
0eceffb
refactor(workflow-executor): move module-level helpers to private sta…
Mar 23, 2026
1635102
refactor(workflow-executor): extract getExecutor into StepExecutorFac…
Mar 23, 2026
4892437
refactor(workflow-executor): never-throw executor contract + ErrorSte…
Mar 23, 2026
0870aa4
refactor(workflow-executor): remove ErrorStepExecutor, inline error h…
Mar 23, 2026
a55c23e
feat(workflow-executor): add getPendingStepExecutionsForRun to fix tr…
Mar 23, 2026
d381ac3
fix: add claude
Mar 24, 2026
511f368
refactor(workflow-executor): simplify load-related-record pending dat…
Mar 24, 2026
a5d6249
docs(workflow-executor): replace invented route names with TODOs in c…
Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions WORKFLOW-EXECUTOR-CONTRACT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# Workflow Executor — Contract Types

> Types exchanged between the **orchestrator (server)**, the **executor (agent-nodejs)**, and the **frontend**.
> Last updated: 2026-03-24

---

## 1. Polling

**`GET /liana/v1/workflow-step-executions/pending?runId=<runId>`**

The executor polls for the current pending step of a run. The server must return **one object** (not an array), or `null` if the run is not found.

```typescript
interface PendingStepExecution {
runId: string;
stepId: string;
stepIndex: number;
baseRecordRef: RecordRef;
stepDefinition: StepDefinition;
previousSteps: Step[];
userConfirmed?: boolean; // true = user confirmed a pending action on this step
}
```

> **`null` response** → executor throws `RunNotFoundError` → HTTP 404 returned to caller.

### RecordRef

Lightweight pointer to a specific record.

```typescript
interface RecordRef {
collectionName: string;
recordId: Array<string | number>;
stepIndex: number; // index of the workflow step that loaded this record
}
```

### Step

History entry for an already-executed step (used in `previousSteps`).

```typescript
interface Step {
stepDefinition: StepDefinition;
stepOutcome: StepOutcome;
}
```

### StepDefinition

Discriminated union on `type`.

```typescript
type StepDefinition =
| ConditionStepDefinition
| RecordTaskStepDefinition
| McpTaskStepDefinition;

interface ConditionStepDefinition {
type: "condition";
options: [string, ...string[]]; // at least one option required
prompt?: string;
aiConfigName?: string;
}

interface RecordTaskStepDefinition {
type: "read-record"
| "update-record"
| "trigger-action"
| "load-related-record";
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}

interface McpTaskStepDefinition {
type: "mcp-task";
mcpServerId?: string;
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}
```

### StepOutcome

What the executor previously reported for each past step (used in `previousSteps`).

```typescript
type StepOutcome =
| ConditionStepOutcome
| RecordTaskStepOutcome
| McpTaskStepOutcome;

interface ConditionStepOutcome {
type: "condition";
stepId: string;
stepIndex: number;
status: "success" | "error" | "manual-decision";
selectedOption?: string; // present when status = "success"
error?: string; // present when status = "error"
}

interface RecordTaskStepOutcome {
type: "record-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}

interface McpTaskStepOutcome {
type: "mcp-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}
```

---

## 2. Step Result

**`POST /liana/v1/workflow-step-executions/<runId>/complete`**

After executing a step, the executor posts the outcome back to the server. The body is one of the `StepOutcome` shapes above.

> ⚠️ **NEVER contains client data** (field values, AI reasoning, etc.) — those stay in the `RunStore` on the client side.

---

## 3. Pending Data

Steps that require user input pause with `status: "awaiting-input"`. The frontend writes `pendingData` to unblock them via a dedicated endpoint on the executor HTTP server.

> **TODO** — The pending-data write endpoint is not yet implemented. Route, method, and per-step-type body shapes are TBD (PRD-240).

Once written, the frontend calls `POST /runs/:runId/trigger` and the executor resumes with `userConfirmed: true`.

### update-record — user picks a field + value to write

> **TODO** — Pending-data write endpoint TBD (PRD-240).

```typescript
interface UpdateRecordPendingData {
name: string; // technical field name
displayName: string; // label shown in the UI
value: string; // value chosen by the user
}
```

### trigger-action — user confirmation only

No payload required from the frontend. The executor selects the action and writes `pendingData` itself (action name + displayName) to the RunStore. The frontend just confirms:

```
POST /runs/:runId/trigger
```

### load-related-record — user picks the relation and/or the record

The frontend can override **both** the relation (field) and the selected record.

> **Current status** — The frontend cannot yet override the AI selection. The executor HTTP server does not yet expose the pending-data write endpoint. Until it is implemented, the executor writes the AI's pick directly into `selectedRecordId`.

```typescript
// Written by the executor; overwritable by the frontend via the pending-data endpoint (TBD)
interface LoadRelatedRecordPendingData {
name: string; // technical relation name
displayName: string; // label shown in the UI
relatedCollectionName: string; // collection of the related record
suggestedFields?: string[]; // fields suggested for display
selectedRecordId: Array<string|number>; // AI's pick; overwritten by the frontend via the pending-data endpoint
}
```

The executor initially writes the AI's pick into `selectedRecordId`. The pending-data endpoint overwrites it (and optionally `name`, `displayName`, `relatedCollectionName`) when the user changes the selection.

#### Future endpoint — pending-data write (not yet implemented)

> **TODO** — Route and method TBD (PRD-240).

Request body:

```typescript
{
selectedRecordId?: Array<string | number>; // record chosen by the user
name?: string; // relation changed
displayName?: string; // relation changed
relatedCollectionName?: string; // required if name is provided
}
```

Response: `204 No Content`.

The frontend calls this endpoint **before** `POST /runs/:runId/trigger`. On the next poll, `userConfirmed: true` and the executor reads `selectedRecordId` from the RunStore.

### mcp-task — user confirmation only

No payload required from the frontend. The executor selects the tool and writes `pendingData` itself (tool name + input) to the RunStore. The frontend just confirms:

```
POST /runs/:runId/trigger
```

The executor resumes with `userConfirmed: true` and executes the pre-selected tool.

---

## Flow Summary

```
Orchestrator ──► GET pending?runId=X ──► Executor
executes step
┌───────────────┴───────────────┐
needs input done
│ │
status: awaiting-input POST /complete
│ (StepOutcome)
Frontend writes pendingData
to executor HTTP server TODO: route TBD
POST /runs/:runId/trigger
(next poll: userConfirmed = true)
Executor resumes
```
7 changes: 7 additions & 0 deletions packages/ai-proxy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export { default as ProviderDispatcher } from './provider-dispatcher';
export * from './provider-dispatcher';
export * from './ai-client';
export * from './remote-tools';
export { default as RemoteTool } from './remote-tool';
export * from './router';
export * from './mcp-client';
export * from './oauth-token-injector';
Expand All @@ -16,3 +17,9 @@ export * from './errors';
export function validMcpConfigurationOrThrow(mcpConfig: McpConfiguration) {
return McpConfigChecker.check(mcpConfig);
}

export type { BaseChatModel } from '@langchain/core/language_models/chat_models';
export type { BaseMessage } from '@langchain/core/messages';
export { HumanMessage, SystemMessage } from '@langchain/core/messages';
export type { StructuredToolInterface } from '@langchain/core/tools';
export { DynamicStructuredTool } from '@langchain/core/tools';
12 changes: 3 additions & 9 deletions packages/ai-proxy/test/ai-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ describe('Model validation', () => {
expect(
() =>
new AiClient({
aiConfigurations: [
{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4' },
],
aiConfigurations: [{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4' }],
}),
).toThrow(AIModelNotSupportedError);
});
Expand All @@ -34,9 +32,7 @@ describe('Model validation', () => {
expect(
() =>
new AiClient({
aiConfigurations: [
{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4o' },
],
aiConfigurations: [{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4o' }],
}),
).not.toThrow();
});
Expand Down Expand Up @@ -143,9 +139,7 @@ describe('getModel', () => {
'Warn',
expect.stringContaining("AI configuration 'non-existent' not found"),
);
expect(createBaseChatModelMock).toHaveBeenCalledWith(
expect.objectContaining({ name: 'gpt4' }),
);
expect(createBaseChatModelMock).toHaveBeenCalledWith(expect.objectContaining({ name: 'gpt4' }));
expect(result).toBe(fakeModel);
});
});
Expand Down
2 changes: 1 addition & 1 deletion packages/ai-proxy/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import {
AIProviderError,
AIProviderUnavailableError,
AITooManyRequestsError,
AIUnauthorizedError,
AIToolNotFoundError,
AIToolUnprocessableError,
AIUnauthorizedError,
McpConfigError,
McpConflictError,
McpConnectionError,
Expand Down
9 changes: 2 additions & 7 deletions packages/ai-proxy/test/langchain-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,11 @@ describe('LangChainAdapter', () => {
{
role: 'assistant',
content: '',
tool_calls: [
{ id: 'call_1', function: { name: 'my_tool', arguments: 'not-json' } },
],
tool_calls: [{ id: 'call_1', function: { name: 'my_tool', arguments: 'not-json' } }],
},
]),
).toThrow(
new AIBadRequestError(
"Invalid JSON in tool_calls arguments for tool 'my_tool': not-json",
),
new AIBadRequestError("Invalid JSON in tool_calls arguments for tool 'my_tool': not-json"),
);
});
});
Expand Down Expand Up @@ -256,5 +252,4 @@ describe('LangChainAdapter', () => {
);
});
});

});
8 changes: 2 additions & 6 deletions packages/ai-proxy/test/provider-dispatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ describe('ProviderDispatcher', () => {
const thrown = await dispatcher.dispatch(buildBody()).catch(e => e);

expect(thrown).toBeInstanceOf(AIProviderUnavailableError);
expect(thrown.message).toBe(
'OpenAI server error (HTTP 500): Internal Server Error',
);
expect(thrown.message).toBe('OpenAI server error (HTTP 500): Internal Server Error');
expect(thrown.provider).toBe('OpenAI');
expect(thrown.providerStatusCode).toBe(500);
expect(thrown.baseBusinessErrorName).toBe('InternalServerError');
Expand Down Expand Up @@ -468,9 +466,7 @@ describe('ProviderDispatcher', () => {
.catch(e => e);

expect(thrown).toBeInstanceOf(AIProviderUnavailableError);
expect(thrown.message).toBe(
'Anthropic server error (HTTP 503): Service Unavailable',
);
expect(thrown.message).toBe('Anthropic server error (HTTP 503): Service Unavailable');
expect(thrown.provider).toBe('Anthropic');
expect(thrown.providerStatusCode).toBe(503);
expect(thrown.baseBusinessErrorName).toBe('InternalServerError');
Expand Down
10 changes: 8 additions & 2 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──

```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
Expand All @@ -61,7 +61,9 @@ src/
│ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers)
│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ ├── read-record-step-executor.ts # AI-powered record field reading step
│ └── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ ├── trigger-record-action-step-executor.ts # AI-powered action trigger step (with confirmation flow)
│ └── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
└── index.ts # Barrel exports
Expand All @@ -74,7 +76,11 @@ src/
- **Privacy** — Zero client data leaves the client's infrastructure. `StepOutcome` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only).
- **Ports (IO injection)** — All external IO goes through injected port interfaces, keeping the core pure and testable.
- **AI integration** — Uses `@langchain/core` (`BaseChatModel`, `DynamicStructuredTool`) for AI-powered steps. `ExecutionContext.model` is a `BaseChatModel`.
- **Error hierarchy** — All domain errors must extend `WorkflowExecutorError` (defined in `src/errors.ts`). This ensures executors can distinguish domain errors (caught → error outcome) from infrastructure errors (uncaught → propagate to caller). Never throw a plain `Error` for a domain error case.
- **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`.
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently).

## Commands

Expand Down
9 changes: 9 additions & 0 deletions packages/workflow-executor/jest.config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
/* eslint-disable import/no-relative-packages */
import path from 'path';

import jestConfig from '../../jest.config';

// Jest < 30 doesn't resolve wildcard exports in package.json.
// @anthropic-ai/sdk uses "./lib/*" exports that need this workaround.
const anthropicSdkDir = path.dirname(require.resolve('@anthropic-ai/sdk'));

export default {
...jestConfig,
collectCoverageFrom: ['<rootDir>/src/**/*.ts'],
testMatch: ['<rootDir>/test/**/*.test.ts'],
moduleNameMapper: {
'^@anthropic-ai/sdk/(.*)$': `${anthropicSdkDir}/$1`,
},
};
Loading
Loading