Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
9432bb1
feat(workflow-executor): add UpdateRecordStepExecutor with confirmati…
Mar 19, 2026
a70b36a
refactor(workflow-executor): simplify interruption lookup in UpdateRe…
Mar 19, 2026
6aee69c
refactor(workflow-executor): rename toolConfirmationInterruption to p…
Mar 19, 2026
545bbd0
refactor(workflow-executor): throw on missing pendingUpdate instead o…
Mar 19, 2026
ebf033f
refactor(workflow-executor): type executionResult as union instead of…
Mar 19, 2026
61e2cb0
fix(workflow-executor): preserve pendingUpdate after confirmation for…
Mar 19, 2026
31a27c6
refactor(workflow-executor): remove unsafe cast in UpdateRecordStepEx…
Mar 19, 2026
f7a3edc
refactor(workflow-executor): simplify UpdateRecordStepExecutor and im…
Mar 19, 2026
9819917
refactor(workflow-executor): remove redundant buildSuccessResult/buil…
Mar 19, 2026
11b41b5
refactor(workflow-executor): rename history to previousSteps and docu…
Mar 19, 2026
239ca2f
docs(workflow-executor): document pendingUpdate field on UpdateRecord…
Mar 19, 2026
50905bc
refactor(workflow-executor): extract findField helper to deduplicate …
Mar 19, 2026
129290c
refactor(workflow-executor): apply PR review fixes across executors
Mar 19, 2026
bc3d0c6
refactor(workflow-executor): rename AiTask to RecordTask for clarity
Mar 19, 2026
d895154
refactor(workflow-executor): remove unused fields from RecordTaskStep…
Mar 19, 2026
d1651db
feat(workflow-executor): add ToolTaskStepDefinition for MCP tool steps
Mar 19, 2026
6a49657
fix(workflow-executor): fix prettier formatting on StepDefinition uni…
Mar 19, 2026
949d77a
refactor(workflow-executor): introduce UpdateTarget to reduce resolve…
Mar 20, 2026
b939ac7
test(workflow-executor): address PR review gaps on UpdateRecordStepEx…
Mar 20, 2026
180f3fd
refactor(workflow-executor): rename ToolTask→McpTask and automaticCom…
Mar 20, 2026
1309b54
test(workflow-executor): rename misleading stepId 'ai-step' to 'read-…
Mar 20, 2026
d99fe27
refactor(workflow-executor): simplify userInput to userConfirmed boolean
Mar 20, 2026
c884d37
feat(workflow-executor): add TriggerActionStepExecutor with confirmat…
Mar 20, 2026
b28c467
refactor(workflow-executor): propagate actionName through ActionTarge…
Mar 20, 2026
21e4bac
refactor(workflow-executor): extract ActionRef and FieldRef, align Re…
Mar 20, 2026
e4ad28f
refactor(workflow-executor): rename fieldNames→fieldDisplayNames and …
Mar 20, 2026
30687a8
refactor(workflow-executor): use FieldRef[] in ReadRecord executionPa…
Mar 20, 2026
fd6250e
style(workflow-executor): fix prettier formatting in read-record-step…
Mar 20, 2026
b0bb7c8
refactor(workflow-executor): rename Zod schema keys to remove "displa…
Mar 20, 2026
9b9a1df
refactor(workflow-executor): introduce executor hierarchy and central…
Mar 20, 2026
c8b9247
refactor(workflow-executor): apply Template Method pattern and consol…
Mar 20, 2026
39d3e5d
refactor(workflow-executor): add load-related-record executor, typed …
Mar 21, 2026
a83c6ee
feat(workflow-executor): add userMessage to error hierarchy for end-u…
Mar 21, 2026
ff60271
feat(workflow-executor): add Logger port, ConsoleLogger adapter and i…
Mar 21, 2026
8295118
refactor(workflow-executor): remove redundant level field from Consol…
Mar 21, 2026
b7d414d
refactor(workflow-executor): extract StepSummaryBuilder, add load-rel…
Mar 21, 2026
c19db33
refactor(workflow-executor): named query types, save actionResult, mo…
Mar 21, 2026
971cd67
style(workflow-executor): remove unused Id import and inline getRelat…
Mar 21, 2026
91f240b
refactor(workflow-executor): move remoteTools out of ExecutionContext…
Mar 21, 2026
02c5af6
refactor(workflow-executor): centralise cause logging in BaseStepExec…
Mar 23, 2026
e4db841
feat(workflow-executor): add agentPortError and safe-agent-port wrapper
Mar 23, 2026
699ce28
feat(workflow-executor): add McpTaskStepExecutionData types and mcp-t…
Mar 23, 2026
b370b0a
feat(ai-proxy): re-export RemoteTool and langchain core types
Mar 23, 2026
581ecfd
refactor(workflow-executor): replace @langchain/core imports with @fo…
Mar 23, 2026
369a398
chore(workflow-executor): remove direct @langchain/core dependency
Mar 23, 2026
013560e
feat(workflow-executor): implement Runner polling loop and step dispatch
Mar 23, 2026
6a76a2d
feat(workflow-executor): add formattedResponse after MCP tool execution
Mar 23, 2026
062d12d
refactor(workflow-executor): add McpTaskStepOutcome and decouple McpT…
Mar 23, 2026
0eceffb
refactor(workflow-executor): move module-level helpers to private sta…
Mar 23, 2026
1635102
refactor(workflow-executor): extract getExecutor into StepExecutorFac…
Mar 23, 2026
4892437
refactor(workflow-executor): never-throw executor contract + ErrorSte…
Mar 23, 2026
0870aa4
refactor(workflow-executor): remove ErrorStepExecutor, inline error h…
Mar 23, 2026
a55c23e
feat(workflow-executor): add getPendingStepExecutionsForRun to fix tr…
Mar 23, 2026
d381ac3
fix: add claude
Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/ai-proxy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export { default as ProviderDispatcher } from './provider-dispatcher';
export * from './provider-dispatcher';
export * from './ai-client';
export * from './remote-tools';
export { default as RemoteTool } from './remote-tool';
export * from './router';
export * from './mcp-client';
export * from './oauth-token-injector';
Expand All @@ -16,3 +17,9 @@ export * from './errors';
export function validMcpConfigurationOrThrow(mcpConfig: McpConfiguration) {
return McpConfigChecker.check(mcpConfig);
}

export type { BaseChatModel } from '@langchain/core/language_models/chat_models';
export type { BaseMessage } from '@langchain/core/messages';
export { HumanMessage, SystemMessage } from '@langchain/core/messages';
export type { StructuredToolInterface } from '@langchain/core/tools';
export { DynamicStructuredTool } from '@langchain/core/tools';
12 changes: 3 additions & 9 deletions packages/ai-proxy/test/ai-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ describe('Model validation', () => {
expect(
() =>
new AiClient({
aiConfigurations: [
{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4' },
],
aiConfigurations: [{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4' }],
}),
).toThrow(AIModelNotSupportedError);
});
Expand All @@ -34,9 +32,7 @@ describe('Model validation', () => {
expect(
() =>
new AiClient({
aiConfigurations: [
{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4o' },
],
aiConfigurations: [{ name: 'test', provider: 'openai', apiKey: 'dev', model: 'gpt-4o' }],
}),
).not.toThrow();
});
Expand Down Expand Up @@ -143,9 +139,7 @@ describe('getModel', () => {
'Warn',
expect.stringContaining("AI configuration 'non-existent' not found"),
);
expect(createBaseChatModelMock).toHaveBeenCalledWith(
expect.objectContaining({ name: 'gpt4' }),
);
expect(createBaseChatModelMock).toHaveBeenCalledWith(expect.objectContaining({ name: 'gpt4' }));
expect(result).toBe(fakeModel);
});
});
Expand Down
2 changes: 1 addition & 1 deletion packages/ai-proxy/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import {
AIProviderError,
AIProviderUnavailableError,
AITooManyRequestsError,
AIUnauthorizedError,
AIToolNotFoundError,
AIToolUnprocessableError,
AIUnauthorizedError,
McpConfigError,
McpConflictError,
McpConnectionError,
Expand Down
9 changes: 2 additions & 7 deletions packages/ai-proxy/test/langchain-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,11 @@ describe('LangChainAdapter', () => {
{
role: 'assistant',
content: '',
tool_calls: [
{ id: 'call_1', function: { name: 'my_tool', arguments: 'not-json' } },
],
tool_calls: [{ id: 'call_1', function: { name: 'my_tool', arguments: 'not-json' } }],
},
]),
).toThrow(
new AIBadRequestError(
"Invalid JSON in tool_calls arguments for tool 'my_tool': not-json",
),
new AIBadRequestError("Invalid JSON in tool_calls arguments for tool 'my_tool': not-json"),
);
});
});
Expand Down Expand Up @@ -256,5 +252,4 @@ describe('LangChainAdapter', () => {
);
});
});

});
8 changes: 2 additions & 6 deletions packages/ai-proxy/test/provider-dispatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ describe('ProviderDispatcher', () => {
const thrown = await dispatcher.dispatch(buildBody()).catch(e => e);

expect(thrown).toBeInstanceOf(AIProviderUnavailableError);
expect(thrown.message).toBe(
'OpenAI server error (HTTP 500): Internal Server Error',
);
expect(thrown.message).toBe('OpenAI server error (HTTP 500): Internal Server Error');
expect(thrown.provider).toBe('OpenAI');
expect(thrown.providerStatusCode).toBe(500);
expect(thrown.baseBusinessErrorName).toBe('InternalServerError');
Expand Down Expand Up @@ -468,9 +466,7 @@ describe('ProviderDispatcher', () => {
.catch(e => e);

expect(thrown).toBeInstanceOf(AIProviderUnavailableError);
expect(thrown.message).toBe(
'Anthropic server error (HTTP 503): Service Unavailable',
);
expect(thrown.message).toBe('Anthropic server error (HTTP 503): Service Unavailable');
expect(thrown.provider).toBe('Anthropic');
expect(thrown.providerStatusCode).toBe(503);
expect(thrown.baseBusinessErrorName).toBe('InternalServerError');
Expand Down
11 changes: 9 additions & 2 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Front ◀──▶ Orchestrator ◀──pull/push──▶ Executor ──

```
src/
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError
├── errors.ts # WorkflowExecutorError, MissingToolCallError, MalformedToolCallError, NoRecordsError, NoReadableFieldsError, NoWritableFieldsError, NoActionsError, StepPersistenceError, NoRelationshipFieldsError, RelatedRecordNotFoundError
├── runner.ts # Runner class — main entry point (start/stop/triggerPoll, HTTP server wiring)
├── types/ # Core type definitions (@draft)
│ ├── step-definition.ts # StepType enum + step definition interfaces
Expand All @@ -60,7 +60,10 @@ src/
├── executors/ # Step executor implementations
│ ├── base-step-executor.ts # Abstract base class (context injection + shared helpers)
│ ├── condition-step-executor.ts # AI-powered condition step (chooses among options)
│ └── read-record-step-executor.ts # AI-powered record field reading step
│ ├── read-record-step-executor.ts # AI-powered record field reading step
│ ├── update-record-step-executor.ts # AI-powered record field update step (with confirmation flow)
│ ├── trigger-record-action-step-executor.ts # AI-powered action trigger step (with confirmation flow)
│ └── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
└── index.ts # Barrel exports
Expand All @@ -73,7 +76,11 @@ src/
- **Privacy** — Zero client data leaves the client's infrastructure. `StepOutcome` is sent to the orchestrator and must NEVER contain client data. Privacy-sensitive information (e.g. AI reasoning) must stay in `StepExecutionData` (persisted in the RunStore, client-side only).
- **Ports (IO injection)** — All external IO goes through injected port interfaces, keeping the core pure and testable.
- **AI integration** — Uses `@langchain/core` (`BaseChatModel`, `DynamicStructuredTool`) for AI-powered steps. `ExecutionContext.model` is a `BaseChatModel`.
- **Error hierarchy** — All domain errors must extend `WorkflowExecutorError` (defined in `src/errors.ts`). This ensures executors can distinguish domain errors (caught → error outcome) from infrastructure errors (uncaught → propagate to caller). Never throw a plain `Error` for a domain error case.
- **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`.
- **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`).
- **No recovery/retry** — Once the executor returns a step result to the orchestrator, the step is considered executed. There is no mechanism to re-dispatch a step, so executors must NOT include recovery checks (e.g. checking the RunStore for cached results before executing). Each step executes exactly once.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getPendingStepExecutions()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightSteps` (to avoid running the same step twice concurrently).

## Commands

Expand Down
9 changes: 9 additions & 0 deletions packages/workflow-executor/jest.config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
/* eslint-disable import/no-relative-packages */
import path from 'path';

import jestConfig from '../../jest.config';

// Jest < 30 doesn't resolve wildcard exports in package.json.
// @anthropic-ai/sdk uses "./lib/*" exports that need this workaround.
const anthropicSdkDir = path.dirname(require.resolve('@anthropic-ai/sdk'));

export default {
...jestConfig,
collectCoverageFrom: ['<rootDir>/src/**/*.ts'],
testMatch: ['<rootDir>/test/**/*.test.ts'],
moduleNameMapper: {
'^@anthropic-ai/sdk/(.*)$': `${anthropicSdkDir}/$1`,
},
};
2 changes: 1 addition & 1 deletion packages/workflow-executor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
"test": "jest"
},
"dependencies": {
"@forestadmin/ai-proxy": "1.6.1",
"@forestadmin/agent-client": "1.4.13",
"@forestadmin/forestadmin-client": "1.37.17",
"@koa/router": "^13.1.0",
"@langchain/core": "1.1.33",
"koa": "^3.0.1",
"zod": "4.3.6"
},
Expand Down
77 changes: 36 additions & 41 deletions packages/workflow-executor/src/adapters/agent-client-agent-port.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
import type { AgentPort } from '../ports/agent-port';
import type {
AgentPort,
ExecuteActionQuery,
GetRecordQuery,
GetRelatedDataQuery,
UpdateRecordQuery,
} from '../ports/agent-port';
import type { CollectionSchema } from '../types/record';
import type { RemoteAgentClient, SelectOptions } from '@forestadmin/agent-client';

import { RecordNotFoundError } from '../errors';

function buildPkFilter(
primaryKeyFields: string[],
recordId: Array<string | number>,
id: Array<string | number>,
): SelectOptions['filters'] {
if (primaryKeyFields.length === 1) {
return { field: primaryKeyFields[0], operator: 'Equal', value: recordId[0] };
return { field: primaryKeyFields[0], operator: 'Equal', value: id[0] };
}

return {
aggregator: 'And',
conditions: primaryKeyFields.map((field, i) => ({
field,
operator: 'Equal',
value: recordId[i],
value: id[i],
})),
};
}

// agent-client methods (update, relation, action) still expect the pipe-encoded string format
function encodePk(recordId: Array<string | number>): string {
return recordId.map(v => String(v)).join('|');
function encodePk(id: Array<string | number>): string {
return id.map(v => String(v)).join('|');
}

function extractRecordId(
Expand All @@ -46,44 +52,39 @@ export default class AgentClientAgentPort implements AgentPort {
this.collectionSchemas = params.collectionSchemas;
}

async getRecord(collectionName: string, recordId: Array<string | number>, fieldNames?: string[]) {
const schema = this.resolveSchema(collectionName);
const records = await this.client.collection(collectionName).list<Record<string, unknown>>({
filters: buildPkFilter(schema.primaryKeyFields, recordId),
async getRecord({ collection, id, fields }: GetRecordQuery) {
const schema = this.resolveSchema(collection);
const records = await this.client.collection(collection).list<Record<string, unknown>>({
filters: buildPkFilter(schema.primaryKeyFields, id),
pagination: { size: 1, number: 1 },
...(fieldNames?.length && { fields: fieldNames }),
...(fields?.length && { fields }),
});

if (records.length === 0) {
throw new RecordNotFoundError(collectionName, encodePk(recordId));
throw new RecordNotFoundError(collection, encodePk(id));
}

return { collectionName, recordId, values: records[0] };
return { collectionName: collection, recordId: id, values: records[0] };
}

async updateRecord(
collectionName: string,
recordId: Array<string | number>,
values: Record<string, unknown>,
) {
async updateRecord({ collection, id, values }: UpdateRecordQuery) {
const updatedRecord = await this.client
.collection(collectionName)
.update<Record<string, unknown>>(encodePk(recordId), values);
.collection(collection)
.update<Record<string, unknown>>(encodePk(id), values);

return { collectionName, recordId, values: updatedRecord };
return { collectionName: collection, recordId: id, values: updatedRecord };
}

async getRelatedData(
collectionName: string,
recordId: Array<string | number>,
relationName: string,
) {
const relatedSchema = this.resolveSchema(relationName);
async getRelatedData({ collection, id, relation, limit, fields }: GetRelatedDataQuery) {
const relatedSchema = this.resolveSchema(relation);

const records = await this.client
.collection(collectionName)
.relation(relationName, encodePk(recordId))
.list<Record<string, unknown>>();
.collection(collection)
.relation(relation, encodePk(id))
.list<Record<string, unknown>>({
...(limit !== null && { pagination: { size: limit, number: 1 } }),
...(fields?.length && { fields }),
});

return records.map(record => ({
collectionName: relatedSchema.collectionName,
Expand All @@ -92,17 +93,11 @@ export default class AgentClientAgentPort implements AgentPort {
}));
}

async executeAction(
collectionName: string,
actionName: string,
recordIds: Array<string | number>[],
): Promise<unknown> {
const encodedIds = recordIds.map(id => encodePk(id));
const action = await this.client
.collection(collectionName)
.action(actionName, { recordIds: encodedIds });

return action.execute();
async executeAction({ collection, action, id }: ExecuteActionQuery): Promise<unknown> {
const encodedId = id?.length ? [encodePk(id)] : [];
const act = await this.client.collection(collection).action(action, { recordIds: encodedId });

return act.execute();
}

private resolveSchema(collectionName: string): CollectionSchema {
Expand Down
7 changes: 7 additions & 0 deletions packages/workflow-executor/src/adapters/console-logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { Logger } from '../ports/logger-port';

export default class ConsoleLogger implements Logger {
error(message: string, context: Record<string, unknown>): void {
console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { ServerUtils } from '@forestadmin/forestadmin-client';
// TODO: finalize route paths with the team — these are placeholders
const ROUTES = {
pendingStepExecutions: '/liana/v1/workflow-step-executions/pending',
pendingStepExecutionForRun: (runId: string) =>
`/liana/v1/workflow-step-executions/pending?runId=${encodeURIComponent(runId)}`,
updateStepExecution: (runId: string) => `/liana/v1/workflow-step-executions/${runId}/complete`,
collectionSchema: (collectionName: string) => `/liana/v1/collections/${collectionName}`,
mcpServerConfigs: '/liana/mcp-server-configs-with-details',
Expand All @@ -29,6 +31,14 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
);
}

async getPendingStepExecutionsForRun(runId: string): Promise<PendingStepExecution | null> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low adapters/forest-server-workflow-port.ts:34

getPendingStepExecutionsForRun queries a list endpoint that returns an array, but declares return type PendingStepExecution | null. The server will return an array at runtime, causing callers to receive an array typed as a single object. Consider changing the return type to Promise<PendingStepExecution[]> to match the actual response shape.

Suggested change
async getPendingStepExecutionsForRun(runId: string): Promise<PendingStepExecution | null> {
async getPendingStepExecutionsForRun(runId: string): Promise<PendingStepExecution[]> {
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/workflow-executor/src/adapters/forest-server-workflow-port.ts around line 34:

`getPendingStepExecutionsForRun` queries a list endpoint that returns an array, but declares return type `PendingStepExecution | null`. The server will return an array at runtime, causing callers to receive an array typed as a single object. Consider changing the return type to `Promise<PendingStepExecution[]>` to match the actual response shape.

Evidence trail:
- packages/workflow-executor/src/adapters/forest-server-workflow-port.ts lines 10-16 (ROUTES definitions showing same base endpoint)
- packages/workflow-executor/src/adapters/forest-server-workflow-port.ts lines 25-39 (both methods using same endpoint with different return types)
- packages/forestadmin-client/src/utils/server.ts line 72 (ServerUtils.query returns response.body directly without transformation)
- packages/workflow-executor/src/runner.ts lines 91-96 (usage of result as single object)
- packages/workflow-executor/src/ports/workflow-port.ts lines 11-12 (interface definition)

return ServerUtils.query<PendingStepExecution | null>(
this.options,
'get',
ROUTES.pendingStepExecutionForRun(runId),
);
}

async updateStepExecution(runId: string, stepOutcome: StepOutcome): Promise<void> {
await ServerUtils.query(
this.options,
Expand Down
Loading
Loading