Skip to content

feat(task-graph): binary-streaming framework + result-as-reference#561

Open
sroussey wants to merge 27 commits into
mainfrom
claude/peaceful-lamport-0aa97d
Open

feat(task-graph): binary-streaming framework + result-as-reference#561
sroussey wants to merge 27 commits into
mainfrom
claude/peaceful-lamport-0aa97d

Conversation

@sroussey

Copy link
Copy Markdown
Collaborator

Spec 1 — binary-delta streaming framework

Adds a binary-delta variant to StreamEvent (analogous to text-delta /
object-delta) plus an x-stream: "binary" annotation on output port
schemas, so a task can executeStream byte chunks the same way it streams
text or structured objects. New port helpers (getBinaryPortId,
getBinaryPortFormat, getStreamingPorts), a materializeBinary
assembler (Blob for format: "blob"/absent, ArrayBuffer for
format: "binary"), and a getOutputStreamMode adopter let downstream
code branch cleanly on binary mode without reaching for any.

StreamProcessor accumulates binary-delta chunks per port and merges
them into the enriched finish event so downstream dataflows see the
materialized payload (or, for explicit binary finish payloads, the
artifact wins per Spec 1's precedence rule).

StreamPump adds the graph-aware decision (canStreamBinaryToCache,
anyConsumerNeedsMaterialized) and the pipeBinaryToCache assembly
helper that turns a task's binary-delta events into an AsyncIterable
ready to drive a streaming cache sink.

TaskOutputRepository gains an optional saveOutputStream sink so
file-backed (or other stream-capable) caches can ingest bytes without
materializing the full payload; supportsStreaming() and the
RunPrivateCacheRepo wrapper forward the capability correctly.

Spec 2 — result-as-reference

Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing
supports streaming, the runner pipes the binary bytes straight to the
cache and places a CacheRef placeholder in Output at the port slot.
Downstream Output consumers (and the queue row) see a small envelope
({ \$ref, size?, mime? }) instead of the full payload, while the bytes
live in the cache for hydration on demand.

Pieces:

  • CacheRef type + isCacheRef type guard (cache/CacheRef.ts).
  • resolveOutput walker (cache/resolveRef.ts) — pure recursive walker
    that hydrates refs through a caller-supplied resolver. Identity is
    preserved when no descendant matches the optional filter; class
    instances (Error, URL, custom classes) survive with prototype
    intact; Map/Set are walked through so nested refs resolve; opaque
    leaves are Blob/ArrayBuffer/TypedArray/Date/RegExp/Promise.
  • resolveJobOutput queue-boundary bridge (cache/resolveJobOutput.ts)
    accepting either a CacheRefResolver function or any object exposing
    getOutputByRef (TaskOutputRepository shape).
  • IRunConfig.referenceThresholdBytes (default 64 KiB; 0 forces ref
    for every binary output).
  • TaskOutputRepository.saveOutputStream now returns Promise<CacheRef>;
    new getOutputByRef / getOutputStreamByRef readers complete the
    contract.
  • CacheCoordinator.getBinaryRefSinksByPolicy derives a per-port
    BinaryRefSink map; hydrateRefsBelowThreshold rehydrates refs whose
    committed size falls below the configured threshold (schema-restricted
    to binary streaming ports so legitimate {\$ref: string} fields in
    non-binary slots are not mistakenly hit against the cache).
  • StreamProcessor routes binary-delta chunks to a BinaryRefSink
    via a small BinaryStreamRouter producer-consumer pump.
  • TaskRunner reads the threshold, builds sinks, threads them through
    StreamProcessor, and rehydrates below-threshold refs in the post-run
    pass — saveByPolicy then writes the small ref-bearing Output.
  • StreamProcessor TEES when both an accumulator and a router exist for
    a port (graph context where the cache can stream AND a downstream
    edge needs materialized bytes): the emitted finish event carries the
    materialized Blob/ArrayBuffer for edge consumers; finalOutput
    carries the CacheRef so the queue/cache row stays small.
  • RunPrivateCacheRepo forwards all three new optional methods,
    mirroring the backing's true capability on the wrapper instance
    (assigning undefined when the backing lacks them) so callers
    probing typeof === "function" see the truth.

Tests cover binary-delta accumulation + explicit-finish-payload
precedence, port helpers, cache decision + assembly, runner pipe + force-ref +
threshold rehydrate, tee for the graph + materializing-consumer case,
saved-row size + cross-process serialization round-trip + dangling-ref
best-effort, and the walker / resolveOutput / resolveJobOutput
surface (class instances, Map/Set, sparse-ref filter, concurrency bound,
identity preservation).

claude added 16 commits June 8, 2026 23:36
Spec 1 — binary-delta streaming framework
-----------------------------------------
Adds a `binary-delta` variant to `StreamEvent` (analogous to `text-delta` /
`object-delta`) plus an `x-stream: "binary"` annotation on output port
schemas, so a task can `executeStream` byte chunks the same way it streams
text or structured objects. New port helpers (`getBinaryPortId`,
`getBinaryPortFormat`, `getStreamingPorts`), a `materializeBinary`
assembler (Blob for `format: "blob"`/absent, ArrayBuffer for
`format: "binary"`), and a `getOutputStreamMode` adopter let downstream
code branch cleanly on binary mode without reaching for `any`.

StreamProcessor accumulates `binary-delta` chunks per port and merges
them into the enriched finish event so downstream dataflows see the
materialized payload (or, for explicit binary finish payloads, the
artifact wins per Spec 1's precedence rule).

StreamPump adds the graph-aware decision (`canStreamBinaryToCache`,
`anyConsumerNeedsMaterialized`) and the `pipeBinaryToCache` assembly
helper that turns a task's `binary-delta` events into an `AsyncIterable`
ready to drive a streaming cache sink.

`TaskOutputRepository` gains an optional `saveOutputStream` sink so
file-backed (or other stream-capable) caches can ingest bytes without
materializing the full payload; `supportsStreaming()` and the
`RunPrivateCacheRepo` wrapper forward the capability correctly.

Spec 2 — result-as-reference
----------------------------
Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing
supports streaming, the runner pipes the binary bytes straight to the
cache and places a `CacheRef` placeholder in `Output` at the port slot.
Downstream `Output` consumers (and the queue row) see a small envelope
(`{ \$ref, size?, mime? }`) instead of the full payload, while the bytes
live in the cache for hydration on demand.

Pieces:

- `CacheRef` type + `isCacheRef` type guard (`cache/CacheRef.ts`).
- `resolveOutput` walker (`cache/resolveRef.ts`) — pure recursive walker
  that hydrates refs through a caller-supplied resolver. Identity is
  preserved when no descendant matches the optional filter; class
  instances (`Error`, `URL`, custom classes) survive with prototype
  intact; `Map`/`Set` are walked through so nested refs resolve; opaque
  leaves are `Blob`/`ArrayBuffer`/`TypedArray`/`Date`/`RegExp`/`Promise`.
- `resolveJobOutput` queue-boundary bridge (`cache/resolveJobOutput.ts`)
  accepting either a `CacheRefResolver` function or any object exposing
  `getOutputByRef` (`TaskOutputRepository` shape).
- `IRunConfig.referenceThresholdBytes` (default 64 KiB; `0` forces ref
  for every binary output).
- `TaskOutputRepository.saveOutputStream` now returns `Promise<CacheRef>`;
  new `getOutputByRef` / `getOutputStreamByRef` readers complete the
  contract.
- `CacheCoordinator.getBinaryRefSinksByPolicy` derives a per-port
  `BinaryRefSink` map; `hydrateRefsBelowThreshold` rehydrates refs whose
  committed size falls below the configured threshold (schema-restricted
  to binary streaming ports so legitimate `{\$ref: string}` fields in
  non-binary slots are not mistakenly hit against the cache).
- `StreamProcessor` routes `binary-delta` chunks to a `BinaryRefSink`
  via a small `BinaryStreamRouter` producer-consumer pump.
- `TaskRunner` reads the threshold, builds sinks, threads them through
  `StreamProcessor`, and rehydrates below-threshold refs in the post-run
  pass — saveByPolicy then writes the small ref-bearing Output.
- StreamProcessor TEES when both an accumulator and a router exist for
  a port (graph context where the cache can stream AND a downstream
  edge needs materialized bytes): the emitted finish event carries the
  materialized Blob/ArrayBuffer for edge consumers; `finalOutput`
  carries the CacheRef so the queue/cache row stays small.
- `RunPrivateCacheRepo` forwards all three new optional methods,
  mirroring the backing's true capability on the wrapper instance
  (assigning `undefined` when the backing lacks them) so callers
  probing `typeof === "function"` see the truth.

Tests cover binary-delta accumulation + explicit-finish-payload
precedence, port helpers, cache decision + assembly, runner pipe + force-ref +
threshold rehydrate, tee for the graph + materializing-consumer case,
saved-row size + cross-process serialization round-trip + dangling-ref
best-effort, and the walker / `resolveOutput` / `resolveJobOutput`
surface (class instances, Map/Set, sparse-ref filter, concurrency bound,
identity preservation).
Adds a same-process channel so a holder of a `JobHandle` can subscribe
to a running job's stream events (text deltas, object deltas,
binary-delta chunks, snapshot, finish, error, phase) instead of only
the terminal result.

Worker side
-----------
- `IJobExecuteContext` gains an optional `emitStreamEvent(event)` method.
- `JobQueueWorker` plumbs a per-job event emitter through into the
  execute context so a run-fn can call `ctx.emitStreamEvent(...)` to
  publish stream chunks as they're produced.

Server side
-----------
- `JobQueueServer.forwardToClients("handleJobStream", jobId, event)`
  fans the event to every attached client by direct method invocation —
  pure in-memory, no `postMessage`, no serialization, no worker thread.
  The channel is intentionally same-process only; storage-backed cross-
  process clients see state transitions through `subscribeToChanges`
  but receive no incremental stream events.

Client side
-----------
- `JobHandle.onStream(callback)` is exposed only when the client is
  server-attached (`this.server` set); callers branch on
  `typeof handle.onStream === "function"`.
- Each listener invocation is wrapped in try/catch so one throwing
  subscriber does not abort delivery to the rest or break the dispatch.

Tests
-----
- `JobQueueStream.test.ts` proves end-to-end same-process delivery: a
  worker's `emitStreamEvent` calls reach every `JobHandle.onStream`
  listener in order.
- `JobQueueStreamWorker.integration.test.ts` (+ its `.fixture.mjs`)
  validates the underlying Node `worker_threads` transfer mechanism
  the design relies on for any future cross-thread queue host: binary
  chunks emitted from a worker thread transfer (not copy) to the host
  per `WorkerServerBase.extractTransferables`. The docblock spells
  out that this is a Node-primitive validation, NOT a test of the
  current package's behavior — today's queue channel is entirely
  same-process and the test exists as a navigational marker for a
  future hosted-in-thread variant.
…ly collisions

`CacheRef` was discriminated by shape alone: any object with `{ $ref: string }`
satisfied `isCacheRef`, including JSON-Schema `$ref` pointers embedded in
metadata. The cache-ref resolver walks task outputs and calls
`getOutputByRef(ref)` on every match — so any code path that surfaces an
attacker-influenced `{$ref: "cache://OTHER_RUN/secret"}` shape (e.g. a tool
result, an AI structured-output field, a parsed-JSON document) could trick
`resolveJobOutput` / `resolveOutput` into reading bytes from another run or
tenant's private cache slot.

This patch adds a literal `kind: "task-graph/CacheRef"` brand discriminator
that:
  - survives JSON serialization across queue rows / IPC (Symbol-based brands
    would be erased by `JSON.stringify` and break cross-process resolution);
  - is checked by `isCacheRef` alongside the `$ref` string;
  - is applied uniformly by a new `makeCacheRef(...)` helper that callers
    use to construct refs.

`CacheCoordinator.getBinaryRefSinksByPolicy` and
`RunPrivateCacheRepo.saveOutputStream` now defensively re-wrap the value
returned by legacy backings (`isCacheRef(raw) ? raw : makeCacheRef(raw)`),
so a backing that pre-dates the brand still produces a discriminator-bearing
ref when seen through the framework. In-tree test repositories and callers
are updated to use `makeCacheRef`.

Test coverage:
  - `CacheRef.test.ts` now expects shape-only `{$ref: string}` to be rejected
    and exercises JSON round-trip preserving the brand.
  - `resolveOutput.test.ts` adds a case where a JSON-Schema-shaped
    `{schema: {$ref: "#/\$defs/Foo"}}` is left untouched and the resolver is
    never called (identity preserved).
  - `resolveJobOutput.test.ts` adds the cross-tenant attack case: an
    attacker-supplied `{note: {\$ref: "cache://OTHER_RUN/secret"}}` never
    invokes `getOutputByRef`.
…b"|"binary"

`materializeBinary` previously accepted any string and silently coerced
unknown values (including casing typos like `"Blob"`) to the ArrayBuffer
branch. A task author writing `format: "Blob"` would unknowingly produce an
ArrayBuffer where every downstream consumer expected a Blob — and the
mismatch only surfaced at the consumer (often as a misleading runtime
error during streaming, or worse, silent data corruption when the consumer
duck-typed both shapes).

This patch establishes a canonical `BinaryFormat = "blob" | "binary"` type
and routes every binary-port consumer through a single
`assertBinaryFormat(schema, port)` helper:

  - `undefined` and `"blob"` resolve to `"blob"` (the documented default);
  - `"binary"` resolves to `"binary"`;
  - anything else throws with the allowed vocabulary in the message.

`materializeBinary` now takes the canonical `BinaryFormat` directly and
`StreamProcessor` / `CacheCoordinator.hydrateRefsBelowThreshold` both call
`assertBinaryFormat` before invoking it.

`TaskRegistry.registerTask` runs the same check at registration time over
every output port with `x-stream: "binary"`, so the typo fails near the
task definition rather than during a streaming run. The task is not added
to the registry when the check fails.

Test coverage:
  - `StreamBinaryTypes.test.ts` replaces the now-removed "unknown format =
    binary" behavior with `assertBinaryFormat` cases for `"blob"`,
    `"binary"`, undefined-default, the casing typo `"Blob"`, and an unknown
    value (`"wat"`).
  - `TaskRegistry.test.ts` adds cases asserting registration throws on a
    binary port with `format: "Blob"`, and succeeds on `"blob"` /
    `"binary"`.
  - `Spec2QueueRowAndRehydrate.test.ts` adds symmetric rehydration cases:
    `format: "blob"` rehydrates into a `Blob`, `format: "binary"` into an
    `ArrayBuffer`.
…efault 8 MiB)

The streaming binary router buffered chunks without bound. A fast producer
(e.g. an AI image / audio generator yielding 1 MiB chunks) feeding a slow
sink (remote object store, throttled FS) would let the producer race ahead
and accumulate the entire payload in memory before the sink saw the first
chunk — turning a notionally O(1) streaming path into peak-residency O(N).
The old comment even acknowledged the issue ("backpressure: there is none")
and offloaded the problem onto the sink author.

This patch:

  - Introduces `DEFAULT_BINARY_HIGH_WATER_BYTES = 8 MiB` in `StreamTypes.ts`.
  - `BinaryStreamRouter` now tracks `bufferedBytes` (sum of un-consumed
    chunk sizes). `push(chunk)` returns a Promise that resolves
    immediately while `bufferedBytes < highWaterMarkBytes`, and parks the
    producer until the consumer drains under the mark otherwise. `end()`
    and `fail()` BOTH release any parked producer so an abort mid-park
    does not leak the Promise.
  - `StreamProcessor` `await router.push(...)` on every `binary-delta`
    yield, so the byte-bounded backpressure applies for tasks running
    through the standard streaming path.
  - `IRunConfig.binaryHighWaterBytes` lets callers override per-run.
    Threaded through `TaskRunner` → `StreamProcessor.run` deps.
  - `IExecuteContext.binaryBackpressure?: () => Promise<void>` is a
    cooperative hook for tasks that emit via a side channel and cannot
    use the awaited `push` path; the StreamProcessor and StreamPump
    install router-aware implementations, and an absent runtime supplies
    a no-op (free for tasks that don't call it).
  - `StreamPump.pipeBinaryToCache` (the EventEmitter path used for the
    cache-ingest tee) gets the same byte-counted queue and returns a
    `backpressure()` function alongside `promise` / `detach`.

Test coverage in `StreamingBackpressure.test.ts` adds a "binary
backpressure" describe block:

  - 100 × 1 MiB through a slow (50 ms / chunk) sink with a 4 MiB
    high-water mark: peak buffer stays at or below `mark + 1 chunk`
    and every byte is delivered.
  - End-to-end: 100 MiB through `StreamProcessor.run` with the same
    high-water mark, asserting full delivery without drops.
  - Abort-while-parked: a producer parked at the high-water mark sees
    its `push()` Promise settle within 100 ms of `r.end()`.
Adds supportsStreamingReads() to TaskOutputRepository (mirrored by
RunPrivateCacheRepo), plus streamRefViaBacking/byteIterableFromBlob and the
RefStreamBacking shape in resolveRef. Extracts the shared in-memory
streaming repo test double into packages/test bindings.
…om cache

Streams a completed job's binary output out of the cache backing by port or
single-ref discovery, adapting inline Blob/ArrayBuffer/Uint8Array values.
makeJobOutputStreamResolver produces the injectable resolver shape for
job-queue (which cannot depend on task-graph).
…it replay

StreamPump.anyConsumerAcceptsBinaryStream inspects outgoing edges for
binary-to-binary stream pass-through; the graph runner threads the result
into each task run as IRunConfig.hasStreamingConsumers.
On a cache hit whose binary ports hold CacheRefs, CacheCoordinator now
mirrors the fresh-run event contract: cached bytes replay as chunked
binary-delta events for stream-capable consumers and hydrate into the
enriched finish event for materializing consumers, while the returned
output keeps the ref. Dangling refs convert the hit into a miss so the
task re-executes and rewrites the entry. Consumer needs are graph-computed
(anyConsumerNeedsMaterialized / anyConsumerAcceptsBinaryStream) and
threaded through IRunConfig.
Branded CacheRefs reaching a task's resolved inputs are resolved against
the run's cache registry (private first, then deterministic) and inlined
per the port's format annotation before validation and cache-key
computation. Binary-streaming ports with a live input stream are skipped;
unresolvable refs fail the task with a named-port error.
…decar files

First production streaming-capable output cache (node/bun, exported via a
new common-server entry): JSON rows through FsFolderTabularStorage, binary
payloads as sidecar blob files written incrementally and published by
atomic rename. Deterministic blob naming from (taskType, input
fingerprint) overwrites instead of leaking; refs from foreign or
path-traversal shaped $refs never resolve. Includes a generic stream-out
contract suite run against both the in-memory and FS repositories, and an
end-to-end cache-hit replay through the FS backing.
…inary results

JobQueueClientOptions accepts an injected outputStreamResolver (built via
task-graph's makeJobOutputStreamResolver — the dependency edge points the
other way, so the resolver is structural). When configured, handles expose
outputStream(port?) which awaits completion and streams the binary result
out of the output cache without materializing it.
…tory

Review findings: prefix-scoped row deletions (RunPrivateCacheRepo.clearRun,
CacheJanitor sweeps) now cascade to blob sidecar files instead of leaking
them; blob names fingerprint the raw taskType so lossy sanitization cannot
make two task types share a blob file; a failed write or rename removes its
.tmp instead of stranding it. Also clears the shared test repo between
job-queue outputStream tests.
…am listeners on abort/error

pipeBinaryToCache had no production callers — StreamProcessor's
BinaryStreamRouter owns live byte delivery to the cache sink — so the
duplicate queue/backpressure implementation and its test scaffolding are
gone. createStreamFromTaskEvents now also terminates on the task's
abort/error events (which never emit stream_end), closing the edge stream
and detaching listeners instead of leaking them and leaving downstream
readers waiting forever.
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown

Coverage Report

Status Category Percentage Covered / Total
🔵 Lines 62.52% 25544 / 40853
🔵 Statements 62.36% 26421 / 42363
🔵 Functions 63.5% 4822 / 7593
🔵 Branches 51.34% 12601 / 24543
File CoverageNo changed files found.
Generated in workflow #2569 for commit 7c67ab4 by the Vitest Coverage Report Action

@sroussey

Copy link
Copy Markdown
Collaborator Author

@cursoragent /review

@cursor

cursor Bot commented Jun 11, 2026

Copy link
Copy Markdown

You need to increase your spend limit or enable usage-based billing to run background agents. Go to Cursor

…ngle-binary-port streaming

Review findings: (1) saveByPolicy now runs before below-threshold
rehydration so JSON-row backings persist the serializable CacheRef
envelope instead of an inline Blob that stringifies to {} — and cache
hits apply the same hydration before returning, so small outputs come
back inline on both paths. (2) canStreamBinaryToCache and
getBinaryRefSinksByPolicy both require exactly one binary streaming
port; multi-port tasks fall back to accumulation instead of silently
dropping every port without a sink.

@sroussey sroussey left a comment

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside review — one HIGH follow-up before merge

Pulled this PR into a focused review (security / DX / correctness). The 337b688 fix is correct for the streaming-capable backings (the cache row carries the wire-form CacheRef; cache-hit applies the same below-threshold hydration as a fresh run, so semantics are symmetric). Brand discrimination on CacheRef, the FsFolder REF_PATTERN, and the schema-restricted hydration path together close the obvious cross-tenant / unscoped-resolve risks within a single trust boundary.

One HIGH item that survives the 337b688 fix — flagging here because it requires this branch's code to reproduce, and the fix is small enough to land in this PR rather than as a follow-up.

H-1 — Silent Blob"{}" corruption in non-streaming cache backings

Where: packages/task-graph/src/task/CacheCoordinator.ts ~218-227 (the saveByPolicyserializeOutputPorts path).

Symptom: When a cacheable task has an x-stream: "binary" output port AND the configured TaskOutputRepository does NOT implement saveOutputStream (TaskOutputTabularRepository directly → SQLite/Postgres/InMemory; IndexedDbTaskOutputRepository), the StreamProcessor accumulation path materializes binary deltas into a Blob. Then serializeOutputPorts looks up a PortCodec for format: "blob" / "binary". No such codec is registered (grep confirms only imageCacheCodec and this PR's own codecs), so the lookup returns undefined and the raw Blob is handed to JSON.stringify"{}". Cache row is empty; next cache hit returns nothing useful.

The 337b688 fix only rescues backings that DO implement saveOutputStream (by turning the binary output into a CacheRef so the row carries the envelope, not the Blob). Backings that don't stream still hit the corruption path.

Suggested fix — default PortCodec for blob / binary (Option A)

Preserves user-facing contract (existing non-streaming consumers keep working), matches the precedent set by imageCacheCodec.

New file: packages/task-graph/src/cache/BinaryPortCodec.ts

/**
 * @license
 * Copyright 2026 Steven Roussey <sroussey@gmail.com>
 * SPDX-License-Identifier: Apache-2.0
 */

import { registerPortCodec } from "@workglow/util";

export interface BinaryPortWire {
  readonly __binaryPortWire: 1;
  readonly base64: string;
  readonly size: number;
  readonly mime: string | undefined;
}

function isBinaryPortWire(v: unknown): v is BinaryPortWire {
  if (v === null || typeof v !== "object") return false;
  const o = v as Record<string, unknown>;
  return o.__binaryPortWire === 1 && typeof o.base64 === "string" && typeof o.size === "number";
}

function bytesToBase64(bytes: Uint8Array): string {
  if (typeof Buffer !== "undefined") {
    return Buffer.from(bytes.buffer, bytes.byteOffset, bytes.byteLength).toString("base64");
  }
  let bin = "";
  for (let i = 0; i < bytes.length; i++) bin += String.fromCharCode(bytes[i] ?? 0);
  return btoa(bin);
}

function base64ToBytes(b64: string): Uint8Array {
  if (typeof Buffer !== "undefined") {
    const buf = Buffer.from(b64, "base64");
    return new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
  }
  const bin = atob(b64);
  const out = new Uint8Array(bin.length);
  for (let i = 0; i < bin.length; i++) out[i] = bin.charCodeAt(i);
  return out;
}

async function serializeBinary(value: unknown): Promise<BinaryPortWire> {
  if (value instanceof Blob) {
    const bytes = new Uint8Array(await value.arrayBuffer());
    return {
      __binaryPortWire: 1,
      base64: bytesToBase64(bytes),
      size: bytes.byteLength,
      mime: value.type === "" ? undefined : value.type,
    };
  }
  if (value instanceof ArrayBuffer) {
    const bytes = new Uint8Array(value);
    return { __binaryPortWire: 1, base64: bytesToBase64(bytes), size: bytes.byteLength, mime: undefined };
  }
  throw new Error("BinaryPortCodec.serialize: value is not a Blob or ArrayBuffer");
}

registerPortCodec<Blob | ArrayBuffer, BinaryPortWire>("blob", {
  async serialize(value) { return serializeBinary(value); },
  async deserialize(wire): Promise<Blob> {
    if (!isBinaryPortWire(wire)) throw new Error("BinaryPortCodec(blob).deserialize: input is not a BinaryPortWire");
    const bytes = base64ToBytes(wire.base64);
    return new Blob([bytes.buffer as ArrayBuffer], wire.mime ? { type: wire.mime } : undefined);
  },
});

registerPortCodec<Blob | ArrayBuffer, BinaryPortWire>("binary", {
  async serialize(value) { return serializeBinary(value); },
  async deserialize(wire): Promise<ArrayBuffer> {
    if (!isBinaryPortWire(wire)) throw new Error("BinaryPortCodec(binary).deserialize: input is not a BinaryPortWire");
    const bytes = base64ToBytes(wire.base64);
    return bytes.buffer.slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength);
  },
});

Wire registration: packages/task-graph/src/common.ts — side-effect import next to imageCacheCodec's:

import "./cache/BinaryPortCodec";

No change needed to CacheCoordinator.ts — the existing getPortCodec(prop.format) lookup will now hit the new codecs.

Tests (suggested) — packages/test/src/test/task-graph/CacheSerialization.test.ts

  1. Blob round-trip via non-streaming backing. Task with format: "blob" port returns new Blob([new Uint8Array([1,2,3,4])], { type: "application/octet-stream" }). Run against SpyRepo. Assert JSON.stringify(repo.saved[0]) is NOT '{"bytes":{}}' AND repo.saved[0].bytes.__binaryPortWire === 1. Re-run → cache hit → assert rehydrated Blob bytes deep-equal [1,2,3,4] and blob.type === "application/octet-stream".
  2. ArrayBuffer round-trip (format: "binary"). Same shape; rehydrated value is ArrayBuffer (not Blob); bytes match.
  3. Streaming backing still produces a CacheRef. SpyRepo with saveOutputStream; assert repo.saved[0].bytes matches isCacheRef(...) and is NOT a BinaryPortWire — confirms codec doesn't interfere with the ref path.

Notes

  • Memory: base64 is ~1.33× the binary size + UTF-16 spike inside JSON.stringify. The PR's IRunConfig.referenceThresholdBytes only governs ref-vs-inline on streaming-capable backings; non-streaming backings have no equivalent guard. Mitigation is out of scope here, but the codec docstring could call this out (small payloads only — use a streaming-capable backing for large blobs).
  • Cross-runtime: Blob is available in Node 18+, Bun, and all browsers; Buffer is gated behind typeof Buffer !== "undefined". Same gating as imageCacheCodec.
  • Registration ordering: last-writer-wins, same contract as imageCacheCodec.

Happy to send a follow-up PR against this branch if it's easier than folding it in here — let me know.


Other findings (not blockers; included as a heads-up — see below)

MEDIUM / LOW findings from the review

M-1 — resolveJobOutputStream(handle, backing) without explicit port resolves any branded CacheRef reachable in the outputcache/resolveJobOutput.ts:64-83. collectCacheRefs walks every property including user-controlled fields; a hand-crafted {kind: "task-graph/CacheRef", $ref: "..."} embedded in metadata would stream through the backing. The same protection hydrateRefsBelowThreshold applies (schema-binary-port allow-list) is missing here. Either pass the task's output schema and filter to declared binary ports, or document that callers must pass port when the producer is not fully trusted.

M-2 — BinaryStreamRouter.wakeDrain releases ALL chained drain waiters at oncetask/StreamProcessor.ts:444-456, 520-528. Even when consuming a single chunk only freed a small fraction of the buffer. Producers re-park immediately if still over the mark, but the thundering herd briefly pushes bufferedBytes above highWaterMarkBytes by N chained chunk-pushes before the next park. Wake one waiter at a time, or have each waiter re-check the mark before resolving its outer Promise.

M-3 — resolveOutput walker doesn't preserve Symbol-keyed / non-enumerable properties or properly reconstitute class instances with private fieldscache/resolveRef.ts:179-195. Object.keys(source) is string-enumerable-only; Object.create(proto) skips the constructor, so a class Foo { #x = 1 } instance loses its private field. Real-world impact is small (task outputs are usually plain data) but the docstring claims "the returned clone preserves their prototype" without noting these caveats. Narrow the docstring, or fall back to leaf when proto !== Object.prototype && proto !== null.

M-4 — RunPrivateCacheRepo shadows prototype overrides with undefinedcache/RunPrivateCacheRepo.ts:53-60. A caller using repo.saveOutputStream?.(args) is fine; repo.saveOutputStream(args) without the check gets TypeError. Worth a one-line docstring note: "capability probing MUST use typeof === 'function' or ?.()".

M-5 — FsFolderTaskOutputRepository.clearOlderThanstorage/FsFolderTaskOutputRepository.ts:135-142. deleteBlobsByPrefix("", cutoff) deletes all blob files whose mtime is older than cutoff with no row-presence check; a pruned-row whose blob is newer (concurrent write) gets the row removed but the blob kept until the next sweep. Self-healing; worth a docstring note or row-presence check.

M-6 — replayBinaryRefs emits binary-delta events synchronously with no backpressuretask/CacheCoordinator.ts:200-213. Stream-reading from FsFolder is paced by createReadStream's default 64 KiB chunks, so memory is bounded for that backing — but a future getOutputStreamByRef implementation that yields the entire payload as one chunk would blow up consumer memory. Document the implementation requirement, or pass the consumer's high-water mark through.

L-1CacheRef.size is "best-effort" but FsFolderTaskOutputRepository is the only first-party backing that populates it; third-party saveOutputStream implementations may silently bypass below-threshold inlining. Flag in saveOutputStream's docstring.
L-2resolveOutput's Promise.all parallel walk doesn't short-circuit on a thrown resolver error. Use Promise.allSettled + first-error-throw, or accept it.
L-3RunPrivateCacheRepo.fallbackWarned is process-static; tests exercising the fallback twice get one warning total. _resetWarnedForTests would help isolation.
L-5BinaryStreamRouter private getters _bufferedBytes / _highWaterMarkBytes are exposed as test hooks via public get. Collapse to a single _getMetrics() to avoid leaking internal state into the public type.


Overall this is a large, carefully-architected slice with thorough test coverage. The brand discrimination on CacheRef, the FsFolder REF_PATTERN, and the schema-restricted hydration close the obvious risks. H-1 above is the one item I'd hold the merge for; everything else is non-blocking.

https://claude.ai/code/session_017TXwbp2GD6jvXrELz6msZW


Generated by Claude Code

…ackings

Review follow-up (H-1): a cacheable task with a binary output port on a
NON-streaming backing accumulates an inline Blob/ArrayBuffer that
JSON.stringify silently turns into {} in the row. New BinaryPortCodec
registers default codecs for format blob/binary that encode inline
bytes as a base64 BinaryPortWire envelope and decode back to the
port's declared type. CacheRefs and unknown shapes pass through
unchanged in both directions so streaming-backed rows keep their ref
envelopes verbatim. Also adds the review's docstring hardening notes:
explicit-port guidance for portless resolveJobOutputStream, bounded
chunk requirement on getOutputStreamByRef, size population on
saveOutputStream refs, and capability-probing rules for
RunPrivateCacheRepo.

Copy link
Copy Markdown
Collaborator Author

H-1 addressed in 1d2f730: default blob/binary port codecs (cache/BinaryPortCodec.ts, registered via side-effect import in the package entry) encode inline binary values as a base64 BinaryPortWire envelope so non-streaming JSON-row backings round-trip instead of persisting {}.

One deliberate deviation from the suggested codec: serialize/deserialize pass CacheRefs and unknown shapes through unchanged rather than throwing — serializeOutputPorts/deserializeOutputPorts apply the codec to every format: "blob" port, including ports whose value is the ref envelope on the streaming path, so a strict-throw codec would break ref rows (and the suggested test 3).

Tests: Blob and ArrayBuffer round-trips through a non-streaming JSON-row backing (cache hit returns original bytes + mime), and ref-path non-interference (streaming rows still store the branded ref, no wire envelope). The old "contrast" test asserting the raw-Blob bloat row now asserts the encoded envelope. Full task-graph+task suites: 1987 passed.

Also landed the doc-hardening notes for M-1 (explicit port for untrusted producers), M-4 (capability probing contract), M-6 (bounded chunk requirement on getOutputStreamByRef), and L-1 (populate size on refs). M-2/M-3/L-2/L-3/L-5 left as non-blockers per the review.


Generated by Claude Code

claude added 5 commits June 12, 2026 18:24
… subtrees

The walker recursed into every reachable object without a visited set, so a
self-referential output or a shared sub-tree stack-overflowed the resolver
loop. Thread a `WeakSet<object>` through `walk`, `hasMatchingRef`, and
`collectCacheRefs`; revisited objects short-circuit by reference rather than
recursing. Cycles preserve their topology — refs inside the cycle are not
rewritten on the back-edge.
Errors carry `message` / `stack` as own non-enumerable properties and `URL`
exposes everything via prototype accessors. The generic `Object.keys()` clone
in `walk` would have dropped that data while preserving the prototype, leaving
a hollow shell. Add `Error` and `URL` to `isLeaf` (and the matching skip in
`collectCacheRefs`) so they pass through by reference instead of being
restructurally cloned.
…OutputRepository

The atomic-rename pattern only guarantees a published name pointing at the
right inode; it doesn't guarantee that the data has reached storage. On a
crash between the rename and the OS flushing dirty pages, the published blob
name can resolve to zero bytes — the very partial-blob scenario the rename
was meant to avoid. Add `handle.sync()` before close so the data is durable
when the rename announces it. Skip the directory fsync (cache semantics
tolerate a renamed-but-unflushed-directory crash; it just forces a recompute).
… row commit fails

A streaming binary save is a two-phase operation: the sink writes the blob
(producing a CacheRef) and then the row commit points at it. If the row
commit failed (or the process died between the two), the blob persisted on
disk with nothing referencing it, and the row-driven cleanup paths would
never find it. Add an optional `deleteOutputByRef` hook on
`TaskOutputRepository`, implement it in `FsFolderTaskOutputRepository`, and
expose a `CacheCoordinator.cleanupOrphanBlobsForBinaryPorts` helper that the
runner calls on `saveByPolicy` failure to drop just-written blobs before
re-throwing. Document that periodic `clearOlderThan` is still required to
catch the hard-kill case that races the in-band cleanup.
…putRepository deterministic tier

Blob names in the deterministic-cache path are `(sanitize(taskType),
fingerprint(inputs))` with no tenant axis. Two tenants on a shared backing
with identical inputs resolve to the same name — a blob-existence side
channel for sensitive inputs. Document the single-tenant assumption on the
class and the fingerprinting site, and point operators at the supported
wrappers (per-tenant folder/prefix or `RunPrivateCacheRepo`) for the
multi-tenant case. Behavior is unchanged.

Copy link
Copy Markdown
Collaborator Author

Pushed 5 HIGH-priority follow-up fixes on top of 1d2f730. All packages/test/src/test/task-graph tests (875) pass on this branch; one unrelated pre-existing failure remains in JobQueueStreamWorker.integration.test.ts (Node worker_threads transfer semantics) that is identical with or without these commits.

  • b04a42dfix(task-graph): guard resolveOutput walker against cycles and shared subtrees. Threads a WeakSet<object> through walk, hasMatchingRef, and collectCacheRefs so a self-referential output or a shared sub-tree no longer stack-overflows the resolver. Cycles preserve their topology (the back-edge is returned by reference rather than rewritten).
  • a846500fix(task-graph): treat Error and URL as opaque leaves in ref walker. Error.message/stack are own non-enumerable, URL is all prototype accessors — the generic Object.keys() clone was silently dropping their data while keeping the prototype. Adds both to isLeaf (and the matching collectCacheRefs skip) so they pass through by reference.
  • 4fd581ffix(task-graph): fsync blob temp handle before rename in FsFolderTaskOutputRepository. Adds await handle.sync() before close in saveOutputStream so the atomic-rename pattern actually delivers the durability promise its JSDoc claims. Directory fsync intentionally skipped (cache semantics tolerate a rename loss; cost not justified).
  • 16bb667fix(task-graph): clean up orphan blobs when stream-write succeeds but row commit fails. Adds optional deleteOutputByRef on TaskOutputRepository, implements it in FsFolderTaskOutputRepository, and a CacheCoordinator.cleanupOrphanBlobsForBinaryPorts helper the runner invokes on saveByPolicy rejection to best-effort delete just-written blobs before re-throwing. clearOlderThan JSDoc updated to note that periodic janitor sweeps are still required to catch hard-kill cases that race the in-band cleanup. Covered by a new TaskRunnerRefPath test that injects a failing saveOutput and asserts the blob is dropped.
  • 0fca60dfix(task-graph): document single-tenant assumption of FsFolderTaskOutputRepository deterministic tier. Doc-only. Calls out that the deterministic blob naming (sanitize(taskType), fingerprint(inputs)) carries no tenant axis and shares blobs across tenants with identical inputs — an existence side channel. Points operators at per-tenant prefixes or RunPrivateCacheRepo for the multi-tenant case. Behavior unchanged.

No departures from the suggested fixes; the one tweak worth flagging is in b04a42dhasMatchingRef is called with a fresh WeakSet from inside walk (separate from walk's own visited set) so the pre-scan still terminates on cycles without forcing the walker to share state with the containment check.


Generated by Claude Code

The previous walker walked any object with prototype != Object.prototype
when isLeaf opted them in (e.g. Error, URL). Class instances whose data
lives on the prototype (accessors) or in private slots — Headers,
Request, Response, FormData, URLSearchParams, ReadableStream, and any
user-defined class — were still walked via Object.keys() and silently
cloned to empty objects.

Invert the policy: walk only plain objects (Object.prototype / null
prototype), Array, Map, and Set. Every class instance is opaque and
returned by reference. The cycle/short-circuit logic in hasMatchingRef
and walk now reach the plain-object branch only for plain objects, so
the prototype-preserving Object.create(proto) branch in walk collapses
to {}.

Mirror the same opaque-by-default policy in collectCacheRefs in
resolveJobOutput.ts so both walkers stop at the same boundary.
claude added 2 commits June 13, 2026 08:41
…s between rename and dir-metadata flush

On ext4 `data=ordered` (and similar journaled filesystems) the rename of
the temp blob to its published name is not durable until the parent
directory's metadata is also flushed. A crash between the rename and
that metadata flush can leave the published name visible but pointing
at stale (zero-byte) content — the file handle was already fsync'd, but
the directory entry change is lost.

After the rename, open the blobs directory and call `sync()` on the
handle, then close it. Run this best-effort: swallow `EPERM`, `EINVAL`,
`ENOTSUP`, `EISDIR` from the dir-open for filesystems / platforms that
reject opening a directory for fsync (the rename is still the
durability boundary; on a recompute the cache simply re-runs the task).

Add an integration test that exercises the happy path round-trip and a
16-way concurrent-write scenario to confirm the dir-sync does not break
normal flow or serialize writes incorrectly. The unsupported-FS error
codes can't be naturally produced on a Linux tmp dir, so they're
covered by the swallow list in code review.
…rphan-cleanup races

The blob name was `<sanitized-taskType>_<fingerprint>.bin`, derived
entirely from `(taskType, inputs)`. Two runners executing the same
task with the same inputs would both write to the same path. If
runner A's row-commit failed and triggered the orphan-blob cleanup
(`cleanupOrphanBlobsForBinaryPorts` -> `deleteOutputByRef`), A would
unlink the blob that runner B's successfully-committed row was still
pointing at — a silent data-loss race.

Append a per-write `randomUUID()` suffix to the blob filename so each
`saveOutputStream` invocation lands at a unique path. Concurrent writers
no longer share a file, so a row-cleanup on one path can't touch the
other writer's blob.

The published `$ref` still carries the sanitized taskType prefix, so
prefix-scoped pruning (`deleteByTaskTypePrefix` /
`clearOlderThanWithTaskTypePrefix`) keeps cascading correctly. The
existing REF_PATTERN regex already accepts both the new
`<taskType>_<fingerprint>_<uuid>.bin` shape and the legacy
`<taskType>_<fingerprint>.bin` shape, so old refs written by previous
versions of this repository still resolve via `getOutputByRef`.

Tests cover (1) two concurrent writers with identical inputs producing
distinct readable refs, (2) the cleanup race — A's `deleteOutputByRef`
leaves B's blob intact, and (3) backward compatibility with legacy
un-suffixed blob filenames.

Copy link
Copy Markdown
Collaborator Author

Three follow-up review fixes pushed on top of the current branch:

1. b4fe120 — Ref walker: opaque-by-default for class instances

The previous walker only opted Error and URL into the opaque set. Every other class instance (Headers, Request, Response, FormData, URLSearchParams, ReadableStream, user-defined classes with private fields, etc.) was still walked via Object.keys() and silently cloned to an empty object — data lives on the prototype or in private slots and Object.keys() cannot see it.

Inverted the policy in resolveRef.ts and mirrored it in resolveJobOutput.collectCacheRefs: walk only plain objects (prototype Object.prototype / null), Array, Map, Set. Every class instance is opaque and returned by reference. The prototype-preserving Object.create(proto) branch collapses now that only plain objects reach it. New tests cover Headers, Request, Response, FormData, URLSearchParams, Error, URL, and a user class with private fields.

2. 87d9f4e — Directory fsync after rename

The temp blob file handle was fsynced before rename, but the containing directory was not. On ext4 data=ordered (and similar journaled filesystems) a crash between the rename and the directory metadata flush can leave the published name pointing at zero bytes.

After the rename, open(blobsDir, "r")dir.sync()dir.close(), wrapped in try/catch that swallows EPERM, EINVAL, ENOTSUP, EISDIR for filesystems that reject opening a directory for fsync. Integration test covers the happy round-trip + 16-way concurrent writes; the unsupported-FS error codes can't naturally fire on a Linux tmp dir, so they're covered by the swallow list in code review.

3. 7c67ab4 — Per-write unique blob suffix (the CRITICAL race)

Blob name was <sanitized-taskType>_<fingerprint>.bin, derived entirely from (taskType, inputs). Two concurrent runners with the same (taskType, inputs) wrote to the same path. If runner A's row-commit failed and triggered the orphan cleanup (cleanupOrphanBlobsForBinaryPortsdeleteOutputByRef), A would unlink the blob that runner B's successfully-committed row still pointed at — silent data loss.

Appended a per-write randomUUID() suffix: <sanitized-taskType>_<fingerprint>_<uuid>.bin. Each saveOutputStream lands at a unique path; concurrent writers can't race on the same file. The sanitized taskType prefix is unchanged, so prefix-scoped pruning still cascades correctly. The existing REF_PATTERN regex already matches both the new and legacy shapes, so old refs written by previous versions continue to resolve.

New tests cover: concurrent writers with identical inputs producing distinct refs, A's deleteOutputByRef cleanup leaving B's blob intact, and backward compatibility with legacy un-suffixed blob names.

Verification: bun scripts/test.ts graph vitest → 91 files / 890 tests passing; bun run build:types clean.


Generated by Claude Code

Copy link
Copy Markdown
Collaborator Author

Two HIGH findings from a fresh review pass on this PR — both narrowly scoped, both currently uncovered by tests. Patches and test sketches below.

HIGH 1 — RunPrivateCacheRepo does not forward deleteOutputByRef

File: packages/task-graph/src/cache/RunPrivateCacheRepo.ts (constructor ~lines 42–67; missing method).

Failure mode: CacheCoordinator.cleanupOrphanBlobsForBinaryPorts (CacheCoordinator.ts:332) guards typeof cache.deleteOutputByRef !== "function" and silently returns when missing. RunPrivateCacheRepo overrides saveOutputStream / getOutputByRef / getOutputStreamByRef but never overrides deleteOutputByRef; the base class declares it optional. When a run-private cache writes a streamed blob successfully and the row-commit then fails (the exact case commit 16bb667 was added to fix), orphan cleanup becomes a silent no-op — the blob lingers on disk until the periodic janitor sweep.

Patch (sketch):

In the constructor, immediately after the getOutputStreamByRef capability-shadow block (around line 65):

if (typeof backing.deleteOutputByRef !== "function") {
  (this as { deleteOutputByRef?: unknown }).deleteOutputByRef = undefined;
}

New method, after getOutputStreamByRef (~line 154):

public override deleteOutputByRef(ref: CacheRef): Promise<void> {
  if (typeof this.backing.deleteOutputByRef !== "function") return Promise.resolve();
  return this.backing.deleteOutputByRef(ref);
}

Do not inspect the ref string for the run id — refs are opaque to the wrapper, and cross-run isolation is already structural via the __run:<runId>::<taskType> namespacing.

Tests to add (extend packages/test/src/test/task-graph-cache/RunPrivateCacheRepo.test.ts):

  • Backing supports stream: save → exists → deleteOutputByRef(ref) → blob gone, getOutputByRef(ref) returns undefined.
  • Regression pin: FsFolder backing wrapped in RunPrivateCacheRepo; monkey-patch saveOutput to throw after a successful streamed write; drive cleanupOrphanBlobsForBinaryPorts; assert the blob file no longer exists on disk. (This is the test that fails on the current PR head.)
  • Backing without deleteOutputByRef: assert typeof wrapper.deleteOutputByRef === "undefined".
  • Cross-run isolation: two wrappers run-A/run-B sharing one FsFolder backing; A's delete leaves B's blob intact.

HIGH 2 — BinaryStreamRouter swallows sink rejection → producer hangs forever

File: packages/task-graph/src/task/StreamProcessor.ts (constructor ~lines 430–440; push parks at high-water mark ~lines 448–464; fail ~lines 475–481).

Failure mode: The constructor invokes this.refPromise = sink(this.iterable()) then attaches this.refPromise.catch(() => {}), suppressing the unhandled rejection but NOT propagating it. If a sink rejects BEFORE iterating (e.g. FsFolderTaskOutputRepository.saveOutputStream's await open(tmpPath, "w") throws EACCES / ENOSPC, or RunPrivateCacheRepo.saveOutputStream throws because the backing's stream method is missing), the iterable's next() is never called, chunkNotify is never set, and the producer's await router.push(chunk) parks at the 8 MiB high-water mark forever. executeStream's top-level catch only runs after the for-await exits — but the loop is itself awaiting that very push. The task hangs until external abort.

Patch:

Replace the swallowed catch in the constructor with a real handler:

this.refPromise.catch((err: unknown) => {
  this.fail(err instanceof Error ? err : new Error(String(err)));
});

fail() (lines 475–481) already sets failure, flips finished = true, and calls wakeChunk() + wakeDrain(), so the parked producer wakes and subsequent pushes early-return at the existing if (this.finished) return Promise.resolve(); check. The attached .catch returns a discarded chain — the original this.refPromise rejection still surfaces through await router.ref(). No caller-side change needed.

Tests to add (extend packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts, each with vitest { timeout: 2000 }):

  • Synchronous sink rejection before iteration: sink that returns Promise.reject(new Error("EACCES")) without iterating. Drive a producer that pushes ~10 MiB as small chunks (above the 8 MiB high-water mark). Assert (i) loop completes — 2s timeout is the failure signal, (ii) await router.ref() rejects with "EACCES".
  • Asynchronous sink rejection after partial iteration: sink consumes 3 chunks then throws; assert producer eventually returns and await router.ref() rejects.
  • Happy-path regression: existing sink behavior unchanged.

Verify BinaryStreamRouter is exported from packages/task-graph/src/index.ts; if not, add the named export so tests can import it.


Happy to push these if it's preferable to land them here vs as a follow-up PR.


Generated by Claude Code

import { withJobErrorDiagnostics } from "./JobErrorDiagnostics";
import { storageToClass } from "./JobStorageConverters";
import type { StreamEventLike } from "./JobQueueEventListeners";
import { classToStorage, storageToClass } from "./JobStorageConverters";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants