fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628
fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1)#1628Scra3 wants to merge 11 commits into
Conversation
4 new issues
|
…rd (PRD-442 #1) The operation audit-trail entry pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: buildActivityLogArgs ran before doExecute (so the acted record wasn't resolved yet) and used context.collectionId + baseRecordRef. Record executors now emit the entry at the point of the agent call, via a new base helper withActivityLog(args, fn), targeting selectedRecordRef and its collection's numeric id (read from the live schema). CollectionSchema gains a required collectionId — the audit is not optional, so a missing id fails loud at the boundary (DomainValidationError); the orchestrator must include it in getCollectionSchema responses (coordinated deploy). Scope: bug #1 (wrong target) only. As a consequence of logging at the operation point, a confirmation step no longer logs before the write — the remaining #2 work (exactly-once / no-log-on-reject, MCP step) is a follow-up. The MCP executor still uses context.collectionId via the existing wrapper. fixes PRD-442 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
bdff84b to
e8f778b
Compare
…logOperation Move the activity-log machinery out of BaseStepExecutor into a new OperationStepExecutor (Base -> Operation -> Record/Mcp). It owns the `operation` descriptor, `logOperation(record, fn)` and the now-private `withActivityLog` helper, so the base class no longer carries any audit concern. McpStepExecutor now extends OperationStepExecutor and logs its tool call against the run base record via context.collectionId. RecordStepExecutor overrides operationCollectionId to resolve the acted record's own collection id. Relocate the activity-log tests from base-step-executor.test.ts to a new operation-step-executor.test.ts. fixes PRD-442 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Tighten operation typing: CreateActivityLogArgs now uses the lib's
ActivityLogAction/ActivityLogType (removes the `as ActivityLogAction`
cast); extract OperationDescriptor (Pick) reused by the MCP executor.
collectionId is now required on CreateActivityLogArgs.
- Drop the dead `errorMessage` param from ActivityLogPort.markFailed: the
server status endpoint only accepts { status }, so the message went
nowhere. The failure cause is still logged by BaseStepExecutor.
- Move the `executing` write-ahead marker inside the logOperation callback
(after createPending, before the side effect) in update/trigger/mcp so an
activity-log creation failure never leaves an orphan executing marker.
- Fix stale comment in base-step-executor + CLAUDE.md idempotency section.
- Reword the misleading MCP formatting-fallback log line.
- Tests: cross-collection activity-log target for update + trigger, log on
the load-related awaiting-input branch, no-orphan-marker on createPending
failure, and markFailed single-arg assertions.
fixes PRD-442
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| return ( | ||
| cached ?? { | ||
| collectionName, | ||
| collectionId: collectionName, |
Move the activity-log audit out of OperationStepExecutor into a dedicated
AgentWithLog collaborator that wraps AgentPort. Each data-access call now
emits one audit entry (pending -> success/failed); the action/type are
intrinsic to the method and the target (collectionId, recordId) is derived
from the call, so the per-executor `operation` descriptor and the
`operationCollectionId` hook are gone.
Idempotency stays in the executors. Write methods take a `beforeCall` thunk
run between createPending and the side effect, so the executor persists its
`executing` write-ahead marker in the correct order (createPending ->
executing -> write) without AgentWithLog knowing anything about idempotency.
MCP (whose side effect is a tool invocation, not an AgentPort call) uses the
generic `logged()` method.
Also restore the local-only `errorMessage` on markFailed: it never reaches
the server (status endpoint accepts only { status }) but is kept for the
adapter's diagnostic log line.
- delete OperationStepExecutor; RecordStepExecutor + McpStepExecutor extend
BaseStepExecutor directly
- extract shared loadCollectionSchema helper (reused by getCollectionSchema
and AgentWithLog collectionId resolution)
- getActionFormInfo / probe stay on the raw agentPort (intentionally not audited)
- new agent-with-log.test.ts (audit per method, beforeCall ordering,
createPending-throws, userMessage vs 'Unexpected error'); relocate the
audit suite out of the deleted operation-step-executor.test.ts
fixes PRD-442
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- AgentWithLog: replace LoggedOperation with AuditTarget (Omit<CreateActivityLogArgs,'renderingId'>), reused by logged() and audit() so the audit-arg shape stays anchored to the port type. - CLAUDE.md: update the idempotency section to reference AgentWithLog and the beforeCall thunk (the deleted OperationStepExecutor.logOperation). - tests: assert stepOutcome.error (audit-creation failure message) on the no-orphan-marker case; pin getSingleRelatedData (xToOne) audit action. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
resolveSchema fabricated a CollectionSchema on cache miss, including a buggy `collectionId: cached.collectionId` (cached is undefined in that branch). The fallback is unreachable in the normal flow — AgentWithLog resolves (and caches, via the shared SchemaCache) the schema right before delegating to the agent port — so throw a SchemaNotCachedError instead of returning a fabricated schema with a wrong primaryKeyFields/collectionId. - new SchemaNotCachedError (domain error) - agent-client-agent-port test: fallback case becomes a throw assertion Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| const cached = this.schemaCache.get(collectionName); | ||
|
|
||
| if (!cached) { | ||
| // eslint-disable-next-line no-console |
There was a problem hiding this comment.
should never occur because schema is already loaded at this point
Move the cache-or-fetch logic onto SchemaCache as getOrLoad(name, load): the cache owns "check cache, else load via the injected thunk, then store", while staying decoupled from WorkflowPort (the loader closure captures both the port and the per-step runId). Removes the free loadCollectionSchema helper; RecordStepExecutor.getCollectionSchema and AgentWithLog.resolveCollectionId both go through getOrLoad. Also pin the confirmation-flow audit behavior (PRD-442 #2 premature/duplicate symptom, fixed as a consequence of logging at the operation point): a rejected step emits no activity-log entry. Plus direct getOrLoad unit tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| // `load` is injected so the cache stays decoupled from the orchestrator port. | ||
| async getOrLoad( | ||
| collectionName: string, | ||
| load: () => Promise<CollectionSchema>, |
There was a problem hiding this comment.
why do we need to accept a load arg ? is it not always the same logic for getting the schema ?
This means you could access a cached value that was not loaded from the same loader as the one you provide
There was a problem hiding this comment.
I introduce a schemaResolver to avoid to pass this ugly getOrLoad
| } catch (err) { | ||
| const errorMessage = | ||
| err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error'; | ||
| void this.activityLogPort.markFailed(handle, errorMessage); |
There was a problem hiding this comment.
If beforeCall throws, we markFailed though the side effect never ran — the audit then shows a failed write on a record that was never touched. Intended, or should we only markFailed around run()?
|
|
||
| private async resolveCollectionId(collectionName: string): Promise<string> { | ||
| const schema = await this.schemaCache.getOrLoad(collectionName, () => | ||
| this.workflowPort.getCollectionSchema(collectionName, this.runId), |
There was a problem hiding this comment.
Uncovered (qlty): a test with a real SchemaCache miss would exercise the workflowPort loader path here.
…Resolver Both getOrLoad callers rewrote the same loader closure, only because the process-wide SchemaCache singleton can't hold the per-run runId that getCollectionSchema needs. Bind (schemaCache, workflowPort, runId) once in a SchemaResolver instead, and make SchemaCache a pure store again. The resolver feeds the same shared cache AgentClientAgentPort reads from, preserving the SchemaNotCachedError invariant. AgentWithLogDeps drops from 6 to 4 fields. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The step error is already logged/surfaced by base-step-executor when rethrown, so threading it into markFailed only enriched a rare double-fault diagnostic. Drop the param: markFailed(handle) is now symmetric with markSucceeded, and AgentWithLog no longer computes a userMessage/'Unexpected error' string. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
When beforeCall (the write-ahead marker save) throws, the datasource write must not run and the pending entry is resolved to failed — never left orphan-pending. Documents the intended behavior raised in review. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
After the SchemaResolver extraction no executor reads context.schemaCache — all schema access goes through context.schemaResolver. Exposing the raw cache next to the resolver advertised a second, unsafe path (a direct .get() bypasses the resolver's read-through fetch, re-opening the SchemaNotCachedError footgun). The factory keeps cfg.schemaCache to wire the resolver and the agent-port. Also soften the SchemaResolver header comment (the guard is unreachable under normal TTLs, not absolutely) and note markFailed must be called on a rethrow path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
nbouliol
left a comment
There was a problem hiding this comment.
Code LGTM, needs manual testing

Context
fixes PRD-442A workflow step's operation activity-log entry (audit trail) pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: the audit args were built before
doExecute()(so the acted record wasn't resolved yet) and usedcontext.collectionId+baseRecordRef.Changes
Audit moved into a dedicated
AgentWithLogwrapper. Activity-log responsibility leftBaseStepExecutor/OperationStepExecutor(deleted) and now lives inAgentWithLog, which wrapsAgentPortand emits one audit entry per data-access / write call (pending → succeeded/failed). The target is derived from the call itself —recordId = query.id,collectionIdresolved from the call's collection — so each entry points at the acted record and its own collection, and fires only when the operation actually happens. Covers read / update / trigger-action / load-related; the MCP step uses the genericlogged()(its side effect is a tool invocation, audited against the run's base record).Acted collection's numeric id.
CollectionSchemagains a requiredcollectionId, resolved from the live schema via the newSchemaCache.getOrLoad(name, loader)(lazy cache-or-fetch, decoupled fromWorkflowPort). The audit is not optional, so a missing id fails loud at the boundary (DomainValidationError).Fail-loud schema resolution.
AgentClientAgentPort.resolveSchemanow throwsSchemaNotCachedErrorinstead of returning a synthetic fallback schema (which had a wrongprimaryKeyFields/collectionId). Unreachable in the normal flow —AgentWithLogresolves (and caches) the schema right before delegating — so it's a true invariant guard.Idempotency unchanged, ordering preserved. Idempotency stays entirely in the executors. The
executingwrite-ahead marker is saved in abeforeCallthunk thatAgentWithLogruns aftercreatePending, before the side effect — so an audit-creation failure never leaves an orphanexecutingmarker, andAgentWithLogstays ignorant of run state.Activity-log failures surface as user-facing step errors;
markFailedcarries a local-only diagnostic message (the server status endpoint accepts just{ status }).The orchestrator must include
collectionIdin everygetCollectionSchemaresponse. Until it does, steps on that collection fail at schema parse — so this must ship in tandem with the orchestrator change.Scope
Bug #1 (wrong target), plus the confirmation-flow premature / duplicate symptom of #2: logging at the operation point means a confirmation step logs nothing while awaiting input, nothing on reject, and exactly once on confirm (covered by tests). Remaining #2 work: exactly-once across network retries (orchestrator-side dedup of identical
(runId, stepIndex)outcomes).Verification
yarn workspace @forestadmin/workflow-executor build✅customers, act onorders#99→ audit targetsorders, not the trigger), per-executor target assertions,AgentWithLogunit tests (beforeCall ordering, createPending-throws,userMessagevs'Unexpected error'), reject-emits-no-log, andgetOrLoadhit/miss ✅🤖 Generated with Claude Code
Note
Context
fixes PRD-442A workflow step's operation activity-log entry (audit trail) pointed at the run's trigger record, even when the step acted on a record loaded earlier in the run — possibly in another collection. Root cause: the audit args were built before
doExecute()(so the acted record wasn't resolved yet) and usedcontext.collectionId+baseRecordRef.Changes
Audit moved into a dedicated
AgentWithLogwrapper. Activity-log responsibility leftBaseStepExecutor/OperationStepExecutor(deleted) and now lives inAgentWithLog, which wrapsAgentPortand emits one audit entry per data-access / write call (pending → succeeded/failed). The target is derived from the call itself —recordId = query.id,collectionIdresolved from the call's collection — so each entry points at the acted record and its own collection, and fires only when the operation actually happens. Covers read / update / trigger-action / load-related; the MCP step uses the genericlogged()(its side effect is a tool invocation, audited against the run's base record).Acted collection's numeric id.
CollectionSchemagains a requiredcollectionId, resolved from the live schema via the newSchemaCache.getOrLoad(name, loader)(lazy cache-or-fetch, decoupled fromWorkflowPort). The audit is not optional, so a missing id fails loud at the boundary (DomainValidationError).Fail-loud schema resolution.
AgentClientAgentPort.resolveSchemanow throwsSchemaNotCachedErrorinstead of returning a synthetic fallback schema (which had a wrongprimaryKeyFields/collectionId). Unreachable in the normal flow —AgentWithLogresolves (and caches) the schema right before delegating — so it's a true invariant guard.Idempotency unchanged, ordering preserved. Idempotency stays entirely in the executors. The
executingwrite-ahead marker is saved in abeforeCallthunk thatAgentWithLogruns aftercreatePending, before the side effect — so an audit-creation failure never leaves an orphanexecutingmarker, andAgentWithLogstays ignorant of run state.Activity-log failures surface as user-facing step errors;
markFailedcarries a local-only diagnostic message (the server status endpoint accepts just{ status }).The orchestrator must include
collectionIdin everygetCollectionSchemaresponse. Until it does, steps on that collection fail at schema parse — so this must ship in tandem with the orchestrator change.Scope
Bug #1 (wrong target), plus the confirmation-flow premature / duplicate symptom of #2: logging at the operation point means a confirmation step logs nothing while awaiting input, nothing on reject, and exactly once on confirm (covered by tests). Remaining #2 work: exactly-once across network retries (orchestrator-side dedup of identical
(runId, stepIndex)outcomes).Verification
yarn workspace @forestadmin/workflow-executor build✅customers, act onorders#99→ audit targetsorders, not the trigger), per-executor target assertions,AgentWithLogunit tests (beforeCall ordering, createPending-throws,userMessagevs'Unexpected error'), reject-emits-no-log, andgetOrLoadhit/miss ✅🤖 Generated with Claude Code
Changes since #1628 opened
SchemaResolverclass to bind schema resolution to a specific workflow run context [453f88a]executors.AgentWithLogto useSchemaResolverinstead of individual dependencies [453f88a]schemaResolverproperty totypes.ExecutionContextinterface and updated context construction [453f88a]executors.BaseStepExecutorandexecutors.RecordStepExecutorto useSchemaResolver[453f88a]SchemaResolverin test contexts [453f88a]AgentWithLog.updateRecorderror handling whenbeforeCallthrows [3629570]schemaCacheproperty fromExecutionContextinterface inworkflow-executorpackage [8bb1ba6]workflow-executortest suite to removeschemaCachefrom mock contexts [8bb1ba6]workflow-executorschema and activity log interfaces [8bb1ba6]