diff --git a/src/api/executeStatement.ts b/src/api/executeStatement.ts index 44d5d70..36fcdae 100644 --- a/src/api/executeStatement.ts +++ b/src/api/executeStatement.ts @@ -4,8 +4,9 @@ import type { ExecuteStatementRequest, StatementResult, StatementState, + QueryMetrics, } from '../types.js' -import { postStatement, getStatement, cancelStatement } from '../databricks-api.js' +import { postStatement, getStatement, cancelStatement, getQueryMetrics } from '../databricks-api.js' import { extractWarehouseId, throwIfAborted, delay } from '../util.js' import { DatabricksSqlError, @@ -19,8 +20,21 @@ const TERMINAL_STATES = new Set([ 'CANCELED', 'CLOSED', ]) -const POLL_INTERVAL_MS = 500 -const MAX_POLL_INTERVAL_MS = 5000 +const POLL_INTERVAL_MS = 5000 + +async function fetchMetrics( + auth: AuthInfo, + statementId: string, + signal?: AbortSignal +): Promise { + try { + const queryInfo = await getQueryMetrics(auth, statementId, signal) + return queryInfo.metrics + } catch { + // Ignore metrics fetch errors - non-critical + return undefined + } +} /** * Execute SQL statement and poll until completion @@ -31,13 +45,17 @@ export async function executeStatement( options: ExecuteStatementOptions = {} ): Promise { const warehouseId = options.warehouse_id ?? extractWarehouseId(auth.httpPath) - const { signal, onProgress } = options + const { signal, onProgress, enableMetrics } = options // Check if already aborted throwIfAborted(signal, 'executeStatement') + // Helper to call onProgress with optional metrics + const emitProgress = onProgress + ? async (statementId: string) => onProgress(result.status, enableMetrics ? await fetchMetrics(auth, statementId, signal) : undefined) + : undefined + // 1. Build request (filter out undefined values) - // Keep payload small and aligned with the REST API contract. const request = Object.fromEntries( Object.entries({ warehouse_id: warehouseId, @@ -45,8 +63,8 @@ export async function executeStatement( byte_limit: options.byte_limit, disposition: options.disposition, format: options.format, - on_wait_timeout: options.on_wait_timeout, - wait_timeout: options.wait_timeout, + on_wait_timeout: options.on_wait_timeout ?? 'CONTINUE', + wait_timeout: options.wait_timeout ?? '50s', row_limit: options.row_limit, catalog: options.catalog, schema: options.schema, @@ -58,31 +76,19 @@ export async function executeStatement( let result = await postStatement(auth, request, signal) // 3. Poll until terminal state - let pollInterval = POLL_INTERVAL_MS - while (!TERMINAL_STATES.has(result.status.state)) { - // Check abort signal if (signal?.aborted) { - // Try to cancel on server - await cancelStatement(auth, result.statement_id).catch(() => { - // Ignore cancel errors - }) + await cancelStatement(auth, result.statement_id).catch(() => { }) throw new AbortError('Aborted during polling') } - // Call progress callback - onProgress?.(result.status) - - // Wait before next poll (exponential backoff) - await delay(pollInterval, signal) - pollInterval = Math.min(pollInterval * 1.5, MAX_POLL_INTERVAL_MS) - - // Get current status + await emitProgress?.(result.statement_id) + await delay(POLL_INTERVAL_MS, signal) result = await getStatement(auth, result.statement_id, signal) } // 4. Final progress callback - onProgress?.(result.status) + await emitProgress?.(result.statement_id) // 5. Handle terminal states if (result.status.state === 'SUCCEEDED') diff --git a/src/api/fetchAll.ts b/src/api/fetchAll.ts index f092d79..f5119d1 100644 --- a/src/api/fetchAll.ts +++ b/src/api/fetchAll.ts @@ -6,6 +6,7 @@ import type { RowObject, StatementResult, } from '../types.js' + import { fetchRow } from './fetchRow.js' /** diff --git a/src/api/fetchRow.ts b/src/api/fetchRow.ts index ef8d2af..968e992 100644 --- a/src/api/fetchRow.ts +++ b/src/api/fetchRow.ts @@ -6,12 +6,14 @@ import type { RowObject, StatementResult, } from '../types.js' + import { parser } from 'stream-json' import { streamArray } from 'stream-json/streamers/StreamArray' + import { getChunk } from '../databricks-api.js' -import { DatabricksSqlError, AbortError } from '../errors.js' -import { validateSucceededResult } from '../util.js' import { createRowMapper } from '../createRowMapper.js' +import { AbortError, DatabricksSqlError } from '../errors.js' +import { validateSucceededResult } from '../util.js' import { fetchStream } from './fetchStream.js' /** diff --git a/src/api/fetchStream.ts b/src/api/fetchStream.ts index 0f2cfac..89bf9d9 100644 --- a/src/api/fetchStream.ts +++ b/src/api/fetchStream.ts @@ -1,12 +1,16 @@ +import type { MergeFormat } from '@bitofsky/merge-streams' import type { AuthInfo, ExternalLinkInfo, - StatementResult, FetchStreamOptions, StatementManifest, + StatementResult, } from '../types.js' + import { PassThrough, Readable } from 'node:stream' -import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams' + +import { mergeStreamsFromUrls } from '@bitofsky/merge-streams' + import { getChunk } from '../databricks-api.js' import { AbortError, DatabricksSqlError } from '../errors.js' import { pipeUrlToOutput, validateSucceededResult } from '../util.js' diff --git a/src/api/mergeExternalLinks.ts b/src/api/mergeExternalLinks.ts index c670eb8..d0c725b 100644 --- a/src/api/mergeExternalLinks.ts +++ b/src/api/mergeExternalLinks.ts @@ -1,10 +1,11 @@ import type { AuthInfo, - StatementResult, MergeExternalLinksOptions, + StatementResult, } from '../types.js' -import { fetchStream } from './fetchStream.js' + import { validateSucceededResult } from '../util.js' +import { fetchStream } from './fetchStream.js' /** * Merge external links from StatementResult into a single stream, diff --git a/src/databricks-api.ts b/src/databricks-api.ts index 5ddcb43..3d0ca72 100644 --- a/src/databricks-api.ts +++ b/src/databricks-api.ts @@ -3,11 +3,14 @@ import type { ExecuteStatementRequest, StatementResult, GetChunkResponse, + QueryInfo, } from './types.js' import { httpRequest } from './http.js' // Base path for Databricks SQL Statement Execution API. const BASE_PATH = '/api/2.0/sql/statements' +// Base path for Query History API. +const HISTORY_BASE_PATH = '/api/2.0/sql/history/queries' /** * Execute SQL statement @@ -74,3 +77,19 @@ export async function getChunk( ...(signal ? { signal } : {}), }) } + +/** + * Get query metrics from Query History API + * GET /api/2.0/sql/history/queries/{query_id}?include_metrics=true + */ +export async function getQueryMetrics( + auth: AuthInfo, + queryId: string, + signal?: AbortSignal +): Promise { + return httpRequest(auth, { + method: 'GET', + path: `${HISTORY_BASE_PATH}/${queryId}?include_metrics=true`, + ...(signal ? { signal } : {}), + }) +} diff --git a/src/types.ts b/src/types.ts index d50fa93..35a8f84 100644 --- a/src/types.ts +++ b/src/types.ts @@ -28,6 +28,85 @@ export type StatementStatus = { } } +/** + * Query execution metrics from Query History API + * @see https://docs.databricks.com/api/workspace/queryhistory/list + */ +export type QueryMetrics = { + /** Total time in milliseconds */ + total_time_ms?: number + /** Compilation time in milliseconds */ + compilation_time_ms?: number + /** Execution time in milliseconds */ + execution_time_ms?: number + /** Result fetch time in milliseconds */ + result_fetch_time_ms?: number + /** Query execution time in milliseconds */ + query_execution_time_ms?: number + /** Metadata time in milliseconds */ + metadata_time_ms?: number + /** Task total time in milliseconds */ + task_total_time_ms?: number + /** Photon total time in milliseconds */ + photon_total_time_ms?: number + /** Query compilation start timestamp */ + query_compilation_start_timestamp?: number + /** Bytes read */ + read_bytes?: number + /** Remote bytes read */ + read_remote_bytes?: number + /** Remote bytes written */ + write_remote_bytes?: number + /** Cache bytes read */ + read_cache_bytes?: number + /** Bytes spilled to disk */ + spill_to_disk_bytes?: number + /** Network bytes sent */ + network_sent_bytes?: number + /** Pruned bytes */ + pruned_bytes?: number + /** Rows produced count */ + rows_produced_count?: number + /** Rows read count */ + rows_read_count?: number + /** Files read count */ + read_files_count?: number + /** Partitions read count */ + read_partitions_count?: number + /** Pruned files count */ + pruned_files_count?: number + /** Whether result is from cache */ + result_from_cache?: boolean + /** Percentage of bytes read from cache */ + bytes_read_from_cache_percentage?: number + /** Remote rows written */ + write_remote_rows?: number + /** Remote files written */ + write_remote_files?: number +} + +/** + * Query info from Query History API + * @see https://docs.databricks.com/api/workspace/queryhistory/list + */ +export type QueryInfo = { + query_id: string + status: string + query_text: string + query_start_time_ms: number + execution_end_time_ms?: number + query_end_time_ms?: number + user_id: number + user_name: string + endpoint_id: string + warehouse_id: string + rows_produced?: number + metrics?: QueryMetrics + is_final: boolean + duration?: number + statement_type?: string +} + /** Column schema information */ export type ColumnInfo = { name: string @@ -109,7 +188,9 @@ export type StatementParameter = { */ export type ExecuteStatementOptions = { /** Progress callback (called on each poll) */ - onProgress?: (status: StatementStatus) => void + onProgress?: (status: StatementStatus, metrics?: QueryMetrics) => void + /** Enable query metrics fetching during polling (default: false) */ + enableMetrics?: boolean /** Abort signal for cancellation */ signal?: AbortSignal /** Result byte limit */ diff --git a/test/executeStatement.spec.ts b/test/executeStatement.spec.ts index fd82a59..13f9c0c 100644 --- a/test/executeStatement.spec.ts +++ b/test/executeStatement.spec.ts @@ -7,6 +7,7 @@ import { mockRunningResult, mockSucceededAfterPolling, mockFailedResult, + mockQueryInfo, } from './mocks.js' describe('executeStatement', () => { @@ -53,9 +54,9 @@ describe('executeStatement', () => { const resultPromise = executeStatement('SELECT 42', mockAuth) - // Advance timers for polling delays - await vi.advanceTimersByTimeAsync(500) // First poll delay - await vi.advanceTimersByTimeAsync(750) // Second poll delay (500 * 1.5) + // Advance timers for polling delays (fixed 5000ms interval) + await vi.advanceTimersByTimeAsync(5000) // First poll delay + await vi.advanceTimersByTimeAsync(5000) // Second poll delay const result = await resultPromise @@ -79,12 +80,134 @@ describe('executeStatement', () => { const onProgress = vi.fn() const resultPromise = executeStatement('SELECT 42', mockAuth, { onProgress }) - await vi.advanceTimersByTimeAsync(500) + await vi.advanceTimersByTimeAsync(5000) await resultPromise - expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }) - expect(onProgress).toHaveBeenCalledWith({ state: 'SUCCEEDED' }) + expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) + expect(onProgress).toHaveBeenCalledWith({ state: 'SUCCEEDED' }, undefined) + }) + + it('should not fetch metrics when enableMetrics is false', async () => { + const mockFetch = vi + .fn() + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockPendingResult), + }) + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockSucceededAfterPolling), + }) + vi.stubGlobal('fetch', mockFetch) + + const onProgress = vi.fn() + const resultPromise = executeStatement('SELECT 42', mockAuth, { onProgress }) + + await vi.advanceTimersByTimeAsync(5000) + await resultPromise + + // Only 2 calls: postStatement + getStatement (no metrics calls) + expect(mockFetch).toHaveBeenCalledTimes(2) + // onProgress should be called without metrics + expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) + }) + + it('should fetch metrics when enableMetrics is true', async () => { + const mockFetch = vi + .fn() + // postStatement + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockPendingResult), + }) + // getQueryMetrics (during polling) + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockQueryInfo), + }) + // getStatement + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockSucceededAfterPolling), + }) + // getQueryMetrics (final) + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockQueryInfo), + }) + vi.stubGlobal('fetch', mockFetch) + + const onProgress = vi.fn() + const resultPromise = executeStatement('SELECT 42', mockAuth, { + onProgress, + enableMetrics: true, + }) + + await vi.advanceTimersByTimeAsync(5000) + await resultPromise + + // 4 calls: postStatement + getQueryMetrics + getStatement + getQueryMetrics + expect(mockFetch).toHaveBeenCalledTimes(4) + + // Check that onProgress was called with metrics + expect(onProgress).toHaveBeenCalledWith( + { state: 'PENDING' }, + expect.objectContaining({ + total_time_ms: 959, + execution_time_ms: 642, + }) + ) + expect(onProgress).toHaveBeenCalledWith( + { state: 'SUCCEEDED' }, + expect.objectContaining({ + total_time_ms: 959, + }) + ) + }) + + it('should handle metrics API failure gracefully', async () => { + const mockFetch = vi + .fn() + // postStatement + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockPendingResult), + }) + // getQueryMetrics fails (401 - no retry) + .mockResolvedValueOnce({ + ok: false, + status: 401, + statusText: 'Unauthorized', + text: () => Promise.resolve('Unauthorized'), + }) + // getStatement + .mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve(mockSucceededAfterPolling), + }) + // getQueryMetrics fails again (401 - no retry) + .mockResolvedValueOnce({ + ok: false, + status: 401, + statusText: 'Unauthorized', + text: () => Promise.resolve('Unauthorized'), + }) + vi.stubGlobal('fetch', mockFetch) + + const onProgress = vi.fn() + const resultPromise = executeStatement('SELECT 42', mockAuth, { + onProgress, + enableMetrics: true, + }) + + await vi.advanceTimersByTimeAsync(5000) + const result = await resultPromise + + // Statement should still succeed even if metrics fail + expect(result.status.state).toBe('SUCCEEDED') + // onProgress should be called without metrics (undefined) + expect(onProgress).toHaveBeenCalledWith({ state: 'PENDING' }, undefined) }) it('should throw DatabricksSqlError when statement fails', async () => { diff --git a/test/mocks.ts b/test/mocks.ts index 8803178..40f74e5 100644 --- a/test/mocks.ts +++ b/test/mocks.ts @@ -1,4 +1,4 @@ -import type { AuthInfo, StatementResult } from '../src/types.js' +import type { AuthInfo, StatementResult, QueryInfo } from '../src/types.js' export const mockAuth: AuthInfo = { token: 'test-token', @@ -116,3 +116,30 @@ export const mockExternalLinkData = [ ['3', '6'], ['4', '8'], ] + +// Mock query info with metrics +export const mockQueryInfo: QueryInfo = { + query_id: '01f0e17f-pending-test', + status: 'FINISHED', + query_text: 'SELECT 42', + query_start_time_ms: 1766720174230, + execution_end_time_ms: 1766720175189, + query_end_time_ms: 1766720175189, + user_id: 12345, + user_name: 'test@example.com', + endpoint_id: 'abc123def456', + warehouse_id: 'abc123def456', + rows_produced: 1, + is_final: true, + duration: 959, + statement_type: 'SELECT', + metrics: { + total_time_ms: 959, + read_bytes: 0, + rows_produced_count: 1, + compilation_time_ms: 213, + execution_time_ms: 642, + result_fetch_time_ms: 43, + result_from_cache: false, + }, +} diff --git a/test/s3.integration.spec.ts b/test/s3.integration.spec.ts index e3cf6e3..7b0ef5e 100644 --- a/test/s3.integration.spec.ts +++ b/test/s3.integration.spec.ts @@ -119,7 +119,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => { { name: 'big_int', type_text: 'BIGINT', - type_name: 'BIGINT', + type_name: 'LONG', position: 2, }, {