Skip to content

fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628

Open
Scra3 wants to merge 11 commits into
feat/prd-214-server-step-mapperfrom
fix/prd-442-activity-log-target
Open

fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628
Scra3 wants to merge 11 commits into
feat/prd-214-server-step-mapperfrom
fix/prd-442-activity-log-target

Conversation

@Scra3
Copy link
Copy Markdown
Member

@Scra3 Scra3 commented Jun 4, 2026

Context

fixes PRD-442

A workflow step's operation activity-log entry (audit trail) pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: the audit args were built before doExecute() (so the acted record wasn't resolved yet) and used context.collectionId + baseRecordRef.

Changes

Audit moved into a dedicated AgentWithLog wrapper. Activity-log responsibility left BaseStepExecutor / OperationStepExecutor (deleted) and now lives in AgentWithLog, which wraps AgentPort and emits one audit entry per data-access / write call (pending → succeeded/failed). The target is derived from the call itself — recordId = query.id, collectionId resolved from the call's collection — so each entry points at the acted record and its own collection, and fires only when the operation actually happens. Covers read / update / trigger-action / load-related; the MCP step uses the generic logged() (its side effect is a tool invocation, audited against the run's base record).

Acted collection's numeric id. CollectionSchema gains a required collectionId, resolved from the live schema via the new SchemaCache.getOrLoad(name, loader) (lazy cache-or-fetch, decoupled from WorkflowPort). The audit is not optional, so a missing id fails loud at the boundary (DomainValidationError).

Fail-loud schema resolution. AgentClientAgentPort.resolveSchema now throws SchemaNotCachedError instead of returning a synthetic fallback schema (which had a wrong primaryKeyFields / collectionId). Unreachable in the normal flow — AgentWithLog resolves (and caches) the schema right before delegating — so it's a true invariant guard.

Idempotency unchanged, ordering preserved. Idempotency stays entirely in the executors. The executing write-ahead marker is saved in a beforeCall thunk that AgentWithLog runs after createPending, before the side effect — so an audit-creation failure never leaves an orphan executing marker, and AgentWithLog stays ignorant of run state.

Activity-log failures surface as user-facing step errors; markFailed carries a local-only diagnostic message (the server status endpoint accepts just { status }).

⚠️ Coordinated deploy (cross-repo)

The orchestrator must include collectionId in every getCollectionSchema response. Until it does, steps on that collection fail at schema parse — so this must ship in tandem with the orchestrator change.

Scope

Bug #1 (wrong target), plus the confirmation-flow premature / duplicate symptom of #2: logging at the operation point means a confirmation step logs nothing while awaiting input, nothing on reject, and exactly once on confirm (covered by tests). Remaining #2 work: exactly-once across network retries (orchestrator-side dedup of identical (runId, stepIndex) outcomes).

Verification

  • yarn workspace @forestadmin/workflow-executor build
  • 922 tests — incl. a cross-collection repro (trigger on customers, act on orders#99 → audit targets orders, not the trigger), per-executor target assertions, AgentWithLog unit tests (beforeCall ordering, createPending-throws, userMessage vs 'Unexpected error'), reject-emits-no-log, and getOrLoad hit/miss ✅
  • lint: 0 errors ✅

🤖 Generated with Claude Code

Note

Context

fixes PRD-442

A workflow step's operation activity-log entry (audit trail) pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: the audit args were built before doExecute() (so the acted record wasn't resolved yet) and used context.collectionId + baseRecordRef.

Changes

Audit moved into a dedicated AgentWithLog wrapper. Activity-log responsibility left BaseStepExecutor / OperationStepExecutor (deleted) and now lives in AgentWithLog, which wraps AgentPort and emits one audit entry per data-access / write call (pending → succeeded/failed). The target is derived from the call itself — recordId = query.id, collectionId resolved from the call's collection — so each entry points at the acted record and its own collection, and fires only when the operation actually happens. Covers read / update / trigger-action / load-related; the MCP step uses the generic logged() (its side effect is a tool invocation, audited against the run's base record).

Acted collection's numeric id. CollectionSchema gains a required collectionId, resolved from the live schema via the new SchemaCache.getOrLoad(name, loader) (lazy cache-or-fetch, decoupled from WorkflowPort). The audit is not optional, so a missing id fails loud at the boundary (DomainValidationError).

Fail-loud schema resolution. AgentClientAgentPort.resolveSchema now throws SchemaNotCachedError instead of returning a synthetic fallback schema (which had a wrong primaryKeyFields / collectionId). Unreachable in the normal flow — AgentWithLog resolves (and caches) the schema right before delegating — so it's a true invariant guard.

Idempotency unchanged, ordering preserved. Idempotency stays entirely in the executors. The executing write-ahead marker is saved in a beforeCall thunk that AgentWithLog runs after createPending, before the side effect — so an audit-creation failure never leaves an orphan executing marker, and AgentWithLog stays ignorant of run state.

Activity-log failures surface as user-facing step errors; markFailed carries a local-only diagnostic message (the server status endpoint accepts just { status }).

⚠️ Coordinated deploy (cross-repo)

The orchestrator must include collectionId in every getCollectionSchema response. Until it does, steps on that collection fail at schema parse — so this must ship in tandem with the orchestrator change.

Scope

Bug #1 (wrong target), plus the confirmation-flow premature / duplicate symptom of #2: logging at the operation point means a confirmation step logs nothing while awaiting input, nothing on reject, and exactly once on confirm (covered by tests). Remaining #2 work: exactly-once across network retries (orchestrator-side dedup of identical (runId, stepIndex) outcomes).

Verification

  • yarn workspace @forestadmin/workflow-executor build
  • 922 tests — incl. a cross-collection repro (trigger on customers, act on orders#99 → audit targets orders, not the trigger), per-executor target assertions, AgentWithLog unit tests (beforeCall ordering, createPending-throws, userMessage vs 'Unexpected error'), reject-emits-no-log, and getOrLoad hit/miss ✅
  • lint: 0 errors ✅

🤖 Generated with Claude Code

Changes since #1628 opened

  • Introduced SchemaResolver class to bind schema resolution to a specific workflow run context [453f88a]
  • Refactored executors.AgentWithLog to use SchemaResolver instead of individual dependencies [453f88a]
  • Added schemaResolver property to types.ExecutionContext interface and updated context construction [453f88a]
  • Updated executors.BaseStepExecutor and executors.RecordStepExecutor to use SchemaResolver [453f88a]
  • Updated test files to construct and provide SchemaResolver in test contexts [453f88a]
  • Removed error message parameter from activity log failure marking throughout the workflow executor [73dd015]
  • Added test case verifying AgentWithLog.updateRecord error handling when beforeCall throws [3629570]
  • Removed schemaCache property from ExecutionContext interface in workflow-executor package [8bb1ba6]
  • Updated test helper functions across workflow-executor test suite to remove schemaCache from mock contexts [8bb1ba6]
  • Updated documentation comments in workflow-executor schema and activity log interfaces [8bb1ba6]

@linear-code
Copy link
Copy Markdown

linear-code Bot commented Jun 4, 2026

PRD-442

@qltysh
Copy link
Copy Markdown

qltysh Bot commented Jun 4, 2026

4 new issues

Tool Category Rule Count
qlty Structure Function with high complexity (count = 11): execute 2
qlty Duplication Found 21 lines of similar code in 2 locations (mass = 103) 1
qlty Structure Function with many returns (count = 5): execute 1

@qltysh
Copy link
Copy Markdown

qltysh Bot commented Jun 4, 2026

Qlty


Coverage Impact

Unable to calculate total coverage change because base branch coverage was not found.

Modified Files with Diff Coverage (13)

RatingFile% DiffUncovered Line #s
New Coverage rating: A
packages/workflow-executor/src/executors/step-executor-factory.ts100.0%
New Coverage rating: A
...ow-executor/src/executors/load-related-record-step-executor.ts100.0%
New Coverage rating: A
...s/workflow-executor/src/executors/read-record-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/record-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/base-step-executor.ts100.0%
New Coverage rating: A
...workflow-executor/src/executors/update-record-step-executor.ts100.0%
New Coverage rating: A
...-executor/src/executors/trigger-record-action-step-executor.ts100.0%
New Coverage rating: A
...ages/workflow-executor/src/adapters/agent-client-agent-port.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/errors.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/index.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/mcp-step-executor.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/executors/agent-with-log.ts100.0%
New Coverage rating: A
packages/workflow-executor/src/schema-resolver.ts100.0%
Total100.0%
🚦 See full report on Qlty Cloud »

🛟 Help
  • Diff Coverage: Coverage for added or modified lines of code (excludes deleted files). Learn more.

  • Total Coverage: Coverage for the whole repository, calculated as the sum of all File Coverage. Learn more.

  • File Coverage: Covered Lines divided by Covered Lines plus Missed Lines. (Excludes non-executable lines including blank lines and comments.)

    • Indirect Changes: Changes to File Coverage for files that were not modified in this PR. Learn more.

…rd (PRD-442 #1)

The operation audit-trail entry pointed at the run's trigger record, even
when the step acted on a record loaded earlier in the run — possibly in
another collection. Root cause: buildActivityLogArgs ran before doExecute
(so the acted record wasn't resolved yet) and used context.collectionId +
baseRecordRef.

Record executors now emit the entry at the point of the agent call, via a
new base helper withActivityLog(args, fn), targeting selectedRecordRef and
its collection's numeric id (read from the live schema). CollectionSchema
gains a required collectionId — the audit is not optional, so a missing id
fails loud at the boundary (DomainValidationError); the orchestrator must
include it in getCollectionSchema responses (coordinated deploy).

Scope: bug #1 (wrong target) only. As a consequence of logging at the
operation point, a confirmation step no longer logs before the write — the
remaining #2 work (exactly-once / no-log-on-reject, MCP step) is a follow-up.
The MCP executor still uses context.collectionId via the existing wrapper.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@Scra3 Scra3 force-pushed the fix/prd-442-activity-log-target branch from bdff84b to e8f778b Compare June 4, 2026 15:01
alban bertolini and others added 2 commits June 4, 2026 17:46
…logOperation

Move the activity-log machinery out of BaseStepExecutor into a new
OperationStepExecutor (Base -> Operation -> Record/Mcp). It owns the
`operation` descriptor, `logOperation(record, fn)` and the now-private
`withActivityLog` helper, so the base class no longer carries any audit
concern.

McpStepExecutor now extends OperationStepExecutor and logs its tool call
against the run base record via context.collectionId. RecordStepExecutor
overrides operationCollectionId to resolve the acted record's own
collection id.

Relocate the activity-log tests from base-step-executor.test.ts to a new
operation-step-executor.test.ts.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Tighten operation typing: CreateActivityLogArgs now uses the lib's
  ActivityLogAction/ActivityLogType (removes the `as ActivityLogAction`
  cast); extract OperationDescriptor (Pick) reused by the MCP executor.
  collectionId is now required on CreateActivityLogArgs.
- Drop the dead `errorMessage` param from ActivityLogPort.markFailed: the
  server status endpoint only accepts { status }, so the message went
  nowhere. The failure cause is still logged by BaseStepExecutor.
- Move the `executing` write-ahead marker inside the logOperation callback
  (after createPending, before the side effect) in update/trigger/mcp so an
  activity-log creation failure never leaves an orphan executing marker.
- Fix stale comment in base-step-executor + CLAUDE.md idempotency section.
- Reword the misleading MCP formatting-fallback log line.
- Tests: cross-collection activity-log target for update + trigger, log on
  the load-related awaiting-input branch, no-orphan-marker on createPending
  failure, and markFailed single-arg assertions.

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
return (
cached ?? {
collectionName,
collectionId: collectionName,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not works

alban bertolini and others added 3 commits June 4, 2026 20:16
Move the activity-log audit out of OperationStepExecutor into a dedicated
AgentWithLog collaborator that wraps AgentPort. Each data-access call now
emits one audit entry (pending -> success/failed); the action/type are
intrinsic to the method and the target (collectionId, recordId) is derived
from the call, so the per-executor `operation` descriptor and the
`operationCollectionId` hook are gone.

Idempotency stays in the executors. Write methods take a `beforeCall` thunk
run between createPending and the side effect, so the executor persists its
`executing` write-ahead marker in the correct order (createPending ->
executing -> write) without AgentWithLog knowing anything about idempotency.
MCP (whose side effect is a tool invocation, not an AgentPort call) uses the
generic `logged()` method.

Also restore the local-only `errorMessage` on markFailed: it never reaches
the server (status endpoint accepts only { status }) but is kept for the
adapter's diagnostic log line.

- delete OperationStepExecutor; RecordStepExecutor + McpStepExecutor extend
  BaseStepExecutor directly
- extract shared loadCollectionSchema helper (reused by getCollectionSchema
  and AgentWithLog collectionId resolution)
- getActionFormInfo / probe stay on the raw agentPort (intentionally not audited)
- new agent-with-log.test.ts (audit per method, beforeCall ordering,
  createPending-throws, userMessage vs 'Unexpected error'); relocate the
  audit suite out of the deleted operation-step-executor.test.ts

fixes PRD-442

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- AgentWithLog: replace LoggedOperation with AuditTarget
  (Omit<CreateActivityLogArgs,'renderingId'>), reused by logged() and
  audit() so the audit-arg shape stays anchored to the port type.
- CLAUDE.md: update the idempotency section to reference AgentWithLog and
  the beforeCall thunk (the deleted OperationStepExecutor.logOperation).
- tests: assert stepOutcome.error (audit-creation failure message) on the
  no-orphan-marker case; pin getSingleRelatedData (xToOne) audit action.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
resolveSchema fabricated a CollectionSchema on cache miss, including a
buggy `collectionId: cached.collectionId` (cached is undefined in that
branch). The fallback is unreachable in the normal flow — AgentWithLog
resolves (and caches, via the shared SchemaCache) the schema right before
delegating to the agent port — so throw a SchemaNotCachedError instead of
returning a fabricated schema with a wrong primaryKeyFields/collectionId.

- new SchemaNotCachedError (domain error)
- agent-client-agent-port test: fallback case becomes a throw assertion

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
const cached = this.schemaCache.get(collectionName);

if (!cached) {
// eslint-disable-next-line no-console
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should never occur because schema is already loaded at this point

Move the cache-or-fetch logic onto SchemaCache as getOrLoad(name, load):
the cache owns "check cache, else load via the injected thunk, then store",
while staying decoupled from WorkflowPort (the loader closure captures both
the port and the per-step runId). Removes the free loadCollectionSchema
helper; RecordStepExecutor.getCollectionSchema and AgentWithLog.resolveCollectionId
both go through getOrLoad.

Also pin the confirmation-flow audit behavior (PRD-442 #2 premature/duplicate
symptom, fixed as a consequence of logging at the operation point): a rejected
step emits no activity-log entry. Plus direct getOrLoad unit tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
// `load` is injected so the cache stays decoupled from the orchestrator port.
async getOrLoad(
collectionName: string,
load: () => Promise<CollectionSchema>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to accept a load arg ? is it not always the same logic for getting the schema ?

This means you could access a cached value that was not loaded from the same loader as the one you provide

Copy link
Copy Markdown
Member Author

@Scra3 Scra3 Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduce a schemaResolver to avoid to pass this ugly getOrLoad

} catch (err) {
const errorMessage =
err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error';
void this.activityLogPort.markFailed(handle, errorMessage);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If beforeCall throws, we markFailed though the side effect never ran — the audit then shows a failed write on a record that was never touched. Intended, or should we only markFailed around run()?


private async resolveCollectionId(collectionName: string): Promise<string> {
const schema = await this.schemaCache.getOrLoad(collectionName, () =>
this.workflowPort.getCollectionSchema(collectionName, this.runId),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncovered (qlty): a test with a real SchemaCache miss would exercise the workflowPort loader path here.

alban bertolini and others added 4 commits June 5, 2026 15:46
…Resolver

Both getOrLoad callers rewrote the same loader closure, only because the
process-wide SchemaCache singleton can't hold the per-run runId that
getCollectionSchema needs. Bind (schemaCache, workflowPort, runId) once in a
SchemaResolver instead, and make SchemaCache a pure store again. The resolver
feeds the same shared cache AgentClientAgentPort reads from, preserving the
SchemaNotCachedError invariant. AgentWithLogDeps drops from 6 to 4 fields.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The step error is already logged/surfaced by base-step-executor when rethrown,
so threading it into markFailed only enriched a rare double-fault diagnostic.
Drop the param: markFailed(handle) is now symmetric with markSucceeded, and
AgentWithLog no longer computes a userMessage/'Unexpected error' string.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
When beforeCall (the write-ahead marker save) throws, the datasource write
must not run and the pending entry is resolved to failed — never left
orphan-pending. Documents the intended behavior raised in review.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
After the SchemaResolver extraction no executor reads context.schemaCache —
all schema access goes through context.schemaResolver. Exposing the raw cache
next to the resolver advertised a second, unsafe path (a direct .get() bypasses
the resolver's read-through fetch, re-opening the SchemaNotCachedError footgun).
The factory keeps cfg.schemaCache to wire the resolver and the agent-port.

Also soften the SchemaResolver header comment (the guard is unreachable under
normal TTLs, not absolutely) and note markFailed must be called on a rethrow path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@nbouliol nbouliol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code LGTM, needs manual testing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants