From 34e1b34992546aaeec70ee29435b528bb2532251 Mon Sep 17 00:00:00 2001 From: persovt2 Date: Mon, 1 Jun 2026 20:02:09 +0300 Subject: [PATCH] feat(agent-core): stream background subagent progress into /tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background subagents are Promise-based, so BackgroundProcessManager had no live proc.stdout to capture and their /tasks output pane stayed empty until completion. Tap the child agent's event stream via a new in-process Agent.onEvent, format assistant deltas and tool-call boundaries into log text, and feed it through registerAgentTask's outputSource into the task's ring buffer + output.log — matching background bash tasks. Output subscription is released on every terminal path (completed, failed, killed, timeout) via finalizeTerminal. Foreground subagents are unaffected. Closes #292 --- .changeset/stream-subagent-progress.md | 6 + packages/agent-core/src/agent/index.ts | 20 +++ .../agent-core/src/session/subagent-host.ts | 50 ++++++ .../src/session/subagent-progress.ts | 52 ++++++ .../src/tools/background/manager.ts | 27 ++- .../src/tools/builtin/collaboration/agent.ts | 3 + .../agent-core/test/agent/on-event.test.ts | 49 +++++ .../test/session/subagent-host.test.ts | 40 +++++ .../test/session/subagent-progress.test.ts | 102 +++++++++++ .../background/agent-output-stream.test.ts | 167 ++++++++++++++++++ 10 files changed, 514 insertions(+), 2 deletions(-) create mode 100644 .changeset/stream-subagent-progress.md create mode 100644 packages/agent-core/src/session/subagent-progress.ts create mode 100644 packages/agent-core/test/agent/on-event.test.ts create mode 100644 packages/agent-core/test/session/subagent-progress.test.ts create mode 100644 packages/agent-core/test/tools/background/agent-output-stream.test.ts diff --git a/.changeset/stream-subagent-progress.md b/.changeset/stream-subagent-progress.md new file mode 100644 index 00000000..49096973 --- /dev/null +++ b/.changeset/stream-subagent-progress.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": minor +"@moonshot-ai/kimi-code": minor +--- + +Stream background subagent progress into `/tasks` so live output (assistant text and tool-call boundaries) appears while the subagent runs, matching background bash tasks. Previously an agent task's output pane stayed empty until completion. diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index d38e0c32..3554482d 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -123,6 +123,9 @@ export class Agent { private lastLlmConfigLogSignature?: string; + // In-process event taps, fanned out alongside the RPC sink in `emitEvent`. + private readonly eventListeners = new Set<(event: AgentEvent) => void>(); + constructor(options: AgentOptions) { this.type = options.type ?? 'main'; this.kaos = options.kaos; @@ -385,8 +388,25 @@ export class Agent { }; } + onEvent(listener: (event: AgentEvent) => void): () => void { + this.eventListeners.add(listener); + return () => { + this.eventListeners.delete(listener); + }; + } + emitEvent(event: AgentEvent): void { if (this.records.restoring) return; + // size check keeps the common no-tap path off this hot loop + if (this.eventListeners.size > 0) { + for (const listener of this.eventListeners) { + try { + listener(event); + } catch { + /* a tap must not break emission */ + } + } + } void this.rpc?.emitEvent?.(event); } diff --git a/packages/agent-core/src/session/subagent-host.ts b/packages/agent-core/src/session/subagent-host.ts index 458e291e..42e45616 100644 --- a/packages/agent-core/src/session/subagent-host.ts +++ b/packages/agent-core/src/session/subagent-host.ts @@ -11,8 +11,13 @@ import { import { linkAbortSignal, userCancellationReason } from '../utils/abort'; import { collectGitContext } from './git-context'; import type { Session } from './index'; +import { formatSubagentEvent, SubagentProgress } from './subagent-progress'; import SUMMARY_CONTINUATION_PROMPT from './summary-continuation.md'; +/** Coalesce `assistant.delta` text up to this many bytes before flushing, so + * per-token deltas don't become a storm of tiny `output.log` appends. */ +const PROGRESS_FLUSH_THRESHOLD = 512; + /** * A subagent summary shorter than this many characters triggers one * follow-up turn that asks the subagent to expand it, so the parent @@ -49,6 +54,8 @@ export type SubagentHandle = { readonly profileName: string; readonly resumed: boolean; readonly completion: Promise; + /** Progress feed, present only for background runs (foreground has no task to feed). */ + readonly progress?: SubagentProgress | undefined; }; export class SessionSubagentHost { @@ -74,6 +81,8 @@ export class SessionSubagentHost { undefined, this.ownerAgentId, ); + // Tap before runChild so events emitted during child setup aren't missed. + const tap = this.attachProgress(agent, options.runInBackground); const controller = new AbortController(); const unlinkAbortSignal = linkAbortSignal(options.signal, controller); this.activeChildren.set(id, { @@ -92,6 +101,7 @@ export class SessionSubagentHost { }, () => this.configureChild(parent, agent, profile), ).finally(() => { + tap?.detach(); unlinkAbortSignal(); this.activeChildren.delete(id); }); @@ -101,6 +111,7 @@ export class SessionSubagentHost { profileName: profile.name, resumed: false, completion, + progress: tap?.progress, }; } @@ -131,6 +142,7 @@ export class SessionSubagentHost { const profileName = child.config.profileName ?? 'subagent'; + const tap = this.attachProgress(child, options.runInBackground); const controller = new AbortController(); const unlinkAbortSignal = linkAbortSignal(options.signal, controller); this.activeChildren.set(agentId, { @@ -155,6 +167,7 @@ export class SessionSubagentHost { return Promise.resolve(); }, ).finally(() => { + tap?.detach(); unlinkAbortSignal(); this.activeChildren.delete(agentId); }); @@ -164,6 +177,7 @@ export class SessionSubagentHost { profileName, resumed: true, completion, + progress: tap?.progress, }; } @@ -187,6 +201,42 @@ export class SessionSubagentHost { return this.session.agents.get(agentId)?.config.profileName; } + // Tap a background subagent's events into a progress buffer (foreground gets + // none — no task to feed). `detach()` flushes the remainder and unsubscribes. + private attachProgress( + child: Agent, + runInBackground: boolean, + ): { readonly progress: SubagentProgress; readonly detach: () => void } | undefined { + if (!runInBackground) return undefined; + const progress = new SubagentProgress(); + let pending = ''; + const flush = (): void => { + if (pending.length > 0) { + progress.push(pending); + pending = ''; + } + }; + const unsubscribe = child.onEvent((event) => { + const text = formatSubagentEvent(event); + if (text === undefined) return; + if (event.type === 'assistant.delta') { + pending += text; + if (pending.length >= PROGRESS_FLUSH_THRESHOLD) flush(); + } else { + // flush prose before the tool boundary so order is preserved + flush(); + progress.push(text); + } + }); + return { + progress, + detach: () => { + flush(); + unsubscribe(); + }, + }; + } + private resolveProfile(parent: Agent, profileName: string): ResolvedAgentProfile { const profile = DEFAULT_AGENT_PROFILES[parent.config.profileName ?? 'agent']?.subagents?.[profileName] ?? diff --git a/packages/agent-core/src/session/subagent-progress.ts b/packages/agent-core/src/session/subagent-progress.ts new file mode 100644 index 00000000..f219b1a1 --- /dev/null +++ b/packages/agent-core/src/session/subagent-progress.ts @@ -0,0 +1,52 @@ +/** + * Live progress feed for a background subagent task. Buffers formatted event + * text emitted between `spawn()` and the manager's `subscribe()`, replaying it + * on subscription so no early output is lost. Single-consumer (the manager). + */ + +import type { AgentEvent } from '../rpc'; + +export class SubagentProgress { + private buffer: string[] = []; + private live: ((chunk: string) => void) | undefined; + + push(chunk: string): void { + if (chunk.length === 0) return; + if (this.live !== undefined) { + this.live(chunk); + } else { + this.buffer.push(chunk); + } + } + + subscribe(onChunk: (chunk: string) => void): () => void { + // Replay the backlog as one chunk — each delivery is a separate output.log + // append downstream, so joining keeps it as coarse as live coalescing. + if (this.buffer.length > 0) onChunk(this.buffer.join('')); + this.buffer = []; + this.live = onChunk; + return () => { + if (this.live === onChunk) this.live = undefined; + }; + } +} + +/** + * Format a child-agent event into log text, or `undefined` to skip it. Compact + * by design: tool results become a one-line status (a Read can be megabytes), + * prose comes from `assistant.delta`, thinking deltas are dropped as noise. + */ +export function formatSubagentEvent(event: AgentEvent): string | undefined { + if (event.type === 'assistant.delta') return event.delta; + if (event.type === 'tool.call.started') { + const suffix = event.description !== undefined ? ` — ${event.description}` : ''; + return `\n\n$ ${event.name}${suffix}\n`; + } + if (event.type === 'tool.result') { + return event.isError === true ? ' ✗ tool error\n' : ' ✓ done\n'; + } + if (event.type === 'turn.ended') { + return event.reason === 'completed' ? undefined : `\n[turn ${event.reason}]\n`; + } + return undefined; +} diff --git a/packages/agent-core/src/tools/background/manager.ts b/packages/agent-core/src/tools/background/manager.ts index c87cbe66..ef822981 100644 --- a/packages/agent-core/src/tools/background/manager.ts +++ b/packages/agent-core/src/tools/background/manager.ts @@ -138,6 +138,9 @@ interface ManagedProcess { stopRequested: boolean; /** Session dir captured at registration for output.log writes. */ readonly outputSessionDir?: string | undefined; + /** Run once from `finalizeTerminal` (every terminal path) to release an + * agent task's output subscription so its progress listener cannot leak. */ + onTerminalCleanup?: (() => void) | undefined; lifecyclePromise: Promise; persistWriteQueue: Promise; outputWriteQueue: Promise; @@ -761,7 +764,10 @@ export class BackgroundProcessManager { /** * Register a Promise-based agent task (no KaosProcess). Used by * AgentTool for background subagent dispatch. Agent tasks appear in - * `list()` / `getTask()` but have pid=0 and empty output. + * `list()` / `getTask()` with pid=0. Because there is no live + * `proc.stdout`, intermediate output is streamed via the optional + * `opts.outputSource` (the subagent's formatted progress); without it + * the only output is the final summary appended on completion. * * `opts.timeoutMs` wraps the completion in an external deadline. On * deadline fire, the task is marked `failed` with `timedOut=true` @@ -780,6 +786,8 @@ export class BackgroundProcessManager { agentId?: string; /** Subagent profile name; surfaced on task info. */ subagentType?: string; + /** Live progress source; subscribed immediately, unsubscribed at terminal state. */ + outputSource?: (onChunk: (chunk: string) => void) => () => void; } = {}, ): string { if (opts.reservation) { @@ -826,6 +834,12 @@ export class BackgroundProcessManager { outputWriteQueue: Promise.resolve(), }; this.processes.set(taskId, entry); + // Pipe progress into the ring buffer + output.log, like register() does for + // bash stdout; chunks after terminal are dropped, and finalizeTerminal owns + // the unsubscribe so it fires even on the direct stop() path. + entry.onTerminalCleanup = opts.outputSource?.((chunk) => { + if (!TERMINAL_STATUSES.has(entry.status)) this.appendOutput(entry, chunk); + }); void this.persistLive(entry); this.fireLifecycle('started', this.toInfo(entry)); @@ -858,7 +872,13 @@ export class BackgroundProcessManager { // `completion` resolved before deadline. const r = outcome as { result: string }; if (TERMINAL_STATUSES.has(entry.status)) return; - this.appendOutput(entry, r.result); + // A streaming task already logged the assistant's prose (its tail is the + // final message), so re-appending the result would just duplicate it — + // only fall back to the result when nothing streamed. Non-streaming + // tasks have no other output, so the result is their whole log. + if (opts.outputSource === undefined || entry.outputSizeBytes === 0) { + this.appendOutput(entry, r.result); + } await this.finalizeTerminal(entry, 'completed', 0); }) .catch(async (error: unknown) => { @@ -1170,6 +1190,9 @@ export class BackgroundProcessManager { // that here for the awaiting → terminal path. entry.approvalReason = undefined; entry.stopRequested = false; + // Runs once (guarded above): release any output subscription. + entry.onTerminalCleanup?.(); + entry.onTerminalCleanup = undefined; await this.persistLive(entry); this.fireTerminalCallbacks(entry); this.resolveWaiters(entry); diff --git a/packages/agent-core/src/tools/builtin/collaboration/agent.ts b/packages/agent-core/src/tools/builtin/collaboration/agent.ts index a726217c..68aae154 100644 --- a/packages/agent-core/src/tools/builtin/collaboration/agent.ts +++ b/packages/agent-core/src/tools/builtin/collaboration/agent.ts @@ -253,11 +253,14 @@ export class AgentTool implements BuiltinTool { } let taskId: string; try { + const progress = handle.progress; taskId = backgroundManager.registerAgentTask(handle.completion, args.description, { timeoutMs: timeoutMs ?? this.subagentHost.backgroundTaskTimeoutMs, reservation, agentId: handle.agentId, subagentType: handle.profileName, + outputSource: + progress !== undefined ? (onChunk) => progress.subscribe(onChunk) : undefined, abort: () => { backgroundController?.abort(); }, diff --git a/packages/agent-core/test/agent/on-event.test.ts b/packages/agent-core/test/agent/on-event.test.ts new file mode 100644 index 00000000..7d2115e4 --- /dev/null +++ b/packages/agent-core/test/agent/on-event.test.ts @@ -0,0 +1,49 @@ +/** + * Agent.onEvent — in-process event tap. + * + * The local fan-out added alongside the RPC sink powers background-subagent + * progress streaming. Its contract: deliver to every listener, stop after + * unsubscribe, and never let a buggy listener break emission. + */ + +import { describe, expect, it } from 'vitest'; + +import type { AgentEvent } from '../../src/rpc'; +import { testAgent } from './harness/agent'; + +const warning = (message: string): AgentEvent => ({ type: 'warning', message }); + +describe('Agent.onEvent', () => { + it('fans out to every listener and stops a listener after it unsubscribes', () => { + const { agent } = testAgent(); + const a: string[] = []; + const b: string[] = []; + + const unsubscribeA = agent.onEvent((event) => a.push(event.type)); + agent.onEvent((event) => b.push(event.type)); + + agent.emitEvent(warning('one')); + expect(a).toEqual(['warning']); + expect(b).toEqual(['warning']); + + unsubscribeA(); + agent.emitEvent(warning('two')); + expect(a).toEqual(['warning']); // unsubscribed — no further delivery + expect(b).toEqual(['warning', 'warning']); // still subscribed + }); + + it('swallows a listener exception so siblings and emission still run', () => { + const { agent } = testAgent(); + const seen: string[] = []; + + agent.onEvent(() => { + throw new Error('buggy tap'); + }); + agent.onEvent((event) => seen.push(event.type)); + + expect(() => { + agent.emitEvent(warning('x')); + }).not.toThrow(); + expect(seen).toEqual(['warning']); // sibling listener still received it + }); +}); diff --git a/packages/agent-core/test/session/subagent-host.test.ts b/packages/agent-core/test/session/subagent-host.test.ts index 558af309..b1cf42e3 100644 --- a/packages/agent-core/test/session/subagent-host.test.ts +++ b/packages/agent-core/test/session/subagent-host.test.ts @@ -820,6 +820,46 @@ describe('Session resume permission parent chain', () => { await session.close(); } }); + + it('streams background subagent progress and omits the stream for foreground', async () => { + const detailedText = + 'Implemented the requested change across the affected module, traced every call site to confirm nothing else depended on the old behaviour, applied the edit, ran the existing test suite, and verified it still passes end to end before handing the work back to the parent agent.'; + + // Foreground: no progress stream is created (nothing would consume it). + const fgParent = testAgent(); + fgParent.configure(); + const fgChild = testAgent(); + fgChild.mockNextResponse({ type: 'text', text: detailedText }); + const fgHost = new SessionSubagentHost(fakeSession(fgParent.agent, fgChild.agent), 'main'); + const fgHandle = await fgHost.spawn('coder', { + parentToolCallId: 'call_agent', + prompt: 'Do it', + description: 'Foreground task', + runInBackground: false, + signal, + }); + await fgHandle.completion; + expect(fgHandle.progress).toBeUndefined(); + + // Background: the progress stream receives the child's formatted output. + const bgParent = testAgent(); + bgParent.configure(); + const bgChild = testAgent(); + bgChild.mockNextResponse({ type: 'text', text: detailedText }); + const bgHost = new SessionSubagentHost(fakeSession(bgParent.agent, bgChild.agent), 'main'); + const bgHandle = await bgHost.spawn('coder', { + parentToolCallId: 'call_agent', + prompt: 'Do it', + description: 'Background task', + runInBackground: true, + signal, + }); + expect(bgHandle.progress).toBeDefined(); + const chunks: string[] = []; + bgHandle.progress?.subscribe((chunk) => chunks.push(chunk)); + await bgHandle.completion; + expect(chunks.join('')).toContain('traced every call site'); + }); }); describe('Session.createAgent', () => { diff --git a/packages/agent-core/test/session/subagent-progress.test.ts b/packages/agent-core/test/session/subagent-progress.test.ts new file mode 100644 index 00000000..5e5365c7 --- /dev/null +++ b/packages/agent-core/test/session/subagent-progress.test.ts @@ -0,0 +1,102 @@ +/** + * SubagentProgress buffer + event formatter. + * + * The buffer decouples spawn-time event capture from registration-time + * consumption: events emitted before the manager subscribes must be + * replayed, not lost. + */ + +import { describe, expect, it } from 'vitest'; + +import type { AgentEvent } from '../../src/rpc'; +import { formatSubagentEvent, SubagentProgress } from '../../src/session/subagent-progress'; + +describe('SubagentProgress', () => { + it('replays the buffered backlog as one joined chunk, then streams live', () => { + const progress = new SubagentProgress(); + progress.push('a'); + progress.push('b'); + + const seen: string[] = []; + progress.subscribe((chunk) => seen.push(chunk)); + expect(seen).toEqual(['ab']); // pre-subscription backlog delivered coalesced + + progress.push('c'); + expect(seen).toEqual(['ab', 'c']); // live delivery, chunk-by-chunk afterwards + }); + + it('ignores empty chunks', () => { + const progress = new SubagentProgress(); + const seen: string[] = []; + progress.subscribe((chunk) => seen.push(chunk)); + progress.push(''); + expect(seen).toEqual([]); + }); + + it('unsubscribe stops live delivery and re-buffers later chunks', () => { + const progress = new SubagentProgress(); + const seen: string[] = []; + const unsubscribe = progress.subscribe((chunk) => seen.push(chunk)); + + progress.push('x'); + unsubscribe(); + progress.push('y'); // no live sink → buffered, not delivered + expect(seen).toEqual(['x']); + }); +}); + +describe('formatSubagentEvent', () => { + it('passes assistant delta text through verbatim', () => { + const event: AgentEvent = { type: 'assistant.delta', turnId: 1, delta: 'hello' }; + expect(formatSubagentEvent(event)).toBe('hello'); + }); + + it('renders a tool-call header with its description', () => { + const event: AgentEvent = { + type: 'tool.call.started', + turnId: 1, + toolCallId: 't1', + name: 'Bash', + args: {}, + description: 'list files', + }; + expect(formatSubagentEvent(event)).toBe('\n\n$ Bash — list files\n'); + }); + + it('renders a tool-call header without a description', () => { + const event: AgentEvent = { + type: 'tool.call.started', + turnId: 1, + toolCallId: 't1', + name: 'Read', + args: {}, + }; + expect(formatSubagentEvent(event)).toBe('\n\n$ Read\n'); + }); + + it('renders a compact tool-result status', () => { + const ok: AgentEvent = { type: 'tool.result', turnId: 1, toolCallId: 't1', output: 'x' }; + const err: AgentEvent = { + type: 'tool.result', + turnId: 1, + toolCallId: 't1', + output: 'x', + isError: true, + }; + expect(formatSubagentEvent(ok)).toBe(' ✓ done\n'); + expect(formatSubagentEvent(err)).toBe(' ✗ tool error\n'); + }); + + it('skips noise events (thinking deltas, clean turn end)', () => { + expect(formatSubagentEvent({ type: 'thinking.delta', turnId: 1, delta: 'z' })).toBeUndefined(); + expect( + formatSubagentEvent({ type: 'turn.ended', turnId: 1, reason: 'completed' }), + ).toBeUndefined(); + }); + + it('flags an abnormal turn end', () => { + expect(formatSubagentEvent({ type: 'turn.ended', turnId: 1, reason: 'failed' })).toBe( + '\n[turn failed]\n', + ); + }); +}); diff --git a/packages/agent-core/test/tools/background/agent-output-stream.test.ts b/packages/agent-core/test/tools/background/agent-output-stream.test.ts new file mode 100644 index 00000000..7cc6866d --- /dev/null +++ b/packages/agent-core/test/tools/background/agent-output-stream.test.ts @@ -0,0 +1,167 @@ +/** + * `registerAgentTask` `outputSource` streaming. + * + * A background subagent is Promise-based, so without an output source the + * task's output stays empty until completion. `outputSource` lets the + * manager stream the subagent's formatted progress into the ring buffer / + * output.log while it runs, mirroring how `register()` pipes a bash task's + * stdout. + */ + +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { BackgroundProcessManager } from '../../../src/tools/background/manager'; + +describe('BackgroundProcessManager.registerAgentTask — outputSource', () => { + const manager = new BackgroundProcessManager(); + + afterEach(() => { + manager._reset(); + vi.useRealTimers(); + }); + + it('streams chunks into the task output before completion', async () => { + let emit!: (chunk: string) => void; + let resolveFn!: (r: { result: string }) => void; + const completion = new Promise<{ result: string }>((res) => { + resolveFn = res; + }); + + const taskId = manager.registerAgentTask(completion, 'streaming', { + outputSource: (onChunk) => { + emit = onChunk; + return () => {}; + }, + }); + + emit('hello '); + emit('world'); + // Visible mid-run, before the completion promise settles. + expect(manager.getOutput(taskId)).toBe('hello world'); + + resolveFn({ result: 'world' }); + await manager.waitForTerminal(taskId); + + // The streamed prose already ends with the final message, so the result is + // not re-appended — the log is exactly what streamed, no duplication. + expect(manager.getOutput(taskId)).toBe('hello world'); + }); + + it('appends the result as the whole log when not streaming', async () => { + let resolveFn!: (r: { result: string }) => void; + const completion = new Promise<{ result: string }>((res) => { + resolveFn = res; + }); + const taskId = manager.registerAgentTask(completion, 'raw'); + + resolveFn({ result: 'just the summary' }); + await manager.waitForTerminal(taskId); + + // No outputSource → the result is the task's only output. + expect(manager.getOutput(taskId)).toBe('just the summary'); + }); + + it('falls back to the result when streaming produced no output', async () => { + let resolveFn!: (r: { result: string }) => void; + const completion = new Promise<{ result: string }>((res) => { + resolveFn = res; + }); + // Subscribes but never emits a chunk. + const taskId = manager.registerAgentTask(completion, 'silent', { + outputSource: () => () => {}, + }); + + resolveFn({ result: 'only the summary' }); + await manager.waitForTerminal(taskId); + + // Nothing streamed → the result is appended so the log is not empty. + expect(manager.getOutput(taskId)).toBe('only the summary'); + }); + + it('unsubscribes the output source when the task completes', async () => { + let unsubscribed = false; + let resolveFn!: (r: { result: string }) => void; + const completion = new Promise<{ result: string }>((res) => { + resolveFn = res; + }); + + const taskId = manager.registerAgentTask(completion, 'unsub', { + outputSource: () => () => { + unsubscribed = true; + }, + }); + + resolveFn({ result: 'done' }); + // Cleanup runs inside finalizeTerminal, before waiters resolve, so it has + // already fired by the time waitForTerminal returns — no drain needed. + await manager.waitForTerminal(taskId); + expect(unsubscribed).toBe(true); + }); + + it('unsubscribes the output source when the task is stopped (killed)', async () => { + let unsubscribed = false; + let rejectFn!: (error: unknown) => void; + const completion = new Promise<{ result: string }>((_res, rej) => { + rejectFn = rej; + }); + + const taskId = manager.registerAgentTask(completion, 'kill', { + // stop() → proc.kill() → abort(); reject as an AbortError so the task is + // recorded as killed (not failed), matching the real subagent abort path. + abort: () => { + const error = new Error('aborted'); + error.name = 'AbortError'; + rejectFn(error); + }, + outputSource: () => () => { + unsubscribed = true; + }, + }); + + const info = await manager.stop(taskId, 'test stop'); + expect(info?.status).toBe('killed'); + expect(unsubscribed).toBe(true); + }); + + it('unsubscribes the output source on the timeout path', async () => { + vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); + let unsubscribed = false; + + const taskId = manager.registerAgentTask(new Promise<{ result: string }>(() => {}), 'timeout', { + timeoutMs: 2_000, + outputSource: () => () => { + unsubscribed = true; + }, + }); + + const terminal = manager.waitForTerminal(taskId); + await vi.advanceTimersByTimeAsync(2_100); + const info = await terminal; + + expect(info?.status).toBe('failed'); + expect(info?.timedOut).toBe(true); + expect(unsubscribed).toBe(true); + }); + + it('drops chunks that arrive after the task is terminal', async () => { + let emit!: (chunk: string) => void; + let resolveFn!: (r: { result: string }) => void; + const completion = new Promise<{ result: string }>((res) => { + resolveFn = res; + }); + + const taskId = manager.registerAgentTask(completion, 'late', { + outputSource: (onChunk) => { + emit = onChunk; + return () => {}; + }, + }); + + resolveFn({ result: 'R' }); + await manager.waitForTerminal(taskId); + + const before = manager.getOutput(taskId); + emit('late chunk'); // guarded out — task is already terminal + expect(manager.getOutput(taskId)).toBe(before); + }); +});