Skip to content

Commit 601d7bb

Browse files
committed
fix(websocket): negotiate blob transport for socket.io
1 parent fe88d62 commit 601d7bb

8 files changed

Lines changed: 391 additions & 15 deletions

File tree

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@sqlitecloud/drivers",
3-
"version": "1.0.837",
3+
"version": "1.0.872",
44
"description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients",
55
"main": "./lib/index.js",
66
"types": "./lib/index.d.ts",
@@ -46,6 +46,7 @@
4646
"eventemitter3": "^5.0.1",
4747
"lz4js": "^0.2.0",
4848
"socket.io-client": "^4.8.1",
49+
"socket.io-parser": "~4.2.4",
4950
"whatwg-url": "^14.2.0"
5051
},
5152
"peerDependencies": {

src/drivers/connection-ws.ts

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,54 @@
33
*/
44

55
import { io, Socket } from 'socket.io-client'
6+
import { Decoder as SocketIODecoder, Encoder as SocketIOEncoder } from 'socket.io-parser'
67
import { SQLiteCloudConnection } from './connection'
78
import { SQLiteCloudRowset } from './rowset'
8-
import { ErrorCallback, ResultsCallback, SQLiteCloudCommand, SQLiteCloudConfig, SQLiteCloudError } from './types'
9-
import { decodeBigIntMarkers, encodeBigIntMarkers } from './utilities'
9+
import {
10+
DEFAULT_WEBSOCKET_BLOB_TRANSFER_FORMAT,
11+
ErrorCallback,
12+
ResultsCallback,
13+
SQLiteCloudCommand,
14+
SQLiteCloudConfig,
15+
SQLiteCloudError,
16+
SQLiteCloudWebsocketBlobTransferFormat
17+
} from './types'
18+
import { decodeBigIntMarkers, decodeWebsocketRowsetData, encodeBigIntMarkers, parseWebsocketBlobTransferFormat, parseWebsocketMaxAttachments } from './utilities'
19+
20+
const SocketIODecoderBase = SocketIODecoder as unknown as new (...args: any[]) => { opts?: { maxAttachments?: number } }
21+
22+
function createSocketIOParser(maxAttachments: number) {
23+
class SQLiteCloudSocketIODecoder extends SocketIODecoderBase {
24+
constructor(opts?: any) {
25+
const decoderOptions = typeof opts === 'function' ? { reviver: opts } : opts
26+
super(decoderOptions?.reviver)
27+
this.opts ||= {}
28+
this.opts.maxAttachments = Math.max(this.opts.maxAttachments ?? decoderOptions?.maxAttachments ?? 0, maxAttachments)
29+
}
30+
}
31+
32+
return {
33+
Encoder: SocketIOEncoder,
34+
Decoder: SQLiteCloudSocketIODecoder
35+
}
36+
}
37+
38+
function getResponseBlobTransferFormat(response: any): SQLiteCloudWebsocketBlobTransferFormat | undefined {
39+
return parseWebsocketBlobTransferFormat(response?.capabilities?.blobTransferFormat || response?.blobTransferFormat, undefined)
40+
}
41+
42+
function getAttachmentLimitError(description: unknown, limit: number): SQLiteCloudError | undefined {
43+
const descriptionMessage = description instanceof Error ? description.message : typeof description === 'string' ? description : ''
44+
if (/illegal attachments/i.test(descriptionMessage)) {
45+
return new SQLiteCloudError(
46+
`WebSocket blob response exceeded the configured Socket.IO attachment limit (${limit}). Use websocketBlobFormat=base64-blobs-v1 or increase websocketMaxAttachments.`,
47+
{
48+
errorCode: 'ERR_WEBSOCKET_MAX_ATTACHMENTS_EXCEEDED',
49+
cause: description as Error | string
50+
}
51+
)
52+
}
53+
}
1054

1155
/**
1256
* Implementation of TransportConnection that connects to the database indirectly
@@ -31,20 +75,33 @@ export class SQLiteCloudWebsocketConnection extends SQLiteCloudConnection {
3175
if (!this.socket) {
3276
this.config = config
3377
const connectionstring = this.config.connectionstring as string
78+
const websocketMaxAttachments = parseWebsocketMaxAttachments(this.config.websocketMaxAttachments)
3479
const gatewayUrl = this.config?.gatewayurl || `${this.config.host === 'localhost' ? 'ws' : 'wss'}://${this.config.host as string}:443`
35-
this.socket = io(gatewayUrl, { auth: { token: connectionstring } })
80+
this.socket = io(gatewayUrl, { auth: { token: connectionstring }, parser: createSocketIOParser(websocketMaxAttachments) })
3681

3782
this.socket.on('connect', () => {
3883
callback?.call(this, null)
3984
})
4085

41-
this.socket.on('disconnect', reason => {
86+
this.socket.on('disconnect', (reason, description) => {
4287
this.close()
43-
callback?.call(this, new SQLiteCloudError('Disconnected', { errorCode: 'ERR_CONNECTION_ENDED', cause: reason }))
88+
callback?.call(
89+
this,
90+
(reason === 'parse error' && getAttachmentLimitError(description, websocketMaxAttachments)) ||
91+
new SQLiteCloudError('Disconnected', { errorCode: 'ERR_CONNECTION_ENDED', cause: reason })
92+
)
4493
})
4594

4695
this.socket.on('connect_error', (error: any) => {
4796
this.close()
97+
if (error?.message === 'parse error' || error?.cause === 'parse error') {
98+
callback?.call(
99+
this,
100+
getAttachmentLimitError(error?.description || error?.data || error?.cause, websocketMaxAttachments) ||
101+
new SQLiteCloudError('Connection error', { errorCode: 'ERR_CONNECTION_ERROR', cause: error })
102+
)
103+
return
104+
}
48105
let message = error.message || 'Connection error'
49106
if (typeof error.context == 'object' && error.context.responseText) {
50107
try {
@@ -87,16 +144,24 @@ export class SQLiteCloudWebsocketConnection extends SQLiteCloudConnection {
87144
sql: commands.query,
88145
bind: encodeBigIntMarkers(commands.parameters),
89146
row: 'array',
90-
safe_integer_mode: this.config.safe_integer_mode
147+
safe_integer_mode: this.config.safe_integer_mode,
148+
capabilities: {
149+
blobTransferFormat: this.config.websocketBlobFormat || DEFAULT_WEBSOCKET_BLOB_TRANSFER_FORMAT
150+
}
91151
},
92152
(response: any) => {
93153
if (response?.error) {
94154
const error = new SQLiteCloudError(response.error.detail, { ...response.error })
95155
callback?.call(this, error)
96156
} else {
97157
const { metadata } = response
98-
const data = decodeBigIntMarkers(response?.data, this.config.safe_integer_mode)
99-
if (data && metadata) {
158+
const blobTransferFormat = getResponseBlobTransferFormat(response)
159+
const data =
160+
metadata && metadata.numberOfRows !== undefined && metadata.numberOfColumns !== undefined && metadata.columns !== undefined
161+
? decodeWebsocketRowsetData(response?.data, metadata, this.config.safe_integer_mode, blobTransferFormat)
162+
: decodeBigIntMarkers(response?.data, this.config.safe_integer_mode)
163+
164+
if (data !== undefined && metadata) {
100165
if (metadata.numberOfRows !== undefined && metadata.numberOfColumns !== undefined && metadata.columns !== undefined) {
101166
console.assert(Array.isArray(data), 'SQLiteCloudWebsocketConnection.transportCommands - data is not an array')
102167
// we can recreate a SQLiteCloudRowset from the response which we know to be an array of arrays

src/drivers/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ export const DEFAULT_PORT = 8860
2020
* mixed - use BigInt and Number types depending on the value size
2121
*/
2222
export type SQLiteCloudSafeIntegerMode = 'number' | 'bigint' | 'mixed'
23+
export type SQLiteCloudWebsocketBlobTransferFormat = 'base64-blobs-v1' | 'socketio-blobs-v1'
2324

2425
export let SAFE_INTEGER_MODE: SQLiteCloudSafeIntegerMode = 'number'
26+
export const DEFAULT_WEBSOCKET_BLOB_TRANSFER_FORMAT: SQLiteCloudWebsocketBlobTransferFormat = 'base64-blobs-v1'
27+
export const DEFAULT_WEBSOCKET_MAX_ATTACHMENTS = 100000
2528
if (typeof process !== 'undefined') {
2629
const mode = process.env['SAFE_INTEGER_MODE']?.toLowerCase()
2730
if (mode === 'bigint' || mode === 'mixed' || mode === 'number') {
@@ -94,6 +97,10 @@ export interface SQLiteCloudConfig {
9497
usewebsocket?: boolean
9598
/** Url where we can connect to a SQLite Cloud Gateway that has a socket.io deamon waiting to connect, eg. wss://host:443 */
9699
gatewayurl?: string
100+
/** Preferred blob transfer format when using websocket transport. Defaults to base64-blobs-v1 for new clients. */
101+
websocketBlobFormat?: SQLiteCloudWebsocketBlobTransferFormat
102+
/** Maximum number of socket.io binary attachments accepted by the websocket parser. */
103+
websocketMaxAttachments?: number
97104

98105
/** Optional identifier used for verbose logging */
99106
clientid?: string

src/drivers/utilities.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@
33
//
44

55
import {
6+
DEFAULT_WEBSOCKET_BLOB_TRANSFER_FORMAT,
7+
DEFAULT_WEBSOCKET_MAX_ATTACHMENTS,
68
DEFAULT_PORT,
79
DEFAULT_TIMEOUT,
10+
ErrorCallback,
811
SAFE_INTEGER_MODE,
12+
SQLCloudRowsetMetadata,
913
SQLiteCloudArrayType,
1014
SQLiteCloudConfig,
1115
SQLiteCloudDataTypes,
1216
SQLiteCloudError,
13-
SQLiteCloudSafeIntegerMode
17+
SQLiteCloudSafeIntegerMode,
18+
SQLiteCloudWebsocketBlobTransferFormat
1419
} from './types'
1520
import { getSafeURL } from './safe-imports'
1621
import { Buffer } from 'buffer'
@@ -187,6 +192,8 @@ export function validateConfiguration(config: SQLiteCloudConfig): SQLiteCloudCon
187192
config.noblob = parseBoolean(config.noblob)
188193
config.compression = config.compression != undefined && config.compression != null ? parseBoolean(config.compression) : true // default: true
189194
config.safe_integer_mode = parseSafeIntegerMode(config.safe_integer_mode || SAFE_INTEGER_MODE)
195+
config.websocketBlobFormat = parseWebsocketBlobTransferFormat(config.websocketBlobFormat)
196+
config.websocketMaxAttachments = parseWebsocketMaxAttachments(config.websocketMaxAttachments)
190197

191198
config.create = parseBoolean(config.create)
192199
config.non_linearizable = parseBoolean(config.non_linearizable)
@@ -257,6 +264,8 @@ export function parseconnectionstring(connectionstring: string): SQLiteCloudConf
257264
maxrowset: options.maxrowset ? parseInt(options.maxrowset) : undefined,
258265
safe_integer_mode: options.safe_integer_mode ? parseSafeIntegerMode(options.safe_integer_mode) : undefined,
259266
usewebsocket: options.usewebsocket ? parseBoolean(options.usewebsocket) : undefined,
267+
websocketBlobFormat: options.websocket_blob_format ? parseWebsocketBlobTransferFormat(options.websocket_blob_format, undefined) : undefined,
268+
websocketMaxAttachments: options.websocket_max_attachments ? parseWebsocketMaxAttachments(options.websocket_max_attachments) : undefined,
260269
verbose: options.verbose ? parseBoolean(options.verbose) : undefined
261270
}
262271

@@ -302,7 +311,29 @@ export function parseSafeIntegerMode(value: string | SQLiteCloudSafeIntegerMode
302311
return 'number'
303312
}
304313

314+
/** Parse websocket BLOB transport format, falling back to the driver default for new websocket clients. */
315+
export function parseWebsocketBlobTransferFormat(
316+
value: string | SQLiteCloudWebsocketBlobTransferFormat | null | undefined,
317+
fallback: SQLiteCloudWebsocketBlobTransferFormat | undefined = DEFAULT_WEBSOCKET_BLOB_TRANSFER_FORMAT
318+
): SQLiteCloudWebsocketBlobTransferFormat | undefined {
319+
const format = value?.toLowerCase()
320+
if (format === 'base64-blobs-v1' || format === 'socketio-blobs-v1') {
321+
return format
322+
}
323+
return fallback
324+
}
325+
326+
/** Parse the maximum number of socket.io binary attachments allowed for a websocket response. */
327+
export function parseWebsocketMaxAttachments(value: string | number | null | undefined): number {
328+
const parsed = typeof value === 'string' ? Number.parseInt(value, 10) : value
329+
if (Number.isSafeInteger(parsed) && (parsed as number) > 0) {
330+
return parsed as number
331+
}
332+
return DEFAULT_WEBSOCKET_MAX_ATTACHMENTS
333+
}
334+
305335
const BIGINT_MARKER_RE = /^-?\d+n$/
336+
const BLOB_COLUMN_TYPE_RE = /\bblob\b/i
306337

307338
/** Convert values that JSON cannot represent losslessly into sqlitecloud-js bigint markers. */
308339
export function encodeBigIntMarkers(value: any): any {
@@ -349,3 +380,31 @@ export function decodeBigIntMarkers(value: any, safeIntegerMode?: SQLiteCloudSaf
349380

350381
return value
351382
}
383+
384+
/** Decode websocket rowset cells using metadata-aware rules for bigint markers and negotiated BLOB transport. */
385+
export function decodeWebsocketRowsetData(
386+
data: any,
387+
metadata: SQLCloudRowsetMetadata,
388+
safeIntegerMode?: SQLiteCloudSafeIntegerMode,
389+
blobTransferFormat?: SQLiteCloudWebsocketBlobTransferFormat
390+
): any {
391+
if (!Array.isArray(data)) {
392+
return decodeBigIntMarkers(data, safeIntegerMode)
393+
}
394+
395+
const blobColumnIndexes = new Set(
396+
metadata.columns.flatMap((column, index) => (column.type && BLOB_COLUMN_TYPE_RE.test(column.type) ? [index] : []))
397+
)
398+
const decodeCell = (value: any, columnIndex: number) => {
399+
if (blobTransferFormat === 'base64-blobs-v1' && blobColumnIndexes.has(columnIndex) && typeof value === 'string') {
400+
return Buffer.from(value, 'base64')
401+
}
402+
return decodeBigIntMarkers(value, safeIntegerMode)
403+
}
404+
405+
if (data.every(row => !Array.isArray(row))) {
406+
return data.map((value, index) => decodeCell(value, index % metadata.numberOfColumns))
407+
}
408+
409+
return data.map(row => (Array.isArray(row) ? row.map((value, columnIndex) => decodeCell(value, columnIndex)) : decodeBigIntMarkers(row, safeIntegerMode)))
410+
}

test/blob-transport.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { describe, expect, it } from '@jest/globals'
2+
import { Database, SQLiteCloudRowset } from '../src'
3+
import { GATEWAY_URL, LONG_TIMEOUT, getChinookDatabase } from './shared'
4+
5+
const BLOB_TRANSPORT_TABLE = 'blob_transport_probe'
6+
const BLOB_ROW_COUNT = 100
7+
const BLOB_SIZE = 32
8+
9+
async function assertBlobRowsRoundTrip(database: Database) {
10+
await database.sql(`DROP TABLE IF EXISTS ${BLOB_TRANSPORT_TABLE};`)
11+
await database.sql(`
12+
CREATE TABLE ${BLOB_TRANSPORT_TABLE} (
13+
id INTEGER PRIMARY KEY,
14+
label TEXT NOT NULL,
15+
payload BLOB NOT NULL
16+
);
17+
`)
18+
await database.sql(
19+
`
20+
WITH RECURSIVE seq(n) AS (
21+
SELECT 1
22+
UNION ALL
23+
SELECT n + 1 FROM seq WHERE n < ?
24+
)
25+
INSERT INTO ${BLOB_TRANSPORT_TABLE} (id, label, payload)
26+
SELECT n, 'row-' || n, randomblob(?)
27+
FROM seq;
28+
`,
29+
BLOB_ROW_COUNT,
30+
BLOB_SIZE
31+
)
32+
33+
const rows = (await database.sql(`
34+
SELECT
35+
id,
36+
label,
37+
length(payload) AS payloadLength,
38+
payload
39+
FROM ${BLOB_TRANSPORT_TABLE}
40+
ORDER BY id
41+
LIMIT ${BLOB_ROW_COUNT};
42+
`)) as SQLiteCloudRowset
43+
44+
expect(rows).toHaveLength(BLOB_ROW_COUNT)
45+
46+
rows.forEach((row, index) => {
47+
expect(row.id).toBe(index + 1)
48+
expect(row.label).toBe(`row-${index + 1}`)
49+
expect(row.payloadLength).toBe(BLOB_SIZE)
50+
expect(row.payload).toBeInstanceOf(Buffer)
51+
expect((row.payload as Buffer).length).toBe(BLOB_SIZE)
52+
})
53+
}
54+
55+
describe('blob rowsets across transports', () => {
56+
it(
57+
'should return more than 12 rows with small blobs over tls',
58+
async () => {
59+
let database: Database | undefined
60+
61+
try {
62+
database = getChinookDatabase()
63+
await assertBlobRowsRoundTrip(database)
64+
} finally {
65+
database?.close()
66+
}
67+
},
68+
LONG_TIMEOUT
69+
)
70+
71+
it(
72+
'should return more than 12 rows with small blobs over websocket',
73+
async () => {
74+
let database: Database | undefined
75+
76+
try {
77+
database = getChinookDatabase(undefined, {
78+
usewebsocket: true,
79+
gatewayurl: GATEWAY_URL
80+
})
81+
await assertBlobRowsRoundTrip(database)
82+
} finally {
83+
database?.close()
84+
}
85+
},
86+
LONG_TIMEOUT
87+
)
88+
})

0 commit comments

Comments
 (0)