From 875a7e892a659536e268b33fc5e63d28a22efc61 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Jun 2026 05:28:19 +0000 Subject: [PATCH] fix(task-graph): bubble subgraph events from iterator/map/reduce loops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IteratorTask (and MapTask/ReduceTask) ran a cloned subgraph per iteration but never bridged its inner per-task events to the parent graph, so task_complete / task_progress / task_stream_* from iterator children never surfaced on the top-level run stream — unlike GraphAsTask, While, and Fallback. Bridge each iteration's clone and tear it down in finally so discarded clones don't leak their parent subscriptions. Also guard bridgeSubGraphTaskEvents against a subGraph === parentGraph self-loop (degrades to a no-op), and add a Map regression test. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01XNUdjvscCZH6BYzHxecwuW --- .../src/task-graph/SubGraphEventBridge.ts | 5 ++ .../task-graph/src/task/IteratorTaskRunner.ts | 10 ++++ .../test/task-graph/TaskCompleteEvent.test.ts | 53 +++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index dbf9b511e..24d199f4b 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -19,6 +19,11 @@ import type { TaskGraph } from "./TaskGraph"; * run cannot leak subscriptions (which would double-emit on a later run). */ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void { + // A subgraph bridging to itself would re-emit each event back onto the same + // graph it just observed, looping forever. This cannot arise from normal + // composition (a compound task's subGraph and parentGraph are distinct + // instances) but guard anyway so a malformed hierarchy degrades to a no-op. + if (subGraph === parentGraph) return () => {}; const offs = [ subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)), subGraph.subscribe("task_progress", (id, p, m, ...a) => diff --git a/packages/task-graph/src/task/IteratorTaskRunner.ts b/packages/task-graph/src/task/IteratorTaskRunner.ts index 7ab54ae00..1f0514c8c 100644 --- a/packages/task-graph/src/task/IteratorTaskRunner.ts +++ b/packages/task-graph/src/task/IteratorTaskRunner.ts @@ -6,6 +6,7 @@ import { uuid4 } from "@workglow/util"; import { Dataflow } from "../task-graph/Dataflow"; +import { bridgeSubGraphTaskEvents } from "../task-graph/SubGraphEventBridge"; import { TaskGraph } from "../task-graph/TaskGraph"; import { GraphAsTaskRunner } from "./GraphAsTaskRunner"; import type { ITaskConstructor } from "./ITask"; @@ -293,6 +294,14 @@ export class IteratorTaskRunner< }; const unsubscribeGraphProgress = graphClone.subscribe("graph_progress", onGraphProgress); + // Bubble inner-task events up to the parent graph so subgraph children of an + // iteration surface as individual task events on the top-level stream + // (previews + progress), matching GraphAsTask / While / Fallback. A fresh + // clone is bridged per iteration, so tear down in finally — otherwise each + // discarded clone leaks its parentGraph subscriptions for the iterator's life. + const parentGraph = this.task.parentGraph; + const unbridge = parentGraph ? bridgeSubGraphTaskEvents(graphClone, parentGraph) : () => {}; + try { const results = await graphClone.run(input as TaskInput, { parentSignal: this.currentCtx?.abortController.signal, @@ -311,6 +320,7 @@ export class IteratorTaskRunner< ) as TaskOutput; } finally { unsubscribeGraphProgress(); + unbridge(); if (this.aggregatingParentMapProgress && this.mapPartialIterationCount > 0) { this.mapPartialProgress[index] = 100; this.emitMapParentProgressFromPartials(); diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index d54b75a53..380720f31 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -9,10 +9,12 @@ import { Dataflow, FallbackTask, GraphAsTask, + MapTask, Task, TaskGraph, TaskRegistry, WhileTask, + Workflow, } from "@workglow/task-graph"; import type { DataPortSchema } from "@workglow/util/schema"; import { describe, expect, it } from "vitest"; @@ -154,6 +156,35 @@ class TCEStream extends Task<{ value: number }, { text: string }> { } TaskRegistry.registerTask(TCEStream as never); +// Map-iterable: doubles a single `item`. Used to verify that MapTask iterations +// (which run cloned subgraphs) bubble their inner task events to the top graph. +class TCEMapItem extends Task<{ item: number }, { processed: number }> { + static override readonly type = "TCEMapItem"; + static override readonly category = "Test"; + static override title = "Map item double"; + static override description = "Doubles a single item"; + static override inputSchema(): DataPortSchema { + return { + type: "object", + properties: { item: { type: "number" } }, + required: ["item"], + additionalProperties: true, + } as const satisfies DataPortSchema; + } + static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { processed: { type: "number" } }, + required: ["processed"], + additionalProperties: false, + } as const satisfies DataPortSchema; + } + override async execute(input: { item: number }): Promise<{ processed: number }> { + return { processed: input.item * 2 }; + } +} +TaskRegistry.registerTask(TCEMapItem as never); + describe("task_complete graph event", () => { it("emits task_complete for every completed task with its output", async () => { const graph = new TaskGraph(); @@ -298,6 +329,28 @@ describe("task_complete graph event", () => { expect(seen).toContain("fbody"); }); + it("bubbles task_complete from a Map loop's per-iteration children", async () => { + const workflow = new Workflow(); + workflow.map({ maxIterations: "unbounded" }).addTask(TCEMapItem).endMap(); + const mapTask = workflow.graph.getTasks()[0] as MapTask; + + const innerOutputs: number[] = []; + const unsub = workflow.graph.subscribe("task_complete", (taskId, output) => { + // The map node itself emits the aggregated array output; inner iterations + // emit a scalar `processed`. Each cloned iteration gets a fresh uuid, so we + // count the inner completions rather than assert on their ids. + if (String(taskId) !== mapTask.id && typeof output?.processed === "number") { + innerOutputs.push(output.processed); + } + }); + + await workflow.run({ item: [1, 2, 3] }); + unsub(); + + // One bridged inner task_complete per item, carrying the doubled value. + expect(innerOutputs.sort((a, b) => a - b)).toEqual([2, 4, 6]); + }); + it("does not leak the subgraph bridge after a failed run (no double-emit on re-run)", async () => { let attempt = 0; class FailFirstThenPass extends Task<{ value: number }, { value: number }> {