From ab01cf45124e240d8a8e578f51373eac1919342d Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 18:26:16 +0200 Subject: [PATCH 1/3] fix(stream): remove completed/errored streams from registry after buffer TTL Every SSE stream that finished (completed or errored) while its original client was still connected stayed in the StreamRegistry forever: the success path called _cleanupStream synchronously before the client's close event fired (so clients.size was still > 0 and nothing was scheduled), the close handler installed by _createNewStream had no cleanup-scheduling branch, and the error path never scheduled cleanup at all. Since every analytics query is one SSE stream whose final event is the full result, a busy server retained up to maxActiveStreams (1000) event buffers, generators, abort controllers, and trace contexts in heap. Factor the TTL-based removal scheduling into _scheduleRemovalAfterTTL and call it from every termination path: both close handlers, the success path, the abort path, and the error path. The event buffer remains available for reconnect replay for bufferTTL after the last client disconnects; a reconnect during the TTL window refreshes lastAccess so the pending timer no-ops and a fresh TTL starts when that client disconnects. Cleanup timers are unref()'d so they don't keep the process alive. Also drop the never-referenced cleanupInterval and maxPersistentBuffers keys from streamDefaults; the per-stream timers now cover every termination path, so no periodic sweeper is wired. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- packages/appkit/src/stream/defaults.ts | 2 - packages/appkit/src/stream/stream-manager.ts | 58 ++++-- .../appkit/src/stream/tests/stream.test.ts | 179 ++++++++++++++++++ 3 files changed, 219 insertions(+), 20 deletions(-) diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index c8fc91591..a7e307b44 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -2,8 +2,6 @@ export const streamDefaults = { bufferSize: 100, maxEventSize: 1024 * 1024, // 1MB bufferTTL: 10 * 60 * 1000, // 10 minutes - cleanupInterval: 5 * 60 * 1000, // 5 minutes - maxPersistentBuffers: 10000, // 10000 buffers heartbeatInterval: 10 * 1000, // 10 seconds maxActiveStreams: 1000, // 1000 streams } as const; diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index f21ff586e..ae98fc717 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -148,13 +148,7 @@ export class StreamManager { } // cleanup if stream is completed and no clients are connected - if (streamEntry.isCompleted && streamEntry.clients.size === 0) { - setTimeout(() => { - if (streamEntry.clients.size === 0) { - this.streamRegistry.remove(streamEntry.streamId); - } - }, this.bufferTTL); - } + this._scheduleRemovalAfterTTL(streamEntry); }); // if stream is completed, close connection @@ -235,6 +229,11 @@ export class StreamManager { new DOMException("Client disconnected", "AbortError"), ); } + + // if the stream already finished (completed or errored), schedule + // registry removal once the buffer TTL elapses so completed streams + // don't accumulate in the registry forever + this._scheduleRemovalAfterTTL(streamEntry); }); await this._processGeneratorInBackground(streamEntry); @@ -288,8 +287,9 @@ export class StreamManager { // close all clients this._closeAllClients(streamEntry); - // cleanup if no clients are connected - this._cleanupStream(streamEntry); + // cleanup if no clients are connected (clients that are still + // connected schedule cleanup from their close handlers instead) + this._scheduleRemovalAfterTTL(streamEntry); } catch (error) { const errorMsg = error instanceof Error ? error.message : "Internal server error"; @@ -301,7 +301,7 @@ export class StreamManager { logger.info("Stream aborted by client (code=%s)", errorCode); streamEntry.isCompleted = true; this._closeAllClients(streamEntry); - this._cleanupStream(streamEntry); + this._scheduleRemovalAfterTTL(streamEntry); return; } @@ -328,6 +328,10 @@ export class StreamManager { true, ); streamEntry.isCompleted = true; + + // cleanup if no clients are connected (clients that are still + // connected schedule cleanup from their close handlers instead) + this._scheduleRemovalAfterTTL(streamEntry); } }); } @@ -400,15 +404,33 @@ export class StreamManager { } } - // cleanup stream if no clients are connected - private _cleanupStream(streamEntry: StreamEntry): void { - if (streamEntry.clients.size === 0) { - setTimeout(() => { - if (streamEntry.clients.size === 0) { - this.streamRegistry.remove(streamEntry.streamId); - } - }, this.bufferTTL); + // schedule registry removal once a finished (completed or errored) stream + // has no connected clients. The event buffer stays available for + // reconnect replay for `bufferTTL` after the last client disconnects; + // after that the stream entry is removed from the registry so it can be + // garbage collected. + private _scheduleRemovalAfterTTL(streamEntry: StreamEntry): void { + if (!streamEntry.isCompleted || streamEntry.clients.size > 0) { + return; } + + // mark the moment the stream became idle so a reconnect during the TTL + // window (which refreshes lastAccess) makes the pending timer a no-op + streamEntry.lastAccess = Date.now(); + + const timer = setTimeout(() => { + // no-op if a client reconnected during the TTL window; the reconnect's + // own close handler schedules a fresh removal when it disconnects + if ( + streamEntry.clients.size === 0 && + Date.now() - streamEntry.lastAccess >= this.bufferTTL + ) { + this.streamRegistry.remove(streamEntry.streamId); + } + }, this.bufferTTL); + + // don't keep the process alive just to clean up finished streams + timer.unref?.(); } private _categorizeError(error: unknown): SSEErrorCode { diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 2939dd4dd..168bfda40 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -731,6 +731,185 @@ describe("StreamManager", () => { }); }); + describe("registry cleanup after stream termination", () => { + // default bufferTTL from streamDefaults + const BUFFER_TTL = 10 * 60 * 1000; + + function getRegistry(manager: StreamManager) { + return (manager as any).streamRegistry as { + has(streamId: string): boolean; + }; + } + + function captureCloseHandler(mockRes: { on: ReturnType }) { + const handlers: (() => void)[] = []; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") handlers.push(handler); + }); + return () => { + for (const handler of handlers) { + handler(); + } + }; + } + + test("removes a completed stream from the registry after buffer TTL once the client disconnects", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-completed-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + // server ended the response; the close event fires asynchronously + fireClose(); + + // buffer is retained for reconnect replay during the TTL window + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + // once the TTL elapses, the stream is removed from the registry + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes an errored stream from the registry after buffer TTL once the client disconnects", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-errored-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "start" }; + throw new Error("Boom"); + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + fireClose(); + + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes an errored stream even when the client disconnected before the error", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-errored-disconnected-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "start" }; + // simulate client going away mid-stream + fireClose(); + throw new Error("Boom"); + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(BUFFER_TTL); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("keeps the buffer available for reconnect replay within the TTL window", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-replay-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + const fireClose1 = captureCloseHandler(mockRes1); + + async function* generator1() { + yield { type: "message", data: "event1" }; + yield { type: "message", data: "event2" }; + } + + await streamManager.stream(mockRes1 as any, generator1, { streamId }); + fireClose1(); + + // reconnect halfway through the TTL window + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + const fireClose2 = captureCloseHandler(mockRes2); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + // missed events are replayed from the buffer + const replayedData = events2 + .filter((e) => e.startsWith("data: ")) + .map((e) => e.replace("data: ", "").replace("\n\n", "")); + expect(replayedData.length).toBe(1); + expect(replayedData[0]).toContain("event2"); + + fireClose2(); + + // the timer from the first disconnect fires now but must no-op since + // the reconnect refreshed the stream's last access + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + expect(registry.has(streamId)).toBe(true); + + // a full TTL after the second disconnect, the stream is removed + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("does not remove a completed stream while a client is still connected", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-still-connected-123"; + const { mockRes } = createMockResponse(); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + // mockRes.on never fires "close", so the client stays connected + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); + expect(registry.has(streamId)).toBe(true); + + vi.useRealTimers(); + }); + }); + describe("error categorization", () => { async function captureErrorEvent( manager: StreamManager, From fccd6b3dbbf46f39993e6c0514b35a0240c770bc Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 19:16:07 +0200 Subject: [PATCH 2/3] fix(appkit): decouple stream cleanup from close events and prefer evicting completed streams Follow-up to the stream registry leak fix, addressing review findings: - _closeAllClients now removes the clients it deliberately ends from the stream entry, so registry removal is scheduled on every terminal path (complete/error/abort) instead of hinging on the transport emitting 'close'. The close handlers' later deletes remain safe no-ops. - At most one removal timer per stream: the timer handle is stored on the StreamEntry, rescheduling clears the prior timer, and a reconnect-attach cancels any pending removal. The fire-time lastAccess re-check stays as a safety net. - Registry eviction now prefers the oldest completed stream (waiting out its buffer TTL) and only falls back to LRU when every stream is live, so a full registry no longer evicts live streams while dead ones survive. StreamRegistry is backed by a Map instead of RingBuffer: ring slot overwrites clobbered an unrelated live entry whenever the evicted stream wasn't in the oldest insertion slot. - Remove unused StreamConfig keys cleanupInterval and maxPersistentBuffers (never read; their only definition site was already removed). - Tests: mock response end() now marks the response ended and fires close handlers asynchronously like Node; replaced the test that encoded the leaky behavior with active-stream-retained and completed-stream-removed- without-close coverage; added end()-fires-close cleanup, eviction preference, and replay boundary (newest id, evicted id) tests. - knip: set ignoreExportsUsedInFile so RingBuffer (still used inside buffers.ts by EventRingBuffer) isn't flagged once the registry stops importing it. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- knip.json | 1 + packages/appkit/src/stream/stream-manager.ts | 48 +++-- packages/appkit/src/stream/stream-registry.ts | 67 ++++--- .../src/stream/tests/stream-registry.test.ts | 84 +++++++++ .../appkit/src/stream/tests/stream.test.ts | 172 +++++++++++++++++- packages/appkit/src/stream/types.ts | 5 + packages/shared/src/execute.ts | 4 +- 7 files changed, 338 insertions(+), 43 deletions(-) diff --git a/knip.json b/knip.json index 251cc61eb..1e48feaf3 100644 --- a/knip.json +++ b/knip.json @@ -1,5 +1,6 @@ { "$schema": "https://unpkg.com/knip@5/schema.json", + "ignoreExportsUsedInFile": true, "ignoreWorkspaces": [ "packages/shared", "packages/lakebase", diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index ae98fc717..82b69708c 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -115,6 +115,13 @@ export class StreamManager { } } + // a reconnect cancels any pending registry removal so the entry isn't + // pulled out from under the newly attached client + if (streamEntry.removalTimer) { + clearTimeout(streamEntry.removalTimer); + streamEntry.removalTimer = undefined; + } + // add client to stream entry streamEntry.clients.add(res); streamEntry.lastAccess = Date.now(); @@ -157,6 +164,11 @@ export class StreamManager { // cleanup operation this.activeOperations.delete(streamOperation); clearInterval(heartbeat); + // we deliberately ended this client, so drop it from the entry and + // schedule removal now instead of relying on the transport's `close` + // event (the close handler's later delete is a safe no-op) + streamEntry.clients.delete(res); + this._scheduleRemovalAfterTTL(streamEntry); } } private async _createNewStream( @@ -284,11 +296,10 @@ export class StreamManager { streamEntry.isCompleted = true; - // close all clients + // close all clients (this also drops them from the entry, so the + // removal below is scheduled regardless of whether the transport + // ever emits `close`) this._closeAllClients(streamEntry); - - // cleanup if no clients are connected (clients that are still - // connected schedule cleanup from their close handlers instead) this._scheduleRemovalAfterTTL(streamEntry); } catch (error) { const errorMsg = @@ -329,8 +340,10 @@ export class StreamManager { ); streamEntry.isCompleted = true; - // cleanup if no clients are connected (clients that are still - // connected schedule cleanup from their close handlers instead) + // the broadcast above already ended the connected clients; drop them + // from the entry so removal is scheduled regardless of whether the + // transport ever emits `close` + this._closeAllClients(streamEntry); this._scheduleRemovalAfterTTL(streamEntry); } }); @@ -395,13 +408,18 @@ export class StreamManager { } } - // close all connected clients + // close all connected clients and remove them from the stream entry. + // We are deliberately terminating these connections, so cleanup must not + // depend on the transport emitting a `close` event for each client (it may + // never fire). The close handlers' later `clients.delete(...)` calls remain + // safe no-ops. private _closeAllClients(streamEntry: StreamEntry): void { for (const client of streamEntry.clients) { if (!client.writableEnded) { client.end(); } } + streamEntry.clients.clear(); } // schedule registry removal once a finished (completed or errored) stream @@ -414,13 +432,21 @@ export class StreamManager { return; } + // at most one removal timer per stream: rescheduling replaces any + // pending timer instead of stacking a new one (each pending timer pins + // the entry's buffer/generator/trace context for the full TTL) + if (streamEntry.removalTimer) { + clearTimeout(streamEntry.removalTimer); + } + // mark the moment the stream became idle so a reconnect during the TTL // window (which refreshes lastAccess) makes the pending timer a no-op streamEntry.lastAccess = Date.now(); - const timer = setTimeout(() => { - // no-op if a client reconnected during the TTL window; the reconnect's - // own close handler schedules a fresh removal when it disconnects + streamEntry.removalTimer = setTimeout(() => { + streamEntry.removalTimer = undefined; + // safety net: no-op if a client reconnected during the TTL window; + // a fresh removal is scheduled when that client disconnects if ( streamEntry.clients.size === 0 && Date.now() - streamEntry.lastAccess >= this.bufferTTL @@ -430,7 +456,7 @@ export class StreamManager { }, this.bufferTTL); // don't keep the process alive just to clean up finished streams - timer.unref?.(); + streamEntry.removalTimer.unref?.(); } private _categorizeError(error: unknown): SSEErrorCode { diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 18af1e333..8bbf7cc51 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -1,29 +1,31 @@ -import { RingBuffer } from "./buffers"; import { SSEErrorCode, type StreamEntry } from "./types"; export class StreamRegistry { - private streams: RingBuffer; + // keyed storage with explicit, policy-driven eviction. A ring buffer is + // unsuitable here: it overwrites by insertion slot, so evicting an entry + // chosen by policy (e.g. a completed stream) and then adding would + // silently clobber an unrelated live stream sitting in the oldest slot. + private streams: Map; + private maxActiveStreams: number; constructor(maxActiveStreams: number) { - this.streams = new RingBuffer( - maxActiveStreams, - (entry) => entry.streamId, - ); + this.streams = new Map(); + this.maxActiveStreams = maxActiveStreams; } // add a stream to the registry add(entry: StreamEntry): void { // enforce hard cap - if (this.streams.getSize() >= this.streams.capacity) { + if (this.streams.size >= this.maxActiveStreams) { this._evictOldestStream(entry.streamId); } - this.streams.add(entry); + this.streams.set(entry.streamId, entry); } // get a stream from the registry get(streamId: string): StreamEntry | null { - return this.streams.get(streamId); + return this.streams.get(streamId) ?? null; } // check if a stream exists in the registry @@ -33,42 +35,56 @@ export class StreamRegistry { // remove a stream from the registry remove(streamId: string): void { - this.streams.remove(streamId); + this.streams.delete(streamId); } // get the number of streams in the registry size(): number { - return this.streams.getSize(); + return this.streams.size; } clear(): void { - const allStreams = this.streams.getAll(); - - for (const stream of allStreams) { + for (const stream of this.streams.values()) { stream.abortController.abort("Server shutdown"); + if (stream.removalTimer) { + clearTimeout(stream.removalTimer); + stream.removalTimer = undefined; + } } this.streams.clear(); } - // evict the oldest stream from the registry + // evict the oldest stream from the registry, preferring completed streams. + // Completed streams waiting out their buffer TTL can look recently + // accessed, so plain LRU could evict a live stream while dead ones + // survive. Prefer the oldest completed stream when one exists and fall + // back to LRU over all streams otherwise. private _evictOldestStream(excludeStreamId: string): void { - const allStreams = this.streams.getAll(); + const allStreams = this.streams.values(); let oldestStream: StreamEntry | null = null; let oldestAccess = Infinity; + let oldestCompletedStream: StreamEntry | null = null; + let oldestCompletedAccess = Infinity; - // find the least recently accessed stream + // find the least recently accessed stream (overall and completed-only) for (const stream of allStreams) { - if ( - stream.streamId !== excludeStreamId && - stream.lastAccess < oldestAccess - ) { + if (stream.streamId === excludeStreamId) continue; + + if (stream.lastAccess < oldestAccess) { oldestStream = stream; oldestAccess = stream.lastAccess; } + + if (stream.isCompleted && stream.lastAccess < oldestCompletedAccess) { + oldestCompletedStream = stream; + oldestCompletedAccess = stream.lastAccess; + } } - // abort the oldest stream + oldestStream = oldestCompletedStream ?? oldestStream; + + // abort the evicted stream if (oldestStream) { // broadcast stream eviction error to all clients for (const client of oldestStream.clients) { @@ -84,7 +100,12 @@ export class StreamRegistry { } } oldestStream.abortController.abort("Stream evicted"); - this.streams.remove(oldestStream.streamId); + // a pending removal timer would otherwise pin the evicted entry + if (oldestStream.removalTimer) { + clearTimeout(oldestStream.removalTimer); + oldestStream.removalTimer = undefined; + } + this.streams.delete(oldestStream.streamId); } } } diff --git a/packages/appkit/src/stream/tests/stream-registry.test.ts b/packages/appkit/src/stream/tests/stream-registry.test.ts index d3f70e95a..343039394 100644 --- a/packages/appkit/src/stream/tests/stream-registry.test.ts +++ b/packages/appkit/src/stream/tests/stream-registry.test.ts @@ -262,6 +262,90 @@ describe("StreamRegistry", () => { expect(abortController1.signal.reason).toBe("Stream evicted"); }); + + test("should prefer evicting a completed stream over an older live stream", () => { + const liveAbortController = new AbortController(); + + // the live stream is the LRU candidate, but a completed stream + // (waiting out its buffer TTL) exists and must be evicted first + registry.add( + createMockStreamEntry("live-old", { + lastAccess: 100, + abortController: liveAbortController, + }), + ); + registry.add( + createMockStreamEntry("completed-recent", { + lastAccess: 300, + isCompleted: true, + }), + ); + registry.add(createMockStreamEntry("live-recent", { lastAccess: 200 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed-recent")).toBe(false); + expect(registry.has("live-old")).toBe(true); + expect(registry.has("live-recent")).toBe(true); + expect(registry.has("stream-4")).toBe(true); + expect(liveAbortController.signal.aborted).toBe(false); + }); + + test("should evict the oldest completed stream when several are completed", () => { + registry.add( + createMockStreamEntry("completed-old", { + lastAccess: 200, + isCompleted: true, + }), + ); + registry.add( + createMockStreamEntry("completed-recent", { + lastAccess: 300, + isCompleted: true, + }), + ); + registry.add(createMockStreamEntry("live", { lastAccess: 100 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed-old")).toBe(false); + expect(registry.has("completed-recent")).toBe(true); + expect(registry.has("live")).toBe(true); + }); + + test("should fall back to LRU eviction when no stream is completed", () => { + registry.add(createMockStreamEntry("stream-1", { lastAccess: 300 })); + registry.add(createMockStreamEntry("stream-2", { lastAccess: 100 })); + registry.add(createMockStreamEntry("stream-3", { lastAccess: 200 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("stream-2")).toBe(false); + expect(registry.has("stream-1")).toBe(true); + expect(registry.has("stream-3")).toBe(true); + expect(registry.has("stream-4")).toBe(true); + }); + + test("should clear a pending removal timer on the evicted completed stream", () => { + vi.useFakeTimers(); + const removalTimer = setTimeout(() => {}, 60_000); + + registry.add( + createMockStreamEntry("completed", { + lastAccess: 100, + isCompleted: true, + removalTimer, + }), + ); + registry.add(createMockStreamEntry("stream-2", { lastAccess: 200 })); + registry.add(createMockStreamEntry("stream-3", { lastAccess: 300 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed")).toBe(false); + expect(vi.getTimerCount()).toBe(0); + vi.useRealTimers(); + }); }); describe("eviction SSE broadcast", () => { diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 168bfda40..c3d7035b9 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -3,14 +3,27 @@ import { StreamManager } from "../index"; function createMockResponse(headers: Record = {}) { const events: string[] = []; + const closeHandlers: (() => void)[] = []; const mockRes = { setHeader: vi.fn(), flushHeaders: vi.fn(), write: vi.fn((data: string) => { events.push(data); }), - end: vi.fn(), - on: vi.fn(), + // mirror Node's lifecycle: end() marks the response as ended and the + // 'close' event fires asynchronously afterwards + end: vi.fn(() => { + if (mockRes.writableEnded) return; + mockRes.writableEnded = true; + queueMicrotask(() => { + for (const handler of closeHandlers) { + handler(); + } + }); + }), + on: vi.fn((event: string, handler: () => void) => { + if (event === "close") closeHandlers.push(handler); + }), writableEnded: false, req: { headers: headers, @@ -689,6 +702,81 @@ describe("StreamManager", () => { expect(events2.some((e) => e.includes("STREAM_FORBIDDEN"))).toBe(true); }); + test("replays nothing when last-event-id is the newest buffered event", async () => { + const streamId = "replay-newest-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + + async function* generator1() { + yield { type: "message", data: "event1" }; + yield { type: "message", data: "event2" }; + yield { type: "message", data: "event3" }; + } + + await streamManager.stream(mockRes1 as any, generator1, { streamId }); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + const newestEventId = eventIds[eventIds.length - 1]; + + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": newestEventId, + }); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + // the client is already caught up: zero replayed events and no + // buffer-overflow warning + expect(events2.filter((e) => e.startsWith("data: ")).length).toBe(0); + expect(events2.some((e) => e.includes("BUFFER_OVERFLOW_RESTART"))).toBe( + false, + ); + expect(mockRes2.end).toHaveBeenCalled(); + }); + + test("sends a buffer overflow warning when last-event-id was evicted from the ring", async () => { + const streamId = "replay-evicted-456"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + + async function* generator1() { + for (let i = 0; i < 5; i++) { + yield { type: "message", count: i }; + } + } + + await streamManager.stream(mockRes1 as any, generator1, { + streamId, + bufferSize: 2, + }); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + expect(eventIds.length).toBe(5); + + // the first event id has been pushed out of the size-2 ring buffer + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + expect(events2.some((e) => e.includes("BUFFER_OVERFLOW_RESTART"))).toBe( + true, + ); + expect(events2.some((e) => e.includes("should-not-run"))).toBe(false); + }); + test("should replay successfully when within buffer capacity", async () => { const streamId = "no-overflow-test-456"; @@ -890,22 +978,94 @@ describe("StreamManager", () => { vi.useRealTimers(); }); - test("does not remove a completed stream while a client is still connected", async () => { + test("does not remove an active stream while a client is still connected", async () => { vi.useFakeTimers(); - const streamId = "cleanup-still-connected-123"; + const streamId = "cleanup-active-connected-123"; const { mockRes } = createMockResponse(); + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + async function* generator() { + yield { type: "start" }; + await gate; + yield { type: "end" }; + } + + const streamPromise = streamManager.stream(mockRes as any, generator, { + streamId, + }); + + // let the generator yield its first event and park on the gate + await vi.advanceTimersByTimeAsync(0); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + // an active stream with a connected client is never removed, + // no matter how much time passes + await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); + expect(registry.has(streamId)).toBe(true); + + release(); + await streamPromise; + vi.useRealTimers(); + }); + + test("removes a completed stream after TTL even if the transport never emits close", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-no-close-123"; + const { mockRes } = createMockResponse(); + + // simulate a transport that never emits the 'close' event + mockRes.on.mockImplementation(() => {}); + mockRes.end.mockImplementation(() => { + mockRes.writableEnded = true; + }); + async function* generator() { yield { type: "message", data: "result" }; } - // mockRes.on never fires "close", so the client stays connected await streamManager.stream(mockRes as any, generator, { streamId }); const registry = getRegistry(streamManager); - await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); expect(registry.has(streamId)).toBe(true); + // cleanup must not hinge on the transport emitting 'close' + await vi.advanceTimersByTimeAsync(BUFFER_TTL); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes the entry after buffer TTL when end() fires the close event", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-end-close-123"; + const { mockRes } = createMockResponse(); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + // the generator completed with the client attached, so the server + // ended the response; the mock fires 'close' asynchronously + expect(mockRes.end).toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(0); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + vi.useRealTimers(); }); }); diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index bb6f65f6e..616ddcb14 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -49,6 +49,11 @@ export interface StreamEntry { lastAccess: number; abortController: AbortController; traceContext: Context; + /** + * Pending registry-removal timer. At most one removal timer exists per + * stream; scheduling a new one clears any previous timer first. + */ + removalTimer?: NodeJS.Timeout; } export interface StreamOperation { diff --git a/packages/shared/src/execute.ts b/packages/shared/src/execute.ts index 62ac45bfb..aae28cfb4 100644 --- a/packages/shared/src/execute.ts +++ b/packages/shared/src/execute.ts @@ -1,14 +1,12 @@ import type { CacheConfig } from "./cache"; -/** SSE stream configuration for `executeStream()`. Controls buffer sizes, heartbeat interval, and cleanup behavior. */ +/** SSE stream configuration for `executeStream()`. Controls buffer sizes, buffer retention, and heartbeat interval. */ export interface StreamConfig { userSignal?: AbortSignal; streamId?: string; bufferSize?: number; maxEventSize?: number; bufferTTL?: number; - cleanupInterval?: number; - maxPersistentBuffers?: number; heartbeatInterval?: number; maxActiveStreams?: number; } From 68537358b945a803105a6a9ab508cefc946537b4 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 19:25:28 +0200 Subject: [PATCH 3/3] fix(appkit): scope knip suppression and harden stream registry eviction edge cases - revert repo-wide ignoreExportsUsedInFile in knip.json; replace with a file-scoped ignore entry for packages/appkit/src/stream/buffers.ts only - StreamRegistry.add(): only evict when inserting a genuinely new key, so re-adding an existing streamId at capacity no longer destroys an unrelated live stream for a net-zero insert - eviction now aborts with DOMException("Stream evicted", "AbortError") to match the manager's terminal abort paths so the error categorizer routes eviction as an abort rather than a stream failure - update stale RingBuffer comments in stream-registry tests to Map semantics and adjust eviction/abort-reason test expectations Co-authored-by: Isaac Signed-off-by: MarioCadenas --- knip.json | 2 +- packages/appkit/src/stream/stream-registry.ts | 16 ++++++++-- .../src/stream/tests/stream-registry.test.ts | 32 ++++++++++--------- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/knip.json b/knip.json index 1e48feaf3..430316b56 100644 --- a/knip.json +++ b/knip.json @@ -1,6 +1,5 @@ { "$schema": "https://unpkg.com/knip@5/schema.json", - "ignoreExportsUsedInFile": true, "ignoreWorkspaces": [ "packages/shared", "packages/lakebase", @@ -24,6 +23,7 @@ "packages/appkit/src/core/agent/tools/index.ts", "packages/appkit/src/core/agent/load-agents.ts", "packages/appkit/src/connectors/mcp/index.ts", + "packages/appkit/src/stream/buffers.ts", "packages/appkit/src/typedoc.entry.ts", "template/**", "tools/**", diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 8bbf7cc51..371605075 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -15,8 +15,13 @@ export class StreamRegistry { // add a stream to the registry add(entry: StreamEntry): void { - // enforce hard cap - if (this.streams.size >= this.maxActiveStreams) { + // enforce hard cap, but only when inserting a genuinely new key: + // re-adding an existing streamId updates in place and doesn't grow the + // map, so evicting another stream for it would destroy an innocent one + if ( + !this.streams.has(entry.streamId) && + this.streams.size >= this.maxActiveStreams + ) { this._evictOldestStream(entry.streamId); } @@ -99,7 +104,12 @@ export class StreamRegistry { } } } - oldestStream.abortController.abort("Stream evicted"); + // abort with a DOMException AbortError so the error categorizer routes + // eviction as an abort (matching the manager's terminal paths) rather + // than a stream failure + oldestStream.abortController.abort( + new DOMException("Stream evicted", "AbortError"), + ); // a pending removal timer would otherwise pin the evicted entry if (oldestStream.removalTimer) { clearTimeout(oldestStream.removalTimer); diff --git a/packages/appkit/src/stream/tests/stream-registry.test.ts b/packages/appkit/src/stream/tests/stream-registry.test.ts index 343039394..26395d444 100644 --- a/packages/appkit/src/stream/tests/stream-registry.test.ts +++ b/packages/appkit/src/stream/tests/stream-registry.test.ts @@ -209,26 +209,25 @@ describe("StreamRegistry", () => { expect(registry.has("stream-4")).toBe(true); }); - test("should exclude the stream being added from eviction", () => { - // This tests the excludeStreamId parameter: if a stream with the same - // ID as the one being added already exists and is the oldest, it should - // still be excluded from eviction. In practice, the new stream won't be - // in the registry yet when eviction runs, so excludeStreamId prevents - // misidentification. + test("should not evict when re-adding an existing key at capacity", () => { + // Re-adding an existing streamId updates the Map entry in place and + // doesn't grow the registry, so no other stream should be evicted for + // a net-zero insert. registry.add(createMockStreamEntry("stream-1", { lastAccess: 100 })); registry.add(createMockStreamEntry("stream-2", { lastAccess: 200 })); registry.add(createMockStreamEntry("stream-3", { lastAccess: 300 })); - // Add stream with id "stream-1" again; eviction should skip "stream-1" - // even though stream-1 has the oldest lastAccess, because it's the - // excludeStreamId. stream-2 should be evicted instead. + // Add stream with id "stream-1" again at capacity; it replaces the + // existing entry without triggering eviction. registry.add(createMockStreamEntry("stream-1", { lastAccess: 400 })); - // stream-1 is updated (RingBuffer updates existing keys in place) + // stream-1 is updated (Map.set replaces the existing key in place) expect(registry.has("stream-1")).toBe(true); - // stream-2 should have been evicted as it was the oldest non-excluded - expect(registry.has("stream-2")).toBe(false); + expect(registry.get("stream-1")?.lastAccess).toBe(400); + // no stream should have been evicted + expect(registry.has("stream-2")).toBe(true); expect(registry.has("stream-3")).toBe(true); + expect(registry.size()).toBe(3); }); test("should abort the evicted stream's AbortController", () => { @@ -247,7 +246,7 @@ describe("StreamRegistry", () => { expect(abortController1.signal.aborted).toBe(true); }); - test("should abort with 'Stream evicted' reason", () => { + test("should abort with a 'Stream evicted' AbortError reason", () => { const abortController1 = new AbortController(); registry.add( createMockStreamEntry("stream-1", { @@ -260,7 +259,10 @@ describe("StreamRegistry", () => { registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); - expect(abortController1.signal.reason).toBe("Stream evicted"); + const reason = abortController1.signal.reason; + expect(reason).toBeInstanceOf(DOMException); + expect(reason.name).toBe("AbortError"); + expect(reason.message).toBe("Stream evicted"); }); test("should prefer evicting a completed stream over an older live stream", () => { @@ -603,7 +605,7 @@ describe("StreamRegistry", () => { registry.add(entry1); registry.add(entry2); - // The RingBuffer updates in place for same key + // The Map updates in place for same key expect(registry.size()).toBe(1); const retrieved = registry.get("stream-1"); expect(retrieved?.lastAccess).toBe(200);