Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1107759
feat(workflow-executor): scaffold @forestadmin/workflow-executor package
matthv Mar 17, 2026
4510b7b
feat(workflow-executor): finalize scaffold — clean CLAUDE.md, remove …
Mar 17, 2026
17f26ca
fix(workflow-executor): address review — lint test dir, improve test,…
Mar 17, 2026
29f5646
feat(workflow-executor): define foundational types and port interface…
Scra3 Mar 18, 2026
127b579
feat(workflow-executor): implement condition step executor (AI-only) …
Scra3 Mar 18, 2026
cb8036b
feat(workflow-executor): implement AgentPort adapter using agent-clie…
matthv Mar 18, 2026
0ebae51
feat(ai-proxy): add programmatic API to aiClient (#1492)
Scra3 Mar 18, 2026
c25a953
feat(workflow-executor): implement WorkflowPort adapter using foresta…
matthv Mar 18, 2026
c9877fe
feat(workflow-executor): add ReadRecordStepExecutor (#1497)
Scra3 Mar 19, 2026
613ec1b
feat(workflow-executor): add HTTP server and WorkflowRunner scaffold …
matthv Mar 20, 2026
39de72a
refactor(workflow-executor): workflow steps (#1502)
Scra3 Mar 24, 2026
de39c30
feat(workflow-executor): add JWT auth and secret validation to HTTP s…
matthv Mar 24, 2026
9399bb7
refactor(workflow-executor): move userConfirmed to RunStore pendingDa…
Scra3 Mar 25, 2026
50583e8
feat(workflow-executor): add RunStore implementations (InMemoryStore …
matthv Mar 25, 2026
6e7858a
feat(workflow-executor): type-specific PATCH body validation (#1508)
Scra3 Mar 25, 2026
a52c00a
refactor(workflow-executor): encapsulate pending-data business logic …
Scra3 Mar 25, 2026
cf7d069
docs(workflow-executor): update contract to match implementation (#1511)
Scra3 Mar 25, 2026
cf8e699
feat(workflow-executor): add buildInMemoryExecutor and buildDatabaseE…
Scra3 Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
- plugin-aws-s3
- plugin-export-advanced
- plugin-flattener
- workflow-executor
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
Expand Down
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ yarn workspace @forestadmin/agent test
5. Are edge cases handled?
6. Is the naming clear and consistent?

## Git Workflow

The **main working branch** for workflow-executor development is `feat/prd-214-setup-workflow-executor-package`.
All feature branches for this area should be based on and PRs targeted at this branch (not `main`).

## Linear Tickets

### MCP Setup
Expand Down
285 changes: 285 additions & 0 deletions WORKFLOW-EXECUTOR-CONTRACT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
# Workflow Executor — Contract Types

> Types exchanged between the **orchestrator (server)**, the **executor (agent-nodejs)**, and the **frontend**.
> Last updated: 2026-03-26

---

## 1. Polling

**`GET /liana/v1/workflow-step-executions/pending?runId=<runId>`**

The executor polls for the current pending step of a run. The server must return **one object** (not an array), or `null` if the run is not found.

```typescript
interface StepUser {
id: number;
email: string;
firstName: string;
lastName: string;
team: string;
renderingId: number;
role: string;
permissionLevel: string;
tags: Record<string, string>;
}

interface PendingStepExecution {
runId: string;
stepId: string;
stepIndex: number;
baseRecordRef: RecordRef;
stepDefinition: StepDefinition;
previousSteps: Step[];
user: StepUser; // identity of the user who initiated the step
}
```

> **`null` response** → executor throws `RunNotFoundError` → HTTP 404 returned to caller.

### CollectionSchema

Schema of a collection, returned by the orchestrator via `GET /liana/v1/collections/:collectionName`. Used by the executor to resolve primary keys and action endpoints.

```typescript
interface CollectionSchema {
collectionName: string;
collectionDisplayName: string;
primaryKeyFields: string[];
fields: FieldSchema[];
actions: ActionSchema[];
}

interface FieldSchema {
fieldName: string;
displayName: string;
isRelationship: boolean;
relationType?: "BelongsTo" | "HasMany" | "HasOne";
relatedCollectionName?: string;
}

interface ActionSchema {
name: string;
displayName: string;
endpoint: string; // route path used by the agent to execute the action
}
```

### RecordRef

Lightweight pointer to a specific record.

```typescript
interface RecordRef {
collectionName: string;
recordId: Array<string | number>;
stepIndex: number; // index of the workflow step that loaded this record
}
```

### Step

History entry for an already-executed step (used in `previousSteps`).

```typescript
interface Step {
stepDefinition: StepDefinition;
stepOutcome: StepOutcome;
}
```

### StepDefinition

Discriminated union on `type`.

```typescript
type StepDefinition =
| ConditionStepDefinition
| RecordTaskStepDefinition
| McpTaskStepDefinition;

interface ConditionStepDefinition {
type: "condition";
options: [string, ...string[]]; // at least one option required
prompt?: string;
aiConfigName?: string;
}

interface RecordTaskStepDefinition {
type: "read-record"
| "update-record"
| "trigger-action"
| "load-related-record";
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}

interface McpTaskStepDefinition {
type: "mcp-task";
mcpServerId?: string;
prompt?: string;
aiConfigName?: string;
automaticExecution?: boolean;
}
```

### StepOutcome

What the executor previously reported for each past step (used in `previousSteps`).

```typescript
type StepOutcome =
| ConditionStepOutcome
| RecordTaskStepOutcome
| McpTaskStepOutcome;

interface ConditionStepOutcome {
type: "condition";
stepId: string;
stepIndex: number;
status: "success" | "error";
selectedOption?: string; // present when status = "success"
error?: string; // present when status = "error"
}

interface RecordTaskStepOutcome {
type: "record-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}

interface McpTaskStepOutcome {
type: "mcp-task";
stepId: string;
stepIndex: number;
status: "success" | "error" | "awaiting-input";
error?: string; // present when status = "error"
}
```

---

## 2. Step Result

**`POST /liana/v1/workflow-step-executions/<runId>/complete`**

After executing a step, the executor posts the outcome back to the server. The body is one of the `StepOutcome` shapes above.

> **NEVER contains client data** (field values, AI reasoning, etc.) — those stay in the `RunStore` on the client side.

---

## 3. Pending Data

Steps that require user input pause with `status: "awaiting-input"`. The executor writes its AI-selected data to `pendingData` in the RunStore. The frontend can then override fields and confirm via the pending-data endpoint.

**`PATCH /runs/:runId/steps/:stepIndex/pending-data`**

The frontend writes user overrides + confirmation to the executor HTTP server. Request bodies are validated per step type with strict Zod schemas — unknown fields are rejected with `400`.

Once written, the frontend calls `POST /runs/:runId/trigger`. On the next execution, the executor reads `pendingData` from the RunStore and checks `userConfirmed`:
- `undefined` → returns `awaiting-input` again (the step is not yet actionable)
- `true` → execute the confirmed action
- `false` → skip the step (mark as success)

### update-record — user picks a field + value to write

The executor writes the AI's field selection to `pendingData`. The frontend can override `value` and confirm.

Stored in RunStore:
```typescript
interface UpdateRecordPendingData {
name: string; // technical field name (set by executor)
displayName: string; // label shown in the UI (set by executor)
value: string; // AI-proposed value; overridable by frontend
userConfirmed?: boolean; // set by frontend via PATCH
}
```

PATCH request body:
```typescript
{
userConfirmed: boolean;
value?: string; // optional override of AI-proposed value
}
```

### trigger-action & mcp-task — user confirmation only

The executor selects the action (or MCP tool) and writes `pendingData` to the RunStore. The frontend cannot override any executor-selected data — it only confirms or rejects.

PATCH request body (same for both types):
```typescript
{
userConfirmed: boolean;
}
```

### load-related-record — user picks the relation and/or the record

The executor writes the AI's relation selection to `pendingData`. The frontend can override the relation, the selected record, or both.

Stored in RunStore:
```typescript
interface LoadRelatedRecordPendingData {
name: string; // technical relation name
displayName: string; // label shown in the UI
suggestedFields?: string[]; // fields suggested for display (set by executor)
selectedRecordId: Array<string|number>; // AI's pick; overridable by frontend
userConfirmed?: boolean; // set by frontend via PATCH
}
```

> `relatedCollectionName` is **not** stored in `pendingData` — the executor re-derives it from the `FieldSchema` at execution time using the (possibly overridden) relation `name`.

PATCH request body:
```typescript
{
userConfirmed: boolean;
name?: string; // override relation
displayName?: string; // override relation label
selectedRecordId?: Array<string|number>; // override selected record (min 1 element)
}
```

### Responses

| Status | Meaning |
|---|---|
| `204 No Content` | Pending data updated successfully |
| `400` | Invalid body — type mismatch, unknown fields, or empty `selectedRecordId` |
| `404` | Step not found, no `pendingData`, or step type does not support confirmation |

---

## Flow Summary

```
Orchestrator ──► GET pending?runId=X ──► Executor
executes step
┌───────────────┴───────────────┐
needs input done
│ │
status: awaiting-input POST /complete
│ (StepOutcome)
Executor writes pendingData
to RunStore (AI selection)
Frontend reads pendingData
via GET /runs/:runId
Frontend overrides + confirms
PATCH /runs/:runId/steps/:stepIndex/pending-data
{ userConfirmed: true/false } → 204
POST /runs/:runId/trigger
Executor resumes
(reads userConfirmed from pendingData)
```
65 changes: 65 additions & 0 deletions packages/ai-proxy/src/ai-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { McpConfiguration } from './mcp-client';
import type { AiConfiguration } from './provider';
import type { Logger } from '@forestadmin/datasource-toolkit';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { createBaseChatModel } from './create-base-chat-model';
import { AINotConfiguredError } from './errors';
import getAiConfiguration from './get-ai-configuration';
import McpClient from './mcp-client';
import validateAiConfigurations from './validate-ai-configurations';

// eslint-disable-next-line import/prefer-default-export
export class AiClient {
private readonly aiConfigurations: AiConfiguration[];
private readonly logger?: Logger;
private readonly modelCache = new Map<string, BaseChatModel>();
private mcpClient?: McpClient;

constructor(params?: { aiConfigurations?: AiConfiguration[]; logger?: Logger }) {
this.aiConfigurations = params?.aiConfigurations ?? [];
this.logger = params?.logger;

validateAiConfigurations(this.aiConfigurations);
}

getModel(aiName?: string): BaseChatModel {
const config = getAiConfiguration(this.aiConfigurations, aiName, this.logger);
if (!config) throw new AINotConfiguredError();

const cached = this.modelCache.get(config.name);
if (cached) return cached;

const model = createBaseChatModel(config);
this.modelCache.set(config.name, model);

return model;
}

async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> {
await this.closeMcpClient('Error closing previous MCP connection');

const newClient = new McpClient(mcpConfig, this.logger);
const tools = await newClient.loadTools();
this.mcpClient = newClient;

return tools;
}
Comment on lines +39 to +47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium src/ai-client.ts:39

In loadRemoteTools, if newClient.loadTools() throws, the McpClient instance is orphaned and never closed. Since this.mcpClient is only assigned after success, closeConnections() cannot reach the failed client, leaving resources leaked. Consider wrapping loadTools() in a try/finally to ensure cleanup on failure.

  async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> {
     await this.closeMcpClient('Error closing previous MCP connection');
 
     const newClient = new McpClient(mcpConfig, this.logger);
-    const tools = await newClient.loadTools();
-    this.mcpClient = newClient;
+    try {
+      const tools = await newClient.loadTools();
+      this.mcpClient = newClient;
 
-    return tools;
+      return tools;
+    } catch (error) {
+      await newClient.closeConnections();
+      throw error;
+    }
   }
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/ai-proxy/src/ai-client.ts around lines 39-47:

In `loadRemoteTools`, if `newClient.loadTools()` throws, the `McpClient` instance is orphaned and never closed. Since `this.mcpClient` is only assigned after success, `closeConnections()` cannot reach the failed client, leaving resources leaked. Consider wrapping `loadTools()` in a try/finally to ensure cleanup on failure.

Evidence trail:
packages/ai-proxy/src/ai-client.ts lines 39-45 (loadRemoteTools function) and lines 52-60 (closeMcpClient function) at REVIEWED_COMMIT. The code shows newClient is instantiated at line 42, loadTools() is called at line 43, and this.mcpClient is assigned at line 44. If line 43 throws, line 44 never executes, leaving newClient orphaned with no cleanup path.


async closeConnections(): Promise<void> {
await this.closeMcpClient('Error during MCP connection cleanup');
}

private async closeMcpClient(errorMessage: string): Promise<void> {
if (!this.mcpClient) return;

try {
await this.mcpClient.closeConnections();
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
this.logger?.('Error', errorMessage, err);
} finally {
this.mcpClient = undefined;
}
}
}
26 changes: 26 additions & 0 deletions packages/ai-proxy/src/create-base-chat-model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { AiConfiguration } from './provider';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';

import { ChatAnthropic } from '@langchain/anthropic';
import { ChatOpenAI } from '@langchain/openai';

import { AIBadRequestError } from './errors';

// eslint-disable-next-line import/prefer-default-export
export function createBaseChatModel(config: AiConfiguration): BaseChatModel {
if (config.provider === 'openai') {
const { provider, name, ...opts } = config;

return new ChatOpenAI({ maxRetries: 0, ...opts });
}

if (config.provider === 'anthropic') {
const { provider, name, model, ...opts } = config;

return new ChatAnthropic({ maxRetries: 0, ...opts, model });
}

throw new AIBadRequestError(
`Unsupported AI provider '${(config as { provider: string }).provider}'.`,
);
}
Loading
Loading