diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index c8fc9159..304a6311 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -6,4 +6,5 @@ export const streamDefaults = { maxPersistentBuffers: 10000, // 10000 buffers heartbeatInterval: 10 * 1000, // 10 seconds maxActiveStreams: 1000, // 1000 streams + disconnectGraceMs: 15_000, // 15 seconds } as const; diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index f21ff586..14741d64 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -18,6 +18,7 @@ export class StreamManager { private sseWriter: SSEWriter; private maxEventSize: number; private bufferTTL: number; + private disconnectGraceMs: number; constructor(options?: StreamConfig) { this.streamRegistry = new StreamRegistry( @@ -26,6 +27,8 @@ export class StreamManager { this.sseWriter = new SSEWriter(); this.maxEventSize = options?.maxEventSize ?? streamDefaults.maxEventSize; this.bufferTTL = options?.bufferTTL ?? streamDefaults.bufferTTL; + this.disconnectGraceMs = + options?.disconnectGraceMs ?? streamDefaults.disconnectGraceMs; this.activeOperations = new Set(); } @@ -74,6 +77,7 @@ export class StreamManager { // abort all active operations abortAll(): void { + // pending disconnect-grace timers are cleared by streamRegistry.clear() below this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); operation.controller.abort( @@ -115,6 +119,9 @@ export class StreamManager { } } + // a reconnecting client cancels the pending disconnect-grace abort + this._clearGraceTimer(streamEntry); + // add client to stream entry streamEntry.clients.add(res); streamEntry.lastAccess = Date.now(); @@ -140,11 +147,9 @@ export class StreamManager { streamEntry.clients.delete(res); this.activeOperations.delete(streamOperation); - // Stop the generator when no clients remain + // grace-abort instead of aborting now, so a reconnect can resume if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - streamEntry.abortController.abort( - new DOMException("All clients disconnected", "AbortError"), - ); + this._scheduleGraceAbort(streamEntry); } // cleanup if stream is completed and no clients are connected @@ -228,12 +233,9 @@ export class StreamManager { this.activeOperations.delete(streamOperation); streamEntry.clients.delete(res); - // Stop the generator when no clients remain so polling loops - // (e.g. jobs runAndWait) don't keep running in the background. + // grace-abort instead of aborting now, so a reconnect can resume if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - abortController.abort( - new DOMException("Client disconnected", "AbortError"), - ); + this._scheduleGraceAbort(streamEntry); } }); @@ -285,6 +287,9 @@ export class StreamManager { streamEntry.isCompleted = true; + // no late grace abort should fire on a completed stream + this._clearGraceTimer(streamEntry); + // close all clients this._closeAllClients(streamEntry); @@ -300,6 +305,7 @@ export class StreamManager { if (errorCode === SSEErrorCode.STREAM_ABORTED) { logger.info("Stream aborted by client (code=%s)", errorCode); streamEntry.isCompleted = true; + this._clearGraceTimer(streamEntry); this._closeAllClients(streamEntry); this._cleanupStream(streamEntry); return; @@ -328,6 +334,7 @@ export class StreamManager { true, ); streamEntry.isCompleted = true; + this._clearGraceTimer(streamEntry); } }); } @@ -400,6 +407,33 @@ export class StreamManager { } } + // abort the generator after the grace window unless a client reconnects first + private _scheduleGraceAbort(streamEntry: StreamEntry): void { + // clear any existing timer to avoid stacking + this._clearGraceTimer(streamEntry); + + const timer = setTimeout(() => { + streamEntry.disconnectGraceTimer = undefined; + if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { + streamEntry.abortController.abort( + new DOMException("Client disconnected (grace expired)", "AbortError"), + ); + } + }, this.disconnectGraceMs); + + // never keep the process alive solely for a grace timer + timer.unref?.(); + streamEntry.disconnectGraceTimer = timer; + } + + // clear a pending disconnect-grace timer, if any + private _clearGraceTimer(streamEntry: StreamEntry): void { + if (streamEntry.disconnectGraceTimer) { + clearTimeout(streamEntry.disconnectGraceTimer); + streamEntry.disconnectGraceTimer = undefined; + } + } + // cleanup stream if no clients are connected private _cleanupStream(streamEntry: StreamEntry): void { if (streamEntry.clients.size === 0) { diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 18af1e33..129f53e5 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -45,12 +45,21 @@ export class StreamRegistry { const allStreams = this.streams.getAll(); for (const stream of allStreams) { + this._clearGraceTimer(stream); stream.abortController.abort("Server shutdown"); } this.streams.clear(); } + // clear a pending grace timer so a removed stream isn't pinned until it fires + private _clearGraceTimer(stream: StreamEntry): void { + if (stream.disconnectGraceTimer) { + clearTimeout(stream.disconnectGraceTimer); + stream.disconnectGraceTimer = undefined; + } + } + // evict the oldest stream from the registry private _evictOldestStream(excludeStreamId: string): void { const allStreams = this.streams.getAll(); @@ -83,6 +92,7 @@ export class StreamRegistry { } } } + this._clearGraceTimer(oldestStream); oldestStream.abortController.abort("Stream evicted"); this.streams.remove(oldestStream.streamId); } diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 2939dd4d..1de49b5f 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -194,7 +194,8 @@ describe("StreamManager", () => { expect(streamManager.getActiveCount()).toBe(0); }); - test("should abort generator when last client disconnects", async () => { + test("should abort generator after grace window when last client disconnects", async () => { + vi.useFakeTimers(); const { mockRes } = createMockResponse(); let closeHandler: (() => void) | undefined; @@ -220,15 +221,191 @@ describe("StreamManager", () => { const streamPromise = streamManager.stream(mockRes as any, generator); // Let the generator yield "start" and enter the signal wait - await new Promise((resolve) => setTimeout(resolve, 10)); + await vi.advanceTimersByTimeAsync(10); - // Simulate client disconnect + // Simulate client disconnect — abort should NOT fire immediately if (closeHandler) closeHandler(); + await vi.advanceTimersByTimeAsync(0); + expect(signalAborted).toBe(false); + // After the grace window elapses with no reconnect, the generator aborts + await vi.advanceTimersByTimeAsync(15_000); await streamPromise; expect(signalAborted).toBe(true); expect(streamManager.getActiveCount()).toBe(0); + + vi.useRealTimers(); + }); + }); + + describe("disconnect grace window", () => { + test("does NOT abort the generator if a client reconnects within the grace window", async () => { + vi.useFakeTimers(); + const streamId = "grace-reconnect-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + let closeHandler1: (() => void) | undefined; + mockRes1.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler1 = handler; + }); + + // Finite generator that emits one event, then waits on a manual gate + // before emitting the rest — mirrors the reconnect demo's pacing. + let releaseRest: (() => void) | undefined; + const restGate = new Promise((resolve) => { + releaseRest = resolve; + }); + + async function* generator(signal: AbortSignal) { + yield { type: "tick", count: 1 }; + await restGate; + if (signal.aborted) return; + yield { type: "tick", count: 2 }; + yield { type: "tick", count: 3 }; + } + + const streamPromise = streamManager.stream(mockRes1 as any, generator, { + streamId, + }); + + // first event emitted + await vi.advanceTimersByTimeAsync(0); + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + expect(eventIds.length).toBe(1); + + // client disconnects (clients -> 0) while the generator is still active + if (closeHandler1) closeHandler1(); + + // reconnect well within the 15s grace window + await vi.advanceTimersByTimeAsync(3_000); + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + mockRes2.on.mockImplementation(() => {}); + + const reconnectPromise = streamManager.stream( + mockRes2 as any, + async function* () { + yield { type: "should-not-run" }; + }, + { streamId }, + ); + + // release the rest of the original generator now that we've reconnected + releaseRest?.(); + await vi.advanceTimersByTimeAsync(0); + await Promise.all([streamPromise, reconnectPromise]); + + // The reconnecting client must receive the subsequent events — proving + // the generator was NOT aborted during the disconnect. + const reconnectData = events2 + .filter((e) => e.startsWith("data: ")) + .map((e) => e.replace("data: ", "").replace("\n\n", "")); + expect(reconnectData.some((d) => d.includes('"count":2'))).toBe(true); + expect(reconnectData.some((d) => d.includes('"count":3'))).toBe(true); + // the placeholder generator passed on reconnect must never run + expect(events2.some((e) => e.includes("should-not-run"))).toBe(false); + + vi.useRealTimers(); + }); + + test("aborts the generator when no client reconnects within the grace window", async () => { + vi.useFakeTimers(); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + let signalAborted = false; + async function* generator(signal: AbortSignal) { + yield { type: "start" }; + await new Promise((resolve) => { + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + signalAborted = signal.aborted; + } + + const streamPromise = streamManager.stream(mockRes as any, generator); + await vi.advanceTimersByTimeAsync(0); + + if (closeHandler) closeHandler(); + + // Just before the window expires, the generator is still alive. + await vi.advanceTimersByTimeAsync(14_999); + expect(signalAborted).toBe(false); + + // Crossing the window aborts the abandoned generator (jobs cleanup). + await vi.advanceTimersByTimeAsync(1); + await streamPromise; + expect(signalAborted).toBe(true); + + vi.useRealTimers(); + }); + + test("respects a custom disconnectGraceMs", async () => { + vi.useFakeTimers(); + const customManager = new StreamManager({ disconnectGraceMs: 1_000 }); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + let signalAborted = false; + async function* generator(signal: AbortSignal) { + yield { type: "start" }; + await new Promise((resolve) => { + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + signalAborted = signal.aborted; + } + + const streamPromise = customManager.stream(mockRes as any, generator); + await vi.advanceTimersByTimeAsync(0); + if (closeHandler) closeHandler(); + + await vi.advanceTimersByTimeAsync(999); + expect(signalAborted).toBe(false); + await vi.advanceTimersByTimeAsync(1); + await streamPromise; + expect(signalAborted).toBe(true); + + vi.useRealTimers(); + }); + + test("stream completion clears the pending grace timer (no late abort)", async () => { + vi.useFakeTimers(); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + const abortSpy = vi.fn(); + + async function* generator(signal: AbortSignal) { + signal.addEventListener("abort", abortSpy, { once: true }); + yield { type: "only" }; + // generator finishes here -> stream becomes completed + } + + const streamPromise = streamManager.stream(mockRes as any, generator); + await streamPromise; + + // Stream has completed. A late close handler must not schedule an abort, + // and any timer that might have been pending must not fire. + if (closeHandler) closeHandler(); + await vi.advanceTimersByTimeAsync(30_000); + + expect(abortSpy).not.toHaveBeenCalled(); + + vi.useRealTimers(); }); }); diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index bb6f65f6..e9a6bbc0 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -49,6 +49,8 @@ export interface StreamEntry { lastAccess: number; abortController: AbortController; traceContext: Context; + // pending grace-window abort, set while the last client is disconnected + disconnectGraceTimer?: NodeJS.Timeout; } export interface StreamOperation { diff --git a/packages/shared/src/execute.ts b/packages/shared/src/execute.ts index 62ac45bf..708a892b 100644 --- a/packages/shared/src/execute.ts +++ b/packages/shared/src/execute.ts @@ -11,6 +11,9 @@ export interface StreamConfig { maxPersistentBuffers?: number; heartbeatInterval?: number; maxActiveStreams?: number; + // ms to keep a generator alive after the last client disconnects, so a + // reconnecting client can resume before the stream is aborted + disconnectGraceMs?: number; } /** Retry configuration for the RetryInterceptor. Uses exponential backoff with full jitter between attempts. */