Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/stream-subagent-progress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@moonshot-ai/agent-core": minor
"@moonshot-ai/kimi-code": minor
---

Stream background subagent progress into `/tasks` so live output (assistant text and tool-call boundaries) appears while the subagent runs, matching background bash tasks. Previously an agent task's output pane stayed empty until completion.
20 changes: 20 additions & 0 deletions packages/agent-core/src/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ export class Agent {

private lastLlmConfigLogSignature?: string;

// In-process event taps, fanned out alongside the RPC sink in `emitEvent`.
private readonly eventListeners = new Set<(event: AgentEvent) => void>();

constructor(options: AgentOptions) {
this.type = options.type ?? 'main';
this.kaos = options.kaos;
Expand Down Expand Up @@ -385,8 +388,25 @@ export class Agent {
};
}

onEvent(listener: (event: AgentEvent) => void): () => void {
this.eventListeners.add(listener);
return () => {
this.eventListeners.delete(listener);
};
}

emitEvent(event: AgentEvent): void {
if (this.records.restoring) return;
// size check keeps the common no-tap path off this hot loop
if (this.eventListeners.size > 0) {
for (const listener of this.eventListeners) {
try {
listener(event);
} catch {
/* a tap must not break emission */
}
}
}
void this.rpc?.emitEvent?.(event);
}

Expand Down
50 changes: 50 additions & 0 deletions packages/agent-core/src/session/subagent-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import {
import { linkAbortSignal, userCancellationReason } from '../utils/abort';
import { collectGitContext } from './git-context';
import type { Session } from './index';
import { formatSubagentEvent, SubagentProgress } from './subagent-progress';
import SUMMARY_CONTINUATION_PROMPT from './summary-continuation.md';

/** Coalesce `assistant.delta` text up to this many bytes before flushing, so
* per-token deltas don't become a storm of tiny `output.log` appends. */
const PROGRESS_FLUSH_THRESHOLD = 512;

/**
* A subagent summary shorter than this many characters triggers one
* follow-up turn that asks the subagent to expand it, so the parent
Expand Down Expand Up @@ -49,6 +54,8 @@ export type SubagentHandle = {
readonly profileName: string;
readonly resumed: boolean;
readonly completion: Promise<SubagentCompletion>;
/** Progress feed, present only for background runs (foreground has no task to feed). */
readonly progress?: SubagentProgress | undefined;
};

export class SessionSubagentHost {
Expand All @@ -74,6 +81,8 @@ export class SessionSubagentHost {
undefined,
this.ownerAgentId,
);
// Tap before runChild so events emitted during child setup aren't missed.
const tap = this.attachProgress(agent, options.runInBackground);
const controller = new AbortController();
const unlinkAbortSignal = linkAbortSignal(options.signal, controller);
this.activeChildren.set(id, {
Expand All @@ -92,6 +101,7 @@ export class SessionSubagentHost {
},
() => this.configureChild(parent, agent, profile),
).finally(() => {
tap?.detach();
unlinkAbortSignal();
this.activeChildren.delete(id);
});
Expand All @@ -101,6 +111,7 @@ export class SessionSubagentHost {
profileName: profile.name,
resumed: false,
completion,
progress: tap?.progress,
};
}

Expand Down Expand Up @@ -131,6 +142,7 @@ export class SessionSubagentHost {

const profileName = child.config.profileName ?? 'subagent';

const tap = this.attachProgress(child, options.runInBackground);
const controller = new AbortController();
const unlinkAbortSignal = linkAbortSignal(options.signal, controller);
this.activeChildren.set(agentId, {
Expand All @@ -155,6 +167,7 @@ export class SessionSubagentHost {
return Promise.resolve();
},
).finally(() => {
tap?.detach();
unlinkAbortSignal();
this.activeChildren.delete(agentId);
});
Expand All @@ -164,6 +177,7 @@ export class SessionSubagentHost {
profileName,
resumed: true,
completion,
progress: tap?.progress,
};
}

Expand All @@ -187,6 +201,42 @@ export class SessionSubagentHost {
return this.session.agents.get(agentId)?.config.profileName;
}

// Tap a background subagent's events into a progress buffer (foreground gets
// none — no task to feed). `detach()` flushes the remainder and unsubscribes.
private attachProgress(
child: Agent,
runInBackground: boolean,
): { readonly progress: SubagentProgress; readonly detach: () => void } | undefined {
if (!runInBackground) return undefined;
const progress = new SubagentProgress();
let pending = '';
const flush = (): void => {
if (pending.length > 0) {
progress.push(pending);
pending = '';
}
};
const unsubscribe = child.onEvent((event) => {
const text = formatSubagentEvent(event);
if (text === undefined) return;
if (event.type === 'assistant.delta') {
pending += text;
if (pending.length >= PROGRESS_FLUSH_THRESHOLD) flush();
} else {
// flush prose before the tool boundary so order is preserved
flush();
progress.push(text);
}
});
return {
progress,
detach: () => {
flush();
unsubscribe();
},
};
}

private resolveProfile(parent: Agent, profileName: string): ResolvedAgentProfile {
const profile =
DEFAULT_AGENT_PROFILES[parent.config.profileName ?? 'agent']?.subagents?.[profileName] ??
Expand Down
52 changes: 52 additions & 0 deletions packages/agent-core/src/session/subagent-progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Live progress feed for a background subagent task. Buffers formatted event
* text emitted between `spawn()` and the manager's `subscribe()`, replaying it
* on subscription so no early output is lost. Single-consumer (the manager).
*/

import type { AgentEvent } from '../rpc';

export class SubagentProgress {
private buffer: string[] = [];
private live: ((chunk: string) => void) | undefined;

push(chunk: string): void {
if (chunk.length === 0) return;
if (this.live !== undefined) {
this.live(chunk);
} else {
this.buffer.push(chunk);
}
}

subscribe(onChunk: (chunk: string) => void): () => void {
// Replay the backlog as one chunk — each delivery is a separate output.log
// append downstream, so joining keeps it as coarse as live coalescing.
if (this.buffer.length > 0) onChunk(this.buffer.join(''));
this.buffer = [];
this.live = onChunk;
return () => {
if (this.live === onChunk) this.live = undefined;
};
}
}

/**
* Format a child-agent event into log text, or `undefined` to skip it. Compact
* by design: tool results become a one-line status (a Read can be megabytes),
* prose comes from `assistant.delta`, thinking deltas are dropped as noise.
*/
export function formatSubagentEvent(event: AgentEvent): string | undefined {
if (event.type === 'assistant.delta') return event.delta;
if (event.type === 'tool.call.started') {
const suffix = event.description !== undefined ? ` — ${event.description}` : '';
return `\n\n$ ${event.name}${suffix}\n`;
}
if (event.type === 'tool.result') {
return event.isError === true ? ' ✗ tool error\n' : ' ✓ done\n';
}
if (event.type === 'turn.ended') {
return event.reason === 'completed' ? undefined : `\n[turn ${event.reason}]\n`;
}
return undefined;
}
27 changes: 25 additions & 2 deletions packages/agent-core/src/tools/background/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ interface ManagedProcess {
stopRequested: boolean;
/** Session dir captured at registration for output.log writes. */
readonly outputSessionDir?: string | undefined;
/** Run once from `finalizeTerminal` (every terminal path) to release an
* agent task's output subscription so its progress listener cannot leak. */
onTerminalCleanup?: (() => void) | undefined;
lifecyclePromise: Promise<void>;
persistWriteQueue: Promise<void>;
outputWriteQueue: Promise<void>;
Expand Down Expand Up @@ -761,7 +764,10 @@ export class BackgroundProcessManager {
/**
* Register a Promise-based agent task (no KaosProcess). Used by
* AgentTool for background subagent dispatch. Agent tasks appear in
* `list()` / `getTask()` but have pid=0 and empty output.
* `list()` / `getTask()` with pid=0. Because there is no live
* `proc.stdout`, intermediate output is streamed via the optional
* `opts.outputSource` (the subagent's formatted progress); without it
* the only output is the final summary appended on completion.
*
* `opts.timeoutMs` wraps the completion in an external deadline. On
* deadline fire, the task is marked `failed` with `timedOut=true`
Expand All @@ -780,6 +786,8 @@ export class BackgroundProcessManager {
agentId?: string;
/** Subagent profile name; surfaced on task info. */
subagentType?: string;
/** Live progress source; subscribed immediately, unsubscribed at terminal state. */
outputSource?: (onChunk: (chunk: string) => void) => () => void;
} = {},
): string {
if (opts.reservation) {
Expand Down Expand Up @@ -826,6 +834,12 @@ export class BackgroundProcessManager {
outputWriteQueue: Promise.resolve(),
};
this.processes.set(taskId, entry);
// Pipe progress into the ring buffer + output.log, like register() does for
// bash stdout; chunks after terminal are dropped, and finalizeTerminal owns
// the unsubscribe so it fires even on the direct stop() path.
entry.onTerminalCleanup = opts.outputSource?.((chunk) => {
if (!TERMINAL_STATUSES.has(entry.status)) this.appendOutput(entry, chunk);
});
void this.persistLive(entry);
this.fireLifecycle('started', this.toInfo(entry));

Expand Down Expand Up @@ -858,7 +872,13 @@ export class BackgroundProcessManager {
// `completion` resolved before deadline.
const r = outcome as { result: string };
if (TERMINAL_STATUSES.has(entry.status)) return;
this.appendOutput(entry, r.result);
// A streaming task already logged the assistant's prose (its tail is the
// final message), so re-appending the result would just duplicate it —
// only fall back to the result when nothing streamed. Non-streaming
// tasks have no other output, so the result is their whole log.
if (opts.outputSource === undefined || entry.outputSizeBytes === 0) {
this.appendOutput(entry, r.result);
}
await this.finalizeTerminal(entry, 'completed', 0);
})
.catch(async (error: unknown) => {
Expand Down Expand Up @@ -1170,6 +1190,9 @@ export class BackgroundProcessManager {
// that here for the awaiting → terminal path.
entry.approvalReason = undefined;
entry.stopRequested = false;
// Runs once (guarded above): release any output subscription.
entry.onTerminalCleanup?.();
entry.onTerminalCleanup = undefined;
await this.persistLive(entry);
this.fireTerminalCallbacks(entry);
this.resolveWaiters(entry);
Expand Down
3 changes: 3 additions & 0 deletions packages/agent-core/src/tools/builtin/collaboration/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,14 @@ export class AgentTool implements BuiltinTool<AgentToolInput> {
}
let taskId: string;
try {
const progress = handle.progress;
taskId = backgroundManager.registerAgentTask(handle.completion, args.description, {
timeoutMs: timeoutMs ?? this.subagentHost.backgroundTaskTimeoutMs,
reservation,
agentId: handle.agentId,
subagentType: handle.profileName,
outputSource:
progress !== undefined ? (onChunk) => progress.subscribe(onChunk) : undefined,
abort: () => {
backgroundController?.abort();
},
Expand Down
49 changes: 49 additions & 0 deletions packages/agent-core/test/agent/on-event.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Agent.onEvent — in-process event tap.
*
* The local fan-out added alongside the RPC sink powers background-subagent
* progress streaming. Its contract: deliver to every listener, stop after
* unsubscribe, and never let a buggy listener break emission.
*/

import { describe, expect, it } from 'vitest';

import type { AgentEvent } from '../../src/rpc';
import { testAgent } from './harness/agent';

const warning = (message: string): AgentEvent => ({ type: 'warning', message });

describe('Agent.onEvent', () => {
it('fans out to every listener and stops a listener after it unsubscribes', () => {
const { agent } = testAgent();
const a: string[] = [];
const b: string[] = [];

const unsubscribeA = agent.onEvent((event) => a.push(event.type));
agent.onEvent((event) => b.push(event.type));

agent.emitEvent(warning('one'));
expect(a).toEqual(['warning']);
expect(b).toEqual(['warning']);

unsubscribeA();
agent.emitEvent(warning('two'));
expect(a).toEqual(['warning']); // unsubscribed — no further delivery
expect(b).toEqual(['warning', 'warning']); // still subscribed
});

it('swallows a listener exception so siblings and emission still run', () => {
const { agent } = testAgent();
const seen: string[] = [];

agent.onEvent(() => {
throw new Error('buggy tap');
});
agent.onEvent((event) => seen.push(event.type));

expect(() => {
agent.emitEvent(warning('x'));
}).not.toThrow();
expect(seen).toEqual(['warning']); // sibling listener still received it
});
});
40 changes: 40 additions & 0 deletions packages/agent-core/test/session/subagent-host.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,46 @@ describe('Session resume permission parent chain', () => {
await session.close();
}
});

it('streams background subagent progress and omits the stream for foreground', async () => {
const detailedText =
'Implemented the requested change across the affected module, traced every call site to confirm nothing else depended on the old behaviour, applied the edit, ran the existing test suite, and verified it still passes end to end before handing the work back to the parent agent.';

// Foreground: no progress stream is created (nothing would consume it).
const fgParent = testAgent();
fgParent.configure();
const fgChild = testAgent();
fgChild.mockNextResponse({ type: 'text', text: detailedText });
const fgHost = new SessionSubagentHost(fakeSession(fgParent.agent, fgChild.agent), 'main');
const fgHandle = await fgHost.spawn('coder', {
parentToolCallId: 'call_agent',
prompt: 'Do it',
description: 'Foreground task',
runInBackground: false,
signal,
});
await fgHandle.completion;
expect(fgHandle.progress).toBeUndefined();

// Background: the progress stream receives the child's formatted output.
const bgParent = testAgent();
bgParent.configure();
const bgChild = testAgent();
bgChild.mockNextResponse({ type: 'text', text: detailedText });
const bgHost = new SessionSubagentHost(fakeSession(bgParent.agent, bgChild.agent), 'main');
const bgHandle = await bgHost.spawn('coder', {
parentToolCallId: 'call_agent',
prompt: 'Do it',
description: 'Background task',
runInBackground: true,
signal,
});
expect(bgHandle.progress).toBeDefined();
const chunks: string[] = [];
bgHandle.progress?.subscribe((chunk) => chunks.push(chunk));
await bgHandle.completion;
expect(chunks.join('')).toContain('traced every call site');
});
});

describe('Session.createAgent', () => {
Expand Down
Loading