diff --git a/crates/bindings-typescript/src/sdk/connection_manager.ts b/crates/bindings-typescript/src/sdk/connection_manager.ts index 7f43c274104..5e6f439efb8 100644 --- a/crates/bindings-typescript/src/sdk/connection_manager.ts +++ b/crates/bindings-typescript/src/sdk/connection_manager.ts @@ -46,12 +46,16 @@ export type ConnectionState = { type Listener = () => void; +export const CONNECTION_MANAGER_RECONNECT_DELAY_MS = 1000; + type ManagedConnection = { connection?: DbConnectionImpl; + builder?: DbConnectionBuilder; refCount: number; state: ConnectionState; listeners: Set; pendingRelease: ReturnType | null; + reconnectTimer: ReturnType | null; onConnect?: (conn: DbConnectionImpl) => void; onDisconnect?: (ctx: ErrorContextInterface, error?: Error) => void; onConnectError?: (ctx: ErrorContextInterface, error: Error) => void; @@ -91,10 +95,12 @@ class ConnectionManagerImpl { } const managed: ManagedConnection = { connection: undefined, + builder: undefined, refCount: 0, state: defaultState(), listeners: new Set(), pendingRelease: null, + reconnectTimer: null, }; this.#connections.set(key, managed); return managed; @@ -106,47 +112,24 @@ class ConnectionManagerImpl { } } - /** - * Retains a connection, incrementing its reference count. - * Creates the connection on first call; returns existing connection on subsequent calls. - * Cancels any pending release if the connection was about to be cleaned up. - * - * @param key - Unique identifier for the connection (use getKey to generate) - * @param builder - Connection builder to create the connection if needed - * @returns The managed connection instance - */ - retain>( - key: string, - builder: DbConnectionBuilder - ): T { - const managed = this.#ensureEntry(key); - if (managed.pendingRelease) { - clearTimeout(managed.pendingRelease); - managed.pendingRelease = null; - } - managed.refCount += 1; - if (managed.connection) { - return managed.connection as T; - } - - const connection = builder.build(); - managed.connection = connection; - - const updateState = (updates: Partial) => { - managed.state = { ...managed.state, ...updates }; - this.#notify(managed); - }; + #updateState( + managed: ManagedConnection, + updates: Partial + ): void { + managed.state = { ...managed.state, ...updates }; + this.#notify(managed); + } - updateState({ - isActive: connection.isActive, - identity: connection.identity, - token: connection.token, - connectionId: connection.connectionId, - connectionError: undefined, - }); + #ensureCallbacks(managed: ManagedConnection): void { + if (managed.onConnect) { + return; + } managed.onConnect = conn => { - updateState({ + if (conn !== managed.connection) { + return; + } + this.#updateState(managed, { isActive: conn.isActive, identity: conn.identity, token: conn.token, @@ -156,26 +139,136 @@ class ConnectionManagerImpl { }; managed.onDisconnect = (ctx, error) => { - updateState({ - isActive: ctx.isActive, + if (ctx !== managed.connection) { + return; + } + this.#updateState(managed, { + isActive: false, connectionError: error ?? undefined, }); + this.#scheduleReconnect(managed); }; managed.onConnectError = (ctx, error) => { - updateState({ - isActive: ctx.isActive, + if (ctx !== managed.connection) { + return; + } + this.#updateState(managed, { + isActive: false, connectionError: error, }); + this.#scheduleReconnect(managed); }; + } + + #attachCallbacks>( + managed: ManagedConnection, + builder: DbConnectionBuilder + ): void { + this.#ensureCallbacks(managed); + builder.onConnect(managed.onConnect!); + builder.onDisconnect(managed.onDisconnect!); + builder.onConnectError(managed.onConnectError!); + } + + #detachCallbacks( + managed: ManagedConnection, + connection: DbConnectionImpl + ): void { + if (managed.onConnect) { + connection.removeOnConnect(managed.onConnect as any); + } + if (managed.onDisconnect) { + connection.removeOnDisconnect(managed.onDisconnect as any); + } + if (managed.onConnectError) { + connection.removeOnConnectError(managed.onConnectError as any); + } + } - builder.onConnect(managed.onConnect); - builder.onDisconnect(managed.onDisconnect); - builder.onConnectError(managed.onConnectError); + #buildManagedConnection>( + managed: ManagedConnection, + builder: DbConnectionBuilder + ): T { + managed.builder = builder; + const connection = builder.build(); + managed.connection = connection; + this.#attachCallbacks(managed, builder); + + this.#updateState(managed, { + isActive: connection.isActive, + identity: connection.identity, + token: connection.token, + connectionId: connection.connectionId, + connectionError: undefined, + }); return connection as T; } + #scheduleReconnect(managed: ManagedConnection): void { + if ( + managed.refCount <= 0 || + managed.pendingRelease || + managed.reconnectTimer || + !managed.builder + ) { + return; + } + + const connection = managed.connection; + if (connection) { + this.#detachCallbacks(managed, connection); + } + managed.connection = undefined; + managed.reconnectTimer = setTimeout(() => { + managed.reconnectTimer = null; + if ( + managed.refCount <= 0 || + managed.pendingRelease || + managed.connection || + !managed.builder + ) { + return; + } + + this.#buildManagedConnection(managed, managed.builder); + }, CONNECTION_MANAGER_RECONNECT_DELAY_MS); + } + + /** + * Retains a connection, incrementing its reference count. + * Creates the connection on first call; returns existing connection on subsequent calls. + * Cancels any pending release if the connection was about to be cleaned up. + * + * @param key - Unique identifier for the connection (use getKey to generate) + * @param builder - Connection builder to create the connection if needed + * @returns The managed connection instance + */ + retain>( + key: string, + builder: DbConnectionBuilder + ): T { + const managed = this.#ensureEntry(key); + if (managed.pendingRelease) { + clearTimeout(managed.pendingRelease); + managed.pendingRelease = null; + } + if (managed.reconnectTimer) { + clearTimeout(managed.reconnectTimer); + managed.reconnectTimer = null; + } + + managed.refCount += 1; + managed.builder = builder; + + if (managed.connection) { + return managed.connection as T; + } + + return this.#buildManagedConnection(managed, builder); + } + release(key: string): void { const managed = this.#connections.get(key); if (!managed) { @@ -187,24 +280,21 @@ class ConnectionManagerImpl { return; } + if (managed.reconnectTimer) { + clearTimeout(managed.reconnectTimer); + managed.reconnectTimer = null; + } + managed.pendingRelease = setTimeout(() => { managed.pendingRelease = null; if (managed.refCount > 0) { return; } - if (managed.connection) { - if (managed.onConnect) { - managed.connection.removeOnConnect(managed.onConnect as any); - } - if (managed.onDisconnect) { - managed.connection.removeOnDisconnect(managed.onDisconnect as any); - } - if (managed.onConnectError) { - managed.connection.removeOnConnectError( - managed.onConnectError as any - ); - } - managed.connection.disconnect(); + const connection = managed.connection; + managed.connection = undefined; + if (connection) { + this.#detachCallbacks(managed, connection); + connection.disconnect(); } this.#connections.delete(key); }, 0); diff --git a/crates/bindings-typescript/src/sdk/db_connection_impl.ts b/crates/bindings-typescript/src/sdk/db_connection_impl.ts index 1e604349f66..df5221a0aa7 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_impl.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_impl.ts @@ -321,12 +321,12 @@ export class DbConnectionImpl this.ws = v; this.ws.onclose = () => { - this.#emitter.emit('disconnect', this); this.isActive = false; + this.#emitter.emit('disconnect', this); }; this.ws.onerror = (e: ErrorEvent) => { - this.#emitter.emit('connectError', this, e); this.isActive = false; + this.#emitter.emit('connectError', this, e); }; this.ws.onopen = this.#handleOnOpen.bind(this); this.ws.onmessage = this.#handleOnMessage.bind(this); diff --git a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts index b32e7744a00..e04f411c7cc 100644 --- a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts @@ -19,6 +19,7 @@ class WebsocketTestAdapter implements WebSocketAdapter { #onclose: (ev: CloseEvent) => void = () => {}; #onopen: () => void = () => {}; #onmessage: (msg: { data: Uint8Array }) => void = () => {}; + #onerror: (msg: ErrorEvent) => void = () => {}; constructor() { this.messageQueue = []; @@ -39,7 +40,13 @@ class WebsocketTestAdapter implements WebSocketAdapter { this.#onmessage = handler; } - set onerror(_handler: (msg: ErrorEvent) => void) {} + set onerror(handler: (msg: ErrorEvent) => void) { + this.#onerror = handler; + } + + error(error: Error): void { + this.#onerror(error as unknown as ErrorEvent); + } send(message: Uint8Array): void { const rawMessage = message.slice(); diff --git a/crates/bindings-typescript/tests/connection_manager_reconnect.test.ts b/crates/bindings-typescript/tests/connection_manager_reconnect.test.ts new file mode 100644 index 00000000000..27d2ed62d95 --- /dev/null +++ b/crates/bindings-typescript/tests/connection_manager_reconnect.test.ts @@ -0,0 +1,322 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; +import { ConnectionId } from '../src'; +import { + CONNECTION_MANAGER_RECONNECT_DELAY_MS, + ConnectionManager, +} from '../src/sdk/connection_manager.ts'; + +type ErrorContextInterface = { + isActive: boolean; +}; + +class MockConnection { + isActive = false; + identity = undefined; + token = undefined; + connectionId = ConnectionId.random(); + disconnected = false; + + #onConnectCallbacks = new Set<(conn: MockConnection) => void>(); + #onDisconnectCallbacks = new Set< + (ctx: ErrorContextInterface, error?: Error) => void + >(); + #onConnectErrorCallbacks = new Set< + (ctx: ErrorContextInterface, error: Error) => void + >(); + + disconnect(): void { + if (this.disconnected) { + return; + } + this.disconnected = true; + this.isActive = false; + for (const cb of this.#onDisconnectCallbacks) { + cb(this as unknown as ErrorContextInterface); + } + } + + removeOnConnect(cb: (conn: MockConnection) => void): void { + this.#onConnectCallbacks.delete(cb); + } + + removeOnDisconnect( + cb: (ctx: ErrorContextInterface, error?: Error) => void + ): void { + this.#onDisconnectCallbacks.delete(cb); + } + + removeOnConnectError( + cb: (ctx: ErrorContextInterface, error: Error) => void + ): void { + this.#onConnectErrorCallbacks.delete(cb); + } + + callbackCounts(): { + connect: number; + disconnect: number; + connectError: number; + } { + return { + connect: this.#onConnectCallbacks.size, + disconnect: this.#onDisconnectCallbacks.size, + connectError: this.#onConnectErrorCallbacks.size, + }; + } + + simulateConnect(): void { + this.isActive = true; + for (const cb of this.#onConnectCallbacks) { + cb(this); + } + } + + simulateDisconnect(error?: Error): void { + this.isActive = false; + for (const cb of this.#onDisconnectCallbacks) { + cb(this as unknown as ErrorContextInterface, error); + } + } + + simulateConnectError(error: Error): void { + this.isActive = false; + for (const cb of this.#onConnectErrorCallbacks) { + cb(this as unknown as ErrorContextInterface, error); + } + } + + registerOnConnect(cb: (conn: MockConnection) => void): void { + this.#onConnectCallbacks.add(cb); + } + + registerOnDisconnect( + cb: (ctx: ErrorContextInterface, error?: Error) => void + ): void { + this.#onDisconnectCallbacks.add(cb); + } + + registerOnConnectError( + cb: (ctx: ErrorContextInterface, error: Error) => void + ): void { + this.#onConnectErrorCallbacks.add(cb); + } +} + +class MockBuilder { + buildCount = 0; + connections: MockConnection[] = []; + + #onConnectCallbacks = new Set<(conn: MockConnection) => void>(); + #onDisconnectCallbacks = new Set< + (ctx: ErrorContextInterface, error?: Error) => void + >(); + #onConnectErrorCallbacks = new Set< + (ctx: ErrorContextInterface, error: Error) => void + >(); + + build(): MockConnection { + const connection = new MockConnection(); + this.buildCount += 1; + this.connections.push(connection); + + for (const cb of this.#onConnectCallbacks) { + connection.registerOnConnect(cb); + } + for (const cb of this.#onDisconnectCallbacks) { + connection.registerOnDisconnect(cb); + } + for (const cb of this.#onConnectErrorCallbacks) { + connection.registerOnConnectError(cb); + } + + return connection; + } + + onConnect(cb: (conn: MockConnection) => void): MockBuilder { + this.#onConnectCallbacks.add(cb); + for (const connection of this.connections) { + connection.registerOnConnect(cb); + } + return this; + } + + onDisconnect( + cb: (ctx: ErrorContextInterface, error?: Error) => void + ): MockBuilder { + this.#onDisconnectCallbacks.add(cb); + for (const connection of this.connections) { + connection.registerOnDisconnect(cb); + } + return this; + } + + onConnectError( + cb: (ctx: ErrorContextInterface, error: Error) => void + ): MockBuilder { + this.#onConnectErrorCallbacks.add(cb); + for (const connection of this.connections) { + connection.registerOnConnectError(cb); + } + return this; + } +} + +let keyCounter = 0; + +function nextKey(): string { + keyCounter += 1; + return `connection-manager-reconnect-${keyCounter}`; +} + +function retainMock(key: string, builder: MockBuilder): MockConnection { + return ConnectionManager.retain( + key, + builder as any + ) as unknown as MockConnection; +} + +describe('ConnectionManager retained reconnect behavior', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.runOnlyPendingTimers(); + vi.useRealTimers(); + }); + + test('rebuilds a retained connection after disconnect', () => { + const key = nextKey(); + const builder = new MockBuilder(); + + const first = retainMock(key, builder); + expect(builder.buildCount).toBe(1); + + first.simulateDisconnect(); + + expect(ConnectionManager.getSnapshot(key)?.isActive).toBe(false); + expect(ConnectionManager.getConnection(key)).toBeNull(); + + vi.advanceTimersByTime(CONNECTION_MANAGER_RECONNECT_DELAY_MS - 1); + expect(builder.buildCount).toBe(1); + + vi.advanceTimersByTime(1); + expect(builder.buildCount).toBe(2); + + const second = ConnectionManager.getConnection( + key + ) as unknown as MockConnection; + expect(second).toBe(builder.connections[1]); + expect(second).not.toBe(first); + + ConnectionManager.release(key); + }); + + test('rebuilds a retained connection after connectError', () => { + const key = nextKey(); + const builder = new MockBuilder(); + const error = new Error('network unavailable'); + + const first = retainMock(key, builder); + first.simulateConnectError(error); + + expect(ConnectionManager.getSnapshot(key)?.isActive).toBe(false); + expect(ConnectionManager.getSnapshot(key)?.connectionError).toBe(error); + expect(ConnectionManager.getConnection(key)).toBeNull(); + + vi.advanceTimersByTime(CONNECTION_MANAGER_RECONNECT_DELAY_MS); + + expect(builder.buildCount).toBe(2); + expect(ConnectionManager.getSnapshot(key)?.connectionError).toBeUndefined(); + expect(ConnectionManager.getConnection(key)).toBe(builder.connections[1]); + + ConnectionManager.release(key); + }); + + test('same-key retain after disconnect returns a fresh connection immediately', () => { + const key = nextKey(); + const builder = new MockBuilder(); + + const first = retainMock(key, builder); + first.simulateDisconnect(); + + const second = retainMock(key, builder); + + expect(builder.buildCount).toBe(2); + expect(second).not.toBe(first); + expect(ConnectionManager.getConnection(key)).toBe(second); + + vi.advanceTimersByTime(CONNECTION_MANAGER_RECONNECT_DELAY_MS); + expect(builder.buildCount).toBe(2); + + ConnectionManager.release(key); + ConnectionManager.release(key); + }); + + test('reconnect uses callbacks from a replacement same-key builder', () => { + const key = nextKey(); + const firstBuilder = new MockBuilder(); + const secondBuilder = new MockBuilder(); + + const first = retainMock(key, firstBuilder); + first.simulateConnect(); + + ConnectionManager.release(key); + const retained = retainMock(key, secondBuilder); + + expect(retained).toBe(first); + expect(firstBuilder.buildCount).toBe(1); + expect(secondBuilder.buildCount).toBe(0); + + first.simulateDisconnect(); + vi.advanceTimersByTime(CONNECTION_MANAGER_RECONNECT_DELAY_MS); + + expect(secondBuilder.buildCount).toBe(1); + const second = secondBuilder.connections[0]; + expect(ConnectionManager.getConnection(key)).toBe(second); + + second.simulateConnect(); + + expect(ConnectionManager.getSnapshot(key)?.isActive).toBe(true); + expect(ConnectionManager.getSnapshot(key)?.connectionId).toBe( + second.connectionId + ); + + ConnectionManager.release(key); + }); + + test('disconnect removes manager callbacks from the old connection before pending reconnect', () => { + const key = nextKey(); + const builder = new MockBuilder(); + + const first = retainMock(key, builder); + expect(first.callbackCounts()).toEqual({ + connect: 1, + disconnect: 1, + connectError: 1, + }); + + first.simulateDisconnect(); + + expect(first.callbackCounts()).toEqual({ + connect: 0, + disconnect: 0, + connectError: 0, + }); + + ConnectionManager.release(key); + }); + + test('release cancels a pending reconnect', () => { + const key = nextKey(); + const builder = new MockBuilder(); + + const first = retainMock(key, builder); + first.simulateDisconnect(); + + ConnectionManager.release(key); + vi.advanceTimersByTime(CONNECTION_MANAGER_RECONNECT_DELAY_MS); + + expect(builder.buildCount).toBe(1); + expect(ConnectionManager.getConnection(key)).toBeNull(); + }); +}); diff --git a/crates/bindings-typescript/tests/db_connection.test.ts b/crates/bindings-typescript/tests/db_connection.test.ts index 0580518e280..7ac0440cd5d 100644 --- a/crates/bindings-typescript/tests/db_connection.test.ts +++ b/crates/bindings-typescript/tests/db_connection.test.ts @@ -166,6 +166,58 @@ describe('DbConnection', () => { expect(connectCalled).toBeFalsy(); }); + test('marks connection inactive before invoking onDisconnect callback', async () => { + const onDisconnectPromise = new Deferred(); + const wsAdapter = new WebsocketTestAdapter(); + let callbackIsActive: boolean | undefined; + + const client = DbConnection.builder() + .withUri('ws://127.0.0.1:1234') + .withDatabaseName('db') + .withWSFn(wsAdapter.openWebSocket) + .onDisconnect(ctx => { + callbackIsActive = ctx.isActive; + onDisconnectPromise.resolve(); + }) + .build(); + + await client['wsPromise']; + wsAdapter.acceptConnection(); + wsAdapter.close(); + + await onDisconnectPromise.promise; + + expect(callbackIsActive).toBe(false); + expect(client.isActive).toBe(false); + }); + + test('marks connection inactive before invoking onConnectError callback from websocket error', async () => { + const onConnectErrorPromise = new Deferred(); + const wsAdapter = new WebsocketTestAdapter(); + let callbackIsActive: boolean | undefined; + const error = new Error('websocket failed'); + + const client = DbConnection.builder() + .withUri('ws://127.0.0.1:1234') + .withDatabaseName('db') + .withWSFn(wsAdapter.openWebSocket) + .onConnectError(ctx => { + callbackIsActive = ctx.isActive; + onConnectErrorPromise.resolve(); + }) + .build(); + + await client['wsPromise']; + wsAdapter.acceptConnection(); + client.isActive = true; + wsAdapter.error(error); + + await onConnectErrorPromise.promise; + + expect(callbackIsActive).toBe(false); + expect(client.isActive).toBe(false); + }); + test('call onConnect callback after getting an identity', async () => { const onConnectPromise = new Deferred(); diff --git a/docs/docs/00200-core-concepts/00600-clients/00700-typescript-reference.md b/docs/docs/00200-core-concepts/00600-clients/00700-typescript-reference.md index 6ea9107c9af..1ed638137cd 100644 --- a/docs/docs/00200-core-concepts/00600-clients/00700-typescript-reference.md +++ b/docs/docs/00200-core-concepts/00600-clients/00700-typescript-reference.md @@ -1020,6 +1020,8 @@ The SpacetimeDB TypeScript SDK includes React bindings under the `spacetimedb/re The React integration is fully compatible with React StrictMode and correctly handles the double-mount behavior (only one WebSocket connection is created). +While a `SpacetimeDBProvider` is mounted, the React connection manager also replaces the managed `DbConnection` if the underlying WebSocket closes or reports a connection error. Hooks such as `useTable` observe the provider state, receive the fresh connection, and establish their subscriptions again. This provider-level recovery does not change the lower-level `DbConnection` contract: applications that create a `DbConnection` directly are still responsible for creating a new connection if they need reconnection behavior. + | Name | Description | | ----------------------------------------------------------- | --------------------------------------------------------- | | [`SpacetimeDBProvider` component](#component-spacetimedbprovider) | Context provider that manages the database connection. |