Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e357456
feat(task-graph): binary-streaming framework + result-as-reference
claude Jun 4, 2026
509c80b
feat(job-queue): in-process stream observability via JobHandle.onStream
claude Jun 4, 2026
a1e2995
fix(task-graph): brand CacheRef with literal kind to prevent shape-on…
claude Jun 9, 2026
1c2fd0a
fix(task-graph): canonicalize binary stream format vocabulary to "blo…
claude Jun 9, 2026
9588cb1
fix(task-graph): byte-bounded backpressure in binary stream router (d…
claude Jun 9, 2026
9e94707
feat(task-graph): streaming read helpers for cache refs
claude Jun 10, 2026
48c30df
feat(task-graph): resolveJobOutputStream for streaming job results fr…
claude Jun 10, 2026
0bd182d
feat(task-graph): expose binary stream-consumer detection for cache-h…
claude Jun 10, 2026
81f2a18
feat(task-graph): cache-hit replay and hydration of binary cache refs
claude Jun 10, 2026
8760535
feat(task-graph): hydrate cache refs in task inputs before execute
claude Jun 10, 2026
393991d
feat(task-graph): FsFolderTaskOutputRepository with streaming blob si…
claude Jun 10, 2026
e685842
feat(job-queue): capability-gated JobHandle.outputStream for cached b…
claude Jun 10, 2026
9069cfe
docs(task-graph): document binary cache stream-out in EXECUTION_MODEL
claude Jun 10, 2026
df8528e
fix(task-graph): blob lifecycle hardening in FsFolderTaskOutputReposi…
claude Jun 11, 2026
616fa41
refactor(task-graph): remove dead pipeBinaryToCache; detach edge-stre…
claude Jun 11, 2026
19cad0a
Merge remote-tracking branch 'origin/main' into claude/peaceful-lampo…
claude Jun 11, 2026
337b688
fix(task-graph): cache rows store refs, not inline binary; enforce si…
claude Jun 11, 2026
1d2f730
fix(task-graph): default blob/binary port codecs for JSON-row cache b…
claude Jun 11, 2026
b04a42d
fix(task-graph): guard resolveOutput walker against cycles and shared…
claude Jun 12, 2026
a846500
fix(task-graph): treat Error and URL as opaque leaves in ref walker
claude Jun 12, 2026
4fd581f
fix(task-graph): fsync blob temp handle before rename in FsFolderTask…
claude Jun 12, 2026
16bb667
fix(task-graph): clean up orphan blobs when stream-write succeeds but…
claude Jun 12, 2026
0fca60d
fix(task-graph): document single-tenant assumption of FsFolderTaskOut…
claude Jun 12, 2026
b4fe120
fix(task-graph): treat all class instances as opaque in ref walker
claude Jun 13, 2026
87d9f4e
fix(task-graph): fsync blobs directory after rename to survive crashe…
claude Jun 13, 2026
7c67ab4
fix(task-graph): stamp FsFolder blobs with unique suffix to prevent o…
claude Jun 13, 2026
6e00067
Merge branch 'main' into claude/peaceful-lamport-0aa97d
sroussey Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/job-queue/src/job/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { JobStatus } from "../queue-storage/IQueueStorage";
import { JobError } from "./JobError";
import type { JobProgressListener } from "./JobQueueEventListeners";
import type { JobProgressListener, StreamEventLike } from "./JobQueueEventListeners";

export { JobStatus };

Expand All @@ -20,6 +20,12 @@ export interface IJobExecuteContext {
message?: string,
details?: Record<string, any> | null
) => Promise<void>;
/**
* OPTIONAL. Present only when the worker's transport can deliver stream
* events. Jobs MUST NOT retain references to chunk buffers after calling
* this (buffers may be transferred across a worker boundary and detached).
*/
emitStreamEvent?: (event: StreamEventLike) => void;
}

/**
Expand Down
104 changes: 103 additions & 1 deletion packages/job-queue/src/job/JobQueueClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
JobQueueEventListeners,
JobQueueEventParameters,
JobQueueEvents,
JobStreamListener,
type StreamEventLike,
} from "./JobQueueEventListeners";
import type { JobQueueServer } from "./JobQueueServer";
import { storageToClass } from "./JobStorageConverters";
Expand All @@ -38,15 +40,50 @@ export interface JobHandle<Output> {
waitFor(): Promise<Output>;
abort(): Promise<void>;
onProgress(callback: JobProgressListener): () => void;
/**
* OPTIONAL — present only when this handle's transport can deliver stream
* events (a same-process server-attached queue). Absent on storage-only
* backends; callers branch on `typeof handle.onStream === "function"`.
*/
onStream?(callback: JobStreamListener): () => void;
/**
* OPTIONAL — present only when the client was configured with an
* `outputStreamResolver` (an output cache backing reachable from this
* process). Awaits the job's completion, then streams the binary result
* back out of the output cache without materializing it. `port` selects the
* output port; omit it when the output holds exactly one cache ref.
* Resolves `undefined` when there is nothing binary to stream (or the
* cache entry was evicted).
*/
outputStream?(port?: string): Promise<AsyncIterable<Uint8Array> | undefined>;
}

/**
* Resolves a completed job's output value to a byte stream. Injected via
* {@link JobQueueClientOptions.outputStreamResolver} because the cache layer
* that understands output refs lives above this package in the dependency
* graph (`@workglow/task-graph` exports `makeJobOutputStreamResolver` to
* build one from a cache backing).
*/
export type JobOutputStreamResolver = (
output: unknown,
port?: string
) => Promise<AsyncIterable<Uint8Array> | undefined>;

/**
* Options for creating a JobQueueClient
*/
export interface JobQueueClientOptions<Input, Output> {
readonly messageQueue: IMessageQueue<JobStorageFormat<Input, Output>>;
readonly jobStore: IJobStore<Input, Output>;
readonly queueName: string;
/**
* OPTIONAL — enables `JobHandle.outputStream` on handles from this client.
* Deployments whose output cache backing is reachable from this process
* inject a resolver (see `makeJobOutputStreamResolver` in
* `@workglow/task-graph`); without it, handles omit the method.
*/
readonly outputStreamResolver?: JobOutputStreamResolver;
}

/**
Expand All @@ -61,6 +98,7 @@ export class JobQueueClient<Input, Output> {
protected readonly events = new EventEmitter<JobQueueEventListeners<Input, Output>>();
protected server: JobQueueServer<Input, Output> | null = null;
protected storageUnsubscribe: (() => void) | null = null;
protected readonly outputStreamResolver: JobOutputStreamResolver | undefined;

/**
* Map of job IDs to their pending promise resolvers
Expand All @@ -78,6 +116,11 @@ export class JobQueueClient<Input, Output> {
*/
protected readonly jobProgressListeners: Map<unknown, Set<JobProgressListener>> = new Map();

/**
* Map of job IDs to their stream listeners
*/
protected readonly jobStreamListeners: Map<unknown, Set<JobStreamListener>> = new Map();

/**
* Last known progress state for each job
*/
Expand All @@ -94,6 +137,7 @@ export class JobQueueClient<Input, Output> {
this.queueName = options.queueName;
this.messageQueue = options.messageQueue;
this.jobStore = options.jobStore;
this.outputStreamResolver = options.outputStreamResolver;
}

/**
Expand Down Expand Up @@ -391,6 +435,27 @@ export class JobQueueClient<Input, Output> {
};
}

/**
* Subscribe to stream events for a specific job
*/
public onJobStream(jobId: unknown, listener: JobStreamListener): () => void {
if (!this.jobStreamListeners.has(jobId)) {
this.jobStreamListeners.set(jobId, new Set());
}
const listeners = this.jobStreamListeners.get(jobId)!;
listeners.add(listener);

return () => {
const listeners = this.jobStreamListeners.get(jobId);
if (listeners) {
listeners.delete(listener);
if (listeners.size === 0) {
this.jobStreamListeners.delete(jobId);
}
}
};
}

// ========================================================================
// Event handling
// ========================================================================
Expand Down Expand Up @@ -524,23 +589,60 @@ export class JobQueueClient<Input, Output> {
}
}

/**
* Called by server when a job emits a stream event. Listener throws are
* isolated per-listener — one misbehaving subscriber does not interrupt
* delivery to the rest or abort the dispatch itself.
* @internal
*/
public handleJobStream(jobId: unknown, event: StreamEventLike): void {
this.events.emit("job_stream", this.queueName, jobId, event);

const listeners = this.jobStreamListeners.get(jobId);
if (!listeners) return;
for (const listener of listeners) {
try {
listener(event);
} catch (err) {
getLogger().error("JobHandle.onStream listener threw", {
jobId,
error: err instanceof Error ? err.message : String(err),
});
}
}
}

// ========================================================================
// Private helpers
// ========================================================================

private createJobHandle(id: unknown): JobHandle<Output> {
return {
const handle: JobHandle<Output> = {
id,
waitFor: () => this.waitFor(id),
abort: () => this.abort(id),
onProgress: (callback: JobProgressListener) => this.onJobProgress(id, callback),
};
// Stream delivery requires a same-process server-attached transport — the
// same signal `connect()` uses. Storage-only backends omit `onStream`, so
// callers branch on `typeof handle.onStream === "function"`.
if (this.server) {
handle.onStream = (callback: JobStreamListener) => this.onJobStream(id, callback);
}
// Streaming result reads require a cache backing reachable from this
// process; the injected resolver is the capability signal.
const resolver = this.outputStreamResolver;
if (resolver) {
handle.outputStream = async (port?: string) => resolver(await this.waitFor(id), port);
}
return handle;
}

private cleanupJob(jobId: unknown): void {
this.activeJobPromises.delete(jobId);
this.lastKnownProgress.delete(jobId);
this.jobProgressListeners.delete(jobId);
this.jobStreamListeners.delete(jobId);
}

private handleStorageChange(change: QueueChangePayload<Input, Output>): void {
Expand Down
14 changes: 14 additions & 0 deletions packages/job-queue/src/job/JobQueueEventListeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export type JobQueueEventListeners<Input, Output> = {
message: string,
details: Record<string, any> | null
) => void;
job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void;
};

export type JobQueueEvents = keyof JobQueueEventListeners<any, any>;
Expand All @@ -46,3 +47,16 @@ export type JobProgressListener = (
message: string,
details: Record<string, any> | null
) => void;

/**
* Minimal structural shape of a stream event crossing the job-queue boundary.
*
* `@workglow/job-queue` sits below `@workglow/task-graph` in the dependency
* graph, so it cannot import task-graph's `StreamEvent`. This structural type
* captures just what the queue plumbing needs; task-graph's `StreamEvent` is
* assignable to it, so real stream producers interoperate transparently.
*/
export type StreamEventLike = { type: string; port?: string; [k: string]: unknown };

/** Listener for cross-process stream events emitted by an executing job. */
export type JobStreamListener = (event: StreamEventLike) => void;
12 changes: 12 additions & 0 deletions packages/job-queue/src/job/JobQueueServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { JobStorageFormat, QueueChangePayload } from "../queue-storage/IQue
import { JobStatus } from "../queue-storage/IQueueStorage";
import type { DeadLetter } from "./DeadLetter";
import { Job, JobClass } from "./Job";
import type { StreamEventLike } from "./JobQueueEventListeners";
import { JobQueueClient } from "./JobQueueClient";
import { JobQueueWorker } from "./JobQueueWorker";
import { storageToClass } from "./JobStorageConverters";
Expand Down Expand Up @@ -49,6 +50,7 @@ export type JobQueueServerEventListeners<Input, Output> = {
message: string,
details: Record<string, unknown> | null
) => void;
job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void;
};

export type JobQueueServerEvents = keyof JobQueueServerEventListeners<unknown, unknown>;
Expand Down Expand Up @@ -479,6 +481,11 @@ export class JobQueueServer<
this.forwardToClients("handleJobProgress", jobId, progress, message, details);
});

worker.on("job_stream", (jobId, event) => {
this.events.emit("job_stream", this.queueName, jobId, event);
this.forwardToClients("handleJobStream", jobId, event);
});

return worker;
}

Expand All @@ -502,6 +509,11 @@ export class JobQueueServer<
message: string,
details: Record<string, unknown> | null
): void;
protected forwardToClients(
method: "handleJobStream",
jobId: unknown,
event: StreamEventLike
): void;
protected forwardToClients(method: string, ...args: unknown[]): void {
for (const client of this.clients) {
const fn = (client as any)[method];
Expand Down
16 changes: 15 additions & 1 deletion packages/job-queue/src/job/JobQueueWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import {
RetryableJobError,
} from "./JobError";
import { withJobErrorDiagnostics } from "./JobErrorDiagnostics";
import { storageToClass } from "./JobStorageConverters";
import type { StreamEventLike } from "./JobQueueEventListeners";
import { classToStorage, storageToClass } from "./JobStorageConverters";

/**
* Upper bound on {@link JobQueueWorker.getLimiterWakeDelay}. Prevents a
Expand All @@ -56,6 +57,7 @@ export type JobQueueWorkerEventListeners<Input, Output> = {
message: string,
details: Record<string, unknown> | null
) => void;
job_stream: (jobId: unknown, event: StreamEventLike) => void;
worker_start: () => void;
worker_stop: () => void;
};
Expand Down Expand Up @@ -812,6 +814,7 @@ export class JobQueueWorker<
return await job.execute(job.input, {
signal,
updateProgress: this.updateProgress.bind(this, job.id),
emitStreamEvent: (event) => this.emitStreamEvent(job.id, event),
});
}

Expand All @@ -833,6 +836,17 @@ export class JobQueueWorker<
this.events.emit("job_progress", jobId, progress, message, details);
}

/**
* Emit a cross-process stream event for a job.
*
* Mirrors {@link updateProgress}: stream events are delivered in-memory via
* the `job_stream` event and forwarded by an attached `JobQueueServer` to
* subscribed clients. Storage is not touched.
*/
protected emitStreamEvent(jobId: unknown, event: StreamEventLike): void {
this.events.emit("job_stream", jobId, event);
}

/** Internal — resolve the active claim for a job id, throw if missing. */
private getClaim(jobId: unknown): IClaim<JobStorageFormat<Input, Output>> | undefined {
return this.activeClaims.get(jobId);
Expand Down
20 changes: 20 additions & 0 deletions packages/task-graph/src/EXECUTION_MODEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ key = sha256(taskType + getCacheVersion() + fingerprint(inputs))

Failed tasks are never cached — only `Ok` results reach `saveOutput`. `saveOutput` is upsert by primary key (last writer wins) — the underlying `TaskOutputTabularRepository` calls `put()` on its tabular storage, so a same-key write replaces the existing row.

### Binary cache stream-out (refs on the read path)

Binary output ports whose bytes were piped into a stream-capable cache carry a branded `CacheRef` in the cached row. On a **cache hit**, the runner mirrors the fresh-run event contract, driven by two graph-computed consumer hints (`IRunConfig.hasStreamingConsumers` / `hasMaterializingConsumers`):

- **Stream-capable consumer** (`x-stream: "binary"` on both ends of an edge): the cached bytes replay as chunked `binary-delta` events, pull-paced from the repository's streaming reader (`getOutputStreamByRef`), so memory stays bounded by the read chunk size. The finish event keeps the ref at the port.
- **Materializing consumer** (target port cannot consume the stream): the ref hydrates into the **enriched finish event** as a `Blob`/`ArrayBuffer` (per the port's `format`), exactly what a fresh run's accumulator would have delivered. The *returned* output still carries the small ref.
- **No consumers**: no reads are performed; the synthetic finish carries the ref unchanged (callers resolve via `resolveOutput` / `resolveJobOutputStream`).

**Rows store the wire form**: the cached row always carries the `CacheRef`, never inline bytes — JSON-row backings would destroy an inline `Blob`/`ArrayBuffer` (`JSON.stringify(Blob)` is `{}`). Below-threshold hydration to inline bytes applies to the value **returned to the caller**, identically on fresh runs and cache hits.

**Single binary port**: the cache sink keys bytes by `(taskType, inputs)` with no port axis, so cache-streaming supports exactly one binary output port. Tasks with multiple binary ports take the accumulation path (enforced in both `StreamPump.canStreamBinaryToCache` and `CacheCoordinator.getBinaryRefSinksByPolicy`); their inline outputs are only safely cacheable by non-JSON-row backings until per-port refs land.

**Self-healing dangling refs**: when a ref needed for replay or hydration no longer resolves (blob evicted, cache cleared), the hit converts into a **miss** — the task re-executes and rewrites both the row and the bytes. No events are emitted before all refs are validated.

**Input-side hydration**: any branded ref that reaches a task's resolved inputs is hydrated against the run's `CacheRegistry` (private first, then deterministic) before validation and cache-key computation, so ref-bearing inputs fingerprint identically to materialized ones. Binary-streaming input ports with a live input stream are skipped — those consumers take bytes from the stream. An unresolvable input ref fails the task with an error naming the port.

**Queue consumers**: `JobHandle.outputStream(port?)` (present only when the `JobQueueClient` was configured with an `outputStreamResolver`, typically `makeJobOutputStreamResolver(repo)`) awaits completion and streams the binary result out of the cache without materializing it.

`FsFolderTaskOutputRepository` (node/bun) is the production streaming backing: JSON rows via `FsFolderTabularStorage`, bytes as sidecar files under `<folder>/blobs/` written incrementally and published by atomic rename — `<sanitized-taskType>_<input-fingerprint>.bin`, so a re-run overwrites rather than leaks. Two instances over one folder interoperate (the cross-process read story).

### Durable execution model

A run is an atomic unit on a single worker. When the worker crashes:
Expand Down
1 change: 1 addition & 0 deletions packages/task-graph/src/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
// organize-imports-ignore

export * from "./common";
export * from "./common-server";
Loading
Loading