feat(task-graph): binary-streaming framework + result-as-reference#561
feat(task-graph): binary-streaming framework + result-as-reference#561sroussey wants to merge 27 commits into
Conversation
Spec 1 — binary-delta streaming framework
-----------------------------------------
Adds a `binary-delta` variant to `StreamEvent` (analogous to `text-delta` /
`object-delta`) plus an `x-stream: "binary"` annotation on output port
schemas, so a task can `executeStream` byte chunks the same way it streams
text or structured objects. New port helpers (`getBinaryPortId`,
`getBinaryPortFormat`, `getStreamingPorts`), a `materializeBinary`
assembler (Blob for `format: "blob"`/absent, ArrayBuffer for
`format: "binary"`), and a `getOutputStreamMode` adopter let downstream
code branch cleanly on binary mode without reaching for `any`.
StreamProcessor accumulates `binary-delta` chunks per port and merges
them into the enriched finish event so downstream dataflows see the
materialized payload (or, for explicit binary finish payloads, the
artifact wins per Spec 1's precedence rule).
StreamPump adds the graph-aware decision (`canStreamBinaryToCache`,
`anyConsumerNeedsMaterialized`) and the `pipeBinaryToCache` assembly
helper that turns a task's `binary-delta` events into an `AsyncIterable`
ready to drive a streaming cache sink.
`TaskOutputRepository` gains an optional `saveOutputStream` sink so
file-backed (or other stream-capable) caches can ingest bytes without
materializing the full payload; `supportsStreaming()` and the
`RunPrivateCacheRepo` wrapper forward the capability correctly.
Spec 2 — result-as-reference
----------------------------
Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing
supports streaming, the runner pipes the binary bytes straight to the
cache and places a `CacheRef` placeholder in `Output` at the port slot.
Downstream `Output` consumers (and the queue row) see a small envelope
(`{ \$ref, size?, mime? }`) instead of the full payload, while the bytes
live in the cache for hydration on demand.
Pieces:
- `CacheRef` type + `isCacheRef` type guard (`cache/CacheRef.ts`).
- `resolveOutput` walker (`cache/resolveRef.ts`) — pure recursive walker
that hydrates refs through a caller-supplied resolver. Identity is
preserved when no descendant matches the optional filter; class
instances (`Error`, `URL`, custom classes) survive with prototype
intact; `Map`/`Set` are walked through so nested refs resolve; opaque
leaves are `Blob`/`ArrayBuffer`/`TypedArray`/`Date`/`RegExp`/`Promise`.
- `resolveJobOutput` queue-boundary bridge (`cache/resolveJobOutput.ts`)
accepting either a `CacheRefResolver` function or any object exposing
`getOutputByRef` (`TaskOutputRepository` shape).
- `IRunConfig.referenceThresholdBytes` (default 64 KiB; `0` forces ref
for every binary output).
- `TaskOutputRepository.saveOutputStream` now returns `Promise<CacheRef>`;
new `getOutputByRef` / `getOutputStreamByRef` readers complete the
contract.
- `CacheCoordinator.getBinaryRefSinksByPolicy` derives a per-port
`BinaryRefSink` map; `hydrateRefsBelowThreshold` rehydrates refs whose
committed size falls below the configured threshold (schema-restricted
to binary streaming ports so legitimate `{\$ref: string}` fields in
non-binary slots are not mistakenly hit against the cache).
- `StreamProcessor` routes `binary-delta` chunks to a `BinaryRefSink`
via a small `BinaryStreamRouter` producer-consumer pump.
- `TaskRunner` reads the threshold, builds sinks, threads them through
`StreamProcessor`, and rehydrates below-threshold refs in the post-run
pass — saveByPolicy then writes the small ref-bearing Output.
- StreamProcessor TEES when both an accumulator and a router exist for
a port (graph context where the cache can stream AND a downstream
edge needs materialized bytes): the emitted finish event carries the
materialized Blob/ArrayBuffer for edge consumers; `finalOutput`
carries the CacheRef so the queue/cache row stays small.
- `RunPrivateCacheRepo` forwards all three new optional methods,
mirroring the backing's true capability on the wrapper instance
(assigning `undefined` when the backing lacks them) so callers
probing `typeof === "function"` see the truth.
Tests cover binary-delta accumulation + explicit-finish-payload
precedence, port helpers, cache decision + assembly, runner pipe + force-ref +
threshold rehydrate, tee for the graph + materializing-consumer case,
saved-row size + cross-process serialization round-trip + dangling-ref
best-effort, and the walker / `resolveOutput` / `resolveJobOutput`
surface (class instances, Map/Set, sparse-ref filter, concurrency bound,
identity preservation).
Adds a same-process channel so a holder of a `JobHandle` can subscribe
to a running job's stream events (text deltas, object deltas,
binary-delta chunks, snapshot, finish, error, phase) instead of only
the terminal result.
Worker side
-----------
- `IJobExecuteContext` gains an optional `emitStreamEvent(event)` method.
- `JobQueueWorker` plumbs a per-job event emitter through into the
execute context so a run-fn can call `ctx.emitStreamEvent(...)` to
publish stream chunks as they're produced.
Server side
-----------
- `JobQueueServer.forwardToClients("handleJobStream", jobId, event)`
fans the event to every attached client by direct method invocation —
pure in-memory, no `postMessage`, no serialization, no worker thread.
The channel is intentionally same-process only; storage-backed cross-
process clients see state transitions through `subscribeToChanges`
but receive no incremental stream events.
Client side
-----------
- `JobHandle.onStream(callback)` is exposed only when the client is
server-attached (`this.server` set); callers branch on
`typeof handle.onStream === "function"`.
- Each listener invocation is wrapped in try/catch so one throwing
subscriber does not abort delivery to the rest or break the dispatch.
Tests
-----
- `JobQueueStream.test.ts` proves end-to-end same-process delivery: a
worker's `emitStreamEvent` calls reach every `JobHandle.onStream`
listener in order.
- `JobQueueStreamWorker.integration.test.ts` (+ its `.fixture.mjs`)
validates the underlying Node `worker_threads` transfer mechanism
the design relies on for any future cross-thread queue host: binary
chunks emitted from a worker thread transfer (not copy) to the host
per `WorkerServerBase.extractTransferables`. The docblock spells
out that this is a Node-primitive validation, NOT a test of the
current package's behavior — today's queue channel is entirely
same-process and the test exists as a navigational marker for a
future hosted-in-thread variant.
…ly collisions
`CacheRef` was discriminated by shape alone: any object with `{ $ref: string }`
satisfied `isCacheRef`, including JSON-Schema `$ref` pointers embedded in
metadata. The cache-ref resolver walks task outputs and calls
`getOutputByRef(ref)` on every match — so any code path that surfaces an
attacker-influenced `{$ref: "cache://OTHER_RUN/secret"}` shape (e.g. a tool
result, an AI structured-output field, a parsed-JSON document) could trick
`resolveJobOutput` / `resolveOutput` into reading bytes from another run or
tenant's private cache slot.
This patch adds a literal `kind: "task-graph/CacheRef"` brand discriminator
that:
- survives JSON serialization across queue rows / IPC (Symbol-based brands
would be erased by `JSON.stringify` and break cross-process resolution);
- is checked by `isCacheRef` alongside the `$ref` string;
- is applied uniformly by a new `makeCacheRef(...)` helper that callers
use to construct refs.
`CacheCoordinator.getBinaryRefSinksByPolicy` and
`RunPrivateCacheRepo.saveOutputStream` now defensively re-wrap the value
returned by legacy backings (`isCacheRef(raw) ? raw : makeCacheRef(raw)`),
so a backing that pre-dates the brand still produces a discriminator-bearing
ref when seen through the framework. In-tree test repositories and callers
are updated to use `makeCacheRef`.
Test coverage:
- `CacheRef.test.ts` now expects shape-only `{$ref: string}` to be rejected
and exercises JSON round-trip preserving the brand.
- `resolveOutput.test.ts` adds a case where a JSON-Schema-shaped
`{schema: {$ref: "#/\$defs/Foo"}}` is left untouched and the resolver is
never called (identity preserved).
- `resolveJobOutput.test.ts` adds the cross-tenant attack case: an
attacker-supplied `{note: {\$ref: "cache://OTHER_RUN/secret"}}` never
invokes `getOutputByRef`.
…b"|"binary"
`materializeBinary` previously accepted any string and silently coerced
unknown values (including casing typos like `"Blob"`) to the ArrayBuffer
branch. A task author writing `format: "Blob"` would unknowingly produce an
ArrayBuffer where every downstream consumer expected a Blob — and the
mismatch only surfaced at the consumer (often as a misleading runtime
error during streaming, or worse, silent data corruption when the consumer
duck-typed both shapes).
This patch establishes a canonical `BinaryFormat = "blob" | "binary"` type
and routes every binary-port consumer through a single
`assertBinaryFormat(schema, port)` helper:
- `undefined` and `"blob"` resolve to `"blob"` (the documented default);
- `"binary"` resolves to `"binary"`;
- anything else throws with the allowed vocabulary in the message.
`materializeBinary` now takes the canonical `BinaryFormat` directly and
`StreamProcessor` / `CacheCoordinator.hydrateRefsBelowThreshold` both call
`assertBinaryFormat` before invoking it.
`TaskRegistry.registerTask` runs the same check at registration time over
every output port with `x-stream: "binary"`, so the typo fails near the
task definition rather than during a streaming run. The task is not added
to the registry when the check fails.
Test coverage:
- `StreamBinaryTypes.test.ts` replaces the now-removed "unknown format =
binary" behavior with `assertBinaryFormat` cases for `"blob"`,
`"binary"`, undefined-default, the casing typo `"Blob"`, and an unknown
value (`"wat"`).
- `TaskRegistry.test.ts` adds cases asserting registration throws on a
binary port with `format: "Blob"`, and succeeds on `"blob"` /
`"binary"`.
- `Spec2QueueRowAndRehydrate.test.ts` adds symmetric rehydration cases:
`format: "blob"` rehydrates into a `Blob`, `format: "binary"` into an
`ArrayBuffer`.
…efault 8 MiB)
The streaming binary router buffered chunks without bound. A fast producer
(e.g. an AI image / audio generator yielding 1 MiB chunks) feeding a slow
sink (remote object store, throttled FS) would let the producer race ahead
and accumulate the entire payload in memory before the sink saw the first
chunk — turning a notionally O(1) streaming path into peak-residency O(N).
The old comment even acknowledged the issue ("backpressure: there is none")
and offloaded the problem onto the sink author.
This patch:
- Introduces `DEFAULT_BINARY_HIGH_WATER_BYTES = 8 MiB` in `StreamTypes.ts`.
- `BinaryStreamRouter` now tracks `bufferedBytes` (sum of un-consumed
chunk sizes). `push(chunk)` returns a Promise that resolves
immediately while `bufferedBytes < highWaterMarkBytes`, and parks the
producer until the consumer drains under the mark otherwise. `end()`
and `fail()` BOTH release any parked producer so an abort mid-park
does not leak the Promise.
- `StreamProcessor` `await router.push(...)` on every `binary-delta`
yield, so the byte-bounded backpressure applies for tasks running
through the standard streaming path.
- `IRunConfig.binaryHighWaterBytes` lets callers override per-run.
Threaded through `TaskRunner` → `StreamProcessor.run` deps.
- `IExecuteContext.binaryBackpressure?: () => Promise<void>` is a
cooperative hook for tasks that emit via a side channel and cannot
use the awaited `push` path; the StreamProcessor and StreamPump
install router-aware implementations, and an absent runtime supplies
a no-op (free for tasks that don't call it).
- `StreamPump.pipeBinaryToCache` (the EventEmitter path used for the
cache-ingest tee) gets the same byte-counted queue and returns a
`backpressure()` function alongside `promise` / `detach`.
Test coverage in `StreamingBackpressure.test.ts` adds a "binary
backpressure" describe block:
- 100 × 1 MiB through a slow (50 ms / chunk) sink with a 4 MiB
high-water mark: peak buffer stays at or below `mark + 1 chunk`
and every byte is delivered.
- End-to-end: 100 MiB through `StreamProcessor.run` with the same
high-water mark, asserting full delivery without drops.
- Abort-while-parked: a producer parked at the high-water mark sees
its `push()` Promise settle within 100 ms of `r.end()`.
Adds supportsStreamingReads() to TaskOutputRepository (mirrored by RunPrivateCacheRepo), plus streamRefViaBacking/byteIterableFromBlob and the RefStreamBacking shape in resolveRef. Extracts the shared in-memory streaming repo test double into packages/test bindings.
…om cache Streams a completed job's binary output out of the cache backing by port or single-ref discovery, adapting inline Blob/ArrayBuffer/Uint8Array values. makeJobOutputStreamResolver produces the injectable resolver shape for job-queue (which cannot depend on task-graph).
…it replay StreamPump.anyConsumerAcceptsBinaryStream inspects outgoing edges for binary-to-binary stream pass-through; the graph runner threads the result into each task run as IRunConfig.hasStreamingConsumers.
On a cache hit whose binary ports hold CacheRefs, CacheCoordinator now mirrors the fresh-run event contract: cached bytes replay as chunked binary-delta events for stream-capable consumers and hydrate into the enriched finish event for materializing consumers, while the returned output keeps the ref. Dangling refs convert the hit into a miss so the task re-executes and rewrites the entry. Consumer needs are graph-computed (anyConsumerNeedsMaterialized / anyConsumerAcceptsBinaryStream) and threaded through IRunConfig.
Branded CacheRefs reaching a task's resolved inputs are resolved against the run's cache registry (private first, then deterministic) and inlined per the port's format annotation before validation and cache-key computation. Binary-streaming ports with a live input stream are skipped; unresolvable refs fail the task with a named-port error.
…decar files First production streaming-capable output cache (node/bun, exported via a new common-server entry): JSON rows through FsFolderTabularStorage, binary payloads as sidecar blob files written incrementally and published by atomic rename. Deterministic blob naming from (taskType, input fingerprint) overwrites instead of leaking; refs from foreign or path-traversal shaped $refs never resolve. Includes a generic stream-out contract suite run against both the in-memory and FS repositories, and an end-to-end cache-hit replay through the FS backing.
…inary results JobQueueClientOptions accepts an injected outputStreamResolver (built via task-graph's makeJobOutputStreamResolver — the dependency edge points the other way, so the resolver is structural). When configured, handles expose outputStream(port?) which awaits completion and streams the binary result out of the output cache without materializing it.
…tory Review findings: prefix-scoped row deletions (RunPrivateCacheRepo.clearRun, CacheJanitor sweeps) now cascade to blob sidecar files instead of leaking them; blob names fingerprint the raw taskType so lossy sanitization cannot make two task types share a blob file; a failed write or rename removes its .tmp instead of stranding it. Also clears the shared test repo between job-queue outputStream tests.
…am listeners on abort/error pipeBinaryToCache had no production callers — StreamProcessor's BinaryStreamRouter owns live byte delivery to the cache sink — so the duplicate queue/backpressure implementation and its test scaffolding are gone. createStreamFromTaskEvents now also terminates on the task's abort/error events (which never emit stream_end), closing the edge stream and detaching listeners instead of leaking them and leaving downstream readers waiting forever.
Coverage Report
File CoverageNo changed files found. |
|
@cursoragent /review |
|
You need to increase your spend limit or enable usage-based billing to run background agents. Go to Cursor |
…ngle-binary-port streaming
Review findings: (1) saveByPolicy now runs before below-threshold
rehydration so JSON-row backings persist the serializable CacheRef
envelope instead of an inline Blob that stringifies to {} — and cache
hits apply the same hydration before returning, so small outputs come
back inline on both paths. (2) canStreamBinaryToCache and
getBinaryRefSinksByPolicy both require exactly one binary streaming
port; multi-port tasks fall back to accumulation instead of silently
dropping every port without a sink.
sroussey
left a comment
There was a problem hiding this comment.
Outside review — one HIGH follow-up before merge
Pulled this PR into a focused review (security / DX / correctness). The 337b688 fix is correct for the streaming-capable backings (the cache row carries the wire-form CacheRef; cache-hit applies the same below-threshold hydration as a fresh run, so semantics are symmetric). Brand discrimination on CacheRef, the FsFolder REF_PATTERN, and the schema-restricted hydration path together close the obvious cross-tenant / unscoped-resolve risks within a single trust boundary.
One HIGH item that survives the 337b688 fix — flagging here because it requires this branch's code to reproduce, and the fix is small enough to land in this PR rather than as a follow-up.
H-1 — Silent Blob → "{}" corruption in non-streaming cache backings
Where: packages/task-graph/src/task/CacheCoordinator.ts ~218-227 (the saveByPolicy → serializeOutputPorts path).
Symptom: When a cacheable task has an x-stream: "binary" output port AND the configured TaskOutputRepository does NOT implement saveOutputStream (TaskOutputTabularRepository directly → SQLite/Postgres/InMemory; IndexedDbTaskOutputRepository), the StreamProcessor accumulation path materializes binary deltas into a Blob. Then serializeOutputPorts looks up a PortCodec for format: "blob" / "binary". No such codec is registered (grep confirms only imageCacheCodec and this PR's own codecs), so the lookup returns undefined and the raw Blob is handed to JSON.stringify → "{}". Cache row is empty; next cache hit returns nothing useful.
The 337b688 fix only rescues backings that DO implement saveOutputStream (by turning the binary output into a CacheRef so the row carries the envelope, not the Blob). Backings that don't stream still hit the corruption path.
Suggested fix — default PortCodec for blob / binary (Option A)
Preserves user-facing contract (existing non-streaming consumers keep working), matches the precedent set by imageCacheCodec.
New file: packages/task-graph/src/cache/BinaryPortCodec.ts
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/
import { registerPortCodec } from "@workglow/util";
export interface BinaryPortWire {
readonly __binaryPortWire: 1;
readonly base64: string;
readonly size: number;
readonly mime: string | undefined;
}
function isBinaryPortWire(v: unknown): v is BinaryPortWire {
if (v === null || typeof v !== "object") return false;
const o = v as Record<string, unknown>;
return o.__binaryPortWire === 1 && typeof o.base64 === "string" && typeof o.size === "number";
}
function bytesToBase64(bytes: Uint8Array): string {
if (typeof Buffer !== "undefined") {
return Buffer.from(bytes.buffer, bytes.byteOffset, bytes.byteLength).toString("base64");
}
let bin = "";
for (let i = 0; i < bytes.length; i++) bin += String.fromCharCode(bytes[i] ?? 0);
return btoa(bin);
}
function base64ToBytes(b64: string): Uint8Array {
if (typeof Buffer !== "undefined") {
const buf = Buffer.from(b64, "base64");
return new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
}
const bin = atob(b64);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
return out;
}
async function serializeBinary(value: unknown): Promise<BinaryPortWire> {
if (value instanceof Blob) {
const bytes = new Uint8Array(await value.arrayBuffer());
return {
__binaryPortWire: 1,
base64: bytesToBase64(bytes),
size: bytes.byteLength,
mime: value.type === "" ? undefined : value.type,
};
}
if (value instanceof ArrayBuffer) {
const bytes = new Uint8Array(value);
return { __binaryPortWire: 1, base64: bytesToBase64(bytes), size: bytes.byteLength, mime: undefined };
}
throw new Error("BinaryPortCodec.serialize: value is not a Blob or ArrayBuffer");
}
registerPortCodec<Blob | ArrayBuffer, BinaryPortWire>("blob", {
async serialize(value) { return serializeBinary(value); },
async deserialize(wire): Promise<Blob> {
if (!isBinaryPortWire(wire)) throw new Error("BinaryPortCodec(blob).deserialize: input is not a BinaryPortWire");
const bytes = base64ToBytes(wire.base64);
return new Blob([bytes.buffer as ArrayBuffer], wire.mime ? { type: wire.mime } : undefined);
},
});
registerPortCodec<Blob | ArrayBuffer, BinaryPortWire>("binary", {
async serialize(value) { return serializeBinary(value); },
async deserialize(wire): Promise<ArrayBuffer> {
if (!isBinaryPortWire(wire)) throw new Error("BinaryPortCodec(binary).deserialize: input is not a BinaryPortWire");
const bytes = base64ToBytes(wire.base64);
return bytes.buffer.slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength);
},
});Wire registration: packages/task-graph/src/common.ts — side-effect import next to imageCacheCodec's:
import "./cache/BinaryPortCodec";No change needed to CacheCoordinator.ts — the existing getPortCodec(prop.format) lookup will now hit the new codecs.
Tests (suggested) — packages/test/src/test/task-graph/CacheSerialization.test.ts
- Blob round-trip via non-streaming backing. Task with
format: "blob"port returnsnew Blob([new Uint8Array([1,2,3,4])], { type: "application/octet-stream" }). Run againstSpyRepo. AssertJSON.stringify(repo.saved[0])is NOT'{"bytes":{}}'ANDrepo.saved[0].bytes.__binaryPortWire === 1. Re-run → cache hit → assert rehydratedBlobbytes deep-equal[1,2,3,4]andblob.type === "application/octet-stream". - ArrayBuffer round-trip (
format: "binary"). Same shape; rehydrated value isArrayBuffer(notBlob); bytes match. - Streaming backing still produces a
CacheRef.SpyRepowithsaveOutputStream; assertrepo.saved[0].bytesmatchesisCacheRef(...)and is NOT aBinaryPortWire— confirms codec doesn't interfere with the ref path.
Notes
- Memory: base64 is ~1.33× the binary size + UTF-16 spike inside
JSON.stringify. The PR'sIRunConfig.referenceThresholdBytesonly governs ref-vs-inline on streaming-capable backings; non-streaming backings have no equivalent guard. Mitigation is out of scope here, but the codec docstring could call this out (small payloads only — use a streaming-capable backing for large blobs). - Cross-runtime:
Blobis available in Node 18+, Bun, and all browsers;Bufferis gated behindtypeof Buffer !== "undefined". Same gating asimageCacheCodec. - Registration ordering: last-writer-wins, same contract as
imageCacheCodec.
Happy to send a follow-up PR against this branch if it's easier than folding it in here — let me know.
Other findings (not blockers; included as a heads-up — see below)
MEDIUM / LOW findings from the review
M-1 — resolveJobOutputStream(handle, backing) without explicit port resolves any branded CacheRef reachable in the output — cache/resolveJobOutput.ts:64-83. collectCacheRefs walks every property including user-controlled fields; a hand-crafted {kind: "task-graph/CacheRef", $ref: "..."} embedded in metadata would stream through the backing. The same protection hydrateRefsBelowThreshold applies (schema-binary-port allow-list) is missing here. Either pass the task's output schema and filter to declared binary ports, or document that callers must pass port when the producer is not fully trusted.
M-2 — BinaryStreamRouter.wakeDrain releases ALL chained drain waiters at once — task/StreamProcessor.ts:444-456, 520-528. Even when consuming a single chunk only freed a small fraction of the buffer. Producers re-park immediately if still over the mark, but the thundering herd briefly pushes bufferedBytes above highWaterMarkBytes by N chained chunk-pushes before the next park. Wake one waiter at a time, or have each waiter re-check the mark before resolving its outer Promise.
M-3 — resolveOutput walker doesn't preserve Symbol-keyed / non-enumerable properties or properly reconstitute class instances with private fields — cache/resolveRef.ts:179-195. Object.keys(source) is string-enumerable-only; Object.create(proto) skips the constructor, so a class Foo { #x = 1 } instance loses its private field. Real-world impact is small (task outputs are usually plain data) but the docstring claims "the returned clone preserves their prototype" without noting these caveats. Narrow the docstring, or fall back to leaf when proto !== Object.prototype && proto !== null.
M-4 — RunPrivateCacheRepo shadows prototype overrides with undefined — cache/RunPrivateCacheRepo.ts:53-60. A caller using repo.saveOutputStream?.(args) is fine; repo.saveOutputStream(args) without the check gets TypeError. Worth a one-line docstring note: "capability probing MUST use typeof === 'function' or ?.()".
M-5 — FsFolderTaskOutputRepository.clearOlderThan — storage/FsFolderTaskOutputRepository.ts:135-142. deleteBlobsByPrefix("", cutoff) deletes all blob files whose mtime is older than cutoff with no row-presence check; a pruned-row whose blob is newer (concurrent write) gets the row removed but the blob kept until the next sweep. Self-healing; worth a docstring note or row-presence check.
M-6 — replayBinaryRefs emits binary-delta events synchronously with no backpressure — task/CacheCoordinator.ts:200-213. Stream-reading from FsFolder is paced by createReadStream's default 64 KiB chunks, so memory is bounded for that backing — but a future getOutputStreamByRef implementation that yields the entire payload as one chunk would blow up consumer memory. Document the implementation requirement, or pass the consumer's high-water mark through.
L-1 — CacheRef.size is "best-effort" but FsFolderTaskOutputRepository is the only first-party backing that populates it; third-party saveOutputStream implementations may silently bypass below-threshold inlining. Flag in saveOutputStream's docstring.
L-2 — resolveOutput's Promise.all parallel walk doesn't short-circuit on a thrown resolver error. Use Promise.allSettled + first-error-throw, or accept it.
L-3 — RunPrivateCacheRepo.fallbackWarned is process-static; tests exercising the fallback twice get one warning total. _resetWarnedForTests would help isolation.
L-5 — BinaryStreamRouter private getters _bufferedBytes / _highWaterMarkBytes are exposed as test hooks via public get. Collapse to a single _getMetrics() to avoid leaking internal state into the public type.
Overall this is a large, carefully-architected slice with thorough test coverage. The brand discrimination on CacheRef, the FsFolder REF_PATTERN, and the schema-restricted hydration close the obvious risks. H-1 above is the one item I'd hold the merge for; everything else is non-blocking.
https://claude.ai/code/session_017TXwbp2GD6jvXrELz6msZW
Generated by Claude Code
…ackings
Review follow-up (H-1): a cacheable task with a binary output port on a
NON-streaming backing accumulates an inline Blob/ArrayBuffer that
JSON.stringify silently turns into {} in the row. New BinaryPortCodec
registers default codecs for format blob/binary that encode inline
bytes as a base64 BinaryPortWire envelope and decode back to the
port's declared type. CacheRefs and unknown shapes pass through
unchanged in both directions so streaming-backed rows keep their ref
envelopes verbatim. Also adds the review's docstring hardening notes:
explicit-port guidance for portless resolveJobOutputStream, bounded
chunk requirement on getOutputStreamByRef, size population on
saveOutputStream refs, and capability-probing rules for
RunPrivateCacheRepo.
|
H-1 addressed in 1d2f730: default One deliberate deviation from the suggested codec: serialize/deserialize pass Tests: Blob and ArrayBuffer round-trips through a non-streaming JSON-row backing (cache hit returns original bytes + mime), and ref-path non-interference (streaming rows still store the branded ref, no wire envelope). The old "contrast" test asserting the raw-Blob bloat row now asserts the encoded envelope. Full task-graph+task suites: 1987 passed. Also landed the doc-hardening notes for M-1 (explicit Generated by Claude Code |
… subtrees The walker recursed into every reachable object without a visited set, so a self-referential output or a shared sub-tree stack-overflowed the resolver loop. Thread a `WeakSet<object>` through `walk`, `hasMatchingRef`, and `collectCacheRefs`; revisited objects short-circuit by reference rather than recursing. Cycles preserve their topology — refs inside the cycle are not rewritten on the back-edge.
Errors carry `message` / `stack` as own non-enumerable properties and `URL` exposes everything via prototype accessors. The generic `Object.keys()` clone in `walk` would have dropped that data while preserving the prototype, leaving a hollow shell. Add `Error` and `URL` to `isLeaf` (and the matching skip in `collectCacheRefs`) so they pass through by reference instead of being restructurally cloned.
…OutputRepository The atomic-rename pattern only guarantees a published name pointing at the right inode; it doesn't guarantee that the data has reached storage. On a crash between the rename and the OS flushing dirty pages, the published blob name can resolve to zero bytes — the very partial-blob scenario the rename was meant to avoid. Add `handle.sync()` before close so the data is durable when the rename announces it. Skip the directory fsync (cache semantics tolerate a renamed-but-unflushed-directory crash; it just forces a recompute).
… row commit fails A streaming binary save is a two-phase operation: the sink writes the blob (producing a CacheRef) and then the row commit points at it. If the row commit failed (or the process died between the two), the blob persisted on disk with nothing referencing it, and the row-driven cleanup paths would never find it. Add an optional `deleteOutputByRef` hook on `TaskOutputRepository`, implement it in `FsFolderTaskOutputRepository`, and expose a `CacheCoordinator.cleanupOrphanBlobsForBinaryPorts` helper that the runner calls on `saveByPolicy` failure to drop just-written blobs before re-throwing. Document that periodic `clearOlderThan` is still required to catch the hard-kill case that races the in-band cleanup.
…putRepository deterministic tier Blob names in the deterministic-cache path are `(sanitize(taskType), fingerprint(inputs))` with no tenant axis. Two tenants on a shared backing with identical inputs resolve to the same name — a blob-existence side channel for sensitive inputs. Document the single-tenant assumption on the class and the fingerprinting site, and point operators at the supported wrappers (per-tenant folder/prefix or `RunPrivateCacheRepo`) for the multi-tenant case. Behavior is unchanged.
|
Pushed 5 HIGH-priority follow-up fixes on top of
No departures from the suggested fixes; the one tweak worth flagging is in Generated by Claude Code |
The previous walker walked any object with prototype != Object.prototype
when isLeaf opted them in (e.g. Error, URL). Class instances whose data
lives on the prototype (accessors) or in private slots — Headers,
Request, Response, FormData, URLSearchParams, ReadableStream, and any
user-defined class — were still walked via Object.keys() and silently
cloned to empty objects.
Invert the policy: walk only plain objects (Object.prototype / null
prototype), Array, Map, and Set. Every class instance is opaque and
returned by reference. The cycle/short-circuit logic in hasMatchingRef
and walk now reach the plain-object branch only for plain objects, so
the prototype-preserving Object.create(proto) branch in walk collapses
to {}.
Mirror the same opaque-by-default policy in collectCacheRefs in
resolveJobOutput.ts so both walkers stop at the same boundary.
…s between rename and dir-metadata flush On ext4 `data=ordered` (and similar journaled filesystems) the rename of the temp blob to its published name is not durable until the parent directory's metadata is also flushed. A crash between the rename and that metadata flush can leave the published name visible but pointing at stale (zero-byte) content — the file handle was already fsync'd, but the directory entry change is lost. After the rename, open the blobs directory and call `sync()` on the handle, then close it. Run this best-effort: swallow `EPERM`, `EINVAL`, `ENOTSUP`, `EISDIR` from the dir-open for filesystems / platforms that reject opening a directory for fsync (the rename is still the durability boundary; on a recompute the cache simply re-runs the task). Add an integration test that exercises the happy path round-trip and a 16-way concurrent-write scenario to confirm the dir-sync does not break normal flow or serialize writes incorrectly. The unsupported-FS error codes can't be naturally produced on a Linux tmp dir, so they're covered by the swallow list in code review.
…rphan-cleanup races The blob name was `<sanitized-taskType>_<fingerprint>.bin`, derived entirely from `(taskType, inputs)`. Two runners executing the same task with the same inputs would both write to the same path. If runner A's row-commit failed and triggered the orphan-blob cleanup (`cleanupOrphanBlobsForBinaryPorts` -> `deleteOutputByRef`), A would unlink the blob that runner B's successfully-committed row was still pointing at — a silent data-loss race. Append a per-write `randomUUID()` suffix to the blob filename so each `saveOutputStream` invocation lands at a unique path. Concurrent writers no longer share a file, so a row-cleanup on one path can't touch the other writer's blob. The published `$ref` still carries the sanitized taskType prefix, so prefix-scoped pruning (`deleteByTaskTypePrefix` / `clearOlderThanWithTaskTypePrefix`) keeps cascading correctly. The existing REF_PATTERN regex already accepts both the new `<taskType>_<fingerprint>_<uuid>.bin` shape and the legacy `<taskType>_<fingerprint>.bin` shape, so old refs written by previous versions of this repository still resolve via `getOutputByRef`. Tests cover (1) two concurrent writers with identical inputs producing distinct readable refs, (2) the cleanup race — A's `deleteOutputByRef` leaves B's blob intact, and (3) backward compatibility with legacy un-suffixed blob filenames.
|
Three follow-up review fixes pushed on top of the current branch: 1. The previous walker only opted Inverted the policy in 2. The temp blob file handle was fsynced before rename, but the containing directory was not. On ext4 After the rename, 3. Blob name was Appended a per-write New tests cover: concurrent writers with identical inputs producing distinct refs, A's Verification: Generated by Claude Code |
|
Two HIGH findings from a fresh review pass on this PR — both narrowly scoped, both currently uncovered by tests. Patches and test sketches below. HIGH 1 —
|
| import { withJobErrorDiagnostics } from "./JobErrorDiagnostics"; | ||
| import { storageToClass } from "./JobStorageConverters"; | ||
| import type { StreamEventLike } from "./JobQueueEventListeners"; | ||
| import { classToStorage, storageToClass } from "./JobStorageConverters"; |
Spec 1 — binary-delta streaming framework
Adds a
binary-deltavariant toStreamEvent(analogous totext-delta/object-delta) plus anx-stream: "binary"annotation on output portschemas, so a task can
executeStreambyte chunks the same way it streamstext or structured objects. New port helpers (
getBinaryPortId,getBinaryPortFormat,getStreamingPorts), amaterializeBinaryassembler (Blob for
format: "blob"/absent, ArrayBuffer forformat: "binary"), and agetOutputStreamModeadopter let downstreamcode branch cleanly on binary mode without reaching for
any.StreamProcessor accumulates
binary-deltachunks per port and mergesthem into the enriched finish event so downstream dataflows see the
materialized payload (or, for explicit binary finish payloads, the
artifact wins per Spec 1's precedence rule).
StreamPump adds the graph-aware decision (
canStreamBinaryToCache,anyConsumerNeedsMaterialized) and thepipeBinaryToCacheassemblyhelper that turns a task's
binary-deltaevents into anAsyncIterableready to drive a streaming cache sink.
TaskOutputRepositorygains an optionalsaveOutputStreamsink sofile-backed (or other stream-capable) caches can ingest bytes without
materializing the full payload;
supportsStreaming()and theRunPrivateCacheRepowrapper forward the capability correctly.Spec 2 — result-as-reference
Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing
supports streaming, the runner pipes the binary bytes straight to the
cache and places a
CacheRefplaceholder inOutputat the port slot.Downstream
Outputconsumers (and the queue row) see a small envelope(
{ \$ref, size?, mime? }) instead of the full payload, while the byteslive in the cache for hydration on demand.
Pieces:
CacheReftype +isCacheReftype guard (cache/CacheRef.ts).resolveOutputwalker (cache/resolveRef.ts) — pure recursive walkerthat hydrates refs through a caller-supplied resolver. Identity is
preserved when no descendant matches the optional filter; class
instances (
Error,URL, custom classes) survive with prototypeintact;
Map/Setare walked through so nested refs resolve; opaqueleaves are
Blob/ArrayBuffer/TypedArray/Date/RegExp/Promise.resolveJobOutputqueue-boundary bridge (cache/resolveJobOutput.ts)accepting either a
CacheRefResolverfunction or any object exposinggetOutputByRef(TaskOutputRepositoryshape).IRunConfig.referenceThresholdBytes(default 64 KiB;0forces reffor every binary output).
TaskOutputRepository.saveOutputStreamnow returnsPromise<CacheRef>;new
getOutputByRef/getOutputStreamByRefreaders complete thecontract.
CacheCoordinator.getBinaryRefSinksByPolicyderives a per-portBinaryRefSinkmap;hydrateRefsBelowThresholdrehydrates refs whosecommitted size falls below the configured threshold (schema-restricted
to binary streaming ports so legitimate
{\$ref: string}fields innon-binary slots are not mistakenly hit against the cache).
StreamProcessorroutesbinary-deltachunks to aBinaryRefSinkvia a small
BinaryStreamRouterproducer-consumer pump.TaskRunnerreads the threshold, builds sinks, threads them throughStreamProcessor, and rehydrates below-threshold refs in the post-runpass — saveByPolicy then writes the small ref-bearing Output.
a port (graph context where the cache can stream AND a downstream
edge needs materialized bytes): the emitted finish event carries the
materialized Blob/ArrayBuffer for edge consumers;
finalOutputcarries the CacheRef so the queue/cache row stays small.
RunPrivateCacheRepoforwards all three new optional methods,mirroring the backing's true capability on the wrapper instance
(assigning
undefinedwhen the backing lacks them) so callersprobing
typeof === "function"see the truth.Tests cover binary-delta accumulation + explicit-finish-payload
precedence, port helpers, cache decision + assembly, runner pipe + force-ref +
threshold rehydrate, tee for the graph + materializing-consumer case,
saved-row size + cross-process serialization round-trip + dangling-ref
best-effort, and the walker /
resolveOutput/resolveJobOutputsurface (class instances, Map/Set, sparse-ref filter, concurrency bound,
identity preservation).