diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts new file mode 100644 index 000000000..1e1bbfde1 --- /dev/null +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -0,0 +1,640 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { Logger } from "../utils/logger"; +import { TaskRunEventStreamSender } from "./event-stream-sender"; + +const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; + +async function readRequestBody(init?: RequestInit): Promise { + const body = init?.body; + if (!body) { + return ""; + } + if (typeof body === "string") { + return body; + } + if (body instanceof Uint8Array) { + return new TextDecoder().decode(body); + } + if (body instanceof ReadableStream) { + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + const length = chunks.reduce((sum, chunk) => sum + chunk.length, 0); + const bytes = new Uint8Array(length); + let offset = 0; + for (const chunk of chunks) { + bytes.set(chunk, offset); + offset += chunk.length; + } + return new TextDecoder().decode(bytes); + } + return String(body); +} + +function parseLines(body: string): Record[] { + const trimmed = body.trim(); + if (!trimmed) { + return []; + } + return trimmed.split("\n").map((line) => JSON.parse(line)); +} + +function eventSequences(body: string): number[] { + return parseLines(body) + .map((line) => line.seq) + .filter((seq): seq is number => typeof seq === "number"); +} + +function completionSequences(body: string): number[] { + return parseLines(body) + .filter((line) => line.type === STREAM_COMPLETE_CONTROL_TYPE) + .map((line) => line.final_seq) + .filter((seq): seq is number => typeof seq === "number"); +} + +function responseForBody(body: string, lastAcceptedSeq = 0): Response { + const sequences = eventSequences(body); + const acceptedSeq = sequences.at(-1) ?? lastAcceptedSeq; + return new Response(JSON.stringify({ last_accepted_seq: acceptedSeq }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); +} + +function createSender( + options: Partial< + ConstructorParameters[0] + > = {}, +): TaskRunEventStreamSender { + return new TaskRunEventStreamSender({ + apiUrl: "http://localhost:8000/", + projectId: 1, + taskId: "task-1", + runId: "run-1", + token: "ingest-token", + logger: new Logger({ debug: false }), + ...options, + }); +} + +describe("TaskRunEventStreamSender", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("streams ordered NDJSON events with the run-scoped token", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender(); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[1][0]).toBe( + "http://localhost:8000/api/projects/1/tasks/task-1/runs/run-1/event_stream/", + ); + expect(fetchMock.mock.calls[1][1]?.headers).toEqual({ + Authorization: "Bearer ingest-token", + "Content-Type": "application/x-ndjson", + }); + expect(fetchMock.mock.calls[1][1]?.headers).not.toHaveProperty( + "X-PostHog-Event-Stream-Complete", + ); + + expect(parseLines(requestBodies[1])).toEqual([ + { + seq: 1, + event: { type: "notification", notification: { method: "first" } }, + }, + { + seq: 2, + event: { type: "notification", notification: { method: "second" } }, + }, + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 }, + ]); + }); + + it("aborts a stuck ingest response after closing the request body", async () => { + let aborted = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + void readRequestBody(init); + return new Promise((_resolve, reject) => { + init.signal?.addEventListener("abort", () => { + aborted = true; + reject(new DOMException("aborted", "AbortError")); + }); + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + requestTimeoutMs: 1, + retryDelayMs: 1, + stopTimeoutMs: 1, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(aborted).toBe(true); + }); + + it("waits for the final ingest response before stop resolves", async () => { + const ingestRequest: { resolve?: (response: Response) => void } = {}; + let stopped = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + void readRequestBody(init); + return new Promise((resolve) => { + ingestRequest.resolve = resolve; + }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender(); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + const stopPromise = sender.stop().then(() => { + stopped = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(stopped).toBe(false); + + const resolveIngest = ingestRequest.resolve; + if (!resolveIngest) { + throw new Error("expected ingest request to be in flight"); + } + resolveIngest( + new Response(JSON.stringify({ last_accepted_seq: 1 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }), + ); + await stopPromise; + + expect(stopped).toBe(true); + }); + + it("streams only a completion control line on shutdown without buffered events", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender(); + + await sender.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(requestBodies[0]).toBe(""); + expect(parseLines(requestBodies[1])).toEqual([ + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 0 }, + ]); + }); + + it.each([ + { + name: "when the buffer is full", + senderOptions: { maxBufferedEvents: 1 }, + events: [ + { type: "notification", notification: { method: "first" } }, + { type: "notification", notification: { method: "second" } }, + ], + acceptedMethod: "first", + }, + { + name: "when an event is oversized", + senderOptions: { maxEventBytes: 120 }, + events: [ + { + type: "notification", + notification: { + method: "oversized", + params: { message: "x".repeat(200) }, + }, + }, + { type: "notification", notification: { method: "small" } }, + ], + acceptedMethod: "small", + }, + ])( + "drops events before assigning sequence $name", + async ({ senderOptions, events, acceptedMethod }) => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender(senderOptions); + + for (const event of events) { + sender.enqueue(event); + } + await sender.stop(); + + expect(parseLines(requestBodies[1])).toEqual([ + { + seq: 1, + event: { + type: "notification", + notification: { method: acceptedMethod }, + }, + }, + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 1 }, + ]); + }, + ); + + it("accepts an event at the next sequence size boundary", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const event = { + type: "notification", + notification: { method: "boundary" }, + }; + const maxEventBytes = new TextEncoder().encode( + JSON.stringify({ seq: 1, event }), + ).length; + + const sender = createSender({ maxEventBytes }); + + sender.enqueue(event); + await sender.stop(); + + expect(parseLines(requestBodies[1])).toEqual([ + { + seq: 1, + event: { + type: "notification", + notification: { method: "boundary" }, + }, + }, + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 1 }, + ]); + }); + + it("rolls capped streams on stop", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ maxStreamEvents: 1 }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies).toHaveLength(3); + expect(eventSequences(requestBodies[1])).toEqual([1]); + expect(completionSequences(requestBodies[1])).toEqual([]); + expect(eventSequences(requestBodies[2])).toEqual([2]); + expect(completionSequences(requestBodies[2])).toEqual([2]); + }); + + it("retries stop drain after a transient ingest failure", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + requestBodies.push(body); + if (requestBodies.length === 1) { + return new Response("temporary failure", { status: 503 }); + } + + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(requestBodies.map(eventSequences)).toEqual([[1], [1]]); + expect(completionSequences(requestBodies[1])).toEqual([1]); + }); + + it("retries when the active stream response rejects before shutdown", async () => { + const requestBodies: string[] = []; + let failedStream = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + if (!failedStream) { + failedStream = true; + throw new TypeError("fetch failed"); + } + + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(requestBodies.map(eventSequences)).toEqual([[1]]); + expect(completionSequences(requestBodies[0])).toEqual([1]); + }); + + it("stops retrying after the stop deadline", async () => { + const requestBodies: string[] = []; + const warnings: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + requestBodies.push(body); + return new Response("temporary failure", { status: 503 }); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + logger: new Logger({ + debug: false, + onLog: (level, _scope, message) => { + if (level === "warn") { + warnings.push(message); + } + }, + }), + retryDelayMs: 5, + stopTimeoutMs: 1, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(requestBodies).toHaveLength(1); + expect(eventSequences(requestBodies[0])).toEqual([1]); + expect(warnings).toContain( + "Task run event ingest stop deadline reached before fully completing transport", + ); + }); + + it("continues after a payload error acknowledges a valid prefix", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBodies.push(body); + + if (requestBodies.length === 1) { + return new Response( + JSON.stringify({ + error: "Too many events in request", + last_accepted_seq: 1, + }), + { + status: 413, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies.map(eventSequences)).toEqual([[1, 2], [2]]); + expect(completionSequences(requestBodies[1])).toEqual([2]); + }); + + it("starts after the server's last accepted sequence on restart", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 42 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender(); + + sender.enqueue({ + type: "notification", + notification: { method: "after-restart" }, + }); + sender.enqueue({ type: "notification", notification: { method: "next" } }); + await sender.stop(); + + expect(eventSequences(requestBodies[0])).toEqual([43, 44]); + expect(completionSequences(requestBodies[0])).toEqual([44]); + }); + + it("rebases buffered events after a sequence gap response", async () => { + const requestBodies: string[] = []; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + return new Response(JSON.stringify({ last_accepted_seq: 42 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + requestBodies.push(body); + if (requestBodies.length === 1) { + return new Response( + JSON.stringify({ + error: "Expected sequence 1, got 43", + last_accepted_seq: 0, + }), + { + status: 409, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ + type: "notification", + notification: { method: "after-expiry" }, + }); + sender.enqueue({ type: "notification", notification: { method: "next" } }); + await sender.stop(); + + expect(requestBodies.map(eventSequences)).toEqual([ + [43, 44], + [1, 2], + ]); + expect(completionSequences(requestBodies[1])).toEqual([2]); + }); + + it("reconnects and replays only events after the server's accepted prefix", async () => { + const requestBodies: string[] = []; + let syncCount = 0; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + if (!body) { + syncCount += 1; + return new Response( + JSON.stringify({ last_accepted_seq: syncCount === 1 ? 0 : 1 }), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + requestBodies.push(body); + if (requestBodies.length === 1) { + throw new Error("connection reset"); + } + + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + retryDelayMs: 1, + stopTimeoutMs: 100, + }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + sender.enqueue({ + type: "notification", + notification: { method: "second" }, + }); + await sender.stop(); + + expect(requestBodies.map(eventSequences)).toEqual([[1, 2], [2]]); + expect(completionSequences(requestBodies[1])).toEqual([2]); + }); +}); diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts new file mode 100644 index 000000000..1c0808249 --- /dev/null +++ b/packages/agent/src/server/event-stream-sender.ts @@ -0,0 +1,647 @@ +import { Buffer } from "node:buffer"; +import type { Logger } from "../utils/logger"; + +interface TaskRunEventStreamSenderConfig { + apiUrl: string; + projectId: number; + taskId: string; + runId: string; + token: string; + logger: Logger; + maxBufferedEvents?: number; + flushDelayMs?: number; + retryDelayMs?: number; + requestTimeoutMs?: number; + stopTimeoutMs?: number; + maxBatchEvents?: number; + maxBatchBytes?: number; + maxEventBytes?: number; + maxStreamEvents?: number; + maxStreamBytes?: number; + streamWindowMs?: number; +} + +interface EventEnvelope { + seq: number; + event: Record; +} + +interface IngestResponse { + last_accepted_seq?: unknown; +} + +interface ActiveStream { + abortController: AbortController; + writer: WritableStreamDefaultWriter; + responsePromise: Promise; + startedAtMs: number; + sentThroughSeq: number; + sentEvents: number; + sentBytes: number; +} + +type StreamingRequestInit = RequestInit & { duplex: "half" }; + +const DEFAULT_MAX_BUFFERED_EVENTS = 20_000; +const DEFAULT_MAX_STREAM_EVENTS = 900; +const DEFAULT_MAX_STREAM_BYTES = 4_000_000; +const DEFAULT_MAX_EVENT_BYTES = 900_000; +const DEFAULT_WRITE_DELAY_MS = 0; +const DEFAULT_RETRY_DELAY_MS = 1_000; +const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; +const DEFAULT_STOP_TIMEOUT_MS = 30_000; +const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000; +const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; + +export class TaskRunEventStreamSender { + private readonly ingestUrl: string; + private readonly maxBufferedEvents: number; + private readonly maxStreamEvents: number; + private readonly maxStreamBytes: number; + private readonly maxEventBytes: number; + private readonly writeDelayMs: number; + private readonly retryDelayMs: number; + private readonly requestTimeoutMs: number; + private readonly stopTimeoutMs: number; + private readonly streamWindowMs: number; + private readonly encoder = new TextEncoder(); + private sequence = 0; + private lastKnownAcceptedSeq = 0; + private bufferedEvents: EventEnvelope[] = []; + private writeTimer: ReturnType | null = null; + private writePromise: Promise | null = null; + private streamClosePromise: Promise | null = null; + private activeStream: ActiveStream | null = null; + private stopPromise: Promise | null = null; + private stopped = false; + private sequenceSynced = false; + private sequenceInitialized = false; + private transportCompleted = false; + private droppedBeforeSequenceCount = 0; + private bufferRevision = 0; + + constructor(private readonly config: TaskRunEventStreamSenderConfig) { + const apiUrl = config.apiUrl.replace(/\/$/, ""); + this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent( + config.taskId, + )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; + this.maxBufferedEvents = + config.maxBufferedEvents ?? DEFAULT_MAX_BUFFERED_EVENTS; + this.maxStreamEvents = + config.maxStreamEvents ?? + config.maxBatchEvents ?? + DEFAULT_MAX_STREAM_EVENTS; + this.maxStreamBytes = + config.maxStreamBytes ?? config.maxBatchBytes ?? DEFAULT_MAX_STREAM_BYTES; + this.maxEventBytes = config.maxEventBytes ?? DEFAULT_MAX_EVENT_BYTES; + this.writeDelayMs = config.flushDelayMs ?? DEFAULT_WRITE_DELAY_MS; + this.retryDelayMs = config.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS; + this.requestTimeoutMs = + config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; + this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; + this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; + } + + enqueue(event: Record): void { + if (this.stopped) return; + + if (!this.canAcceptEvent(event)) { + return; + } + + const envelope: EventEnvelope = { + seq: ++this.sequence, + event, + }; + this.bufferedEvents.push(envelope); + this.scheduleWrite(); + } + + async stop(): Promise { + if (this.stopPromise) { + await this.stopPromise; + return; + } + + this.stopped = true; + + if (this.writeTimer) { + clearTimeout(this.writeTimer); + this.writeTimer = null; + } + + this.stopPromise = this.drainForStop(); + await this.stopPromise; + } + + private scheduleWrite(delayMs = this.writeDelayMs): void { + if (this.writeTimer || this.writePromise || this.stopped) return; + + this.writeTimer = setTimeout(() => { + this.writeTimer = null; + void this.flush(); + }, delayMs); + } + + private async drainForStop(): Promise { + const startedAtMs = Date.now(); + const deadlineAtMs = startedAtMs + this.stopTimeoutMs; + + while (!this.transportCompleted) { + const previousLength = this.bufferedEvents.length; + const previousRevision = this.bufferRevision; + + try { + await this.flush(); + await this.writeCompletionLine(); + await this.closeActiveStream(); + this.transportCompleted = true; + return; + } catch (error) { + this.config.logger.warn( + "Task run event ingest stop request failed", + this.describeError(error), + ); + } + + const madeProgress = + this.bufferedEvents.length < previousLength || + this.bufferRevision !== previousRevision; + if (!madeProgress && !(await this.waitBeforeStopRetry(deadlineAtMs))) { + this.warnStopDeadlineReached(startedAtMs); + return; + } + + if (Date.now() >= deadlineAtMs && !this.transportCompleted) { + this.warnStopDeadlineReached(startedAtMs); + return; + } + } + } + + private async flush(): Promise { + if (this.writePromise) { + await this.writePromise.catch(() => undefined); + } + + if (this.bufferedEvents.length === 0) { + return true; + } + + const previousBufferLength = this.bufferedEvents.length; + const writePromise = this.writeBufferedEvents(); + this.writePromise = writePromise; + + try { + await writePromise; + if (!this.stopped) { + await this.closeActiveStream(); + } + return this.bufferedEvents.length < previousBufferLength; + } catch (error) { + this.config.logger.warn( + "Task run event ingest stream write failed", + this.describeError(error), + ); + await this.abortActiveStream(); + if (!this.stopped) { + this.scheduleWrite(this.retryDelayMs); + } + return false; + } finally { + if (this.writePromise === writePromise) { + this.writePromise = null; + } + if (!this.stopped && this.hasUnwrittenBufferedEvents()) { + this.scheduleWrite(0); + } + } + } + + private async writeBufferedEvents(): Promise { + while (true) { + const stream = await this.ensureActiveStream(); + const nextEvent = this.bufferedEvents.find( + (event) => event.seq > stream.sentThroughSeq, + ); + if (!nextEvent) { + return; + } + + const line = `${this.serializeEnvelope(nextEvent)}\n`; + const lineBytes = Buffer.byteLength(line, "utf8"); + if (this.shouldRollStreamBeforeWriting(stream, lineBytes)) { + await this.closeActiveStream(); + continue; + } + + await stream.writer.write(this.encoder.encode(line)); + stream.sentThroughSeq = nextEvent.seq; + stream.sentEvents += 1; + stream.sentBytes += lineBytes; + } + } + + private hasUnwrittenBufferedEvents(): boolean { + const sentThroughSeq = + this.activeStream?.sentThroughSeq ?? this.lastKnownAcceptedSeq; + return this.bufferedEvents.some((event) => event.seq > sentThroughSeq); + } + + private async writeCompletionLine(): Promise { + await this.syncSequenceWithServer(); + + while (true) { + const stream = await this.ensureActiveStream(); + const hasUnwrittenEvents = this.bufferedEvents.some( + (event) => event.seq > stream.sentThroughSeq, + ); + if (hasUnwrittenEvents) { + await this.writeBufferedEvents(); + continue; + } + + const line = `${JSON.stringify({ + type: STREAM_COMPLETE_CONTROL_TYPE, + final_seq: this.sequence, + })}\n`; + const lineBytes = Buffer.byteLength(line, "utf8"); + if ( + this.shouldRollStreamBeforeWriting(stream, lineBytes, { + ignoreEventCount: true, + }) + ) { + await this.closeActiveStream(); + continue; + } + + await stream.writer.write(this.encoder.encode(line)); + stream.sentBytes += lineBytes; + return; + } + } + + private shouldRollStreamBeforeWriting( + stream: ActiveStream, + lineBytes: number, + options: { ignoreEventCount?: boolean } = {}, + ): boolean { + if ( + !options.ignoreEventCount && + stream.sentEvents > 0 && + stream.sentEvents >= this.maxStreamEvents + ) { + return true; + } + if ( + stream.sentBytes > 0 && + stream.sentBytes + lineBytes > this.maxStreamBytes + ) { + return true; + } + return Date.now() - stream.startedAtMs >= this.streamWindowMs; + } + + private async ensureActiveStream(): Promise { + if (this.streamClosePromise) { + await this.streamClosePromise.catch(() => undefined); + } + + if (this.activeStream) { + return this.activeStream; + } + + await this.syncSequenceWithServer(); + + const bodyStream = new TransformStream(); + const abortController = new AbortController(); + const requestInit: StreamingRequestInit = { + method: "POST", + headers: this.buildHeaders(), + body: bodyStream.readable as BodyInit, + signal: abortController.signal, + duplex: "half", + }; + const responsePromise = fetch(this.ingestUrl, requestInit); + const activeStream: ActiveStream = { + abortController, + writer: bodyStream.writable.getWriter(), + responsePromise, + startedAtMs: Date.now(), + sentThroughSeq: this.lastKnownAcceptedSeq, + sentEvents: 0, + sentBytes: 0, + }; + this.activeStream = activeStream; + responsePromise.catch((error) => { + void this.handleActiveStreamResponseFailure(activeStream, error); + }); + return activeStream; + } + + private async handleActiveStreamResponseFailure( + stream: ActiveStream, + error: unknown, + ): Promise { + if (this.activeStream !== stream) { + return; + } + + this.config.logger.warn( + "Task run event ingest stream request failed", + this.describeError(error), + ); + try { + await this.abortActiveStream(); + } catch (abortError) { + this.config.logger.warn( + "Task run event ingest stream abort failed", + this.describeError(abortError), + ); + } + if (!this.stopped && this.bufferedEvents.length > 0) { + this.scheduleWrite(this.retryDelayMs); + } + } + + private async closeActiveStream(): Promise { + if (this.streamClosePromise) { + await this.streamClosePromise; + return; + } + + const stream = this.activeStream; + if (!stream) { + return; + } + + const closePromise = this.closeStream(stream); + this.streamClosePromise = closePromise; + try { + await closePromise; + } finally { + if (this.activeStream === stream) { + this.activeStream = null; + } + if (this.streamClosePromise === closePromise) { + this.streamClosePromise = null; + } + } + } + + private async closeStream(stream: ActiveStream): Promise { + try { + await stream.writer.close(); + } catch (error) { + stream.abortController.abort(); + this.sequenceSynced = false; + throw error; + } + + let response: Response; + try { + response = await this.waitForResponseWithTimeout( + stream.responsePromise, + stream.abortController, + ); + } catch (error) { + stream.abortController.abort(); + this.sequenceSynced = false; + throw error; + } + + await this.applyIngestResponse(response, "Event ingest stream"); + this.sequenceSynced = true; + } + + private async abortActiveStream(): Promise { + const stream = this.activeStream; + if (!stream) { + return; + } + + stream.abortController.abort(); + try { + await stream.writer.abort(); + } catch { + // The writer may already be closed by fetch after the abort. + } finally { + if (this.activeStream === stream) { + this.activeStream = null; + } + this.sequenceSynced = false; + } + } + + private async waitBeforeStopRetry(deadlineAtMs: number): Promise { + const remainingMs = deadlineAtMs - Date.now(); + if (remainingMs <= 0) { + return false; + } + + await new Promise((resolve) => + setTimeout(resolve, Math.min(this.retryDelayMs, remainingMs)), + ); + return Date.now() < deadlineAtMs; + } + + private warnStopDeadlineReached(startedAtMs: number): void { + this.config.logger.warn( + "Task run event ingest stop deadline reached before fully completing transport", + { + remaining: this.bufferedEvents.length, + stopTimeoutMs: this.stopTimeoutMs, + elapsedMs: Date.now() - startedAtMs, + }, + ); + } + + private async syncSequenceWithServer(): Promise { + if (this.sequenceSynced) return; + + const response = await this.fetchWithTimeout({ + method: "POST", + headers: this.buildHeaders(), + body: "", + }); + const responseBody = await this.parseResponse(response); + + if (!response.ok) { + throw new Error( + `Event ingest sequence sync returned HTTP ${response.status}: ${responseBody.text.slice(0, 300)}`, + ); + } + + const lastAcceptedSeq = responseBody.parsed?.last_accepted_seq; + if (typeof lastAcceptedSeq === "number" && lastAcceptedSeq > 0) { + if (!this.sequenceInitialized) { + this.bufferedEvents = this.bufferedEvents.map((event) => ({ + ...event, + seq: event.seq + lastAcceptedSeq, + })); + this.sequence += lastAcceptedSeq; + this.bufferRevision += 1; + } else { + this.acceptThrough(lastAcceptedSeq); + if (lastAcceptedSeq > this.sequence) { + this.sequence = lastAcceptedSeq; + } + } + this.lastKnownAcceptedSeq = lastAcceptedSeq; + } + + this.sequenceSynced = true; + this.sequenceInitialized = true; + } + + private async fetchWithTimeout(init: RequestInit): Promise { + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, this.requestTimeoutMs); + + try { + return await fetch(this.ingestUrl, { + ...init, + signal: abortController.signal, + }); + } finally { + clearTimeout(timeout); + } + } + + private async waitForResponseWithTimeout( + responsePromise: Promise, + abortController: AbortController, + ): Promise { + const timeout = setTimeout(() => { + abortController.abort(); + }, this.requestTimeoutMs); + + try { + return await responsePromise; + } finally { + clearTimeout(timeout); + } + } + + private async applyIngestResponse( + response: Response, + label: string, + ): Promise { + const responseBody = await this.parseResponse(response); + const lastAcceptedSeq = responseBody.parsed?.last_accepted_seq; + if (typeof lastAcceptedSeq === "number") { + this.acceptThrough(lastAcceptedSeq); + if (lastAcceptedSeq > this.sequence) { + this.sequence = lastAcceptedSeq; + } + this.lastKnownAcceptedSeq = lastAcceptedSeq; + if (response.status === 409) { + this.rebaseBufferedEvents(lastAcceptedSeq); + } + } + + if (!response.ok) { + throw new Error( + `${label} returned HTTP ${response.status}: ${responseBody.text.slice(0, 300)}`, + ); + } + } + + private acceptThrough(lastAcceptedSeq: number): void { + const previousLength = this.bufferedEvents.length; + this.bufferedEvents = this.bufferedEvents.filter( + (event) => event.seq > lastAcceptedSeq, + ); + if (this.bufferedEvents.length !== previousLength) { + this.bufferRevision += 1; + } + } + + private buildHeaders(): Record { + return { + Authorization: `Bearer ${this.config.token}`, + "Content-Type": "application/x-ndjson", + }; + } + + private rebaseBufferedEvents(lastAcceptedSeq: number): void { + let nextSeq = lastAcceptedSeq + 1; + this.bufferedEvents = this.bufferedEvents.map((event) => ({ + ...event, + seq: nextSeq++, + })); + this.sequence = nextSeq - 1; + this.sequenceSynced = true; + this.sequenceInitialized = true; + this.lastKnownAcceptedSeq = lastAcceptedSeq; + this.bufferRevision += 1; + } + + private async parseResponse( + response: Response, + ): Promise<{ parsed: IngestResponse | null; text: string }> { + const text = await response.text(); + if (!text) { + return { parsed: null, text }; + } + + try { + return { parsed: JSON.parse(text) as IngestResponse, text }; + } catch { + return { parsed: null, text }; + } + } + + private canAcceptEvent(event: Record): boolean { + const eventBytes = Buffer.byteLength( + this.serializeEnvelope({ seq: this.sequence + 1, event }), + "utf8", + ); + if (eventBytes > this.maxEventBytes) { + this.config.logger.warn("Dropped oversized task run event", { + eventBytes, + maxEventBytes: this.maxEventBytes, + }); + return false; + } + + if (this.bufferedEvents.length >= this.maxBufferedEvents) { + this.droppedBeforeSequenceCount += 1; + if ( + this.droppedBeforeSequenceCount === 1 || + this.droppedBeforeSequenceCount % 100 === 0 + ) { + this.config.logger.warn( + "Dropped task run event before assigning sequence due to backpressure", + { + dropped: this.droppedBeforeSequenceCount, + maxBufferedEvents: this.maxBufferedEvents, + }, + ); + } + return false; + } + + if (this.droppedBeforeSequenceCount > 0) { + this.config.logger.warn("Task run event ingest recovered after drops", { + dropped: this.droppedBeforeSequenceCount, + }); + this.droppedBeforeSequenceCount = 0; + } + + return true; + } + + private serializeEnvelope(envelope: EventEnvelope): string { + return JSON.stringify({ seq: envelope.seq, event: envelope.event }); + } + + private describeError(error: unknown): unknown { + if (error instanceof Error) { + return { message: error.message, stack: error.stack }; + } + return error; + } +}