From 3ed3ecd2a690a0c4cf1afb7bca4caccc24ce47e9 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Fri, 12 Jun 2026 12:38:25 +0200 Subject: [PATCH 1/3] fix(appkit): keep SSE generator alive for a grace window so reconnection resumes Commit 4e923408 added an immediate `abortController.abort()` to both `res.on("close")` handlers in StreamManager when the last client disconnects. This killed the generator the instant a client dropped, breaking SSE reconnection: a finite/ongoing generator (e.g. the reconnect demo, or any stream the client briefly disconnects from) was aborted and marked completed before the client could reconnect with Last-Event-ID, so reconnection delivered nothing. Replace the immediate abort with a configurable disconnect grace window (default 15s, via StreamConfig.disconnectGraceMs). On last-client disconnect we schedule a delayed abort instead of aborting now; a client reconnecting within the window cancels the pending timer so the live generator keeps running and reconnection resumes. The grace timer is unref'd so it never keeps the process alive, and is cleared on stream completion/error and in abortAll() so timers never leak. Abandoned-stream cleanup is preserved: a stream with no reconnect still has its generator aborted, just disconnectGraceMs later instead of instantly, so background polling loops (e.g. jobs runAndWait) are still stopped. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- packages/appkit/src/stream/defaults.ts | 1 + packages/appkit/src/stream/stream-manager.ts | 72 ++++++- packages/appkit/src/stream/stream-registry.ts | 5 + .../appkit/src/stream/tests/stream.test.ts | 183 +++++++++++++++++- packages/appkit/src/stream/types.ts | 6 + packages/shared/src/execute.ts | 7 + 6 files changed, 262 insertions(+), 12 deletions(-) diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index c8fc91591..304a63112 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -6,4 +6,5 @@ export const streamDefaults = { maxPersistentBuffers: 10000, // 10000 buffers heartbeatInterval: 10 * 1000, // 10 seconds maxActiveStreams: 1000, // 1000 streams + disconnectGraceMs: 15_000, // 15 seconds } as const; diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index f21ff586e..ab2c1e49b 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -18,6 +18,7 @@ export class StreamManager { private sseWriter: SSEWriter; private maxEventSize: number; private bufferTTL: number; + private disconnectGraceMs: number; constructor(options?: StreamConfig) { this.streamRegistry = new StreamRegistry( @@ -26,6 +27,8 @@ export class StreamManager { this.sseWriter = new SSEWriter(); this.maxEventSize = options?.maxEventSize ?? streamDefaults.maxEventSize; this.bufferTTL = options?.bufferTTL ?? streamDefaults.bufferTTL; + this.disconnectGraceMs = + options?.disconnectGraceMs ?? streamDefaults.disconnectGraceMs; this.activeOperations = new Set(); } @@ -74,6 +77,13 @@ export class StreamManager { // abort all active operations abortAll(): void { + // clear any pending disconnect-grace timers so they can't fire late + for (const entry of this.streamRegistry.values()) { + if (entry.disconnectGraceTimer) { + clearTimeout(entry.disconnectGraceTimer); + entry.disconnectGraceTimer = undefined; + } + } this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); operation.controller.abort( @@ -115,6 +125,10 @@ export class StreamManager { } } + // A client is (re)attaching: cancel any pending disconnect-grace abort so + // a reconnection within the grace window resumes the live generator. + this._clearGraceTimer(streamEntry); + // add client to stream entry streamEntry.clients.add(res); streamEntry.lastAccess = Date.now(); @@ -140,11 +154,12 @@ export class StreamManager { streamEntry.clients.delete(res); this.activeOperations.delete(streamOperation); - // Stop the generator when no clients remain + // When no clients remain, don't abort immediately — start a grace timer + // so a reconnecting client can resume the stream. Only truly-abandoned + // streams (no reconnect within the window) get their generator aborted, + // which stops background polling loops (e.g. jobs runAndWait). if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - streamEntry.abortController.abort( - new DOMException("All clients disconnected", "AbortError"), - ); + this._scheduleGraceAbort(streamEntry); } // cleanup if stream is completed and no clients are connected @@ -228,12 +243,13 @@ export class StreamManager { this.activeOperations.delete(streamOperation); streamEntry.clients.delete(res); - // Stop the generator when no clients remain so polling loops - // (e.g. jobs runAndWait) don't keep running in the background. + // When no clients remain, don't abort the generator immediately — start a + // grace timer so a reconnecting client (e.g. SSE resume with + // Last-Event-ID) can keep the stream alive. Polling loops (e.g. jobs + // runAndWait) of truly-abandoned streams are still stopped, just + // `disconnectGraceMs` later instead of instantly. if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - abortController.abort( - new DOMException("Client disconnected", "AbortError"), - ); + this._scheduleGraceAbort(streamEntry); } }); @@ -285,6 +301,9 @@ export class StreamManager { streamEntry.isCompleted = true; + // no late grace abort should fire on a completed stream + this._clearGraceTimer(streamEntry); + // close all clients this._closeAllClients(streamEntry); @@ -300,6 +319,7 @@ export class StreamManager { if (errorCode === SSEErrorCode.STREAM_ABORTED) { logger.info("Stream aborted by client (code=%s)", errorCode); streamEntry.isCompleted = true; + this._clearGraceTimer(streamEntry); this._closeAllClients(streamEntry); this._cleanupStream(streamEntry); return; @@ -328,6 +348,7 @@ export class StreamManager { true, ); streamEntry.isCompleted = true; + this._clearGraceTimer(streamEntry); } }); } @@ -400,6 +421,39 @@ export class StreamManager { } } + // Schedule a delayed abort for a stream whose last client just disconnected. + // If a client reconnects within the grace window, the timer is cleared in + // `_attachToExistingStream` and the generator keeps running so reconnection + // can resume. Otherwise the generator is aborted once the window expires, + // stopping background polling loops for genuinely abandoned streams. + private _scheduleGraceAbort(streamEntry: StreamEntry): void { + // clear any existing grace timer to avoid stacking + if (streamEntry.disconnectGraceTimer) { + clearTimeout(streamEntry.disconnectGraceTimer); + } + + const timer = setTimeout(() => { + streamEntry.disconnectGraceTimer = undefined; + if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { + streamEntry.abortController.abort( + new DOMException("Client disconnected (grace expired)", "AbortError"), + ); + } + }, this.disconnectGraceMs); + + // never keep the process alive solely for a grace timer + timer.unref?.(); + streamEntry.disconnectGraceTimer = timer; + } + + // clear a pending disconnect-grace timer, if any + private _clearGraceTimer(streamEntry: StreamEntry): void { + if (streamEntry.disconnectGraceTimer) { + clearTimeout(streamEntry.disconnectGraceTimer); + streamEntry.disconnectGraceTimer = undefined; + } + } + // cleanup stream if no clients are connected private _cleanupStream(streamEntry: StreamEntry): void { if (streamEntry.clients.size === 0) { diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 18af1e333..bdb5cee82 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -41,6 +41,11 @@ export class StreamRegistry { return this.streams.getSize(); } + // get all streams currently in the registry + values(): StreamEntry[] { + return this.streams.getAll(); + } + clear(): void { const allStreams = this.streams.getAll(); diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 2939dd4dd..1de49b5fd 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -194,7 +194,8 @@ describe("StreamManager", () => { expect(streamManager.getActiveCount()).toBe(0); }); - test("should abort generator when last client disconnects", async () => { + test("should abort generator after grace window when last client disconnects", async () => { + vi.useFakeTimers(); const { mockRes } = createMockResponse(); let closeHandler: (() => void) | undefined; @@ -220,15 +221,191 @@ describe("StreamManager", () => { const streamPromise = streamManager.stream(mockRes as any, generator); // Let the generator yield "start" and enter the signal wait - await new Promise((resolve) => setTimeout(resolve, 10)); + await vi.advanceTimersByTimeAsync(10); - // Simulate client disconnect + // Simulate client disconnect — abort should NOT fire immediately if (closeHandler) closeHandler(); + await vi.advanceTimersByTimeAsync(0); + expect(signalAborted).toBe(false); + // After the grace window elapses with no reconnect, the generator aborts + await vi.advanceTimersByTimeAsync(15_000); await streamPromise; expect(signalAborted).toBe(true); expect(streamManager.getActiveCount()).toBe(0); + + vi.useRealTimers(); + }); + }); + + describe("disconnect grace window", () => { + test("does NOT abort the generator if a client reconnects within the grace window", async () => { + vi.useFakeTimers(); + const streamId = "grace-reconnect-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + let closeHandler1: (() => void) | undefined; + mockRes1.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler1 = handler; + }); + + // Finite generator that emits one event, then waits on a manual gate + // before emitting the rest — mirrors the reconnect demo's pacing. + let releaseRest: (() => void) | undefined; + const restGate = new Promise((resolve) => { + releaseRest = resolve; + }); + + async function* generator(signal: AbortSignal) { + yield { type: "tick", count: 1 }; + await restGate; + if (signal.aborted) return; + yield { type: "tick", count: 2 }; + yield { type: "tick", count: 3 }; + } + + const streamPromise = streamManager.stream(mockRes1 as any, generator, { + streamId, + }); + + // first event emitted + await vi.advanceTimersByTimeAsync(0); + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + expect(eventIds.length).toBe(1); + + // client disconnects (clients -> 0) while the generator is still active + if (closeHandler1) closeHandler1(); + + // reconnect well within the 15s grace window + await vi.advanceTimersByTimeAsync(3_000); + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + mockRes2.on.mockImplementation(() => {}); + + const reconnectPromise = streamManager.stream( + mockRes2 as any, + async function* () { + yield { type: "should-not-run" }; + }, + { streamId }, + ); + + // release the rest of the original generator now that we've reconnected + releaseRest?.(); + await vi.advanceTimersByTimeAsync(0); + await Promise.all([streamPromise, reconnectPromise]); + + // The reconnecting client must receive the subsequent events — proving + // the generator was NOT aborted during the disconnect. + const reconnectData = events2 + .filter((e) => e.startsWith("data: ")) + .map((e) => e.replace("data: ", "").replace("\n\n", "")); + expect(reconnectData.some((d) => d.includes('"count":2'))).toBe(true); + expect(reconnectData.some((d) => d.includes('"count":3'))).toBe(true); + // the placeholder generator passed on reconnect must never run + expect(events2.some((e) => e.includes("should-not-run"))).toBe(false); + + vi.useRealTimers(); + }); + + test("aborts the generator when no client reconnects within the grace window", async () => { + vi.useFakeTimers(); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + let signalAborted = false; + async function* generator(signal: AbortSignal) { + yield { type: "start" }; + await new Promise((resolve) => { + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + signalAborted = signal.aborted; + } + + const streamPromise = streamManager.stream(mockRes as any, generator); + await vi.advanceTimersByTimeAsync(0); + + if (closeHandler) closeHandler(); + + // Just before the window expires, the generator is still alive. + await vi.advanceTimersByTimeAsync(14_999); + expect(signalAborted).toBe(false); + + // Crossing the window aborts the abandoned generator (jobs cleanup). + await vi.advanceTimersByTimeAsync(1); + await streamPromise; + expect(signalAborted).toBe(true); + + vi.useRealTimers(); + }); + + test("respects a custom disconnectGraceMs", async () => { + vi.useFakeTimers(); + const customManager = new StreamManager({ disconnectGraceMs: 1_000 }); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + let signalAborted = false; + async function* generator(signal: AbortSignal) { + yield { type: "start" }; + await new Promise((resolve) => { + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + signalAborted = signal.aborted; + } + + const streamPromise = customManager.stream(mockRes as any, generator); + await vi.advanceTimersByTimeAsync(0); + if (closeHandler) closeHandler(); + + await vi.advanceTimersByTimeAsync(999); + expect(signalAborted).toBe(false); + await vi.advanceTimersByTimeAsync(1); + await streamPromise; + expect(signalAborted).toBe(true); + + vi.useRealTimers(); + }); + + test("stream completion clears the pending grace timer (no late abort)", async () => { + vi.useFakeTimers(); + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + const abortSpy = vi.fn(); + + async function* generator(signal: AbortSignal) { + signal.addEventListener("abort", abortSpy, { once: true }); + yield { type: "only" }; + // generator finishes here -> stream becomes completed + } + + const streamPromise = streamManager.stream(mockRes as any, generator); + await streamPromise; + + // Stream has completed. A late close handler must not schedule an abort, + // and any timer that might have been pending must not fire. + if (closeHandler) closeHandler(); + await vi.advanceTimersByTimeAsync(30_000); + + expect(abortSpy).not.toHaveBeenCalled(); + + vi.useRealTimers(); }); }); diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index bb6f65f6e..0e2421402 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -49,6 +49,12 @@ export interface StreamEntry { lastAccess: number; abortController: AbortController; traceContext: Context; + /** + * Pending timer that aborts the generator once the disconnect grace window + * elapses with no clients. Cleared if a client reconnects, or when the + * stream completes/errors. Undefined while at least one client is connected. + */ + disconnectGraceTimer?: NodeJS.Timeout; } export interface StreamOperation { diff --git a/packages/shared/src/execute.ts b/packages/shared/src/execute.ts index 62ac45bfb..17abd9071 100644 --- a/packages/shared/src/execute.ts +++ b/packages/shared/src/execute.ts @@ -11,6 +11,13 @@ export interface StreamConfig { maxPersistentBuffers?: number; heartbeatInterval?: number; maxActiveStreams?: number; + /** + * How long (ms) an idle stream's generator is kept alive after the last + * client disconnects, so a reconnecting client can resume. If no client + * reconnects within this window, the abandoned stream's generator is + * aborted (e.g. to stop background polling loops like jobs runAndWait). + */ + disconnectGraceMs?: number; } /** Retry configuration for the RetryInterceptor. Uses exponential backoff with full jitter between attempts. */ From 3aee08beed5e0f4cc368eafc7a9c976cb08d7797 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Fri, 12 Jun 2026 15:38:31 +0200 Subject: [PATCH 2/3] fix(appkit): clear disconnect-grace timer on stream eviction A stream whose last client disconnected arms a disconnect-grace timer and stays in the registry. If it was then chosen as the LRU eviction victim, the timer was never cleared, so its setTimeout closure pinned the removed StreamEntry (and its event buffer) in memory until the grace window elapsed -- defeating eviction exactly when memory is under pressure. Clear the pending grace timer on every registry removal path via a new StreamRegistry._clearGraceTimer helper, called from both _evictOldestStream and clear(). This makes the "timers never leak when the entry is removed" invariant hold everywhere, not just on completion/error/abortAll. With clear() now sweeping grace timers, StreamManager.abortAll()'s inline timer loop (and the StreamRegistry.values() method that fed it) are redundant and removed; _scheduleGraceAbort reuses the manager's _clearGraceTimer helper instead of an inline guard. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- packages/appkit/src/stream/stream-manager.ts | 12 ++---------- packages/appkit/src/stream/stream-registry.ts | 16 +++++++++++----- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index ab2c1e49b..27e2f7135 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -77,13 +77,7 @@ export class StreamManager { // abort all active operations abortAll(): void { - // clear any pending disconnect-grace timers so they can't fire late - for (const entry of this.streamRegistry.values()) { - if (entry.disconnectGraceTimer) { - clearTimeout(entry.disconnectGraceTimer); - entry.disconnectGraceTimer = undefined; - } - } + // pending disconnect-grace timers are cleared by streamRegistry.clear() below this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); operation.controller.abort( @@ -428,9 +422,7 @@ export class StreamManager { // stopping background polling loops for genuinely abandoned streams. private _scheduleGraceAbort(streamEntry: StreamEntry): void { // clear any existing grace timer to avoid stacking - if (streamEntry.disconnectGraceTimer) { - clearTimeout(streamEntry.disconnectGraceTimer); - } + this._clearGraceTimer(streamEntry); const timer = setTimeout(() => { streamEntry.disconnectGraceTimer = undefined; diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index bdb5cee82..e4f64c1e7 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -41,21 +41,26 @@ export class StreamRegistry { return this.streams.getSize(); } - // get all streams currently in the registry - values(): StreamEntry[] { - return this.streams.getAll(); - } - clear(): void { const allStreams = this.streams.getAll(); for (const stream of allStreams) { + this._clearGraceTimer(stream); stream.abortController.abort("Server shutdown"); } this.streams.clear(); } + // clear a pending disconnect-grace timer so a removed stream can't keep its + // entry (and event buffer) alive until the timer fires + private _clearGraceTimer(stream: StreamEntry): void { + if (stream.disconnectGraceTimer) { + clearTimeout(stream.disconnectGraceTimer); + stream.disconnectGraceTimer = undefined; + } + } + // evict the oldest stream from the registry private _evictOldestStream(excludeStreamId: string): void { const allStreams = this.streams.getAll(); @@ -88,6 +93,7 @@ export class StreamRegistry { } } } + this._clearGraceTimer(oldestStream); oldestStream.abortController.abort("Stream evicted"); this.streams.remove(oldestStream.streamId); } From 08fa345a3c48fd28dd42d26114368fb463d57597 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Fri, 12 Jun 2026 15:43:45 +0200 Subject: [PATCH 3/3] chore(appkit): tighten verbose grace-window comments Trim the multi-line explanatory comments added with the disconnect-grace work down to single-line comments matching the surrounding house style. No behavior change. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- packages/appkit/src/stream/stream-manager.ts | 22 +++++-------------- packages/appkit/src/stream/stream-registry.ts | 3 +-- packages/appkit/src/stream/types.ts | 6 +---- packages/shared/src/execute.ts | 8 ++----- 4 files changed, 9 insertions(+), 30 deletions(-) diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 27e2f7135..14741d64e 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -119,8 +119,7 @@ export class StreamManager { } } - // A client is (re)attaching: cancel any pending disconnect-grace abort so - // a reconnection within the grace window resumes the live generator. + // a reconnecting client cancels the pending disconnect-grace abort this._clearGraceTimer(streamEntry); // add client to stream entry @@ -148,10 +147,7 @@ export class StreamManager { streamEntry.clients.delete(res); this.activeOperations.delete(streamOperation); - // When no clients remain, don't abort immediately — start a grace timer - // so a reconnecting client can resume the stream. Only truly-abandoned - // streams (no reconnect within the window) get their generator aborted, - // which stops background polling loops (e.g. jobs runAndWait). + // grace-abort instead of aborting now, so a reconnect can resume if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { this._scheduleGraceAbort(streamEntry); } @@ -237,11 +233,7 @@ export class StreamManager { this.activeOperations.delete(streamOperation); streamEntry.clients.delete(res); - // When no clients remain, don't abort the generator immediately — start a - // grace timer so a reconnecting client (e.g. SSE resume with - // Last-Event-ID) can keep the stream alive. Polling loops (e.g. jobs - // runAndWait) of truly-abandoned streams are still stopped, just - // `disconnectGraceMs` later instead of instantly. + // grace-abort instead of aborting now, so a reconnect can resume if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { this._scheduleGraceAbort(streamEntry); } @@ -415,13 +407,9 @@ export class StreamManager { } } - // Schedule a delayed abort for a stream whose last client just disconnected. - // If a client reconnects within the grace window, the timer is cleared in - // `_attachToExistingStream` and the generator keeps running so reconnection - // can resume. Otherwise the generator is aborted once the window expires, - // stopping background polling loops for genuinely abandoned streams. + // abort the generator after the grace window unless a client reconnects first private _scheduleGraceAbort(streamEntry: StreamEntry): void { - // clear any existing grace timer to avoid stacking + // clear any existing timer to avoid stacking this._clearGraceTimer(streamEntry); const timer = setTimeout(() => { diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index e4f64c1e7..129f53e5f 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -52,8 +52,7 @@ export class StreamRegistry { this.streams.clear(); } - // clear a pending disconnect-grace timer so a removed stream can't keep its - // entry (and event buffer) alive until the timer fires + // clear a pending grace timer so a removed stream isn't pinned until it fires private _clearGraceTimer(stream: StreamEntry): void { if (stream.disconnectGraceTimer) { clearTimeout(stream.disconnectGraceTimer); diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index 0e2421402..e9a6bbc0f 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -49,11 +49,7 @@ export interface StreamEntry { lastAccess: number; abortController: AbortController; traceContext: Context; - /** - * Pending timer that aborts the generator once the disconnect grace window - * elapses with no clients. Cleared if a client reconnects, or when the - * stream completes/errors. Undefined while at least one client is connected. - */ + // pending grace-window abort, set while the last client is disconnected disconnectGraceTimer?: NodeJS.Timeout; } diff --git a/packages/shared/src/execute.ts b/packages/shared/src/execute.ts index 17abd9071..708a892be 100644 --- a/packages/shared/src/execute.ts +++ b/packages/shared/src/execute.ts @@ -11,12 +11,8 @@ export interface StreamConfig { maxPersistentBuffers?: number; heartbeatInterval?: number; maxActiveStreams?: number; - /** - * How long (ms) an idle stream's generator is kept alive after the last - * client disconnects, so a reconnecting client can resume. If no client - * reconnects within this window, the abandoned stream's generator is - * aborted (e.g. to stop background polling loops like jobs runAndWait). - */ + // ms to keep a generator alive after the last client disconnects, so a + // reconnecting client can resume before the stream is aborted disconnectGraceMs?: number; }