Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/task-graph/src/task-graph/SubGraphEventBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import type { TaskGraph } from "./TaskGraph";
* run cannot leak subscriptions (which would double-emit on a later run).
*/
export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void {
// A subgraph bridging to itself would re-emit each event back onto the same
// graph it just observed, looping forever. This cannot arise from normal
// composition (a compound task's subGraph and parentGraph are distinct
// instances) but guard anyway so a malformed hierarchy degrades to a no-op.
if (subGraph === parentGraph) return () => {};
const offs = [
subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)),
subGraph.subscribe("task_progress", (id, p, m, ...a) =>
Expand Down
10 changes: 10 additions & 0 deletions packages/task-graph/src/task/IteratorTaskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { uuid4 } from "@workglow/util";
import { Dataflow } from "../task-graph/Dataflow";
import { bridgeSubGraphTaskEvents } from "../task-graph/SubGraphEventBridge";
import { TaskGraph } from "../task-graph/TaskGraph";
import { GraphAsTaskRunner } from "./GraphAsTaskRunner";
import type { ITaskConstructor } from "./ITask";
Expand Down Expand Up @@ -293,6 +294,14 @@ export class IteratorTaskRunner<
};
const unsubscribeGraphProgress = graphClone.subscribe("graph_progress", onGraphProgress);

// Bubble inner-task events up to the parent graph so subgraph children of an
// iteration surface as individual task events on the top-level stream
// (previews + progress), matching GraphAsTask / While / Fallback. A fresh
// clone is bridged per iteration, so tear down in finally — otherwise each
// discarded clone leaks its parentGraph subscriptions for the iterator's life.
const parentGraph = this.task.parentGraph;
const unbridge = parentGraph ? bridgeSubGraphTaskEvents(graphClone, parentGraph) : () => {};

try {
const results = await graphClone.run<TaskOutput>(input as TaskInput, {
parentSignal: this.currentCtx?.abortController.signal,
Expand All @@ -311,6 +320,7 @@ export class IteratorTaskRunner<
) as TaskOutput;
} finally {
unsubscribeGraphProgress();
unbridge();
if (this.aggregatingParentMapProgress && this.mapPartialIterationCount > 0) {
this.mapPartialProgress[index] = 100;
this.emitMapParentProgressFromPartials();
Expand Down
53 changes: 53 additions & 0 deletions packages/test/src/test/task-graph/TaskCompleteEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
Dataflow,
FallbackTask,
GraphAsTask,
MapTask,
Task,
TaskGraph,
TaskRegistry,
WhileTask,
Workflow,
} from "@workglow/task-graph";
import type { DataPortSchema } from "@workglow/util/schema";
import { describe, expect, it } from "vitest";
Expand Down Expand Up @@ -154,6 +156,35 @@ class TCEStream extends Task<{ value: number }, { text: string }> {
}
TaskRegistry.registerTask(TCEStream as never);

// Map-iterable: doubles a single `item`. Used to verify that MapTask iterations
// (which run cloned subgraphs) bubble their inner task events to the top graph.
class TCEMapItem extends Task<{ item: number }, { processed: number }> {
static override readonly type = "TCEMapItem";
static override readonly category = "Test";
static override title = "Map item double";
static override description = "Doubles a single item";
static override inputSchema(): DataPortSchema {
return {
type: "object",
properties: { item: { type: "number" } },
required: ["item"],
additionalProperties: true,
} as const satisfies DataPortSchema;
}
static override outputSchema(): DataPortSchema {
return {
type: "object",
properties: { processed: { type: "number" } },
required: ["processed"],
additionalProperties: false,
} as const satisfies DataPortSchema;
}
override async execute(input: { item: number }): Promise<{ processed: number }> {
return { processed: input.item * 2 };
}
}
TaskRegistry.registerTask(TCEMapItem as never);

describe("task_complete graph event", () => {
it("emits task_complete for every completed task with its output", async () => {
const graph = new TaskGraph();
Expand Down Expand Up @@ -298,6 +329,28 @@ describe("task_complete graph event", () => {
expect(seen).toContain("fbody");
});

it("bubbles task_complete from a Map loop's per-iteration children", async () => {
const workflow = new Workflow();
workflow.map({ maxIterations: "unbounded" }).addTask(TCEMapItem).endMap();
const mapTask = workflow.graph.getTasks()[0] as MapTask;

const innerOutputs: number[] = [];
const unsub = workflow.graph.subscribe("task_complete", (taskId, output) => {
// The map node itself emits the aggregated array output; inner iterations
// emit a scalar `processed`. Each cloned iteration gets a fresh uuid, so we
// count the inner completions rather than assert on their ids.
if (String(taskId) !== mapTask.id && typeof output?.processed === "number") {
innerOutputs.push(output.processed);
}
});

await workflow.run({ item: [1, 2, 3] });
unsub();

// One bridged inner task_complete per item, carrying the doubled value.
expect(innerOutputs.sort((a, b) => a - b)).toEqual([2, 4, 6]);
});

it("does not leak the subgraph bridge after a failed run (no double-emit on re-run)", async () => {
let attempt = 0;
class FailFirstThenPass extends Task<{ value: number }, { value: number }> {
Expand Down
Loading