diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index 72784f739..29548a939 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -203,6 +203,247 @@ describe("AgentServer HTTP Mode", () => { }); describe("turn completion", () => { + function stubSessionCleanup(testServer: unknown): { + cleanupSession: (options?: { + completeEventStream?: boolean; + }) => Promise; + eventStreamSender: { + enqueue: ReturnType; + stop: ReturnType; + }; + } { + const cleanupServer = testServer as { + session: unknown; + eventStreamSender: { + enqueue: ReturnType; + stop: ReturnType; + }; + captureCheckpointState: ReturnType; + cleanupSession: (options?: { + completeEventStream?: boolean; + }) => Promise; + }; + cleanupServer.captureCheckpointState = vi.fn(async () => {}); + cleanupServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + cleanupServer.session = { + payload: { run_id: "run-1" }, + pendingHandoffGitState: undefined, + logWriter: { flush: vi.fn(async () => {}) }, + acpConnection: { cleanup: vi.fn(async () => {}) }, + sseController: { close: vi.fn() }, + }; + return cleanupServer; + } + + it("keeps event ingest open for non-terminal session cleanup", async () => { + const testServer = stubSessionCleanup(createServer()); + + await testServer.cleanupSession(); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled(); + }); + + it("stops event ingest for terminal session cleanup without fake task completion", async () => { + const testServer = stubSessionCleanup(createServer()); + + await testServer.cleanupSession({ completeEventStream: true }); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); + }); + + it("writes terminal failure status before completing event ingest", async () => { + const order: string[] = []; + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + errorMessage?: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(() => { + order.push("enqueue"); + }), + stop: vi.fn(async () => { + order.push("stop"); + }), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => { + order.push("update"); + return {}; + }), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "error", + "boom", + ); + + expect(order).toEqual(["enqueue", "update", "stop"]); + expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledWith( + expect.objectContaining({ + type: "notification", + notification: expect.objectContaining({ + method: "_posthog/error", + params: expect.objectContaining({ error: "boom" }), + }), + }), + ); + expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledWith( + "task-1", + "run-1", + { + status: "failed", + error_message: "boom", + }, + ); + }); + + it("still stops event ingest when terminal failure status update fails", async () => { + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + errorMessage?: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => { + throw new Error("update failed"); + }), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "error", + "boom", + ); + + expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledOnce(); + expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledOnce(); + expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); + }); + + it("leaves event ingest open for non-error stop reasons", async () => { + const testServer = new AgentServer({ + port, + jwtPublicKey: TEST_PUBLIC_KEY, + repositoryPath: repo.path, + apiUrl: "http://localhost:8000", + apiKey: "test-api-key", + projectId: 1, + mode: "interactive", + taskId: "test-task-id", + runId: "test-run-id", + }) as unknown as { + eventStreamSender: { + enqueue: (event: Record) => void; + stop: () => Promise; + }; + posthogAPI: { + updateTaskRun: ( + taskId: string, + runId: string, + payload: Record, + ) => Promise; + }; + signalTaskComplete( + payload: JwtPayload, + stopReason: string, + ): Promise; + }; + testServer.eventStreamSender = { + enqueue: vi.fn(), + stop: vi.fn(async () => {}), + }; + testServer.posthogAPI = { + updateTaskRun: vi.fn(async () => ({})), + }; + + await testServer.signalTaskComplete( + { + run_id: "run-1", + task_id: "task-1", + team_id: 1, + user_id: 1, + distinct_id: "distinct-id", + mode: "interactive", + }, + "end_turn", + ); + + expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled(); + expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled(); + expect(testServer.posthogAPI.updateTaskRun).not.toHaveBeenCalled(); + }); + it("persists structured turn completion notifications", () => { const appendRawLine = vi.fn(); const testServer = new AgentServer({ diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index af4998a4a..e39c1c323 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -54,6 +54,7 @@ import { normalizeCloudPromptContent, promptBlocksToText, } from "./cloud-prompt"; +import { TaskRunEventStreamSender } from "./event-stream-sender"; import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt"; import { handoffLocalGitStateSchema, @@ -228,6 +229,7 @@ export class AgentServer { private session: ActiveSession | null = null; private app: Hono; private posthogAPI: PostHogAPIClient; + private eventStreamSender: TaskRunEventStreamSender | null = null; private questionRelayedToSlack = false; private detectedPrUrl: string | null = null; private lastReportedBranch: string | null = null; @@ -292,6 +294,17 @@ export class AgentServer { getApiKey: () => config.apiKey, userAgent: `posthog/cloud.hog.dev; version: ${config.version ?? packageJson.version}`, }); + if (config.eventIngestToken) { + this.eventStreamSender = new TaskRunEventStreamSender({ + apiUrl: config.apiUrl, + projectId: config.projectId, + taskId: config.taskId, + runId: config.runId, + token: config.eventIngestToken, + logger: this.logger.child("EventIngest"), + streamWindowMs: config.eventIngestStreamWindowMs, + }); + } this.app = this.createApp(); } @@ -555,7 +568,9 @@ export class AgentServer { this.logger.debug("Stopping agent server..."); if (this.session) { - await this.cleanupSession(); + await this.cleanupSession({ completeEventStream: true }); + } else { + await this.eventStreamSender?.stop(); } if (this.server) { @@ -1791,6 +1806,12 @@ ${attributionInstructions} const status = "failed"; + this.enqueueTaskTerminalEvent(POSTHOG_NOTIFICATIONS.ERROR, { + source: "agent_server", + stopReason, + error: errorMessage ?? "Agent error", + }); + try { await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, { status, @@ -1799,9 +1820,28 @@ ${attributionInstructions} this.logger.debug("Task completion signaled", { status, stopReason }); } catch (error) { this.logger.error("Failed to signal task completion", error); + } finally { + await this.eventStreamSender?.stop(); } } + private enqueueTaskTerminalEvent( + method: + | typeof POSTHOG_NOTIFICATIONS.TASK_COMPLETE + | typeof POSTHOG_NOTIFICATIONS.ERROR, + params: Record, + ): void { + this.eventStreamSender?.enqueue({ + type: "notification", + timestamp: new Date().toISOString(), + notification: { + jsonrpc: "2.0", + method, + params, + }, + }); + } + private configureEnvironment({ isInternal = false, }: { @@ -2199,7 +2239,11 @@ ${attributionInstructions} } } - private async cleanupSession(): Promise { + private async cleanupSession({ + completeEventStream = false, + }: { + completeEventStream?: boolean; + } = {}): Promise { if (!this.session) return; this.logger.debug("Cleaning up session"); @@ -2238,6 +2282,10 @@ ${attributionInstructions} this.session.sseController.close(); } + if (completeEventStream) { + await this.eventStreamSender?.stop(); + } + this.pendingEvents = []; this.lastReportedBranch = null; this.session = null; @@ -2321,9 +2369,13 @@ ${attributionInstructions} } private broadcastEvent(event: Record): void { + if (!this.session) return; + + this.eventStreamSender?.enqueue(event); + if (this.session?.sseController) { this.sendSseEvent(this.session.sseController, event); - } else if (this.session) { + } else { // Buffer events during initialization (sseController not yet attached) this.pendingEvents.push(event); } diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index d72a91ba3..36bfe7a0e 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -32,6 +32,15 @@ const envSchema = z.object({ POSTHOG_CODE_REASONING_EFFORT: z .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), + POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), + POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z + .string() + .regex( + /^[1-9]\d*$/, + "POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS must be a positive integer", + ) + .transform((value) => parseInt(value, 10)) + .optional(), }); const program = new Command(); @@ -148,6 +157,9 @@ program const server = new AgentServer({ port: parseInt(options.port, 10), jwtPublicKey: env.JWT_PUBLIC_KEY, + eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN, + eventIngestStreamWindowMs: + env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS, repositoryPath: options.repositoryPath, apiUrl: env.POSTHOG_API_URL, apiKey: env.POSTHOG_PERSONAL_API_KEY, diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index 3ab5f9f6b..60b97e9ca 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -67,6 +67,46 @@ function responseForBody(body: string, lastAcceptedSeq = 0): Response { }); } +type StreamingRequestInit = RequestInit & { duplex: "half" }; + +function createFetchStreamingUpload({ + url, + headers, + abortController, +}: { + url: string; + headers: Record; + abortController: AbortController; +}) { + const bodyStream = new TransformStream(); + const writer = bodyStream.writable.getWriter(); + const requestInit: StreamingRequestInit = { + method: "POST", + headers, + body: bodyStream.readable as BodyInit, + signal: abortController.signal, + duplex: "half", + }; + + return { + write(chunk: Uint8Array): Promise { + return writer.write(chunk); + }, + close(): Promise { + return writer.close(); + }, + async abort(): Promise { + abortController.abort(); + try { + await writer.abort(); + } catch { + // The fetch mock may have already closed the body reader. + } + }, + responsePromise: fetch(url, requestInit), + }; +} + function createSender( options: Partial< ConstructorParameters[0] @@ -79,6 +119,7 @@ function createSender( runId: "run-1", token: "ingest-token", logger: new Logger({ debug: false }), + createStreamingUpload: createFetchStreamingUpload, ...options, }); } @@ -198,6 +239,39 @@ describe("TaskRunEventStreamSender", () => { ]); }); + it("closes an idle active ingest request after the stream window elapses", async () => { + const requestBodies: string[] = []; + let activeStreamClosed = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return responseForBody(await readRequestBody(init)); + } + + const body = await readRequestBody(init); + activeStreamClosed = true; + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await waitFor(() => fetchMock.mock.calls.length === 2); + expect(activeStreamClosed).toBe(false); + + await waitFor(() => activeStreamClosed, 200); + expect(eventSequences(requestBodies[0])).toEqual([1]); + expect(completionSequences(requestBodies[0])).toEqual([]); + + await sender.stop(); + + expect(eventSequences(requestBodies[1])).toEqual([]); + expect(completionSequences(requestBodies[1])).toEqual([1]); + }); + it("aborts a stuck ingest response after closing the request body", async () => { let aborted = false; const fetchMock = vi.fn( diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 0c3ca6554..0620bd758 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -1,5 +1,10 @@ import { Buffer } from "node:buffer"; import type { Logger } from "../utils/logger"; +import { + createNodeStreamingUpload, + type StreamingUpload, + type StreamingUploadFactory, +} from "./streaming-upload"; interface TaskRunEventStreamSenderConfig { apiUrl: string; @@ -13,12 +18,11 @@ interface TaskRunEventStreamSenderConfig { retryDelayMs?: number; requestTimeoutMs?: number; stopTimeoutMs?: number; - maxBatchEvents?: number; - maxBatchBytes?: number; maxEventBytes?: number; maxStreamEvents?: number; maxStreamBytes?: number; streamWindowMs?: number; + createStreamingUpload?: StreamingUploadFactory; } interface EventEnvelope { @@ -32,21 +36,20 @@ interface IngestResponse { interface ActiveStream { abortController: AbortController; - writer: WritableStreamDefaultWriter; + upload: StreamingUpload; responsePromise: Promise; startedAtMs: number; sentThroughSeq: number; sentEvents: number; sentBytes: number; + windowTimer: ReturnType | null; } -type StreamingRequestInit = RequestInit & { duplex: "half" }; - const DEFAULT_MAX_BUFFERED_EVENTS = 20_000; const DEFAULT_MAX_STREAM_EVENTS = 900; const DEFAULT_MAX_STREAM_BYTES = 4_000_000; const DEFAULT_MAX_EVENT_BYTES = 900_000; -const DEFAULT_WRITE_DELAY_MS = 0; +const DEFAULT_FLUSH_DELAY_MS = 0; const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_STOP_TIMEOUT_MS = 30_000; @@ -59,17 +62,18 @@ export class TaskRunEventStreamSender { private readonly maxStreamEvents: number; private readonly maxStreamBytes: number; private readonly maxEventBytes: number; - private readonly writeDelayMs: number; + private readonly flushDelayMs: number; private readonly retryDelayMs: number; private readonly requestTimeoutMs: number; private readonly stopTimeoutMs: number; private readonly streamWindowMs: number; + private readonly createStreamingUpload: StreamingUploadFactory; private readonly encoder = new TextEncoder(); private sequence = 0; private lastKnownAcceptedSeq = 0; private bufferedEvents: EventEnvelope[] = []; - private writeTimer: ReturnType | null = null; - private writePromise: Promise | null = null; + private flushTimer: ReturnType | null = null; + private flushPromise: Promise | null = null; private streamClosePromise: Promise | null = null; private activeStream: ActiveStream | null = null; private stopPromise: Promise | null = null; @@ -87,19 +91,17 @@ export class TaskRunEventStreamSender { )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; this.maxBufferedEvents = config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS; - this.maxStreamEvents = - config.maxStreamEvents ?? - config.maxBatchEvents ?? - DEFAULT_MAX_STREAM_EVENTS; - this.maxStreamBytes = - config.maxStreamBytes ?? config.maxBatchBytes ?? DEFAULT_MAX_STREAM_BYTES; + this.maxStreamEvents = config.maxStreamEvents ?? DEFAULT_MAX_STREAM_EVENTS; + this.maxStreamBytes = config.maxStreamBytes ?? DEFAULT_MAX_STREAM_BYTES; this.maxEventBytes = config.maxEventBytes ?? DEFAULT_MAX_EVENT_BYTES; - this.writeDelayMs = config.flushDelayMs ?? DEFAULT_WRITE_DELAY_MS; + this.flushDelayMs = config.flushDelayMs ?? DEFAULT_FLUSH_DELAY_MS; this.retryDelayMs = config.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS; this.requestTimeoutMs = config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; + this.createStreamingUpload = + config.createStreamingUpload ?? createNodeStreamingUpload; } enqueue(event: Record): void { @@ -114,7 +116,7 @@ export class TaskRunEventStreamSender { event, }; this.bufferedEvents.push(envelope); - this.scheduleWrite(); + this.scheduleFlush(); } async stop(): Promise { @@ -125,20 +127,20 @@ export class TaskRunEventStreamSender { this.stopped = true; - if (this.writeTimer) { - clearTimeout(this.writeTimer); - this.writeTimer = null; + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; } this.stopPromise = this.drainForStop(); await this.stopPromise; } - private scheduleWrite(delayMs = this.writeDelayMs): void { - if (this.writeTimer || this.writePromise || this.stopped) return; + private scheduleFlush(delayMs = this.flushDelayMs): void { + if (this.flushTimer || this.flushPromise || this.stopped) return; - this.writeTimer = setTimeout(() => { - this.writeTimer = null; + this.flushTimer = setTimeout(() => { + this.flushTimer = null; void this.flush(); }, delayMs); } @@ -180,8 +182,8 @@ export class TaskRunEventStreamSender { } private async flush(): Promise { - if (this.writePromise) { - await this.writePromise.catch(() => undefined); + if (this.flushPromise) { + await this.flushPromise.catch(() => undefined); } if (this.bufferedEvents.length === 0) { @@ -189,11 +191,11 @@ export class TaskRunEventStreamSender { } const previousBufferLength = this.bufferedEvents.length; - const writePromise = this.writeBufferedEvents(); - this.writePromise = writePromise; + const flushPromise = this.flushBufferedEvents(); + this.flushPromise = flushPromise; try { - await writePromise; + await flushPromise; return this.bufferedEvents.length < previousBufferLength; } catch (error) { this.config.logger.warn( @@ -202,20 +204,20 @@ export class TaskRunEventStreamSender { ); await this.abortActiveStream(); if (!this.stopped) { - this.scheduleWrite(this.retryDelayMs); + this.scheduleFlush(this.retryDelayMs); } return false; } finally { - if (this.writePromise === writePromise) { - this.writePromise = null; + if (this.flushPromise === flushPromise) { + this.flushPromise = null; } if (!this.stopped && this.hasUnwrittenBufferedEvents()) { - this.scheduleWrite(0); + this.scheduleFlush(0); } } } - private async writeBufferedEvents(): Promise { + private async flushBufferedEvents(): Promise { while (true) { const stream = await this.ensureActiveStream(); const nextEvent = this.bufferedEvents.find( @@ -232,7 +234,7 @@ export class TaskRunEventStreamSender { continue; } - await stream.writer.write(this.encoder.encode(line)); + await stream.upload.write(this.encoder.encode(line)); stream.sentThroughSeq = nextEvent.seq; stream.sentEvents += 1; stream.sentBytes += lineBytes; @@ -254,7 +256,7 @@ export class TaskRunEventStreamSender { (event) => event.seq > stream.sentThroughSeq, ); if (hasUnwrittenEvents) { - await this.writeBufferedEvents(); + await this.flushBufferedEvents(); continue; } @@ -272,7 +274,7 @@ export class TaskRunEventStreamSender { continue; } - await stream.writer.write(this.encoder.encode(line)); + await stream.upload.write(this.encoder.encode(line)); stream.sentBytes += lineBytes; return; } @@ -310,32 +312,77 @@ export class TaskRunEventStreamSender { await this.syncSequenceWithServer(); - const bodyStream = new TransformStream(); const abortController = new AbortController(); - const requestInit: StreamingRequestInit = { - method: "POST", + const upload = this.createStreamingUpload({ + url: this.ingestUrl, headers: this.buildHeaders(), - body: bodyStream.readable as BodyInit, - signal: abortController.signal, - duplex: "half", - }; - const responsePromise = fetch(this.ingestUrl, requestInit); + abortController, + }); const activeStream: ActiveStream = { abortController, - writer: bodyStream.writable.getWriter(), - responsePromise, + upload, + responsePromise: upload.responsePromise, startedAtMs: Date.now(), sentThroughSeq: this.lastKnownAcceptedSeq, sentEvents: 0, sentBytes: 0, + windowTimer: null, }; this.activeStream = activeStream; - responsePromise.catch((error) => { + this.scheduleStreamWindowClose(activeStream); + upload.responsePromise.catch((error) => { void this.handleActiveStreamResponseFailure(activeStream, error); }); return activeStream; } + private scheduleStreamWindowClose( + stream: ActiveStream, + delayOverrideMs?: number, + ): void { + this.clearStreamWindowClose(stream); + // Rotate long-lived uploads even when the agent goes idle; this is a + // transport boundary, not a batching window. + const delayMs = + delayOverrideMs ?? + Math.max(0, stream.startedAtMs + this.streamWindowMs - Date.now()); + stream.windowTimer = setTimeout(() => { + stream.windowTimer = null; + void this.closeExpiredStream(stream); + }, delayMs); + } + + private clearStreamWindowClose(stream: ActiveStream): void { + if (!stream.windowTimer) { + return; + } + clearTimeout(stream.windowTimer); + stream.windowTimer = null; + } + + private async closeExpiredStream(stream: ActiveStream): Promise { + if (this.activeStream !== stream || this.stopped) { + return; + } + + if (this.flushPromise) { + this.scheduleStreamWindowClose(stream, 50); + return; + } + + try { + await this.closeActiveStream(); + } catch (error) { + this.config.logger.warn( + "Task run event ingest stream window close failed", + this.describeError(error), + ); + if (!this.stopped && this.bufferedEvents.length > 0) { + this.scheduleFlush(this.retryDelayMs); + } + } + } + private async handleActiveStreamResponseFailure( stream: ActiveStream, error: unknown, @@ -357,7 +404,7 @@ export class TaskRunEventStreamSender { ); } if (!this.stopped && this.bufferedEvents.length > 0) { - this.scheduleWrite(this.retryDelayMs); + this.scheduleFlush(this.retryDelayMs); } } @@ -377,6 +424,7 @@ export class TaskRunEventStreamSender { try { await closePromise; } finally { + this.clearStreamWindowClose(stream); if (this.activeStream === stream) { this.activeStream = null; } @@ -388,7 +436,7 @@ export class TaskRunEventStreamSender { private async closeStream(stream: ActiveStream): Promise { try { - await stream.writer.close(); + await stream.upload.close(); } catch (error) { stream.abortController.abort(); this.sequenceSynced = false; @@ -418,10 +466,11 @@ export class TaskRunEventStreamSender { } stream.abortController.abort(); + this.clearStreamWindowClose(stream); try { - await stream.writer.abort(); + await stream.upload.abort(); } catch { - // The writer may already be closed by fetch after the abort. + // The upload may already be closed by the transport after the abort. } finally { if (this.activeStream === stream) { this.activeStream = null; diff --git a/packages/agent/src/server/streaming-upload.ts b/packages/agent/src/server/streaming-upload.ts new file mode 100644 index 000000000..fbc34d8dc --- /dev/null +++ b/packages/agent/src/server/streaming-upload.ts @@ -0,0 +1,160 @@ +import { Buffer } from "node:buffer"; +import { + type ClientRequest, + request as httpRequest, + type IncomingHttpHeaders, +} from "node:http"; +import { request as httpsRequest } from "node:https"; +import { URL } from "node:url"; + +export interface StreamingUpload { + write(chunk: Uint8Array): Promise; + close(): Promise; + abort(): Promise; + responsePromise: Promise; +} + +export interface StreamingUploadFactoryInput { + url: string; + headers: Record; + abortController: AbortController; +} + +export type StreamingUploadFactory = ( + input: StreamingUploadFactoryInput, +) => StreamingUpload; + +function headersFromIncoming(headers: IncomingHttpHeaders): Headers { + const result = new Headers(); + for (const [name, value] of Object.entries(headers)) { + if (value === undefined) { + continue; + } + if (Array.isArray(value)) { + for (const item of value) { + result.append(name, item); + } + } else { + result.set(name, String(value)); + } + } + return result; +} + +function abortError(): Error { + const error = new Error("aborted"); + error.name = "AbortError"; + return error; +} + +function writeRequestChunk( + request: ClientRequest, + chunk: Uint8Array, +): Promise { + return new Promise((resolve, reject) => { + const onError = (error: Error): void => { + request.off("error", onError); + reject(error); + }; + request.once("error", onError); + request.write(Buffer.from(chunk), (error?: Error | null) => { + request.off("error", onError); + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +function closeRequest(request: ClientRequest): Promise { + return new Promise((resolve, reject) => { + const onError = (error: Error): void => { + request.off("error", onError); + reject(error); + }; + request.once("error", onError); + request.end(() => { + request.off("error", onError); + resolve(); + }); + }); +} + +export function createNodeStreamingUpload({ + url, + headers, + abortController, +}: StreamingUploadFactoryInput): StreamingUpload { + const parsedUrl = new URL(url); + const requestFactory = + parsedUrl.protocol === "https:" + ? httpsRequest + : parsedUrl.protocol === "http:" + ? httpRequest + : undefined; + if (!requestFactory) { + throw new Error(`Unsupported event ingest protocol: ${parsedUrl.protocol}`); + } + const request = requestFactory(parsedUrl, { + method: "POST", + headers, + }); + + let closed = false; + const responsePromise = new Promise((resolve, reject) => { + request.on("response", (response) => { + const chunks: Buffer[] = []; + response.on("data", (chunk: Buffer | string) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + response.on("end", () => { + resolve( + new Response(Buffer.concat(chunks), { + status: response.statusCode ?? 0, + statusText: response.statusMessage, + headers: headersFromIncoming(response.headers), + }), + ); + }); + response.on("error", reject); + }); + request.on("error", reject); + }); + + const abortRequest = (): void => { + closed = true; + if (!request.destroyed) { + request.destroy(abortError()); + } + }; + abortController.signal.addEventListener("abort", abortRequest, { + once: true, + }); + void responsePromise + .finally(() => { + abortController.signal.removeEventListener("abort", abortRequest); + }) + .catch(() => undefined); + + return { + async write(chunk: Uint8Array): Promise { + if (closed) { + throw new Error("Cannot write to closed event ingest stream"); + } + await writeRequestChunk(request, chunk); + }, + async close(): Promise { + if (closed) { + return; + } + closed = true; + await closeRequest(request); + }, + async abort(): Promise { + abortRequest(); + }, + responsePromise, + }; +} diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index 10cf96fc7..d11cb7748 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -15,6 +15,8 @@ export interface AgentServerConfig { apiKey: string; projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification + eventIngestToken?: string; + eventIngestStreamWindowMs?: number; mode: AgentMode; taskId: string; runId: string;