-
Notifications
You must be signed in to change notification settings - Fork 10
feat(workflow-executor): scaffold @forestadmin/workflow-executor package #1493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Scra3
wants to merge
18
commits into
main
Choose a base branch
from
feat/prd-214-setup-workflow-executor-package
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+16,017
−420
Open
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
1107759
feat(workflow-executor): scaffold @forestadmin/workflow-executor package
matthv 4510b7b
feat(workflow-executor): finalize scaffold — clean CLAUDE.md, remove …
17f26ca
fix(workflow-executor): address review — lint test dir, improve test,…
29f5646
feat(workflow-executor): define foundational types and port interface…
Scra3 127b579
feat(workflow-executor): implement condition step executor (AI-only) …
Scra3 cb8036b
feat(workflow-executor): implement AgentPort adapter using agent-clie…
matthv 0ebae51
feat(ai-proxy): add programmatic API to aiClient (#1492)
Scra3 c25a953
feat(workflow-executor): implement WorkflowPort adapter using foresta…
matthv c9877fe
feat(workflow-executor): add ReadRecordStepExecutor (#1497)
Scra3 613ec1b
feat(workflow-executor): add HTTP server and WorkflowRunner scaffold …
matthv 39de72a
refactor(workflow-executor): workflow steps (#1502)
Scra3 de39c30
feat(workflow-executor): add JWT auth and secret validation to HTTP s…
matthv 9399bb7
refactor(workflow-executor): move userConfirmed to RunStore pendingDa…
Scra3 50583e8
feat(workflow-executor): add RunStore implementations (InMemoryStore …
matthv 6e7858a
feat(workflow-executor): type-specific PATCH body validation (#1508)
Scra3 a52c00a
refactor(workflow-executor): encapsulate pending-data business logic …
Scra3 cf7d069
docs(workflow-executor): update contract to match implementation (#1511)
Scra3 cf8e699
feat(workflow-executor): add buildInMemoryExecutor and buildDatabaseE…
Scra3 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,285 @@ | ||
| # Workflow Executor — Contract Types | ||
|
|
||
| > Types exchanged between the **orchestrator (server)**, the **executor (agent-nodejs)**, and the **frontend**. | ||
| > Last updated: 2026-03-26 | ||
|
|
||
| --- | ||
|
|
||
| ## 1. Polling | ||
|
|
||
| **`GET /liana/v1/workflow-step-executions/pending?runId=<runId>`** | ||
|
|
||
| The executor polls for the current pending step of a run. The server must return **one object** (not an array), or `null` if the run is not found. | ||
|
|
||
| ```typescript | ||
| interface StepUser { | ||
| id: number; | ||
| email: string; | ||
| firstName: string; | ||
| lastName: string; | ||
| team: string; | ||
| renderingId: number; | ||
| role: string; | ||
| permissionLevel: string; | ||
| tags: Record<string, string>; | ||
| } | ||
|
|
||
| interface PendingStepExecution { | ||
| runId: string; | ||
| stepId: string; | ||
| stepIndex: number; | ||
| baseRecordRef: RecordRef; | ||
| stepDefinition: StepDefinition; | ||
| previousSteps: Step[]; | ||
| user: StepUser; // identity of the user who initiated the step | ||
| } | ||
| ``` | ||
|
|
||
| > **`null` response** → executor throws `RunNotFoundError` → HTTP 404 returned to caller. | ||
|
|
||
| ### CollectionSchema | ||
|
|
||
| Schema of a collection, returned by the orchestrator via `GET /liana/v1/collections/:collectionName`. Used by the executor to resolve primary keys and action endpoints. | ||
|
|
||
| ```typescript | ||
| interface CollectionSchema { | ||
| collectionName: string; | ||
| collectionDisplayName: string; | ||
| primaryKeyFields: string[]; | ||
| fields: FieldSchema[]; | ||
| actions: ActionSchema[]; | ||
| } | ||
|
|
||
| interface FieldSchema { | ||
| fieldName: string; | ||
| displayName: string; | ||
| isRelationship: boolean; | ||
| relationType?: "BelongsTo" | "HasMany" | "HasOne"; | ||
| relatedCollectionName?: string; | ||
| } | ||
|
|
||
| interface ActionSchema { | ||
| name: string; | ||
| displayName: string; | ||
| endpoint: string; // route path used by the agent to execute the action | ||
| } | ||
| ``` | ||
|
|
||
| ### RecordRef | ||
|
|
||
| Lightweight pointer to a specific record. | ||
|
|
||
| ```typescript | ||
| interface RecordRef { | ||
| collectionName: string; | ||
| recordId: Array<string | number>; | ||
| stepIndex: number; // index of the workflow step that loaded this record | ||
| } | ||
| ``` | ||
|
|
||
| ### Step | ||
|
|
||
| History entry for an already-executed step (used in `previousSteps`). | ||
|
|
||
| ```typescript | ||
| interface Step { | ||
| stepDefinition: StepDefinition; | ||
| stepOutcome: StepOutcome; | ||
| } | ||
| ``` | ||
|
|
||
| ### StepDefinition | ||
|
|
||
| Discriminated union on `type`. | ||
|
|
||
| ```typescript | ||
| type StepDefinition = | ||
| | ConditionStepDefinition | ||
| | RecordTaskStepDefinition | ||
| | McpTaskStepDefinition; | ||
|
|
||
| interface ConditionStepDefinition { | ||
| type: "condition"; | ||
| options: [string, ...string[]]; // at least one option required | ||
| prompt?: string; | ||
| aiConfigName?: string; | ||
| } | ||
|
|
||
| interface RecordTaskStepDefinition { | ||
| type: "read-record" | ||
| | "update-record" | ||
| | "trigger-action" | ||
| | "load-related-record"; | ||
| prompt?: string; | ||
| aiConfigName?: string; | ||
| automaticExecution?: boolean; | ||
| } | ||
|
|
||
| interface McpTaskStepDefinition { | ||
| type: "mcp-task"; | ||
| mcpServerId?: string; | ||
| prompt?: string; | ||
| aiConfigName?: string; | ||
| automaticExecution?: boolean; | ||
| } | ||
| ``` | ||
|
|
||
| ### StepOutcome | ||
|
|
||
| What the executor previously reported for each past step (used in `previousSteps`). | ||
|
|
||
| ```typescript | ||
| type StepOutcome = | ||
| | ConditionStepOutcome | ||
| | RecordTaskStepOutcome | ||
| | McpTaskStepOutcome; | ||
|
|
||
| interface ConditionStepOutcome { | ||
| type: "condition"; | ||
| stepId: string; | ||
| stepIndex: number; | ||
| status: "success" | "error"; | ||
| selectedOption?: string; // present when status = "success" | ||
| error?: string; // present when status = "error" | ||
| } | ||
|
|
||
| interface RecordTaskStepOutcome { | ||
| type: "record-task"; | ||
| stepId: string; | ||
| stepIndex: number; | ||
| status: "success" | "error" | "awaiting-input"; | ||
| error?: string; // present when status = "error" | ||
| } | ||
|
|
||
| interface McpTaskStepOutcome { | ||
| type: "mcp-task"; | ||
| stepId: string; | ||
| stepIndex: number; | ||
| status: "success" | "error" | "awaiting-input"; | ||
| error?: string; // present when status = "error" | ||
| } | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 2. Step Result | ||
|
|
||
| **`POST /liana/v1/workflow-step-executions/<runId>/complete`** | ||
|
|
||
| After executing a step, the executor posts the outcome back to the server. The body is one of the `StepOutcome` shapes above. | ||
|
|
||
| > **NEVER contains client data** (field values, AI reasoning, etc.) — those stay in the `RunStore` on the client side. | ||
|
|
||
| --- | ||
|
|
||
| ## 3. Pending Data | ||
|
|
||
| Steps that require user input pause with `status: "awaiting-input"`. The executor writes its AI-selected data to `pendingData` in the RunStore. The frontend can then override fields and confirm via the pending-data endpoint. | ||
|
|
||
| **`PATCH /runs/:runId/steps/:stepIndex/pending-data`** | ||
|
|
||
| The frontend writes user overrides + confirmation to the executor HTTP server. Request bodies are validated per step type with strict Zod schemas — unknown fields are rejected with `400`. | ||
|
|
||
| Once written, the frontend calls `POST /runs/:runId/trigger`. On the next execution, the executor reads `pendingData` from the RunStore and checks `userConfirmed`: | ||
| - `undefined` → returns `awaiting-input` again (the step is not yet actionable) | ||
| - `true` → execute the confirmed action | ||
| - `false` → skip the step (mark as success) | ||
|
|
||
| ### update-record — user picks a field + value to write | ||
|
|
||
| The executor writes the AI's field selection to `pendingData`. The frontend can override `value` and confirm. | ||
|
|
||
| Stored in RunStore: | ||
| ```typescript | ||
| interface UpdateRecordPendingData { | ||
| name: string; // technical field name (set by executor) | ||
| displayName: string; // label shown in the UI (set by executor) | ||
| value: string; // AI-proposed value; overridable by frontend | ||
| userConfirmed?: boolean; // set by frontend via PATCH | ||
| } | ||
| ``` | ||
|
|
||
| PATCH request body: | ||
| ```typescript | ||
| { | ||
| userConfirmed: boolean; | ||
| value?: string; // optional override of AI-proposed value | ||
| } | ||
| ``` | ||
|
|
||
| ### trigger-action & mcp-task — user confirmation only | ||
|
|
||
| The executor selects the action (or MCP tool) and writes `pendingData` to the RunStore. The frontend cannot override any executor-selected data — it only confirms or rejects. | ||
|
|
||
| PATCH request body (same for both types): | ||
| ```typescript | ||
| { | ||
| userConfirmed: boolean; | ||
| } | ||
| ``` | ||
|
|
||
| ### load-related-record — user picks the relation and/or the record | ||
|
|
||
| The executor writes the AI's relation selection to `pendingData`. The frontend can override the relation, the selected record, or both. | ||
|
|
||
| Stored in RunStore: | ||
| ```typescript | ||
| interface LoadRelatedRecordPendingData { | ||
| name: string; // technical relation name | ||
| displayName: string; // label shown in the UI | ||
| suggestedFields?: string[]; // fields suggested for display (set by executor) | ||
| selectedRecordId: Array<string|number>; // AI's pick; overridable by frontend | ||
| userConfirmed?: boolean; // set by frontend via PATCH | ||
| } | ||
| ``` | ||
|
|
||
| > `relatedCollectionName` is **not** stored in `pendingData` — the executor re-derives it from the `FieldSchema` at execution time using the (possibly overridden) relation `name`. | ||
|
|
||
| PATCH request body: | ||
| ```typescript | ||
| { | ||
| userConfirmed: boolean; | ||
| name?: string; // override relation | ||
| displayName?: string; // override relation label | ||
| selectedRecordId?: Array<string|number>; // override selected record (min 1 element) | ||
| } | ||
| ``` | ||
|
|
||
| ### Responses | ||
|
|
||
| | Status | Meaning | | ||
| |---|---| | ||
| | `204 No Content` | Pending data updated successfully | | ||
| | `400` | Invalid body — type mismatch, unknown fields, or empty `selectedRecordId` | | ||
| | `404` | Step not found, no `pendingData`, or step type does not support confirmation | | ||
|
|
||
| --- | ||
|
|
||
| ## Flow Summary | ||
|
|
||
| ``` | ||
| Orchestrator ──► GET pending?runId=X ──► Executor | ||
| │ | ||
| executes step | ||
| │ | ||
| ┌───────────────┴───────────────┐ | ||
| needs input done | ||
| │ │ | ||
| status: awaiting-input POST /complete | ||
| │ (StepOutcome) | ||
| │ | ||
| Executor writes pendingData | ||
| to RunStore (AI selection) | ||
| │ | ||
| Frontend reads pendingData | ||
| via GET /runs/:runId | ||
| │ | ||
| Frontend overrides + confirms | ||
| PATCH /runs/:runId/steps/:stepIndex/pending-data | ||
| { userConfirmed: true/false } → 204 | ||
| │ | ||
| POST /runs/:runId/trigger | ||
| │ | ||
| Executor resumes | ||
| (reads userConfirmed from pendingData) | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import type { McpConfiguration } from './mcp-client'; | ||
| import type { AiConfiguration } from './provider'; | ||
| import type { Logger } from '@forestadmin/datasource-toolkit'; | ||
| import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; | ||
|
|
||
| import { createBaseChatModel } from './create-base-chat-model'; | ||
| import { AINotConfiguredError } from './errors'; | ||
| import getAiConfiguration from './get-ai-configuration'; | ||
| import McpClient from './mcp-client'; | ||
| import validateAiConfigurations from './validate-ai-configurations'; | ||
|
|
||
| // eslint-disable-next-line import/prefer-default-export | ||
| export class AiClient { | ||
| private readonly aiConfigurations: AiConfiguration[]; | ||
| private readonly logger?: Logger; | ||
| private readonly modelCache = new Map<string, BaseChatModel>(); | ||
| private mcpClient?: McpClient; | ||
|
|
||
| constructor(params?: { aiConfigurations?: AiConfiguration[]; logger?: Logger }) { | ||
| this.aiConfigurations = params?.aiConfigurations ?? []; | ||
| this.logger = params?.logger; | ||
|
|
||
| validateAiConfigurations(this.aiConfigurations); | ||
| } | ||
|
|
||
| getModel(aiName?: string): BaseChatModel { | ||
| const config = getAiConfiguration(this.aiConfigurations, aiName, this.logger); | ||
| if (!config) throw new AINotConfiguredError(); | ||
|
|
||
| const cached = this.modelCache.get(config.name); | ||
| if (cached) return cached; | ||
|
|
||
| const model = createBaseChatModel(config); | ||
| this.modelCache.set(config.name, model); | ||
|
|
||
| return model; | ||
| } | ||
|
|
||
| async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> { | ||
| await this.closeMcpClient('Error closing previous MCP connection'); | ||
|
|
||
| const newClient = new McpClient(mcpConfig, this.logger); | ||
| const tools = await newClient.loadTools(); | ||
| this.mcpClient = newClient; | ||
|
|
||
| return tools; | ||
| } | ||
|
|
||
| async closeConnections(): Promise<void> { | ||
| await this.closeMcpClient('Error during MCP connection cleanup'); | ||
| } | ||
|
|
||
| private async closeMcpClient(errorMessage: string): Promise<void> { | ||
| if (!this.mcpClient) return; | ||
|
|
||
| try { | ||
| await this.mcpClient.closeConnections(); | ||
| } catch (error) { | ||
| const err = error instanceof Error ? error : new Error(String(error)); | ||
| this.logger?.('Error', errorMessage, err); | ||
| } finally { | ||
| this.mcpClient = undefined; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import type { AiConfiguration } from './provider'; | ||
| import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; | ||
|
|
||
| import { ChatAnthropic } from '@langchain/anthropic'; | ||
| import { ChatOpenAI } from '@langchain/openai'; | ||
|
|
||
| import { AIBadRequestError } from './errors'; | ||
|
|
||
| // eslint-disable-next-line import/prefer-default-export | ||
| export function createBaseChatModel(config: AiConfiguration): BaseChatModel { | ||
| if (config.provider === 'openai') { | ||
| const { provider, name, ...opts } = config; | ||
|
|
||
| return new ChatOpenAI({ maxRetries: 0, ...opts }); | ||
| } | ||
|
|
||
| if (config.provider === 'anthropic') { | ||
| const { provider, name, model, ...opts } = config; | ||
|
|
||
| return new ChatAnthropic({ maxRetries: 0, ...opts, model }); | ||
| } | ||
|
|
||
| throw new AIBadRequestError( | ||
| `Unsupported AI provider '${(config as { provider: string }).provider}'.`, | ||
| ); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium
src/ai-client.ts:39In
loadRemoteTools, ifnewClient.loadTools()throws, theMcpClientinstance is orphaned and never closed. Sincethis.mcpClientis only assigned after success,closeConnections()cannot reach the failed client, leaving resources leaked. Consider wrappingloadTools()in a try/finally to ensure cleanup on failure.async loadRemoteTools(mcpConfig: McpConfiguration): Promise<McpClient['tools']> { await this.closeMcpClient('Error closing previous MCP connection'); const newClient = new McpClient(mcpConfig, this.logger); - const tools = await newClient.loadTools(); - this.mcpClient = newClient; + try { + const tools = await newClient.loadTools(); + this.mcpClient = newClient; - return tools; + return tools; + } catch (error) { + await newClient.closeConnections(); + throw error; + } }🚀 Reply "fix it for me" or copy this AI Prompt for your agent: