Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"packages/appkit/src/core/agent/tools/index.ts",
"packages/appkit/src/core/agent/load-agents.ts",
"packages/appkit/src/connectors/mcp/index.ts",
"packages/appkit/src/stream/buffers.ts",
"packages/appkit/src/typedoc.entry.ts",
"template/**",
"tools/**",
Expand Down
2 changes: 0 additions & 2 deletions packages/appkit/src/stream/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ export const streamDefaults = {
bufferSize: 100,
maxEventSize: 1024 * 1024, // 1MB
bufferTTL: 10 * 60 * 1000, // 10 minutes
cleanupInterval: 5 * 60 * 1000, // 5 minutes
maxPersistentBuffers: 10000, // 10000 buffers
heartbeatInterval: 10 * 1000, // 10 seconds
maxActiveStreams: 1000, // 1000 streams
} as const;
90 changes: 69 additions & 21 deletions packages/appkit/src/stream/stream-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ export class StreamManager {
}
}

// a reconnect cancels any pending registry removal so the entry isn't
// pulled out from under the newly attached client
if (streamEntry.removalTimer) {
clearTimeout(streamEntry.removalTimer);
streamEntry.removalTimer = undefined;
}

// add client to stream entry
streamEntry.clients.add(res);
streamEntry.lastAccess = Date.now();
Expand Down Expand Up @@ -148,13 +155,7 @@ export class StreamManager {
}

// cleanup if stream is completed and no clients are connected
if (streamEntry.isCompleted && streamEntry.clients.size === 0) {
setTimeout(() => {
if (streamEntry.clients.size === 0) {
this.streamRegistry.remove(streamEntry.streamId);
}
}, this.bufferTTL);
}
this._scheduleRemovalAfterTTL(streamEntry);
});

// if stream is completed, close connection
Expand All @@ -163,6 +164,11 @@ export class StreamManager {
// cleanup operation
this.activeOperations.delete(streamOperation);
clearInterval(heartbeat);
// we deliberately ended this client, so drop it from the entry and
// schedule removal now instead of relying on the transport's `close`
// event (the close handler's later delete is a safe no-op)
streamEntry.clients.delete(res);
this._scheduleRemovalAfterTTL(streamEntry);
}
}
private async _createNewStream(
Expand Down Expand Up @@ -235,6 +241,11 @@ export class StreamManager {
new DOMException("Client disconnected", "AbortError"),
);
}

// if the stream already finished (completed or errored), schedule
// registry removal once the buffer TTL elapses so completed streams
// don't accumulate in the registry forever
this._scheduleRemovalAfterTTL(streamEntry);
});

await this._processGeneratorInBackground(streamEntry);
Expand Down Expand Up @@ -285,11 +296,11 @@ export class StreamManager {

streamEntry.isCompleted = true;

// close all clients
// close all clients (this also drops them from the entry, so the
// removal below is scheduled regardless of whether the transport
// ever emits `close`)
this._closeAllClients(streamEntry);

// cleanup if no clients are connected
this._cleanupStream(streamEntry);
this._scheduleRemovalAfterTTL(streamEntry);
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : "Internal server error";
Expand All @@ -301,7 +312,7 @@ export class StreamManager {
logger.info("Stream aborted by client (code=%s)", errorCode);
streamEntry.isCompleted = true;
this._closeAllClients(streamEntry);
this._cleanupStream(streamEntry);
this._scheduleRemovalAfterTTL(streamEntry);
return;
}

Expand All @@ -328,6 +339,12 @@ export class StreamManager {
true,
);
streamEntry.isCompleted = true;

// the broadcast above already ended the connected clients; drop them
// from the entry so removal is scheduled regardless of whether the
// transport ever emits `close`
this._closeAllClients(streamEntry);
this._scheduleRemovalAfterTTL(streamEntry);
}
});
}
Expand Down Expand Up @@ -391,24 +408,55 @@ export class StreamManager {
}
}

// close all connected clients
// close all connected clients and remove them from the stream entry.
// We are deliberately terminating these connections, so cleanup must not
// depend on the transport emitting a `close` event for each client (it may
// never fire). The close handlers' later `clients.delete(...)` calls remain
// safe no-ops.
private _closeAllClients(streamEntry: StreamEntry): void {
for (const client of streamEntry.clients) {
if (!client.writableEnded) {
client.end();
}
}
streamEntry.clients.clear();
}

// cleanup stream if no clients are connected
private _cleanupStream(streamEntry: StreamEntry): void {
if (streamEntry.clients.size === 0) {
setTimeout(() => {
if (streamEntry.clients.size === 0) {
this.streamRegistry.remove(streamEntry.streamId);
}
}, this.bufferTTL);
// schedule registry removal once a finished (completed or errored) stream
// has no connected clients. The event buffer stays available for
// reconnect replay for `bufferTTL` after the last client disconnects;
// after that the stream entry is removed from the registry so it can be
// garbage collected.
private _scheduleRemovalAfterTTL(streamEntry: StreamEntry): void {
if (!streamEntry.isCompleted || streamEntry.clients.size > 0) {
return;
}

// at most one removal timer per stream: rescheduling replaces any
// pending timer instead of stacking a new one (each pending timer pins
// the entry's buffer/generator/trace context for the full TTL)
if (streamEntry.removalTimer) {
clearTimeout(streamEntry.removalTimer);
}

// mark the moment the stream became idle so a reconnect during the TTL
// window (which refreshes lastAccess) makes the pending timer a no-op
streamEntry.lastAccess = Date.now();

streamEntry.removalTimer = setTimeout(() => {
streamEntry.removalTimer = undefined;
// safety net: no-op if a client reconnected during the TTL window;
// a fresh removal is scheduled when that client disconnects
if (
streamEntry.clients.size === 0 &&
Date.now() - streamEntry.lastAccess >= this.bufferTTL
) {
this.streamRegistry.remove(streamEntry.streamId);
}
}, this.bufferTTL);

// don't keep the process alive just to clean up finished streams
streamEntry.removalTimer.unref?.();
}

private _categorizeError(error: unknown): SSEErrorCode {
Expand Down
81 changes: 56 additions & 25 deletions packages/appkit/src/stream/stream-registry.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
import { RingBuffer } from "./buffers";
import { SSEErrorCode, type StreamEntry } from "./types";

export class StreamRegistry {
private streams: RingBuffer<StreamEntry>;
// keyed storage with explicit, policy-driven eviction. A ring buffer is
// unsuitable here: it overwrites by insertion slot, so evicting an entry
// chosen by policy (e.g. a completed stream) and then adding would
// silently clobber an unrelated live stream sitting in the oldest slot.
private streams: Map<string, StreamEntry>;
private maxActiveStreams: number;

constructor(maxActiveStreams: number) {
this.streams = new RingBuffer<StreamEntry>(
maxActiveStreams,
(entry) => entry.streamId,
);
this.streams = new Map();
this.maxActiveStreams = maxActiveStreams;
}

// add a stream to the registry
add(entry: StreamEntry): void {
// enforce hard cap
if (this.streams.getSize() >= this.streams.capacity) {
// enforce hard cap, but only when inserting a genuinely new key:
// re-adding an existing streamId updates in place and doesn't grow the
// map, so evicting another stream for it would destroy an innocent one
if (
!this.streams.has(entry.streamId) &&
this.streams.size >= this.maxActiveStreams
) {
this._evictOldestStream(entry.streamId);
}

this.streams.add(entry);
this.streams.set(entry.streamId, entry);
}

// get a stream from the registry
get(streamId: string): StreamEntry | null {
return this.streams.get(streamId);
return this.streams.get(streamId) ?? null;
}

// check if a stream exists in the registry
Expand All @@ -33,42 +40,56 @@ export class StreamRegistry {

// remove a stream from the registry
remove(streamId: string): void {
this.streams.remove(streamId);
this.streams.delete(streamId);
}

// get the number of streams in the registry
size(): number {
return this.streams.getSize();
return this.streams.size;
}

clear(): void {
const allStreams = this.streams.getAll();

for (const stream of allStreams) {
for (const stream of this.streams.values()) {
stream.abortController.abort("Server shutdown");
if (stream.removalTimer) {
clearTimeout(stream.removalTimer);
stream.removalTimer = undefined;
}
}

this.streams.clear();
}

// evict the oldest stream from the registry
// evict the oldest stream from the registry, preferring completed streams.
// Completed streams waiting out their buffer TTL can look recently
// accessed, so plain LRU could evict a live stream while dead ones
// survive. Prefer the oldest completed stream when one exists and fall
// back to LRU over all streams otherwise.
private _evictOldestStream(excludeStreamId: string): void {
const allStreams = this.streams.getAll();
const allStreams = this.streams.values();
let oldestStream: StreamEntry | null = null;
let oldestAccess = Infinity;
let oldestCompletedStream: StreamEntry | null = null;
let oldestCompletedAccess = Infinity;

// find the least recently accessed stream
// find the least recently accessed stream (overall and completed-only)
for (const stream of allStreams) {
if (
stream.streamId !== excludeStreamId &&
stream.lastAccess < oldestAccess
) {
if (stream.streamId === excludeStreamId) continue;

if (stream.lastAccess < oldestAccess) {
oldestStream = stream;
oldestAccess = stream.lastAccess;
}

if (stream.isCompleted && stream.lastAccess < oldestCompletedAccess) {
oldestCompletedStream = stream;
oldestCompletedAccess = stream.lastAccess;
}
}

// abort the oldest stream
oldestStream = oldestCompletedStream ?? oldestStream;

// abort the evicted stream
if (oldestStream) {
// broadcast stream eviction error to all clients
for (const client of oldestStream.clients) {
Expand All @@ -83,8 +104,18 @@ export class StreamRegistry {
}
}
}
oldestStream.abortController.abort("Stream evicted");
this.streams.remove(oldestStream.streamId);
// abort with a DOMException AbortError so the error categorizer routes
// eviction as an abort (matching the manager's terminal paths) rather
// than a stream failure
oldestStream.abortController.abort(
new DOMException("Stream evicted", "AbortError"),
);
// a pending removal timer would otherwise pin the evicted entry
if (oldestStream.removalTimer) {
clearTimeout(oldestStream.removalTimer);
oldestStream.removalTimer = undefined;
}
this.streams.delete(oldestStream.streamId);
}
}
}
Loading
Loading