diff --git a/README.md b/README.md index 4e0268b..92ad67f 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,9 @@ function fetchStream( ``` - Merges `EXTERNAL_LINKS` into a single binary stream. - Preserves the original format (`JSON_ARRAY`, `CSV`, `ARROW_STREAM`). +- Throws if the result is `INLINE`. - Ends as an empty stream when no external links exist. +- `forceMerge: true` forces merge even when there is only a single external link. ### mergeExternalLinks(statementResult, auth, options) ```ts @@ -196,7 +198,8 @@ function mergeExternalLinks( - Creates a merged stream from `EXTERNAL_LINKS`, uploads it via `options.mergeStreamToExternalLink`, then returns a `StatementResult` with a single external link. -- Returns the original result unchanged when input is `INLINE`. +- Returns the original result unchanged when input is `INLINE` or already a + single external link (unless `forceMerge: true`). ### Options (Summary) ```ts @@ -228,10 +231,12 @@ type FetchAllOptions = { type FetchStreamOptions = { signal?: AbortSignal + forceMerge?: boolean } type MergeExternalLinksOptions = { signal?: AbortSignal + forceMerge?: boolean mergeStreamToExternalLink: (stream: Readable) => Promise<{ externalLink: string byte_count: number @@ -243,6 +248,7 @@ type MergeExternalLinksOptions = { ## Notes - Databricks requires `INLINE` results to use `JSON_ARRAY` format. `INLINE + CSV` is rejected by the API. - `EXTERNAL_LINKS` are merged using `@bitofsky/merge-streams`. +- Requires Node.js >= 20 for global `fetch` and Web streams. ## Development ```bash diff --git a/src/api/fetchRow.ts b/src/api/fetchRow.ts index ef2aaea..ef8d2af 100644 --- a/src/api/fetchRow.ts +++ b/src/api/fetchRow.ts @@ -1,8 +1,4 @@ -import { parser } from 'stream-json' -import { streamArray } from 'stream-json/streamers/StreamArray' import type { Readable } from 'node:stream' -import { getChunk } from '../databricks-api.js' -import { DatabricksSqlError, AbortError } from '../errors.js' import type { AuthInfo, FetchRowsOptions, @@ -10,6 +6,10 @@ import type { RowObject, StatementResult, } from '../types.js' +import { parser } from 'stream-json' +import { streamArray } from 'stream-json/streamers/StreamArray' +import { getChunk } from '../databricks-api.js' +import { DatabricksSqlError, AbortError } from '../errors.js' import { validateSucceededResult } from '../util.js' import { createRowMapper } from '../createRowMapper.js' import { fetchStream } from './fetchStream.js' diff --git a/src/api/fetchStream.ts b/src/api/fetchStream.ts index 38dc22c..0f2cfac 100644 --- a/src/api/fetchStream.ts +++ b/src/api/fetchStream.ts @@ -1,14 +1,15 @@ -import { PassThrough, type Readable } from 'node:stream' -import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams' import type { AuthInfo, + ExternalLinkInfo, StatementResult, FetchStreamOptions, StatementManifest, } from '../types.js' +import { PassThrough, Readable } from 'node:stream' +import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams' import { getChunk } from '../databricks-api.js' -import { AbortError } from '../errors.js' -import { validateSucceededResult } from '../util.js' +import { AbortError, DatabricksSqlError } from '../errors.js' +import { pipeUrlToOutput, validateSucceededResult } from '../util.js' /** * Create a readable stream from statement result. @@ -20,31 +21,32 @@ export function fetchStream( auth: AuthInfo, options: FetchStreamOptions = {} ): Readable { - const { signal } = options + const { signal, forceMerge } = options const manifest = validateSucceededResult(statementResult) const format = manifest.format as MergeFormat + if (statementResult.result?.data_array) { + throw new DatabricksSqlError( + 'fetchStream only supports EXTERNAL_LINKS results', + 'UNSUPPORTED_FORMAT', + statementResult.statement_id + ) + } + // Create PassThrough as output (readable by consumer) const output = new PassThrough() // Handle AbortSignal if (signal) { - const onAbort = () => { - output.destroy(new AbortError('Stream aborted')) - } + const onAbort = () => output.destroy(new AbortError('Stream aborted')) signal.addEventListener('abort', onAbort, { once: true }) - output.once('close', () => { - signal.removeEventListener('abort', onAbort) - }) + output.once('close', () => signal.removeEventListener('abort', onAbort)) } // Start async merge process // Errors are forwarded to the stream consumer via destroy. - mergeChunksToStream(statementResult, auth, manifest, format, output, signal).catch( - (err) => { - output.destroy(err as Error) - } - ) + mergeChunksToStream(statementResult, auth, manifest, format, output, signal, forceMerge) + .catch((err) => output.destroy(err as Error)) return output } @@ -58,29 +60,59 @@ async function mergeChunksToStream( manifest: StatementManifest, format: MergeFormat, output: PassThrough, - signal?: AbortSignal + signal?: AbortSignal, + forceMerge?: boolean ): Promise { - const result = statementResult.result - - // Collect all external link URLs - let urls = result?.external_links?.map((link) => link.external_link) ?? [] - - // If no URLs in initial result, fetch from chunks - if (urls.length === 0 && manifest.total_chunk_count > 0) { - for (let i = 0; i < manifest.total_chunk_count; i++) { - if (signal?.aborted) throw new AbortError('Aborted while collecting URLs') - - // Chunk metadata contains external link URLs when results are chunked. - const chunkData = await getChunk(auth, statementResult.statement_id, i, signal) - const chunkUrls = chunkData.external_links?.map((link) => link.external_link) ?? [] - urls.push(...chunkUrls) - } - } + const urls = await collectExternalUrls(statementResult, auth, manifest, signal) // No external links - close the stream if (urls.length === 0) return void output.end() + // Single URL - pipe directly to output unless forcing merge + if (urls.length === 1 && !forceMerge) + // Avoid merge-streams overhead for a single URL unless forced. + return pipeUrlToOutput(urls[0]!, output, signal) + // Merge all URLs using merge-streams - await mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output }) + return mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output }) +} + +async function collectExternalUrls( + statementResult: StatementResult, + auth: AuthInfo, + manifest: StatementManifest, + signal?: AbortSignal +): Promise { + const urls = extractExternalLinks(statementResult.result?.external_links) + if (urls.length > 0) + return urls + + if (!manifest.total_chunk_count) + return [] + + const chunkUrls: string[] = [] + for (let i = 0; i < manifest.total_chunk_count; i++) { + if (signal?.aborted) + throw new AbortError('Aborted while collecting URLs') + + // Chunk metadata contains external link URLs when results are chunked. + const chunkData = await getChunk(auth, statementResult.statement_id, i, signal) + chunkUrls.push(...extractExternalLinks(chunkData.external_links)) + } + + return chunkUrls +} + +function extractExternalLinks(externalLinks?: ExternalLinkInfo[]): string[] { + if (!externalLinks) + return [] + + return externalLinks + .map((link) => link.external_link) + .filter(isNonEmptyString) +} + +function isNonEmptyString(value: unknown): value is string { + return typeof value === 'string' && value.length > 0 } diff --git a/src/api/mergeExternalLinks.ts b/src/api/mergeExternalLinks.ts index 0a16111..c670eb8 100644 --- a/src/api/mergeExternalLinks.ts +++ b/src/api/mergeExternalLinks.ts @@ -17,14 +17,27 @@ export async function mergeExternalLinks( auth: AuthInfo, options: MergeExternalLinksOptions ): Promise { - const { signal, mergeStreamToExternalLink } = options + const { signal, mergeStreamToExternalLink, forceMerge } = options // If not external links, return original as-is if (!statementResult.result?.external_links) return statementResult + if (!forceMerge) { + const totalChunks = statementResult.manifest?.total_chunk_count + const externalLinks = statementResult.result.external_links + const isSingleChunk = totalChunks === undefined ? externalLinks.length <= 1 : totalChunks <= 1 + + // Skip merging when a single external link already exists unless forced. + if (isSingleChunk && externalLinks.length <= 1) + return statementResult + } + // Get merged stream via fetchStream - const stream = fetchStream(statementResult, auth, signal ? { signal } : {}) + const stream = fetchStream(statementResult, auth, { + ...signal ? { signal } : {}, + ...forceMerge !== undefined ? { forceMerge } : {}, + }) // Upload via callback const uploadResult = await mergeStreamToExternalLink(stream) diff --git a/src/index.ts b/src/index.ts index 513d0f8..03757b8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,4 +5,4 @@ export type * from './types.js' export * from './errors.js' // Core functions -export * from './api' \ No newline at end of file +export * from './api/index.js' diff --git a/src/types.ts b/src/types.ts index 76b5d21..d50fa93 100644 --- a/src/types.ts +++ b/src/types.ts @@ -150,7 +150,10 @@ export type RowObject = Record export type FetchRowFormat = 'JSON_ARRAY' | 'JSON_OBJECT' /** Options for fetchStream */ -export type FetchStreamOptions = SignalOptions +export type FetchStreamOptions = SignalOptions & { + /** Force merge even when there is only a single external link */ + forceMerge?: boolean +} /** Options for fetchRow */ export type FetchRowsOptions = SignalOptions & { @@ -180,6 +183,8 @@ export type MergeExternalLinksResult = { export type MergeExternalLinksOptions = SignalOptions & { /** Callback to upload merged stream to external link */ mergeStreamToExternalLink: (stream: Readable) => Promise + /** Force merge even when there is only a single external link chunk */ + forceMerge?: boolean } /** diff --git a/src/util.ts b/src/util.ts index 43deb3f..d905c31 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,3 +1,6 @@ +import { Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' +import type { ReadableStream as WebReadableStream } from 'node:stream/web' import type { StatementResult, StatementManifest } from './types.js' import { AbortError, DatabricksSqlError } from './errors.js' @@ -80,3 +83,34 @@ export function validateSucceededResult( return statementResult.manifest } + +function isWebReadableStream(body: unknown): body is WebReadableStream { + return typeof (body as WebReadableStream).getReader === 'function' +} + +export async function pipeUrlToOutput( + url: string, + output: NodeJS.WritableStream, + signal?: AbortSignal +): Promise { + // Uses Node 20+ global fetch with Web streams. + if (signal?.aborted) + throw new AbortError('Aborted while streaming') + + const response = await fetch(url, signal ? { signal } : undefined) + if (!response.ok) { + throw new Error( + `Failed to fetch external link: ${response.status} ${response.statusText}` + ) + } + + if (!response.body) + return void output.end() + + const body = response.body + const input = isWebReadableStream(body) + ? Readable.fromWeb(body) + : (body as NodeJS.ReadableStream) + + await pipeline(input, output) +} diff --git a/test/mergeExternalLinks.spec.ts b/test/mergeExternalLinks.spec.ts index 9a66f67..5a6379a 100644 --- a/test/mergeExternalLinks.spec.ts +++ b/test/mergeExternalLinks.spec.ts @@ -62,6 +62,7 @@ describe('mergeExternalLinks', () => { const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, { mergeStreamToExternalLink: mockCallback, + forceMerge: true, }) // Verify callback was called @@ -113,6 +114,7 @@ describe('mergeExternalLinks', () => { await mergeExternalLinks(mockExternalLinksResult, mockAuth, { mergeStreamToExternalLink: mockCallback, + forceMerge: true, }) // Verify callback received the merged data @@ -154,6 +156,7 @@ describe('mergeExternalLinks', () => { mergeExternalLinks(mockExternalLinksResult, mockAuth, { signal: controller.signal, mergeStreamToExternalLink: mockCallback, + forceMerge: true, }) ).rejects.toThrow(/abort/i) }) @@ -170,6 +173,7 @@ describe('mergeExternalLinks', () => { await expect( mergeExternalLinks(mockExternalLinksResult, mockAuth, { mergeStreamToExternalLink: mockCallback, + forceMerge: true, }) ).rejects.toThrow('Upload failed') }) @@ -189,6 +193,7 @@ describe('mergeExternalLinks', () => { const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, { mergeStreamToExternalLink: vi.fn().mockResolvedValue(uploadResult), + forceMerge: true, }) // Verify original manifest properties are preserved @@ -198,4 +203,15 @@ describe('mergeExternalLinks', () => { mockExternalLinksResult.manifest?.total_row_count ) }) + + it('should return original for single external link by default', async () => { + const mockCallback = vi.fn() + + const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, { + mergeStreamToExternalLink: mockCallback, + }) + + expect(result).toBe(mockExternalLinksResult) + expect(mockCallback).not.toHaveBeenCalled() + }) }) diff --git a/test/s3.integration.spec.ts b/test/s3.integration.spec.ts index ed9882e..e3cf6e3 100644 --- a/test/s3.integration.spec.ts +++ b/test/s3.integration.spec.ts @@ -248,6 +248,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { // Merge and upload to S3 const mergedResult = await mergeExternalLinks(result, auth, { + forceMerge: true, mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'), }) @@ -296,6 +297,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { } const mergedResult = await mergeExternalLinks(result, auth, { + forceMerge: true, mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'JSON_ARRAY'), }) @@ -332,6 +334,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { } const mergedResult = await mergeExternalLinks(result, auth, { + forceMerge: true, mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'ARROW_STREAM'), }) @@ -368,6 +371,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { } const mergedResult = await mergeExternalLinks(result, auth, { + forceMerge: true, mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'), })