Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions src/api/executeStatement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import type {
ExecuteStatementRequest,
StatementResult,
StatementState,
QueryMetrics,
} from '../types.js'
import { postStatement, getStatement, cancelStatement } from '../databricks-api.js'
import { postStatement, getStatement, cancelStatement, getQueryMetrics } from '../databricks-api.js'
import { extractWarehouseId, throwIfAborted, delay } from '../util.js'
import {
DatabricksSqlError,
Expand All @@ -19,8 +20,21 @@ const TERMINAL_STATES = new Set<StatementState>([
'CANCELED',
'CLOSED',
])
const POLL_INTERVAL_MS = 500
const MAX_POLL_INTERVAL_MS = 5000
const POLL_INTERVAL_MS = 5000

async function fetchMetrics(
auth: AuthInfo,
statementId: string,
signal?: AbortSignal
): Promise<QueryMetrics | undefined> {
try {
const queryInfo = await getQueryMetrics(auth, statementId, signal)
return queryInfo.metrics
} catch {
// Ignore metrics fetch errors - non-critical
return undefined
}
}

/**
* Execute SQL statement and poll until completion
Expand All @@ -31,22 +45,26 @@ export async function executeStatement(
options: ExecuteStatementOptions = {}
): Promise<StatementResult> {
const warehouseId = options.warehouse_id ?? extractWarehouseId(auth.httpPath)
const { signal, onProgress } = options
const { signal, onProgress, enableMetrics } = options

// Check if already aborted
throwIfAborted(signal, 'executeStatement')

// Helper to call onProgress with optional metrics
const emitProgress = onProgress
? async (statementId: string) => onProgress(result.status, enableMetrics ? await fetchMetrics(auth, statementId, signal) : undefined)
: undefined

// 1. Build request (filter out undefined values)
// Keep payload small and aligned with the REST API contract.
const request = Object.fromEntries(
Object.entries({
warehouse_id: warehouseId,
statement: query,
byte_limit: options.byte_limit,
disposition: options.disposition,
format: options.format,
on_wait_timeout: options.on_wait_timeout,
wait_timeout: options.wait_timeout,
on_wait_timeout: options.on_wait_timeout ?? 'CONTINUE',
wait_timeout: options.wait_timeout ?? '50s',
row_limit: options.row_limit,
catalog: options.catalog,
schema: options.schema,
Expand All @@ -58,31 +76,19 @@ export async function executeStatement(
let result = await postStatement(auth, request, signal)

// 3. Poll until terminal state
let pollInterval = POLL_INTERVAL_MS

while (!TERMINAL_STATES.has(result.status.state)) {
// Check abort signal
if (signal?.aborted) {
// Try to cancel on server
await cancelStatement(auth, result.statement_id).catch(() => {
// Ignore cancel errors
})
await cancelStatement(auth, result.statement_id).catch(() => { })
throw new AbortError('Aborted during polling')
}

// Call progress callback
onProgress?.(result.status)

// Wait before next poll (exponential backoff)
await delay(pollInterval, signal)
pollInterval = Math.min(pollInterval * 1.5, MAX_POLL_INTERVAL_MS)

// Get current status
await emitProgress?.(result.statement_id)
await delay(POLL_INTERVAL_MS, signal)
result = await getStatement(auth, result.statement_id, signal)
}

// 4. Final progress callback
onProgress?.(result.status)
await emitProgress?.(result.statement_id)

// 5. Handle terminal states
if (result.status.state === 'SUCCEEDED')
Expand Down
1 change: 1 addition & 0 deletions src/api/fetchAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
RowObject,
StatementResult,
} from '../types.js'

import { fetchRow } from './fetchRow.js'

/**
Expand Down
6 changes: 4 additions & 2 deletions src/api/fetchRow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import type {
RowObject,
StatementResult,
} from '../types.js'

import { parser } from 'stream-json'
import { streamArray } from 'stream-json/streamers/StreamArray'

import { getChunk } from '../databricks-api.js'
import { DatabricksSqlError, AbortError } from '../errors.js'
import { validateSucceededResult } from '../util.js'
import { createRowMapper } from '../createRowMapper.js'
import { AbortError, DatabricksSqlError } from '../errors.js'
import { validateSucceededResult } from '../util.js'
import { fetchStream } from './fetchStream.js'

/**
Expand Down
8 changes: 6 additions & 2 deletions src/api/fetchStream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import type { MergeFormat } from '@bitofsky/merge-streams'
import type {
AuthInfo,
ExternalLinkInfo,
StatementResult,
FetchStreamOptions,
StatementManifest,
StatementResult,
} from '../types.js'

import { PassThrough, Readable } from 'node:stream'
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'

import { mergeStreamsFromUrls } from '@bitofsky/merge-streams'

import { getChunk } from '../databricks-api.js'
import { AbortError, DatabricksSqlError } from '../errors.js'
import { pipeUrlToOutput, validateSucceededResult } from '../util.js'
Expand Down
5 changes: 3 additions & 2 deletions src/api/mergeExternalLinks.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type {
AuthInfo,
StatementResult,
MergeExternalLinksOptions,
StatementResult,
} from '../types.js'
import { fetchStream } from './fetchStream.js'

import { validateSucceededResult } from '../util.js'
import { fetchStream } from './fetchStream.js'

/**
* Merge external links from StatementResult into a single stream,
Expand Down
19 changes: 19 additions & 0 deletions src/databricks-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import type {
ExecuteStatementRequest,
StatementResult,
GetChunkResponse,
QueryInfo,
} from './types.js'
import { httpRequest } from './http.js'

// Base path for Databricks SQL Statement Execution API.
const BASE_PATH = '/api/2.0/sql/statements'
// Base path for Query History API.
const HISTORY_BASE_PATH = '/api/2.0/sql/history/queries'

/**
* Execute SQL statement
Expand Down Expand Up @@ -74,3 +77,19 @@ export async function getChunk(
...(signal ? { signal } : {}),
})
}

/**
* Get query metrics from Query History API
* GET /api/2.0/sql/history/queries/{query_id}?include_metrics=true
*/
export async function getQueryMetrics(
auth: AuthInfo,
queryId: string,
signal?: AbortSignal
): Promise<QueryInfo> {
return httpRequest<QueryInfo>(auth, {
method: 'GET',
path: `${HISTORY_BASE_PATH}/${queryId}?include_metrics=true`,
...(signal ? { signal } : {}),
})
}
83 changes: 82 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,85 @@ export type StatementStatus = {
}
}

/**
* Query execution metrics from Query History API
* @see https://docs.databricks.com/api/workspace/queryhistory/list
*/
export type QueryMetrics = {
/** Total time in milliseconds */
total_time_ms?: number
/** Compilation time in milliseconds */
compilation_time_ms?: number
/** Execution time in milliseconds */
execution_time_ms?: number
/** Result fetch time in milliseconds */
result_fetch_time_ms?: number
/** Query execution time in milliseconds */
query_execution_time_ms?: number
/** Metadata time in milliseconds */
metadata_time_ms?: number
/** Task total time in milliseconds */
task_total_time_ms?: number
/** Photon total time in milliseconds */
photon_total_time_ms?: number
/** Query compilation start timestamp */
query_compilation_start_timestamp?: number
/** Bytes read */
read_bytes?: number
/** Remote bytes read */
read_remote_bytes?: number
/** Remote bytes written */
write_remote_bytes?: number
/** Cache bytes read */
read_cache_bytes?: number
/** Bytes spilled to disk */
spill_to_disk_bytes?: number
/** Network bytes sent */
network_sent_bytes?: number
/** Pruned bytes */
pruned_bytes?: number
/** Rows produced count */
rows_produced_count?: number
/** Rows read count */
rows_read_count?: number
/** Files read count */
read_files_count?: number
/** Partitions read count */
read_partitions_count?: number
/** Pruned files count */
pruned_files_count?: number
/** Whether result is from cache */
result_from_cache?: boolean
/** Percentage of bytes read from cache */
bytes_read_from_cache_percentage?: number
/** Remote rows written */
write_remote_rows?: number
/** Remote files written */
write_remote_files?: number
}

/**
* Query info from Query History API
* @see https://docs.databricks.com/api/workspace/queryhistory/list
*/
export type QueryInfo = {
query_id: string
status: string
query_text: string
query_start_time_ms: number
execution_end_time_ms?: number
query_end_time_ms?: number
user_id: number
user_name: string
endpoint_id: string
warehouse_id: string
rows_produced?: number
metrics?: QueryMetrics
is_final: boolean
duration?: number
statement_type?: string
}

/** Column schema information */
export type ColumnInfo = {
name: string
Expand Down Expand Up @@ -109,7 +188,9 @@ export type StatementParameter = {
*/
export type ExecuteStatementOptions = {
/** Progress callback (called on each poll) */
onProgress?: (status: StatementStatus) => void
onProgress?: (status: StatementStatus, metrics?: QueryMetrics) => void
/** Enable query metrics fetching during polling (default: false) */
enableMetrics?: boolean
/** Abort signal for cancellation */
signal?: AbortSignal
/** Result byte limit */
Expand Down
Loading