From 1c8ddfa70215c3c448f66af2f2233918663227f2 Mon Sep 17 00:00:00 2001 From: BumSeok Hwang Date: Mon, 29 Dec 2025 05:12:27 +0000 Subject: [PATCH] Update API behavior and integration tests --- README.md | 33 ++-- package-lock.json | 161 +++++++++++++++++- package.json | 3 +- src/api/executeStatement.ts | 52 ++++-- src/api/fetchAll.ts | 14 ++ src/api/fetchRow.ts | 34 +++- src/api/fetchStream.ts | 105 +++++++++--- src/api/mergeExternalLinks.ts | 36 +++- src/types.ts | 18 +- test/executeStatement.spec.ts | 48 +++++- test/fetchRow.spec.ts | 146 ++++++++++------ test/fetchStream.spec.ts | 199 +++++++++------------- test/mergeExternalLinks.spec.ts | 53 ++---- test/s3.integration.spec.ts | 285 +++++++++++++++++++------------- test/testUtil.ts | 41 +++++ 15 files changed, 841 insertions(+), 387 deletions(-) diff --git a/README.md b/README.md index 8fb8e22..86863da 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ The goal is simple: stream big results with stable memory usage and without forc - Optimized polling with server-side wait (up to 50s) before falling back to client polling. - Query metrics support via Query History API (`enableMetrics` option). - Efficient external link handling: merge chunks into a single stream. +- Handles partial external link responses by fetching missing chunk metadata. - `mergeExternalLinks` supports streaming uploads and returns a new StatementResult with a presigned URL. - `fetchRow`/`fetchAll` support `JSON_OBJECT` (schema-based row mapping). - External links + JSON_ARRAY are supported for row iteration (streaming JSON parsing). @@ -48,11 +49,12 @@ console.log(rows) // [{ value: 1 }] ``` ## Sample (Streaming + Presigned URL) -Stream external links into S3 with gzip compression, then return a single presigned URL: +Stream external links into S3 with gzip compression, then return a single presigned URL. ```ts import { executeStatement, mergeExternalLinks } from '@bitofsky/databricks-sql' -import { GetObjectCommand, HeadObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3' +import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3' +import { Upload } from '@aws-sdk/lib-storage' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { createGzip } from 'zlib' import { pipeline } from 'stream/promises' @@ -79,15 +81,17 @@ const merged = await mergeExternalLinks(result, auth, { const gzip = createGzip() // Compress with gzip and upload to S3 const passThrough = new PassThrough() - const uploadPromise = s3.send( - new PutObjectCommand({ + const upload = new Upload({ + client: s3, + params: { Bucket: bucket, Key: key, Body: passThrough, - ContentType: 'text/csv', + ContentType: 'text/csv; charset=utf-8', ContentEncoding: 'gzip', - }) - ) + }, + }) + const uploadPromise = upload.done() await Promise.all([ pipeline(stream, gzip, passThrough), @@ -128,8 +132,8 @@ const result = await executeStatement( auth, { enableMetrics: true, - onProgress: (status, metrics) => { - console.log(`State: ${status.state}`) + onProgress: (result, metrics) => { + console.log(`State: ${result.status.state}`) if (metrics) { // metrics is optional, only present when enableMetrics: true console.log(` Execution time: ${metrics.execution_time_ms}ms`) console.log(` Rows produced: ${metrics.rows_produced_count}`) @@ -190,6 +194,7 @@ function executeStatement( ``` - Calls the Databricks Statement Execution API and polls until completion. - Server waits up to 50s (`wait_timeout`) before client-side polling begins. +- Default `wait_timeout` is `50s`, or `0s` when `onProgress` is provided. - Use `options.onProgress` to receive status updates with optional metrics. - Set `enableMetrics: true` to fetch query metrics from Query History API on each poll. - Throws `DatabricksSqlError` on failure, `StatementCancelledError` on cancel, and `AbortError` on abort. @@ -205,6 +210,7 @@ function fetchRow( - Streams each row to `options.onEachRow`. - Use `format: 'JSON_OBJECT'` to map rows into schema-based objects. - Supports `INLINE` results or `JSON_ARRAY` formatted `EXTERNAL_LINKS` only. +- If only a subset of external links is returned, missing chunk metadata is fetched by index. ### fetchAll(statementResult, auth, options?) ```ts @@ -216,6 +222,7 @@ function fetchAll( ``` - Collects all rows into an array. For large results, prefer `fetchRow`/`fetchStream`. - Supports `INLINE` results or `JSON_ARRAY` formatted `EXTERNAL_LINKS` only. +- If only a subset of external links is returned, missing chunk metadata is fetched by index. ### fetchStream(statementResult, auth, options?) ```ts @@ -230,6 +237,7 @@ function fetchStream( - Throws if the result is `INLINE`. - Ends as an empty stream when no external links exist. - `forceMerge: true` forces merge even when there is only a single external link. +- If only a subset of external links is returned, missing chunk metadata is fetched by index. ### mergeExternalLinks(statementResult, auth, options) ```ts @@ -248,8 +256,9 @@ function mergeExternalLinks( ### Options (Summary) ```ts type ExecuteStatementOptions = { - onProgress?: (status: StatementStatus, metrics?: QueryMetrics) => void + onProgress?: (result: StatementResult, metrics?: QueryMetrics) => void enableMetrics?: boolean // Fetch metrics from Query History API (default: false) + logger?: Logger signal?: AbortSignal disposition?: 'INLINE' | 'EXTERNAL_LINKS' format?: 'JSON_ARRAY' | 'ARROW_STREAM' | 'CSV' @@ -267,21 +276,25 @@ type FetchRowsOptions = { signal?: AbortSignal onEachRow?: (row: RowArray | RowObject) => void format?: 'JSON_ARRAY' | 'JSON_OBJECT' + logger?: Logger } type FetchAllOptions = { signal?: AbortSignal format?: 'JSON_ARRAY' | 'JSON_OBJECT' + logger?: Logger } type FetchStreamOptions = { signal?: AbortSignal forceMerge?: boolean + logger?: Logger } type MergeExternalLinksOptions = { signal?: AbortSignal forceMerge?: boolean + logger?: Logger mergeStreamToExternalLink: (stream: Readable) => Promise<{ externalLink: string byte_count: number diff --git a/package-lock.json b/package-lock.json index cc3d155..8e2b71a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@bitofsky/databricks-sql", - "version": "1.0.0", + "version": "1.0.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@bitofsky/databricks-sql", - "version": "1.0.0", + "version": "1.0.2", "license": "MIT", "dependencies": { "@bitofsky/merge-streams": "1.1.0", @@ -14,6 +14,7 @@ }, "devDependencies": { "@aws-sdk/client-s3": "3.958.0", + "@aws-sdk/lib-storage": "3.958.0", "@aws-sdk/s3-request-presigner": "3.958.0", "@types/node": "24.0.2", "@types/stream-json": "1.7.8", @@ -566,6 +567,28 @@ "node": ">=18.0.0" } }, + "node_modules/@aws-sdk/lib-storage": { + "version": "3.958.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.958.0.tgz", + "integrity": "sha512-cd8CTiJ165ep2DKTc2PHHhVCxDn3byv10BXMGn+lkDY3KwMoatcgZ1uhFWCBuJvsCUnSExqGouJN/Q0qgjkWtg==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@smithy/abort-controller": "^4.2.7", + "@smithy/middleware-endpoint": "^4.4.1", + "@smithy/smithy-client": "^4.10.2", + "buffer": "5.6.0", + "events": "3.3.0", + "stream-browserify": "3.0.0", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=18.0.0" + }, + "peerDependencies": { + "@aws-sdk/client-s3": "^3.958.0" + } + }, "node_modules/@aws-sdk/middleware-bucket-endpoint": { "version": "3.957.0", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-bucket-endpoint/-/middleware-bucket-endpoint-3.957.0.tgz", @@ -2878,6 +2901,27 @@ "node": ">=12" } }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, "node_modules/bowser": { "version": "2.13.1", "resolved": "https://registry.npmjs.org/bowser/-/bowser-2.13.1.tgz", @@ -2885,6 +2929,17 @@ "dev": true, "license": "MIT" }, + "node_modules/buffer": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", + "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", + "dev": true, + "license": "MIT", + "dependencies": { + "base64-js": "^1.0.2", + "ieee754": "^1.1.4" + } + }, "node_modules/bundle-require": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/bundle-require/-/bundle-require-5.1.0.tgz", @@ -3151,6 +3206,16 @@ "@types/estree": "^1.0.0" } }, + "node_modules/events": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", + "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=0.8.x" + } + }, "node_modules/expect-type": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/expect-type/-/expect-type-1.3.0.tgz", @@ -3252,6 +3317,34 @@ "node": ">=8" } }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "BSD-3-Clause" + }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", + "dev": true, + "license": "ISC" + }, "node_modules/joycon": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/joycon/-/joycon-3.1.1.tgz", @@ -3509,6 +3602,21 @@ } } }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dev": true, + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/readdirp": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-4.1.2.tgz", @@ -3575,6 +3683,27 @@ "fsevents": "~2.3.2" } }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, "node_modules/siginfo": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/siginfo/-/siginfo-2.0.0.tgz", @@ -3616,6 +3745,17 @@ "dev": true, "license": "MIT" }, + "node_modules/stream-browserify": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz", + "integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==", + "dev": true, + "license": "MIT", + "dependencies": { + "inherits": "~2.0.4", + "readable-stream": "^3.5.0" + } + }, "node_modules/stream-chain": { "version": "2.2.5", "resolved": "https://registry.npmjs.org/stream-chain/-/stream-chain-2.2.5.tgz", @@ -3631,6 +3771,16 @@ "stream-chain": "^2.2.5" } }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "dev": true, + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, "node_modules/strnum": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/strnum/-/strnum-2.1.2.tgz", @@ -3878,6 +4028,13 @@ "dev": true, "license": "MIT" }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "dev": true, + "license": "MIT" + }, "node_modules/vite": { "version": "7.3.0", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.0.tgz", diff --git a/package.json b/package.json index 76ad70a..4ef7b66 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@bitofsky/databricks-sql", - "version": "1.0.1", + "version": "1.0.2", "description": "Databricks SQL client for Node.js - Direct REST API without SDK", "main": "dist/index.cjs", "module": "dist/index.js", @@ -53,6 +53,7 @@ }, "devDependencies": { "@aws-sdk/client-s3": "3.958.0", + "@aws-sdk/lib-storage": "3.958.0", "@aws-sdk/s3-request-presigner": "3.958.0", "@types/node": "24.0.2", "@types/stream-json": "1.7.8", diff --git a/src/api/executeStatement.ts b/src/api/executeStatement.ts index 36fcdae..78b2046 100644 --- a/src/api/executeStatement.ts +++ b/src/api/executeStatement.ts @@ -45,14 +45,16 @@ export async function executeStatement( options: ExecuteStatementOptions = {} ): Promise { const warehouseId = options.warehouse_id ?? extractWarehouseId(auth.httpPath) - const { signal, onProgress, enableMetrics } = options + const { signal, onProgress, enableMetrics, logger } = options + const waitTimeout = options.wait_timeout ?? (onProgress ? '0s' : '50s') + let cancelIssued = false // Check if already aborted throwIfAborted(signal, 'executeStatement') // Helper to call onProgress with optional metrics const emitProgress = onProgress - ? async (statementId: string) => onProgress(result.status, enableMetrics ? await fetchMetrics(auth, statementId, signal) : undefined) + ? async (statementId: string) => onProgress(result, enableMetrics ? await fetchMetrics(auth, statementId, signal) : undefined) : undefined // 1. Build request (filter out undefined values) @@ -64,7 +66,7 @@ export async function executeStatement( disposition: options.disposition, format: options.format, on_wait_timeout: options.on_wait_timeout ?? 'CONTINUE', - wait_timeout: options.wait_timeout ?? '50s', + wait_timeout: waitTimeout, row_limit: options.row_limit, catalog: options.catalog, schema: options.schema, @@ -72,19 +74,47 @@ export async function executeStatement( }).filter(([, v]) => v !== undefined) ) as ExecuteStatementRequest + logger?.info?.(`executeStatement Executing statement on warehouse ${warehouseId}...`) + // 2. Submit statement execution request let result = await postStatement(auth, request, signal) + const cancelStatementSafely = async () => { + if (cancelIssued) return + logger?.info?.('executeStatement Abort signal received during executeStatement.') + cancelIssued = true + await cancelStatement(auth, result.statement_id).catch((err) => { + logger?.error?.('executeStatement Failed to cancel statement after abort.', err) + }) + } + + if (signal?.aborted) { + await cancelStatementSafely() + throw new AbortError('Aborted during polling') + } + + const onAbort = () => cancelStatementSafely().catch(() => { }) - // 3. Poll until terminal state - while (!TERMINAL_STATES.has(result.status.state)) { - if (signal?.aborted) { - await cancelStatement(auth, result.statement_id).catch(() => { }) + try { + signal?.addEventListener('abort', onAbort, { once: true }) + + // 3. Poll until terminal state + while (!TERMINAL_STATES.has(result.status.state)) { + logger?.info?.(`executeStatement Statement ${result.statement_id} in state ${result.status.state}; polling for status...`) + await emitProgress?.(result.statement_id) + await delay(POLL_INTERVAL_MS, signal) + result = await getStatement(auth, result.statement_id, signal) + } + } catch (err) { + if (err instanceof AbortError || signal?.aborted) { + logger?.info?.('executeStatement Abort detected in executeStatement polling loop.') + await cancelStatementSafely() throw new AbortError('Aborted during polling') } - - await emitProgress?.(result.statement_id) - await delay(POLL_INTERVAL_MS, signal) - result = await getStatement(auth, result.statement_id, signal) + logger?.error?.(`executeStatement Error during executeStatement polling: ${String(err)}`) + throw err + } finally { + logger?.info?.(`executeStatement Statement ${result.statement_id} reached final state: ${result.status.state}`) + signal?.removeEventListener('abort', onAbort) } // 4. Final progress callback diff --git a/src/api/fetchAll.ts b/src/api/fetchAll.ts index f5119d1..4cd3e6c 100644 --- a/src/api/fetchAll.ts +++ b/src/api/fetchAll.ts @@ -19,12 +19,18 @@ export async function fetchAll( options: FetchAllOptions = {} ): Promise> { const rows: Array = [] + const statementId = statementResult.statement_id + const manifest = statementResult.manifest + const logContext = { statementId, manifest, requestedFormat: options.format } const fetchOptions: FetchRowsOptions = { // Collect rows as they are streamed in. onEachRow: (row) => { rows.push(row) }, } + const { logger } = options + + logger?.info?.(`fetchAll fetching all rows for statement ${statementId}.`, logContext) if (options.signal) fetchOptions.signal = options.signal @@ -32,6 +38,14 @@ export async function fetchAll( if (options.format) fetchOptions.format = options.format + if (options.logger) + fetchOptions.logger = options.logger + await fetchRow(statementResult, auth, fetchOptions) + logger?.info?.(`fetchAll fetched ${rows.length} rows for statement ${statementId}.`, { + ...logContext, + rowCount: rows.length, + resolvedFormat: options.format ?? manifest?.format, + }) return rows } diff --git a/src/api/fetchRow.ts b/src/api/fetchRow.ts index 968e992..0e3ce3b 100644 --- a/src/api/fetchRow.ts +++ b/src/api/fetchRow.ts @@ -25,22 +25,34 @@ export async function fetchRow( auth: AuthInfo, options: FetchRowsOptions = {} ): Promise { - const { signal, onEachRow, format } = options + const { signal, onEachRow, format, logger } = options const manifest = validateSucceededResult(statementResult) + const statementId = statementResult.statement_id + const logContext = { statementId, manifest, requestedFormat: format } // Map JSON_ARRAY rows to JSON_OBJECT when requested. const mapRow = createRowMapper(manifest, format) + logger?.info?.(`fetchRow fetching rows for statement ${statementId}.`, { + ...logContext, + resultType: statementResult.result?.external_links ? 'EXTERNAL_LINKS' : 'INLINE', + }) + if (statementResult.result?.external_links) { if (manifest.format !== 'JSON_ARRAY') { + logger?.error?.(`fetchRow only supports JSON_ARRAY for external_links; got ${manifest.format}.`, logContext) throw new DatabricksSqlError( `fetchRow only supports JSON_ARRAY for external_links. Received: ${manifest.format}`, 'UNSUPPORTED_FORMAT', - statementResult.statement_id + statementId ) } - const stream = fetchStream(statementResult, auth, signal ? { signal } : {}) - await consumeJsonArrayStream(stream, mapRow, onEachRow, signal) + logger?.info?.(`fetchRow streaming external links for statement ${statementId}.`, logContext) + const stream = fetchStream(statementResult, auth, { + ...signal ? { signal } : {}, + ...logger ? { logger } : {}, + }) + await consumeJsonArrayStream(stream, mapRow, onEachRow, signal, logger, logContext) return } @@ -49,6 +61,10 @@ export async function fetchRow( // Process first chunk (inline data_array) const dataArray = statementResult.result?.data_array if (dataArray) { + logger?.info?.(`fetchRow processing inline rows for statement ${statementId}.`, { + ...logContext, + inlineRows: dataArray.length, + }) for (const row of dataArray) { if (signal?.aborted) throw new AbortError('Aborted') // Convert row to requested shape before callback. @@ -58,7 +74,7 @@ export async function fetchRow( // Process additional chunks if any if (totalChunks > 1) { - const statementId = statementResult.statement_id + logger?.info?.(`fetchRow processing ${totalChunks} chunks for statement ${statementId}.`, logContext) for (let chunkIndex = 1; chunkIndex < totalChunks; chunkIndex++) { if (signal?.aborted) throw new AbortError('Aborted') @@ -87,13 +103,19 @@ async function consumeJsonArrayStream( stream: Readable, mapRow: (row: RowArray) => RowArray | RowObject, onEachRow: ((row: RowArray | RowObject) => void) | undefined, - signal: AbortSignal | undefined + signal: AbortSignal | undefined, + logger: FetchRowsOptions['logger'], + logContext: Record ): Promise { // Stream JSON_ARRAY as individual rows to avoid buffering whole payloads. const jsonStream = stream.pipe(parser()).pipe(streamArray()) for await (const item of jsonStream) { if (signal?.aborted) { + logger?.info?.('fetchRow abort detected while streaming JSON_ARRAY rows.', { + ...logContext, + aborted: signal.aborted, + }) stream.destroy(new AbortError('Aborted')) throw new AbortError('Aborted') } diff --git a/src/api/fetchStream.ts b/src/api/fetchStream.ts index 89bf9d9..1f9711b 100644 --- a/src/api/fetchStream.ts +++ b/src/api/fetchStream.ts @@ -25,32 +25,60 @@ export function fetchStream( auth: AuthInfo, options: FetchStreamOptions = {} ): Readable { - const { signal, forceMerge } = options + const { signal, forceMerge, logger } = options const manifest = validateSucceededResult(statementResult) const format = manifest.format as MergeFormat + const statementId = statementResult.statement_id + const baseLog = { statementId, manifest, format, forceMerge } if (statementResult.result?.data_array) { + logger?.error?.( + `fetchStream only supports EXTERNAL_LINKS results for statement ${statementId}.`, + { ...baseLog, hasDataArray: true } + ) throw new DatabricksSqlError( 'fetchStream only supports EXTERNAL_LINKS results', 'UNSUPPORTED_FORMAT', - statementResult.statement_id + statementId ) } + logger?.info?.(`fetchStream creating stream for statement ${statementId}.`, { + ...baseLog, + hasExternalLinks: Boolean(statementResult.result?.external_links?.length), + }) + // Create PassThrough as output (readable by consumer) const output = new PassThrough() // Handle AbortSignal if (signal) { - const onAbort = () => output.destroy(new AbortError('Stream aborted')) + const onAbort = () => { + logger?.info?.(`fetchStream abort signal received while streaming statement ${statementId}.`, baseLog) + output.destroy(new AbortError('Stream aborted')) + } signal.addEventListener('abort', onAbort, { once: true }) output.once('close', () => signal.removeEventListener('abort', onAbort)) } + // Prevent AbortError from becoming an uncaught exception when no error handler is attached. + output.on('error', (err) => { + if (err instanceof AbortError) + return + if (output.listenerCount('error') === 1) + throw err + }) + // Start async merge process // Errors are forwarded to the stream consumer via destroy. - mergeChunksToStream(statementResult, auth, manifest, format, output, signal, forceMerge) - .catch((err) => output.destroy(err as Error)) + mergeChunksToStream(statementResult, auth, manifest, format, output, signal, forceMerge, logger) + .catch((err) => { + logger?.error?.(`fetchStream error while streaming statement ${statementId}.`, { + ...baseLog, + error: err, + }) + output.destroy(err as Error) + }) return output } @@ -65,20 +93,35 @@ async function mergeChunksToStream( format: MergeFormat, output: PassThrough, signal?: AbortSignal, - forceMerge?: boolean + forceMerge?: boolean, + logger?: FetchStreamOptions['logger'] ): Promise { + const statementId = statementResult.statement_id + const baseLog = { statementId, manifest, format, forceMerge } + logger?.info?.(`fetchStream collecting external links for statement ${statementId}.`, baseLog) const urls = await collectExternalUrls(statementResult, auth, manifest, signal) // No external links - close the stream - if (urls.length === 0) + if (urls.length === 0) { + logger?.info?.(`fetchStream no external links found for statement ${statementId}.`, baseLog) return void output.end() + } // Single URL - pipe directly to output unless forcing merge - if (urls.length === 1 && !forceMerge) + if (urls.length === 1 && !forceMerge) { + logger?.info?.(`fetchStream piping single external link for statement ${statementId}.`, { + ...baseLog, + urlCount: urls.length, + }) // Avoid merge-streams overhead for a single URL unless forced. return pipeUrlToOutput(urls[0]!, output, signal) + } // Merge all URLs using merge-streams + logger?.info?.(`fetchStream merging ${urls.length} external links for statement ${statementId}.`, { + ...baseLog, + urlCount: urls.length, + }) return mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output }) } @@ -88,33 +131,57 @@ async function collectExternalUrls( manifest: StatementManifest, signal?: AbortSignal ): Promise { - const urls = extractExternalLinks(statementResult.result?.external_links) - if (urls.length > 0) - return urls + const chunkUrls = new Map() + + addChunkLinks(chunkUrls, statementResult.result?.external_links) if (!manifest.total_chunk_count) - return [] + return flattenChunkUrls(chunkUrls) - const chunkUrls: string[] = [] for (let i = 0; i < manifest.total_chunk_count; i++) { + if (chunkUrls.has(i)) + continue if (signal?.aborted) throw new AbortError('Aborted while collecting URLs') // Chunk metadata contains external link URLs when results are chunked. const chunkData = await getChunk(auth, statementResult.statement_id, i, signal) - chunkUrls.push(...extractExternalLinks(chunkData.external_links)) + addChunkLinks(chunkUrls, chunkData.external_links) } - return chunkUrls + return flattenChunkUrls(chunkUrls) } -function extractExternalLinks(externalLinks?: ExternalLinkInfo[]): string[] { +function addChunkLinks( + chunkUrls: Map, + externalLinks?: ExternalLinkInfo[] +): void { if (!externalLinks) + return + + for (const link of externalLinks) { + if (!isNonEmptyString(link.external_link)) + continue + + const existing = chunkUrls.get(link.chunk_index) + if (existing) { + existing.push(link.external_link) + } else { + chunkUrls.set(link.chunk_index, [link.external_link]) + } + } +} + +function flattenChunkUrls(chunkUrls: Map): string[] { + if (chunkUrls.size === 0) return [] - return externalLinks - .map((link) => link.external_link) - .filter(isNonEmptyString) + const sorted = [...chunkUrls.entries()].sort(([a], [b]) => a - b) + const urls: string[] = [] + for (const [, links] of sorted) { + urls.push(...links) + } + return urls } function isNonEmptyString(value: unknown): value is string { diff --git a/src/api/mergeExternalLinks.ts b/src/api/mergeExternalLinks.ts index d0c725b..87903f0 100644 --- a/src/api/mergeExternalLinks.ts +++ b/src/api/mergeExternalLinks.ts @@ -18,41 +18,59 @@ export async function mergeExternalLinks( auth: AuthInfo, options: MergeExternalLinksOptions ): Promise { - const { signal, mergeStreamToExternalLink, forceMerge } = options + const { signal, mergeStreamToExternalLink, forceMerge, logger } = options + const statementId = statementResult.statement_id + const manifest = statementResult.manifest + const externalLinks = statementResult.result?.external_links + const totalChunks = manifest?.total_chunk_count ?? 0 + const logContext = { statementId, manifest, totalChunks, forceMerge } // If not external links, return original as-is - if (!statementResult.result?.external_links) + if (!externalLinks) { + logger?.info?.(`mergeExternalLinks no external links to merge for statement ${statementId}.`, logContext) return statementResult + } if (!forceMerge) { - const totalChunks = statementResult.manifest?.total_chunk_count - const externalLinks = statementResult.result.external_links - const isSingleChunk = totalChunks === undefined ? externalLinks.length <= 1 : totalChunks <= 1 + const isSingleChunk = totalChunks <= 1 // Skip merging when a single external link already exists unless forced. - if (isSingleChunk && externalLinks.length <= 1) + if (isSingleChunk) { + logger?.info?.(`mergeExternalLinks skipping merge for single external link in statement ${statementId}.`, { + ...logContext, + totalChunks, + }) return statementResult + } } // Get merged stream via fetchStream + logger?.info?.(`mergeExternalLinks merging external links for statement ${statementId}.`, logContext) const stream = fetchStream(statementResult, auth, { ...signal ? { signal } : {}, ...forceMerge !== undefined ? { forceMerge } : {}, + ...logger ? { logger } : {}, }) // Upload via callback + logger?.info?.(`mergeExternalLinks uploading merged external link for statement ${statementId}.`, logContext) const uploadResult = await mergeStreamToExternalLink(stream) + logger?.info?.(`mergeExternalLinks uploaded merged external link for statement ${statementId}.`, { + ...logContext, + byteCount: uploadResult.byte_count, + expiration: uploadResult.expiration, + }) // Build updated StatementResult // Manifest must exist for external links; validate before constructing new result. - const manifest = validateSucceededResult(statementResult) - const totalRowCount = manifest.total_row_count ?? 0 + const validatedManifest = validateSucceededResult(statementResult) + const totalRowCount = validatedManifest.total_row_count ?? 0 return { statement_id: statementResult.statement_id, status: statementResult.status, manifest: { - ...manifest, + ...validatedManifest, total_chunk_count: 1, total_byte_count: uploadResult.byte_count, chunks: [ diff --git a/src/types.ts b/src/types.ts index 35a8f84..b82106b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -188,7 +188,9 @@ export type StatementParameter = { */ export type ExecuteStatementOptions = { /** Progress callback (called on each poll) */ - onProgress?: (status: StatementStatus, metrics?: QueryMetrics) => void + onProgress?: (result: StatementResult, metrics?: QueryMetrics) => void + /** Optional logger for lifecycle events */ + logger?: Logger /** Enable query metrics fetching during polling (default: false) */ enableMetrics?: boolean /** Abort signal for cancellation */ @@ -215,6 +217,12 @@ export type ExecuteStatementOptions = { warehouse_id?: string } +export type Logger = { + info?: (...args: unknown[]) => void + warn?: (...args: unknown[]) => void + error?: (...args: unknown[]) => void +} + /** Base options with abort signal support */ export type SignalOptions = { /** Abort signal for cancellation */ @@ -234,6 +242,8 @@ export type FetchRowFormat = 'JSON_ARRAY' | 'JSON_OBJECT' export type FetchStreamOptions = SignalOptions & { /** Force merge even when there is only a single external link */ forceMerge?: boolean + /** Optional logger for lifecycle events */ + logger?: Logger } /** Options for fetchRow */ @@ -242,12 +252,16 @@ export type FetchRowsOptions = SignalOptions & { onEachRow?: (row: RowArray | RowObject) => void /** Row format (default: JSON_ARRAY) */ format?: FetchRowFormat + /** Optional logger for lifecycle events */ + logger?: Logger } /** Options for fetchAll */ export type FetchAllOptions = SignalOptions & { /** Row format (default: JSON_ARRAY) */ format?: FetchRowFormat + /** Optional logger for lifecycle events */ + logger?: Logger } /** Result from mergeStreamToExternalLink callback */ @@ -266,6 +280,8 @@ export type MergeExternalLinksOptions = SignalOptions & { mergeStreamToExternalLink: (stream: Readable) => Promise /** Force merge even when there is only a single external link chunk */ forceMerge?: boolean + /** Optional logger for lifecycle events */ + logger?: Logger } /** diff --git a/test/executeStatement.spec.ts b/test/executeStatement.spec.ts index 13f9c0c..daa93d6 100644 --- a/test/executeStatement.spec.ts +++ b/test/executeStatement.spec.ts @@ -84,8 +84,14 @@ describe('executeStatement', () => { await resultPromise - expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) - expect(onProgress).toHaveBeenCalledWith({ state: 'SUCCEEDED' }, undefined) + expect(mockFetch).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + body: expect.stringContaining('"wait_timeout":"0s"'), + }) + ) + expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({ status: { state: 'PENDING' } }), undefined) + expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({ status: { state: 'SUCCEEDED' } }), undefined) }) it('should not fetch metrics when enableMetrics is false', async () => { @@ -110,7 +116,7 @@ describe('executeStatement', () => { // Only 2 calls: postStatement + getStatement (no metrics calls) expect(mockFetch).toHaveBeenCalledTimes(2) // onProgress should be called without metrics - expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) + expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({ status: { state: 'PENDING' } }), undefined) }) it('should fetch metrics when enableMetrics is true', async () => { @@ -152,14 +158,14 @@ describe('executeStatement', () => { // Check that onProgress was called with metrics expect(onProgress).toHaveBeenCalledWith( - { state: 'PENDING' }, + expect.objectContaining({ status: { state: 'PENDING' } }), expect.objectContaining({ total_time_ms: 959, execution_time_ms: 642, }) ) expect(onProgress).toHaveBeenCalledWith( - { state: 'SUCCEEDED' }, + expect.objectContaining({ status: { state: 'SUCCEEDED' } }), expect.objectContaining({ total_time_ms: 959, }) @@ -207,7 +213,7 @@ describe('executeStatement', () => { // Statement should still succeed even if metrics fail expect(result.status.state).toBe('SUCCEEDED') // onProgress should be called without metrics (undefined) - expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) + expect(onProgress).toHaveBeenCalledWith(expect.objectContaining({ status: { state: 'PENDING' } }), undefined) }) it('should throw DatabricksSqlError when statement fails', async () => { @@ -237,6 +243,36 @@ describe('executeStatement', () => { expect(mockFetch).not.toHaveBeenCalled() }) + it('should cancel statement when aborted during polling', async () => { + const mockFetch = vi + .fn() + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockPendingResult), + }) + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({}), + }) + vi.stubGlobal('fetch', mockFetch) + + const controller = new AbortController() + const resultPromise = executeStatement('SELECT 42', mockAuth, { + signal: controller.signal, + }) + + await Promise.resolve() + controller.abort() + + await expect(resultPromise).rejects.toThrow('Aborted') + expect(mockFetch).toHaveBeenCalledWith( + expect.stringContaining( + `/api/2.0/sql/statements/${mockPendingResult.statement_id}/cancel` + ), + expect.objectContaining({ method: 'POST' }) + ) + }) + it('should extract warehouse_id from httpPath', async () => { const mockFetch = vi.fn().mockResolvedValueOnce({ ok: true, diff --git a/test/fetchRow.spec.ts b/test/fetchRow.spec.ts index 5c3c614..efe9d44 100644 --- a/test/fetchRow.spec.ts +++ b/test/fetchRow.spec.ts @@ -7,7 +7,11 @@ import { mockExternalLinksResult, mockPendingResult, } from './mocks.js' -import { createMockReadableStream } from './testUtil.js' +import { + createExternalLinkInfo, + createGetChunkResponse, + createStreamResponse, +} from './testUtil.js' describe('fetchRow', () => { afterEach(() => { @@ -63,18 +67,15 @@ describe('fetchRow', () => { expect(rows[2]).toEqual(['3', 'third']) }) - it('should throw error for non-succeeded statement', async () => { - await expect(fetchRow(mockPendingResult, mockAuth)).rejects.toThrow( - 'Cannot fetch from non-succeeded statement: PENDING' - ) - }) - - it('should throw error for statement without manifest', async () => { + it('should validate result state and manifest', async () => { const noManifestResult: StatementResult = { statement_id: 'test', status: { state: 'SUCCEEDED' }, } + await expect(fetchRow(mockPendingResult, mockAuth)).rejects.toThrow( + 'Cannot fetch from non-succeeded statement: PENDING' + ) await expect(fetchRow(noManifestResult, mockAuth)).rejects.toThrow( 'Statement result has no manifest' ) @@ -118,10 +119,8 @@ describe('fetchRow', () => { }) it('should process external links data_array', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["1","2"],["3","4"]]'), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse('[["1","2"],["3","4"]]')) vi.stubGlobal('fetch', mockFetch) const rows: RowArray[] = [] @@ -133,10 +132,8 @@ describe('fetchRow', () => { }) it('should map external links rows to JSON objects', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["1","2"],["3","4"]]'), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse('[["1","2"],["3","4"]]')) vi.stubGlobal('fetch', mockFetch) const rows: Array> = [] @@ -217,24 +214,22 @@ describe('fetchRow', () => { }, } - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream( - JSON.stringify([ - [ - '42', - 'hello', - '9007199254740993', - '12.34', - '2024-01-02', - '03:04:05', - '2024-01-02T03:04:05.123Z', - '2024-01-02T03:04:05.123', - '{"a":"1","b":{"c":"x","big":"9007199254740993","price":"56.78"}}', - ], - ]) - ), - }) + const payload = JSON.stringify([ + [ + '42', + 'hello', + '9007199254740993', + '12.34', + '2024-01-02', + '03:04:05', + '2024-01-02T03:04:05.123Z', + '2024-01-02T03:04:05.123', + '{"a":"1","b":{"c":"x","big":"9007199254740993","price":"56.78"}}', + ], + ]) + + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse(payload)) vi.stubGlobal('fetch', mockFetch) const rows = await fetchAll(complexExternalLinksResult, mockAuth, { @@ -263,6 +258,61 @@ describe('fetchRow', () => { ]) }) + it('should process external links across multiple chunks when only first link is present', async () => { + const partialLinksResult: StatementResult = { + statement_id: 'external-multi-chunk-test', + status: { state: 'SUCCEEDED' }, + manifest: { + format: 'JSON_ARRAY', + schema: { column_count: 1, columns: [] }, + total_chunk_count: 3, + total_row_count: 6, + }, + result: { + external_links: [ + createExternalLinkInfo({ + chunk_index: 0, + row_offset: 0, + row_count: 2, + byte_count: 10, + external_link: 'https://mock/chunk0.json', + }), + ], + }, + } + + const links = [ + createExternalLinkInfo({ + chunk_index: 1, + row_offset: 2, + row_count: 2, + byte_count: 10, + external_link: 'https://mock/chunk1.json', + }), + createExternalLinkInfo({ + chunk_index: 2, + row_offset: 4, + row_count: 2, + byte_count: 10, + external_link: 'https://mock/chunk2.json', + }), + ] + + const mockFetch = vi.fn() + .mockResolvedValueOnce(createGetChunkResponse([links[0]!], 1)) + .mockResolvedValueOnce(createGetChunkResponse([links[1]!], 2)) + .mockResolvedValueOnce(createStreamResponse('[["0"],["1"]]')) + .mockResolvedValueOnce(createStreamResponse('[["2"],["3"]]')) + .mockResolvedValueOnce(createStreamResponse('[["4"],["5"]]')) + vi.stubGlobal('fetch', mockFetch) + + const rows = await fetchAll(partialLinksResult, mockAuth) + + expect(rows).toHaveLength(6) + expect(rows[0]).toEqual(['0']) + expect(rows[5]).toEqual(['5']) + }) + it('should abort when signal is aborted before processing', async () => { const controller = new AbortController() controller.abort() @@ -319,9 +369,7 @@ describe('fetchRow', () => { } // Mock getChunk API calls - const mockFetch = vi - .fn() - // Chunk 1 + const mockFetch = vi.fn() .mockResolvedValueOnce({ ok: true, json: () => @@ -332,7 +380,6 @@ describe('fetchRow', () => { data_array: [['chunk1-row0'], ['chunk1-row1']], }), }) - // Chunk 2 .mockResolvedValueOnce({ ok: true, json: () => @@ -373,23 +420,14 @@ describe('fetchRow', () => { }, } - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - json: () => - Promise.resolve({ - chunk_index: 1, - external_links: [ - { - chunk_index: 1, - external_link: 'https://mock/chunk1.json', - row_count: 10, - byte_count: 100, - row_offset: 1, - expiration: '2025-12-25T12:00:00Z', - }, - ], - }), + const link = createExternalLinkInfo({ + chunk_index: 1, + row_offset: 1, + row_count: 10, + byte_count: 100, + external_link: 'https://mock/chunk1.json', }) + const mockFetch = vi.fn().mockResolvedValueOnce(createGetChunkResponse([link], 1)) vi.stubGlobal('fetch', mockFetch) await expect(fetchRow(mixedResult, mockAuth)).rejects.toThrow( diff --git a/test/fetchStream.spec.ts b/test/fetchStream.spec.ts index 1c3b51b..e28186b 100644 --- a/test/fetchStream.spec.ts +++ b/test/fetchStream.spec.ts @@ -1,46 +1,45 @@ import { describe, it, expect, vi, afterEach } from 'vitest' import { fetchStream } from '../src/api' import type { StatementResult } from '../src/types.js' +import { buildUrl } from '../src/util.js' import { mockAuth, mockExternalLinksResult, mockExternalLinkData, } from './mocks.js' -import { createMockReadableStream, collectStream } from './testUtil.js' +import { + collectStream, + createExternalLinkInfo, + createGetChunkResponse, + createStreamResponse, +} from './testUtil.js' describe('fetchStream', () => { afterEach(() => { vi.restoreAllMocks() }) - it('should throw error for non-succeeded statement', () => { + it('should validate result state and manifest', () => { const pendingResult: StatementResult = { statement_id: 'test', status: { state: 'PENDING' }, } - - expect(() => fetchStream(pendingResult, mockAuth)).toThrow( - 'Cannot fetch from non-succeeded statement: PENDING' - ) - }) - - it('should throw error for statement without manifest', () => { const noManifestResult: StatementResult = { statement_id: 'test', status: { state: 'SUCCEEDED' }, } + expect(() => fetchStream(pendingResult, mockAuth)).toThrow( + 'Cannot fetch from non-succeeded statement: PENDING' + ) expect(() => fetchStream(noManifestResult, mockAuth)).toThrow( 'Statement result has no manifest' ) }) it('should stream data from external links', async () => { - // Mock fetch for external link - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream(JSON.stringify(mockExternalLinkData)), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse(JSON.stringify(mockExternalLinkData))) vi.stubGlobal('fetch', mockFetch) const stream = fetchStream(mockExternalLinksResult, mockAuth) @@ -70,15 +69,11 @@ describe('fetchStream', () => { it('should abort stream when signal is aborted', async () => { const controller = new AbortController() - // Mock fetch that delays const mockFetch = vi.fn().mockImplementation( () => new Promise((resolve) => { setTimeout(() => { - resolve({ - ok: true, - body: createMockReadableStream('[]'), - }) + resolve(createStreamResponse(JSON.stringify(mockExternalLinkData))) }, 1000) }) ) @@ -106,48 +101,47 @@ describe('fetchStream', () => { }, result: { external_links: [ - { + createExternalLinkInfo({ chunk_index: 0, row_offset: 0, row_count: 5, byte_count: 100, external_link: 'https://mock/chunk0.json', - expiration: '2025-12-25T12:00:00Z', - }, - { - chunk_index: 1, - row_offset: 5, - row_count: 5, - byte_count: 100, - external_link: 'https://mock/chunk1.json', - expiration: '2025-12-25T12:00:00Z', - }, - { - chunk_index: 2, - row_offset: 10, - row_count: 5, - byte_count: 100, - external_link: 'https://mock/chunk2.json', - expiration: '2025-12-25T12:00:00Z', - }, + }), ], }, } - const mockFetch = vi - .fn() - .mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["a","1"],["b","2"]]'), - }) - .mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["c","3"],["d","4"]]'), - }) - .mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["e","5"],["f","6"]]'), - }) + const links = [ + createExternalLinkInfo({ + chunk_index: 0, + row_offset: 0, + row_count: 5, + byte_count: 100, + external_link: 'https://mock/chunk0.json', + }), + createExternalLinkInfo({ + chunk_index: 1, + row_offset: 5, + row_count: 5, + byte_count: 100, + external_link: 'https://mock/chunk1.json', + }), + createExternalLinkInfo({ + chunk_index: 2, + row_offset: 10, + row_count: 5, + byte_count: 100, + external_link: 'https://mock/chunk2.json', + }), + ] + + const mockFetch = vi.fn() + .mockResolvedValueOnce(createGetChunkResponse([links[1]!], 1)) + .mockResolvedValueOnce(createGetChunkResponse([links[2]!], 2)) + .mockResolvedValueOnce(createStreamResponse('[["a","1"],["b","2"]]')) + .mockResolvedValueOnce(createStreamResponse('[["c","3"],["d","4"]]')) + .mockResolvedValueOnce(createStreamResponse('[["e","5"],["f","6"]]')) vi.stubGlobal('fetch', mockFetch) const stream = fetchStream(multiLinkResult, mockAuth) @@ -170,26 +164,18 @@ describe('fetchStream', () => { expect(content).toContain('"e"') expect(content).toContain('"f"') - // Verify fetch was called 3 times in order - expect(mockFetch).toHaveBeenCalledTimes(3) - expect(mockFetch).toHaveBeenNthCalledWith( - 1, - 'https://mock/chunk0.json', - expect.any(Object) - ) - expect(mockFetch).toHaveBeenNthCalledWith( - 2, - 'https://mock/chunk1.json', - expect.any(Object) - ) - expect(mockFetch).toHaveBeenNthCalledWith( - 3, - 'https://mock/chunk2.json', - expect.any(Object) - ) + const expectedChunkUrls = [ + buildUrl(mockAuth.host, `/api/2.0/sql/statements/${multiLinkResult.statement_id}/result/chunks/1`), + buildUrl(mockAuth.host, `/api/2.0/sql/statements/${multiLinkResult.statement_id}/result/chunks/2`), + ] + const expectedExternalUrls = links.map((link) => link.external_link) + const calledUrls = mockFetch.mock.calls.map((call) => call[0] as string) + + expect(calledUrls.slice(0, 2)).toEqual(expectedChunkUrls) + expect(calledUrls.slice(2)).toEqual(expectedExternalUrls) }) - it('should fetch chunks when no external_links in initial result', async () => { + it('should fetch missing chunks when only first external link is present', async () => { const resultWithChunks: StatementResult = { statement_id: 'chunk-test', status: { state: 'SUCCEEDED' }, @@ -198,56 +184,33 @@ describe('fetchStream', () => { schema: { column_count: 1, columns: [] }, total_chunk_count: 2, }, - // No result.external_links - need to fetch chunks - } - - const mockFetch = vi - .fn() - // First chunk API call - .mockResolvedValueOnce({ - ok: true, - json: () => - Promise.resolve({ + result: { + external_links: [ + createExternalLinkInfo({ chunk_index: 0, - external_links: [ - { - chunk_index: 0, - external_link: 'https://mock/chunk0.json', - row_count: 50, - byte_count: 500, - row_offset: 0, - expiration: '2025-12-25T12:00:00Z', - }, - ], - }), - }) - // Second chunk API call - .mockResolvedValueOnce({ - ok: true, - json: () => - Promise.resolve({ - chunk_index: 1, - external_links: [ - { - chunk_index: 1, - external_link: 'https://mock/chunk1.json', - row_count: 50, - byte_count: 500, - row_offset: 50, - expiration: '2025-12-25T12:00:00Z', - }, - ], + row_offset: 0, + row_count: 50, + byte_count: 500, + external_link: 'https://mock/chunk0.json', }), - }) - // External link fetches (from merge-streams) - .mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["a"],["b"]]'), - }) - .mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[["c"],["d"]]'), - }) + ], + }, + } + + const links = [ + createExternalLinkInfo({ + chunk_index: 1, + row_offset: 50, + row_count: 50, + byte_count: 500, + external_link: 'https://mock/chunk1.json', + }), + ] + + const mockFetch = vi.fn() + .mockResolvedValueOnce(createGetChunkResponse([links[0]!], 1)) + .mockResolvedValueOnce(createStreamResponse('[["a"],["b"]]')) + .mockResolvedValueOnce(createStreamResponse('[["c"],["d"]]')) vi.stubGlobal('fetch', mockFetch) const stream = fetchStream(resultWithChunks, mockAuth) diff --git a/test/mergeExternalLinks.spec.ts b/test/mergeExternalLinks.spec.ts index 5a6379a..fe0170e 100644 --- a/test/mergeExternalLinks.spec.ts +++ b/test/mergeExternalLinks.spec.ts @@ -7,25 +7,14 @@ import { mockInlineResult, mockExternalLinkData, } from './mocks.js' -import { createMockReadableStream, collectStream } from './testUtil.js' +import { collectStream, createStreamResponse } from './testUtil.js' describe('mergeExternalLinks', () => { afterEach(() => { vi.restoreAllMocks() }) - it('should return original for inline data', async () => { - const mockCallback = vi.fn() - - const result = await mergeExternalLinks(mockInlineResult, mockAuth, { - mergeStreamToExternalLink: mockCallback, - }) - - expect(result).toBe(mockInlineResult) - expect(mockCallback).not.toHaveBeenCalled() - }) - - it('should return original for empty result', async () => { + it('should return original for inline data and empty result', async () => { const emptyResult: StatementResult = { statement_id: 'empty-test', status: { state: 'SUCCEEDED' }, @@ -37,19 +26,21 @@ describe('mergeExternalLinks', () => { } const mockCallback = vi.fn() - const result = await mergeExternalLinks(emptyResult, mockAuth, { + const inlineResult = await mergeExternalLinks(mockInlineResult, mockAuth, { + mergeStreamToExternalLink: mockCallback, + }) + const emptyResultResponse = await mergeExternalLinks(emptyResult, mockAuth, { mergeStreamToExternalLink: mockCallback, }) - expect(result).toBe(emptyResult) + expect(inlineResult).toBe(mockInlineResult) + expect(emptyResultResponse).toBe(emptyResult) expect(mockCallback).not.toHaveBeenCalled() }) it('should merge external links and return updated StatementResult', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream(JSON.stringify(mockExternalLinkData)), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse(JSON.stringify(mockExternalLinkData))) vi.stubGlobal('fetch', mockFetch) const uploadResult: MergeExternalLinksResult = { @@ -94,10 +85,8 @@ describe('mergeExternalLinks', () => { }) it('should pass merged stream to callback', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream(JSON.stringify(mockExternalLinkData)), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse(JSON.stringify(mockExternalLinkData))) vi.stubGlobal('fetch', mockFetch) let receivedData: string | undefined @@ -125,15 +114,11 @@ describe('mergeExternalLinks', () => { it('should handle abort signal', async () => { const controller = new AbortController() - const mockFetch = vi.fn().mockImplementation( () => new Promise((resolve) => { setTimeout(() => { - resolve({ - ok: true, - body: createMockReadableStream('[]'), - }) + resolve(createStreamResponse(JSON.stringify(mockExternalLinkData))) }, 1000) }) ) @@ -162,10 +147,8 @@ describe('mergeExternalLinks', () => { }) it('should propagate callback errors', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[]'), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse('[]')) vi.stubGlobal('fetch', mockFetch) const mockCallback = vi.fn().mockRejectedValue(new Error('Upload failed')) @@ -179,10 +162,8 @@ describe('mergeExternalLinks', () => { }) it('should preserve original manifest properties', async () => { - const mockFetch = vi.fn().mockResolvedValueOnce({ - ok: true, - body: createMockReadableStream('[]'), - }) + const mockFetch = vi.fn() + .mockResolvedValueOnce(createStreamResponse('[]')) vi.stubGlobal('fetch', mockFetch) const uploadResult: MergeExternalLinksResult = { diff --git a/test/s3.integration.spec.ts b/test/s3.integration.spec.ts index 7b0ef5e..56a6139 100644 --- a/test/s3.integration.spec.ts +++ b/test/s3.integration.spec.ts @@ -1,13 +1,15 @@ -import { Readable } from 'node:stream' +import { Readable, PassThrough } from 'node:stream' import { describe, it, expect, beforeAll, afterAll } from 'vitest' import { DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, - PutObjectCommand, S3Client, } from '@aws-sdk/client-s3' +import { Upload } from '@aws-sdk/lib-storage' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' +import { createGzip } from 'node:zlib' +import { pipeline } from 'node:stream/promises' // Load environment variables import 'dotenv/config' import { executeStatement, fetchAll, mergeExternalLinks } from '../src/index.js' @@ -36,14 +38,6 @@ const shouldSkip = let s3Client: S3Client const uploadedKeys: string[] = [] -async function streamToBuffer(stream: Readable): Promise { - const chunks: Buffer[] = [] - for await (const chunk of stream) { - chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) - } - return Buffer.concat(chunks) -} - describe.skipIf(shouldSkip)('S3 Integration Tests', () => { beforeAll(() => { s3Client = new S3Client({ @@ -68,9 +62,10 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { } }) - it.concurrent('captures JSON_ARRAY schema and data for complex types', async () => { - const result = await executeStatement( - `SELECT + describe.concurrent('executeStatement with complex types', () => { + it.concurrent('captures JSON_ARRAY schema and data for complex types', async () => { + const result = await executeStatement( + `SELECT 42 AS num, 'hello' AS str, CAST(9007199254740993 AS BIGINT) AS big_int, @@ -92,118 +87,135 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { CAST(56.78 AS DECIMAL(10, 2)) ) ) AS nested`, - auth, - { disposition: 'INLINE', format: 'JSON_ARRAY' } - ) - - expect(result.status.state).toBe('SUCCEEDED') + auth, + { disposition: 'INLINE', format: 'JSON_ARRAY' } + ) - const schema = result.manifest?.schema - const data = result.result?.data_array + expect(result.status.state).toBe('SUCCEEDED') - expect(schema).toEqual({ - column_count: 9, - columns: [ - { - name: 'num', - type_text: 'INT', - type_name: 'INT', - position: 0, - }, - { - name: 'str', - type_text: 'STRING', - type_name: 'STRING', - position: 1, - }, - { - name: 'big_int', - type_text: 'BIGINT', - type_name: 'LONG', - position: 2, - }, - { - name: 'price', - type_text: 'DECIMAL(10,2)', - type_name: 'DECIMAL', - position: 3, - type_precision: 10, - type_scale: 2, - }, - { - name: 'd', - type_text: 'DATE', - type_name: 'DATE', - position: 4, - }, - { - name: 't', - type_text: 'STRING', - type_name: 'STRING', - position: 5, - }, - { - name: 'dt', - type_text: 'TIMESTAMP', - type_name: 'TIMESTAMP', - position: 6, - }, - { - name: 'dt_ntz', - type_text: 'TIMESTAMP_NTZ', - type_name: 'TIMESTAMP_NTZ', - position: 7, - }, - { - name: 'nested', - type_text: - 'STRUCT NOT NULL>', - type_name: 'STRUCT', - position: 8, - }, - ], + const schema = result.manifest?.schema + const data = result.result?.data_array + + expect(schema).toEqual({ + column_count: 9, + columns: [ + { + name: 'num', + type_text: 'INT', + type_name: 'INT', + position: 0, + }, + { + name: 'str', + type_text: 'STRING', + type_name: 'STRING', + position: 1, + }, + { + name: 'big_int', + type_text: 'BIGINT', + type_name: 'LONG', + position: 2, + }, + { + name: 'price', + type_text: 'DECIMAL(10,2)', + type_name: 'DECIMAL', + position: 3, + type_precision: 10, + type_scale: 2, + }, + { + name: 'd', + type_text: 'DATE', + type_name: 'DATE', + position: 4, + }, + { + name: 't', + type_text: 'STRING', + type_name: 'STRING', + position: 5, + }, + { + name: 'dt', + type_text: 'TIMESTAMP', + type_name: 'TIMESTAMP', + position: 6, + }, + { + name: 'dt_ntz', + type_text: 'TIMESTAMP_NTZ', + type_name: 'TIMESTAMP_NTZ', + position: 7, + }, + { + name: 'nested', + type_text: + 'STRUCT NOT NULL>', + type_name: 'STRUCT', + position: 8, + }, + ], + }) + expect(data).toEqual([ + [ + '42', + 'hello', + '9007199254740993', + '12.34', + '2024-01-02', + '03:04:05', + '2024-01-02T03:04:05.123Z', + '2024-01-02T03:04:05.123', + '{"a":"1","b":{"c":"x","big":"9007199254740993","price":"56.78"}}', + ], + ]) }) - expect(data).toEqual([ - [ - '42', - 'hello', - '9007199254740993', - '12.34', - '2024-01-02', - '03:04:05', - '2024-01-02T03:04:05.123Z', - '2024-01-02T03:04:05.123', - '{"a":"1","b":{"c":"x","big":"9007199254740993","price":"56.78"}}', - ], - ]) }) describe.concurrent('mergeExternalLinks with S3 upload', () => { + type UploadResult = MergeExternalLinksResult & { key: string } + async function uploadToS3( stream: Readable, format: string - ): Promise { - const buffer = await streamToBuffer(stream) + ): Promise { const timestamp = Date.now() const ext = format === 'JSON_ARRAY' ? 'json' : format.toLowerCase() const key = `${S3_PREFIX}merged-${timestamp}.${ext}` + const gzip = createGzip() + const passThrough = new PassThrough() // Upload to S3 - await s3Client.send( - new PutObjectCommand({ + const upload = new Upload({ + client: s3Client, + params: { Bucket: S3_BUCKET, Key: key, - Body: buffer, + Body: passThrough, ContentType: format === 'CSV' - ? 'text/csv' + ? 'text/csv; charset=utf-8' : format === 'JSON_ARRAY' - ? 'application/json' + ? 'application/json; charset=utf-8' : 'application/vnd.apache.arrow.stream', - }) - ) + ContentEncoding: 'gzip', + }, + }) + + await Promise.all([ + pipeline(stream, gzip, passThrough), + upload.done(), + ]) uploadedKeys.push(key) + const head = await s3Client.send( + new HeadObjectCommand({ + Bucket: S3_BUCKET, + Key: key, + }) + ) // Generate presigned URL (valid for 1 hour) const presignedUrl = await getSignedUrl( @@ -217,12 +229,13 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { const expiration = new Date(Date.now() + 3600 * 1000).toISOString() - console.log(`Uploaded to s3://${S3_BUCKET}/${key} (${buffer.length} bytes)`) + console.log(`Uploaded to s3://${S3_BUCKET}/${key} (${head.ContentLength ?? 0} bytes)`) return { externalLink: presignedUrl, - byte_count: buffer.length, + byte_count: head.ContentLength ?? 0, expiration, + key, } } @@ -247,9 +260,14 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { } // Merge and upload to S3 + let uploadedKey: string | null = null const mergedResult = await mergeExternalLinks(result, auth, { forceMerge: true, - mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'), + mergeStreamToExternalLink: async (stream) => { + const uploadResult = await uploadToS3(stream, 'CSV') + uploadedKey = uploadResult.key + return uploadResult + }, }) // Verify merged result @@ -370,9 +388,14 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { return } + let uploadedKey: string | null = null const mergedResult = await mergeExternalLinks(result, auth, { forceMerge: true, - mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'), + mergeStreamToExternalLink: async (stream) => { + const uploadResult = await uploadToS3(stream, 'CSV') + uploadedKey = uploadResult.key + return uploadResult + }, }) expect(mergedResult.manifest?.total_chunk_count).toBe(1) @@ -382,7 +405,9 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { console.log(`Merged and uploaded ${uploadedSize} bytes to S3`) // Verify the S3 object exists - const key = uploadedKeys[uploadedKeys.length - 1] + const key = uploadedKey + if (!key) + throw new Error('No uploaded key available for head check') const headResponse = await s3Client.send( new HeadObjectCommand({ Bucket: S3_BUCKET, @@ -494,22 +519,54 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { }, ]) }) + + it.concurrent('fetches all rows across multiple external link chunks', async () => { + const result = await executeStatement( + `SELECT + id, + repeat('x', 5000) AS payload + FROM range(10000)`, + auth, + { disposition: 'EXTERNAL_LINKS', format: 'JSON_ARRAY' } + ) + + expect(result.status.state).toBe('SUCCEEDED') + expect(result.manifest?.total_row_count).toBe(10000) + expect(result.manifest?.total_chunk_count).toBeGreaterThan(1) + + const rows = await fetchAll(result, auth) + expect(rows).toHaveLength(10000) + + const firstRow = rows[0] as string[] + const lastRow = rows[rows.length - 1] as string[] + expect(firstRow[0]).toBe('0') + expect(lastRow[0]).toBe('9999') + expect(firstRow[1]?.length).toBe(5000) + }, 300000) }) describe.concurrent('S3 client operations', () => { it.concurrent('can upload and download from S3', async () => { const testData = 'Hello, Databricks SQL!' const key = `${S3_PREFIX}test-${Date.now()}.txt` + const passThrough = new PassThrough() + const gzip = createGzip() // Upload - await s3Client.send( - new PutObjectCommand({ + const upload = new Upload({ + client: s3Client, + params: { Bucket: S3_BUCKET, Key: key, - Body: testData, - ContentType: 'text/plain', - }) - ) + Body: passThrough, + ContentType: 'text/plain; charset=utf-8', + ContentEncoding: 'gzip', + }, + }) + await Promise.all([ + pipeline(Readable.from([testData]), gzip, passThrough), + upload.done(), + ]) uploadedKeys.push(key) // Generate presigned URL diff --git a/test/testUtil.ts b/test/testUtil.ts index da7fb43..76d165a 100644 --- a/test/testUtil.ts +++ b/test/testUtil.ts @@ -1,3 +1,5 @@ +import type { ExternalLinkInfo } from '../src/types.js' + import { Writable } from 'node:stream' import { pipeline } from 'node:stream/promises' @@ -10,6 +12,45 @@ export function createMockReadableStream(data: string): ReadableStream & { external_link: string } +): ExternalLinkInfo { + return { + chunk_index: overrides.chunk_index ?? 0, + row_offset: overrides.row_offset ?? 0, + row_count: overrides.row_count ?? 100, + byte_count: overrides.byte_count ?? 1236, + external_link: overrides.external_link, + expiration: overrides.expiration ?? DEFAULT_EXTERNAL_LINK_EXPIRATION, + } +} + +export function createGetChunkResponse( + external_links: ExternalLinkInfo[], + chunk_index?: number +): { ok: true; json: () => Promise<{ chunk_index: number; external_links: ExternalLinkInfo[] }> } { + const resolvedChunkIndex = chunk_index ?? external_links[0]?.chunk_index ?? 0 + return { + ok: true, + json: () => + Promise.resolve({ + chunk_index: resolvedChunkIndex, + external_links, + }), + } +} + +export function createStreamResponse( + data: string +): { ok: true; body: ReadableStream } { + return { + ok: true, + body: createMockReadableStream(data), + } +} + export async function collectStream(stream: NodeJS.ReadableStream): Promise { const chunks: Buffer[] = [] const writable = new Writable({