Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ function fetchStream(
```
- Merges `EXTERNAL_LINKS` into a single binary stream.
- Preserves the original format (`JSON_ARRAY`, `CSV`, `ARROW_STREAM`).
- Throws if the result is `INLINE`.
- Ends as an empty stream when no external links exist.
- `forceMerge: true` forces merge even when there is only a single external link.

### mergeExternalLinks(statementResult, auth, options)
```ts
Expand All @@ -196,7 +198,8 @@ function mergeExternalLinks(
- Creates a merged stream from `EXTERNAL_LINKS`, uploads it via
`options.mergeStreamToExternalLink`, then returns a `StatementResult`
with a single external link.
- Returns the original result unchanged when input is `INLINE`.
- Returns the original result unchanged when input is `INLINE` or already a
single external link (unless `forceMerge: true`).

### Options (Summary)
```ts
Expand Down Expand Up @@ -228,10 +231,12 @@ type FetchAllOptions = {

type FetchStreamOptions = {
signal?: AbortSignal
forceMerge?: boolean
}

type MergeExternalLinksOptions = {
signal?: AbortSignal
forceMerge?: boolean
mergeStreamToExternalLink: (stream: Readable) => Promise<{
externalLink: string
byte_count: number
Expand All @@ -243,6 +248,7 @@ type MergeExternalLinksOptions = {
## Notes
- Databricks requires `INLINE` results to use `JSON_ARRAY` format. `INLINE + CSV` is rejected by the API.
- `EXTERNAL_LINKS` are merged using `@bitofsky/merge-streams`.
- Requires Node.js >= 20 for global `fetch` and Web streams.

## Development
```bash
Expand Down
8 changes: 4 additions & 4 deletions src/api/fetchRow.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import type { Readable } from 'node:stream'
import { getChunk } from '../databricks-api.js'
import { DatabricksSqlError, AbortError } from '../errors.js'
import type {
AuthInfo,
FetchRowsOptions,
RowArray,
RowObject,
StatementResult,
} from '../types.js'
import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'
import { getChunk } from '../databricks-api.js'
import { DatabricksSqlError, AbortError } from '../errors.js'
import { validateSucceededResult } from '../util.js'
import { createRowMapper } from '../createRowMapper.js'
import { fetchStream } from './fetchStream.js'
Expand Down
100 changes: 66 additions & 34 deletions src/api/fetchStream.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { PassThrough, type Readable } from 'node:stream'
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'
import type {
AuthInfo,
ExternalLinkInfo,
StatementResult,
FetchStreamOptions,
StatementManifest,
} from '../types.js'
import { PassThrough, Readable } from 'node:stream'
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'
import { getChunk } from '../databricks-api.js'
import { AbortError } from '../errors.js'
import { validateSucceededResult } from '../util.js'
import { AbortError, DatabricksSqlError } from '../errors.js'
import { pipeUrlToOutput, validateSucceededResult } from '../util.js'

/**
* Create a readable stream from statement result.
Expand All @@ -20,31 +21,32 @@ export function fetchStream(
auth: AuthInfo,
options: FetchStreamOptions = {}
): Readable {
const { signal } = options
const { signal, forceMerge } = options
const manifest = validateSucceededResult(statementResult)
const format = manifest.format as MergeFormat

if (statementResult.result?.data_array) {
throw new DatabricksSqlError(
'fetchStream only supports EXTERNAL_LINKS results',
'UNSUPPORTED_FORMAT',
statementResult.statement_id
)
}

// Create PassThrough as output (readable by consumer)
const output = new PassThrough()

// Handle AbortSignal
if (signal) {
const onAbort = () => {
output.destroy(new AbortError('Stream aborted'))
}
const onAbort = () => output.destroy(new AbortError('Stream aborted'))
signal.addEventListener('abort', onAbort, { once: true })
output.once('close', () => {
signal.removeEventListener('abort', onAbort)
})
output.once('close', () => signal.removeEventListener('abort', onAbort))
}

// Start async merge process
// Errors are forwarded to the stream consumer via destroy.
mergeChunksToStream(statementResult, auth, manifest, format, output, signal).catch(
(err) => {
output.destroy(err as Error)
}
)
mergeChunksToStream(statementResult, auth, manifest, format, output, signal, forceMerge)
.catch((err) => output.destroy(err as Error))

return output
}
Expand All @@ -58,29 +60,59 @@ async function mergeChunksToStream(
manifest: StatementManifest,
format: MergeFormat,
output: PassThrough,
signal?: AbortSignal
signal?: AbortSignal,
forceMerge?: boolean
): Promise<void> {
const result = statementResult.result

// Collect all external link URLs
let urls = result?.external_links?.map((link) => link.external_link) ?? []

// If no URLs in initial result, fetch from chunks
if (urls.length === 0 && manifest.total_chunk_count > 0) {
for (let i = 0; i < manifest.total_chunk_count; i++) {
if (signal?.aborted) throw new AbortError('Aborted while collecting URLs')

// Chunk metadata contains external link URLs when results are chunked.
const chunkData = await getChunk(auth, statementResult.statement_id, i, signal)
const chunkUrls = chunkData.external_links?.map((link) => link.external_link) ?? []
urls.push(...chunkUrls)
}
}
const urls = await collectExternalUrls(statementResult, auth, manifest, signal)

// No external links - close the stream
if (urls.length === 0)
return void output.end()

// Single URL - pipe directly to output unless forcing merge
if (urls.length === 1 && !forceMerge)
// Avoid merge-streams overhead for a single URL unless forced.
return pipeUrlToOutput(urls[0]!, output, signal)

// Merge all URLs using merge-streams
await mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output })
return mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output })
}

async function collectExternalUrls(
statementResult: StatementResult,
auth: AuthInfo,
manifest: StatementManifest,
signal?: AbortSignal
): Promise<string[]> {
const urls = extractExternalLinks(statementResult.result?.external_links)
if (urls.length > 0)
return urls

if (!manifest.total_chunk_count)
return []

const chunkUrls: string[] = []
for (let i = 0; i < manifest.total_chunk_count; i++) {
if (signal?.aborted)
throw new AbortError('Aborted while collecting URLs')

// Chunk metadata contains external link URLs when results are chunked.
const chunkData = await getChunk(auth, statementResult.statement_id, i, signal)
chunkUrls.push(...extractExternalLinks(chunkData.external_links))
}

return chunkUrls
}

function extractExternalLinks(externalLinks?: ExternalLinkInfo[]): string[] {
if (!externalLinks)
return []

return externalLinks
.map((link) => link.external_link)
.filter(isNonEmptyString)
}

function isNonEmptyString(value: unknown): value is string {
return typeof value === 'string' && value.length > 0
}
17 changes: 15 additions & 2 deletions src/api/mergeExternalLinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@ export async function mergeExternalLinks(
auth: AuthInfo,
options: MergeExternalLinksOptions
): Promise<StatementResult> {
const { signal, mergeStreamToExternalLink } = options
const { signal, mergeStreamToExternalLink, forceMerge } = options

// If not external links, return original as-is
if (!statementResult.result?.external_links)
return statementResult

if (!forceMerge) {
const totalChunks = statementResult.manifest?.total_chunk_count
const externalLinks = statementResult.result.external_links
const isSingleChunk = totalChunks === undefined ? externalLinks.length <= 1 : totalChunks <= 1

// Skip merging when a single external link already exists unless forced.
if (isSingleChunk && externalLinks.length <= 1)
return statementResult
}

// Get merged stream via fetchStream
const stream = fetchStream(statementResult, auth, signal ? { signal } : {})
const stream = fetchStream(statementResult, auth, {
...signal ? { signal } : {},
...forceMerge !== undefined ? { forceMerge } : {},
})

// Upload via callback
const uploadResult = await mergeStreamToExternalLink(stream)
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ export type * from './types.js'
export * from './errors.js'

// Core functions
export * from './api'
export * from './api/index.js'
7 changes: 6 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ export type RowObject = Record<string, unknown>
export type FetchRowFormat = 'JSON_ARRAY' | 'JSON_OBJECT'

/** Options for fetchStream */
export type FetchStreamOptions = SignalOptions
export type FetchStreamOptions = SignalOptions & {
/** Force merge even when there is only a single external link */
forceMerge?: boolean
}

/** Options for fetchRow */
export type FetchRowsOptions = SignalOptions & {
Expand Down Expand Up @@ -180,6 +183,8 @@ export type MergeExternalLinksResult = {
export type MergeExternalLinksOptions = SignalOptions & {
/** Callback to upload merged stream to external link */
mergeStreamToExternalLink: (stream: Readable) => Promise<MergeExternalLinksResult>
/** Force merge even when there is only a single external link chunk */
forceMerge?: boolean
}

/**
Expand Down
34 changes: 34 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
import type { ReadableStream as WebReadableStream } from 'node:stream/web'
import type { StatementResult, StatementManifest } from './types.js'
import { AbortError, DatabricksSqlError } from './errors.js'

Expand Down Expand Up @@ -80,3 +83,34 @@ export function validateSucceededResult(

return statementResult.manifest
}

function isWebReadableStream(body: unknown): body is WebReadableStream {
return typeof (body as WebReadableStream).getReader === 'function'
}

export async function pipeUrlToOutput(
url: string,
output: NodeJS.WritableStream,
signal?: AbortSignal
): Promise<void> {
// Uses Node 20+ global fetch with Web streams.
if (signal?.aborted)
throw new AbortError('Aborted while streaming')

const response = await fetch(url, signal ? { signal } : undefined)
if (!response.ok) {
throw new Error(
`Failed to fetch external link: ${response.status} ${response.statusText}`
)
}

if (!response.body)
return void output.end()

const body = response.body
const input = isWebReadableStream(body)
? Readable.fromWeb(body)
: (body as NodeJS.ReadableStream)

await pipeline(input, output)
}
16 changes: 16 additions & 0 deletions test/mergeExternalLinks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe('mergeExternalLinks', () => {

const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
mergeStreamToExternalLink: mockCallback,
forceMerge: true,
})

// Verify callback was called
Expand Down Expand Up @@ -113,6 +114,7 @@ describe('mergeExternalLinks', () => {

await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
mergeStreamToExternalLink: mockCallback,
forceMerge: true,
})

// Verify callback received the merged data
Expand Down Expand Up @@ -154,6 +156,7 @@ describe('mergeExternalLinks', () => {
mergeExternalLinks(mockExternalLinksResult, mockAuth, {
signal: controller.signal,
mergeStreamToExternalLink: mockCallback,
forceMerge: true,
})
).rejects.toThrow(/abort/i)
})
Expand All @@ -170,6 +173,7 @@ describe('mergeExternalLinks', () => {
await expect(
mergeExternalLinks(mockExternalLinksResult, mockAuth, {
mergeStreamToExternalLink: mockCallback,
forceMerge: true,
})
).rejects.toThrow('Upload failed')
})
Expand All @@ -189,6 +193,7 @@ describe('mergeExternalLinks', () => {

const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
mergeStreamToExternalLink: vi.fn().mockResolvedValue(uploadResult),
forceMerge: true,
})

// Verify original manifest properties are preserved
Expand All @@ -198,4 +203,15 @@ describe('mergeExternalLinks', () => {
mockExternalLinksResult.manifest?.total_row_count
)
})

it('should return original for single external link by default', async () => {
const mockCallback = vi.fn()

const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
mergeStreamToExternalLink: mockCallback,
})

expect(result).toBe(mockExternalLinksResult)
expect(mockCallback).not.toHaveBeenCalled()
})
})
4 changes: 4 additions & 0 deletions test/s3.integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {

// Merge and upload to S3
const mergedResult = await mergeExternalLinks(result, auth, {
forceMerge: true,
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'),
})

Expand Down Expand Up @@ -296,6 +297,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
}

const mergedResult = await mergeExternalLinks(result, auth, {
forceMerge: true,
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'JSON_ARRAY'),
})

Expand Down Expand Up @@ -332,6 +334,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
}

const mergedResult = await mergeExternalLinks(result, auth, {
forceMerge: true,
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'ARROW_STREAM'),
})

Expand Down Expand Up @@ -368,6 +371,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
}

const mergedResult = await mergeExternalLinks(result, auth, {
forceMerge: true,
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'),
})

Expand Down