diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index a561d990b9..5127b479c0 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1,4 +1,5 @@ -import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse"; +import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse"; +import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse"; import { type RedisOptions } from "@internal/redis"; import { LogicalReplicationClient, @@ -81,7 +82,7 @@ type TaskRunInsert = { export type RunsReplicationServiceEvents = { message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }]; batchFlushed: [ - { flushId: string; taskRunInserts: TaskRunV2[]; payloadInserts: RawTaskRunPayloadV1[] } + { flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] }, ]; }; @@ -171,12 +172,9 @@ export class RunsReplicationService { description: "Insert retry attempts", }); - this._eventsProcessedCounter = this._meter.createCounter( - "runs_replication.events_processed", - { - description: "Replication events processed (inserts, updates, deletes)", - } - ); + this._eventsProcessedCounter = this._meter.createCounter("runs_replication.events_processed", { + description: "Replication events processed (inserts, updates, deletes)", + }); this._flushDurationHistogram = this._meter.createHistogram( "runs_replication.flush_duration_ms", @@ -578,32 +576,46 @@ export class RunsReplicationService { const taskRunInserts = preparedInserts .map(({ taskRunInsert }) => taskRunInsert) - .filter(Boolean) + .filter((x): x is TaskRunInsertArray => Boolean(x)) // batch inserts in clickhouse are more performant if the items // are pre-sorted by the primary key .sort((a, b) => { - if (a.organization_id !== b.organization_id) { - return a.organization_id < b.organization_id ? -1 : 1; + const aOrgId = a[TASK_RUN_INDEX.organization_id] as string; + const bOrgId = b[TASK_RUN_INDEX.organization_id] as string; + if (aOrgId !== bOrgId) { + return aOrgId < bOrgId ? -1 : 1; } - if (a.project_id !== b.project_id) { - return a.project_id < b.project_id ? -1 : 1; + const aProjId = a[TASK_RUN_INDEX.project_id] as string; + const bProjId = b[TASK_RUN_INDEX.project_id] as string; + if (aProjId !== bProjId) { + return aProjId < bProjId ? -1 : 1; } - if (a.environment_id !== b.environment_id) { - return a.environment_id < b.environment_id ? -1 : 1; + const aEnvId = a[TASK_RUN_INDEX.environment_id] as string; + const bEnvId = b[TASK_RUN_INDEX.environment_id] as string; + if (aEnvId !== bEnvId) { + return aEnvId < bEnvId ? -1 : 1; } - if (a.created_at !== b.created_at) { - return a.created_at - b.created_at; + const aCreatedAt = a[TASK_RUN_INDEX.created_at] as number; + const bCreatedAt = b[TASK_RUN_INDEX.created_at] as number; + if (aCreatedAt !== bCreatedAt) { + return aCreatedAt - bCreatedAt; } - return a.run_id < b.run_id ? -1 : 1; + const aRunId = a[TASK_RUN_INDEX.run_id] as string; + const bRunId = b[TASK_RUN_INDEX.run_id] as string; + if (aRunId === bRunId) return 0; + return aRunId < bRunId ? -1 : 1; }); const payloadInserts = preparedInserts .map(({ payloadInsert }) => payloadInsert) - .filter(Boolean) + .filter((x): x is PayloadInsertArray => Boolean(x)) // batch inserts in clickhouse are more performant if the items // are pre-sorted by the primary key .sort((a, b) => { - return a.run_id < b.run_id ? -1 : 1; + const aRunId = a[PAYLOAD_INDEX.run_id] as string; + const bRunId = b[PAYLOAD_INDEX.run_id] as string; + if (aRunId === bRunId) return 0; + return aRunId < bRunId ? -1 : 1; }); span.setAttribute("task_run_inserts", taskRunInserts.length); @@ -633,7 +645,6 @@ export class RunsReplicationService { this.logger.error("Error inserting task run inserts", { error: taskRunError, flushId, - runIds: taskRunInserts.map((r) => r.run_id), }); recordSpanError(span, taskRunError); } @@ -642,7 +653,6 @@ export class RunsReplicationService { this.logger.error("Error inserting payload inserts", { error: payloadError, flushId, - runIds: payloadInserts.map((r) => r.run_id), }); recordSpanError(span, payloadError); } @@ -760,26 +770,24 @@ export class RunsReplicationService { #getClickhouseInsertSettings() { if (this._insertStrategy === "insert") { return {}; - } else if (this._insertStrategy === "insert_async") { - return { - async_insert: 1 as const, - async_insert_max_data_size: "1000000", - async_insert_busy_timeout_ms: 1000, - wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), - }; } + + return { + async_insert: 1 as const, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), + }; } - async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) { + async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( - taskRunInserts, - { + const [insertError, insertResult] = + await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - } - ); + }); if (insertError) { this.logger.error("Error inserting task run inserts attempt", { @@ -795,16 +803,14 @@ export class RunsReplicationService { }); } - async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) { + async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) { return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads( - payloadInserts, - { + const [insertError, insertResult] = + await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { params: { clickhouse_settings: this.#getClickhouseInsertSettings(), }, - } - ); + }); if (insertError) { this.logger.error("Error inserting payload inserts attempt", { @@ -822,25 +828,15 @@ export class RunsReplicationService { async #prepareRunInserts( batchedRun: TaskRunInsert - ): Promise<{ taskRunInsert?: TaskRunV2; payloadInsert?: RawTaskRunPayloadV1 }> { + ): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> { this.logger.debug("Preparing run", { batchedRun, }); const { run, _version, event } = batchedRun; - if (!run.environmentType) { - return { - taskRunInsert: undefined, - payloadInsert: undefined, - }; - } - - if (!run.organizationId) { - return { - taskRunInsert: undefined, - payloadInsert: undefined, - }; + if (!run.environmentType || !run.organizationId) { + return {}; } if (event === "update" || event === "delete" || this._disablePayloadInsert) { @@ -852,10 +848,7 @@ export class RunsReplicationService { _version ); - return { - taskRunInsert, - payloadInsert: undefined, - }; + return { taskRunInsert }; } const [taskRunInsert, payloadInsert] = await Promise.all([ @@ -863,10 +856,7 @@ export class RunsReplicationService { this.#preparePayloadInsert(run, _version), ]); - return { - taskRunInsert, - payloadInsert, - }; + return { taskRunInsert, payloadInsert }; } async #prepareTaskRunInsert( @@ -875,66 +865,68 @@ export class RunsReplicationService { environmentType: string, event: "insert" | "update" | "delete", _version: bigint - ): Promise { + ): Promise { const output = await this.#prepareJson(run.output, run.outputType); - return { - environment_id: run.runtimeEnvironmentId, - organization_id: organizationId, - project_id: run.projectId, - run_id: run.id, - updated_at: run.updatedAt.getTime(), - created_at: run.createdAt.getTime(), - status: run.status, - environment_type: environmentType, - friendly_id: run.friendlyId, - engine: run.engine, - task_identifier: run.taskIdentifier, - queue: run.queue, - span_id: run.spanId, - trace_id: run.traceId, - error: { data: run.error }, - attempt: run.attemptNumber ?? 1, - schedule_id: run.scheduleId ?? "", - batch_id: run.batchId ?? "", - completed_at: run.completedAt?.getTime(), - started_at: run.startedAt?.getTime(), - executed_at: run.executedAt?.getTime(), - delay_until: run.delayUntil?.getTime(), - queued_at: run.queuedAt?.getTime(), - expired_at: run.expiredAt?.getTime(), - usage_duration_ms: run.usageDurationMs, - cost_in_cents: run.costInCents, - base_cost_in_cents: run.baseCostInCents, - tags: run.runTags ?? [], - task_version: run.taskVersion ?? "", - sdk_version: run.sdkVersion ?? "", - cli_version: run.cliVersion ?? "", - machine_preset: run.machinePreset ?? "", - root_run_id: run.rootTaskRunId ?? "", - parent_run_id: run.parentTaskRunId ?? "", - depth: run.depth, - is_test: run.isTest, - idempotency_key: run.idempotencyKey ?? "", - expiration_ttl: run.ttl ?? "", - output, - concurrency_key: run.concurrencyKey ?? "", - bulk_action_group_ids: run.bulkActionGroupIds ?? [], - worker_queue: run.masterQueue, - max_duration_in_seconds: run.maxDurationInSeconds ?? undefined, - _version: _version.toString(), - _is_deleted: event === "delete" ? 1 : 0, - }; + // Return array matching TASK_RUN_COLUMNS order + return [ + run.runtimeEnvironmentId, // environment_id + organizationId, // organization_id + run.projectId, // project_id + run.id, // run_id + run.updatedAt.getTime(), // updated_at + run.createdAt.getTime(), // created_at + run.status, // status + environmentType, // environment_type + run.friendlyId, // friendly_id + run.attemptNumber ?? 1, // attempt + run.engine, // engine + run.taskIdentifier, // task_identifier + run.queue, // queue + run.scheduleId ?? "", // schedule_id + run.batchId ?? "", // batch_id + run.completedAt?.getTime() ?? null, // completed_at + run.startedAt?.getTime() ?? null, // started_at + run.executedAt?.getTime() ?? null, // executed_at + run.delayUntil?.getTime() ?? null, // delay_until + run.queuedAt?.getTime() ?? null, // queued_at + run.expiredAt?.getTime() ?? null, // expired_at + run.usageDurationMs ?? 0, // usage_duration_ms + run.costInCents ?? 0, // cost_in_cents + run.baseCostInCents ?? 0, // base_cost_in_cents + output, // output + { data: run.error }, // error + run.runTags ?? [], // tags + run.taskVersion ?? "", // task_version + run.sdkVersion ?? "", // sdk_version + run.cliVersion ?? "", // cli_version + run.machinePreset ?? "", // machine_preset + run.rootTaskRunId ?? "", // root_run_id + run.parentTaskRunId ?? "", // parent_run_id + run.depth ?? 0, // depth + run.spanId, // span_id + run.traceId, // trace_id + run.idempotencyKey ?? "", // idempotency_key + run.ttl ?? "", // expiration_ttl + run.isTest ?? false, // is_test + _version.toString(), // _version + event === "delete" ? 1 : 0, // _is_deleted + run.concurrencyKey ?? "", // concurrency_key + run.bulkActionGroupIds ?? [], // bulk_action_group_ids + run.masterQueue ?? "", // worker_queue + run.maxDurationInSeconds ?? null, // max_duration_in_seconds + ]; } - async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise { + async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise { const payload = await this.#prepareJson(run.payload, run.payloadType); - return { - run_id: run.id, - created_at: run.createdAt.getTime(), - payload, - }; + // Return array matching PAYLOAD_COLUMNS order + return [ + run.id, // run_id + run.createdAt.getTime(), // created_at + payload, // payload + ]; } async #prepareJson( diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index e08b579738..718875491e 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -1,4 +1,4 @@ -import { ClickHouse } from "@internal/clickhouse"; +import { ClickHouse, TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse"; import { containerTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; import { readFile } from "node:fs/promises"; @@ -889,17 +889,14 @@ describe("RunsReplicationService (part 2/2)", () => { await setTimeout(1000); expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2); - expect(batchFlushedEvents?.[0].taskRunInserts[0]).toEqual( - expect.objectContaining({ - run_id: run.id, - status: "PENDING_VERSION", - }) + // Use TASK_RUN_INDEX for type-safe array access + expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.run_id]).toEqual(run.id); + expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.status]).toEqual( + "PENDING_VERSION" ); - expect(batchFlushedEvents?.[0].taskRunInserts[1]).toEqual( - expect.objectContaining({ - run_id: run.id, - status: "COMPLETED_SUCCESSFULLY", - }) + expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.run_id]).toEqual(run.id); + expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.status]).toEqual( + "COMPLETED_SUCCESSFULLY" ); await runsReplicationService.stop(); @@ -1070,18 +1067,18 @@ describe("RunsReplicationService (part 2/2)", () => { const curr = batchFlushedEvents[0]?.taskRunInserts[i]; const prevKey = [ - prev.organization_id, - prev.project_id, - prev.environment_id, - prev.created_at, - prev.run_id, + prev[TASK_RUN_INDEX.organization_id], + prev[TASK_RUN_INDEX.project_id], + prev[TASK_RUN_INDEX.environment_id], + prev[TASK_RUN_INDEX.created_at], + prev[TASK_RUN_INDEX.run_id], ]; const currKey = [ - curr.organization_id, - curr.project_id, - curr.environment_id, - curr.created_at, - curr.run_id, + curr[TASK_RUN_INDEX.organization_id], + curr[TASK_RUN_INDEX.project_id], + curr[TASK_RUN_INDEX.environment_id], + curr[TASK_RUN_INDEX.created_at], + curr[TASK_RUN_INDEX.run_id], ]; const keysAreEqual = prevKey.every((val, idx) => val === currKey[idx]); @@ -1111,7 +1108,7 @@ describe("RunsReplicationService (part 2/2)", () => { for (let i = 1; i < batchFlushedEvents[0]?.payloadInserts.length; i++) { const prev = batchFlushedEvents[0]?.payloadInserts[i - 1]; const curr = batchFlushedEvents[0]?.payloadInserts[i]; - expect(prev.run_id <= curr.run_id).toBeTruthy(); + expect(prev[PAYLOAD_INDEX.run_id] <= curr[PAYLOAD_INDEX.run_id]).toBeTruthy(); } await runsReplicationService.stop(); diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 0842665c14..f87251fceb 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -6,9 +6,11 @@ import { createClient, type ResultSet, type Row, + type BaseQueryParams, + type InsertResult, } from "@clickhouse/client"; import { recordSpanError, Span, startSpan, trace, Tracer } from "@internal/tracing"; -import { flattenAttributes, tryCatch } from "@trigger.dev/core/v3"; +import { flattenAttributes, tryCatch, type Result } from "@trigger.dev/core/v3"; import { z } from "zod"; import { InsertError, QueryError } from "./errors.js"; import type { @@ -645,6 +647,76 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { }; } + public insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): ClickhouseInsertFunction { + return async (events, options) => { + const queryId = randomUUID(); + + return await startSpan(this.tracer, "insert", async (span) => { + const eventsArray = Array.isArray(events) ? events : [events]; + + this.logger.debug("Inserting into clickhouse (compact)", { + clientName: this.name, + name: req.name, + table: req.table, + events: eventsArray.length, + settings: req.settings, + attributes: options?.attributes, + options, + queryId, + }); + + span.setAttributes({ + "clickhouse.clientName": this.name, + "clickhouse.tableName": req.table, + "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, + "clickhouse.format": "JSONCompactEachRowWithNames", + ...flattenAttributes(req.settings, "clickhouse.settings"), + ...flattenAttributes(options?.attributes), + }); + + // Build compact format: [columns, ...rows] + const compactData: any[] = [Array.from(req.columns)]; + for (let i = 0; i < eventsArray.length; i++) { + compactData.push(req.toArray(eventsArray[i])); + } + + const [clickhouseError, result] = await tryCatch( + this.client.insert({ + table: req.table, + format: "JSONCompactEachRowWithNames", + values: compactData, + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }) + ); + + if (clickhouseError) { + this.logger.error("Error inserting into clickhouse", { + name: req.name, + error: clickhouseError, + table: req.table, + }); + + recordSpanError(span, clickhouseError); + return [new InsertError(clickhouseError.message), null]; + } + + return [null, result]; + }); + }; + } + public insertUnsafe>(req: { name: string; table: string; @@ -654,11 +726,13 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { const queryId = randomUUID(); return await startSpan(this.tracer, "insert", async (span) => { + const eventsArray = Array.isArray(events) ? events : [events]; + this.logger.debug("Inserting into clickhouse", { clientName: this.name, name: req.name, table: req.table, - events: Array.isArray(events) ? events.length : 1, + events: eventsArray.length, settings: req.settings, attributes: options?.attributes, options, @@ -678,7 +752,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { this.client.insert({ table: req.table, format: "JSONEachRow", - values: Array.isArray(events) ? events : [events], + values: eventsArray, query_id: queryId, ...options?.params, clickhouse_settings: { @@ -725,26 +799,119 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { }); }; } + + public insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): ( + events: readonly any[][] | any[], + options?: { + attributes?: Record; + params?: BaseQueryParams; + } + ) => Promise> { + return async (events, options) => { + const queryId = randomUUID(); + + return await startSpan(this.tracer, "insert", async (span) => { + // Check if events is a single row (array) or multiple rows (array of arrays) + // If first element is not an array, treat as single row + const isSingleRow = events.length > 0 && !Array.isArray(events[0]); + const eventsArray: readonly any[][] = isSingleRow + ? [events as any[]] + : (events as readonly any[][]); + + this.logger.debug("Inserting into clickhouse (compact raw)", { + clientName: this.name, + name: req.name, + table: req.table, + events: eventsArray.length, + settings: req.settings, + attributes: options?.attributes, + options, + queryId, + }); + + span.setAttributes({ + "clickhouse.clientName": this.name, + "clickhouse.tableName": req.table, + "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, + "clickhouse.format": "JSONCompactEachRowWithNames", + ...flattenAttributes(req.settings, "clickhouse.settings"), + ...flattenAttributes(options?.attributes), + }); + + // Build compact format: [columns, ...rows] + // Data is already in array format, no conversion needed + const compactData: any[] = [Array.from(req.columns), ...eventsArray]; + + const [clickhouseError, result] = await tryCatch( + this.client.insert({ + table: req.table, + format: "JSONCompactEachRowWithNames", + values: compactData, + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }) + ); + + if (clickhouseError) { + this.logger.error("Error inserting into clickhouse", { + name: req.name, + error: clickhouseError, + table: req.table, + }); + + recordClickhouseError(span, clickhouseError); + return [new InsertError(clickhouseError.message), null]; + } + + this.logger.debug("Inserted into clickhouse", { + clientName: this.name, + name: req.name, + table: req.table, + result, + queryId, + }); + + span.setAttributes({ + "clickhouse.query_id": result.query_id, + "clickhouse.executed": result.executed, + "clickhouse.summary.read_rows": result.summary?.read_rows, + "clickhouse.summary.read_bytes": result.summary?.read_bytes, + "clickhouse.summary.written_rows": result.summary?.written_rows, + "clickhouse.summary.written_bytes": result.summary?.written_bytes, + "clickhouse.summary.total_rows_to_read": result.summary?.total_rows_to_read, + "clickhouse.summary.result_rows": result.summary?.result_rows, + "clickhouse.summary.result_bytes": result.summary?.result_bytes, + "clickhouse.summary.elapsed_ns": result.summary?.elapsed_ns, + }); + + return [null, result]; + }); + }; + } } -function recordClickhouseError(span: Span, error: Error) { +function recordClickhouseError(span: Span, error: Error): void { if (error instanceof ClickHouseError) { span.setAttributes({ "clickhouse.error.code": error.code, "clickhouse.error.message": error.message, "clickhouse.error.type": error.type, }); - recordSpanError(span, error); - } else { - recordSpanError(span, error); } + recordSpanError(span, error); } -function convertLogLevelToClickhouseLogLevel(logLevel?: LogLevel) { - if (!logLevel) { - return ClickHouseLogLevel.INFO; - } - +function convertLogLevelToClickhouseLogLevel(logLevel?: LogLevel): ClickHouseLogLevel { switch (logLevel) { case "debug": return ClickHouseLogLevel.DEBUG; diff --git a/internal-packages/clickhouse/src/client/noop.ts b/internal-packages/clickhouse/src/client/noop.ts index 3509297f9f..e0872cada6 100644 --- a/internal-packages/clickhouse/src/client/noop.ts +++ b/internal-packages/clickhouse/src/client/noop.ts @@ -159,4 +159,61 @@ export class NoopClient implements ClickhouseReader, ClickhouseWriter { ]; }; } + + public insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): (events: TRecord | TRecord[]) => Promise> { + return async (events: TRecord | TRecord[]) => { + return [ + null, + { + executed: true, + query_id: "noop", + summary: { + read_rows: "0", + read_bytes: "0", + written_rows: "0", + written_bytes: "0", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "0", + }, + response_headers: {}, + }, + ]; + }; + } + + public insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): (events: readonly any[][] | any[]) => Promise> { + return async (events: readonly any[][] | any[]) => { + return [ + null, + { + executed: true, + query_id: "noop", + summary: { + read_rows: "0", + read_bytes: "0", + written_rows: "0", + written_bytes: "0", + total_rows_to_read: "0", + result_rows: "0", + result_bytes: "0", + elapsed_ns: "0", + }, + response_headers: {}, + }, + ]; + }; + } } diff --git a/internal-packages/clickhouse/src/client/types.ts b/internal-packages/clickhouse/src/client/types.ts index 25cd2efde0..7120422508 100644 --- a/internal-packages/clickhouse/src/client/types.ts +++ b/internal-packages/clickhouse/src/client/types.ts @@ -220,5 +220,26 @@ export interface ClickhouseWriter { settings?: ClickHouseSettings; }): ClickhouseInsertFunction; + insertCompact>(req: { + name: string; + table: string; + columns: readonly string[]; + toArray: (record: TRecord) => any[]; + settings?: ClickHouseSettings; + }): ClickhouseInsertFunction; + + insertCompactRaw(req: { + name: string; + table: string; + columns: readonly string[]; + settings?: ClickHouseSettings; + }): ( + events: readonly any[][] | any[], + options?: { + attributes?: Record; + params?: BaseQueryParams; + } + ) => Promise>; + close(): Promise; } diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 03b8b81e13..08be47f629 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -3,8 +3,8 @@ import { ClickhouseClient } from "./client/client.js"; import { ClickhouseReader, ClickhouseWriter } from "./client/types.js"; import { NoopClient } from "./client/noop.js"; import { - insertTaskRuns, - insertRawTaskRunPayloads, + insertTaskRunsCompactArrays, + insertRawTaskRunPayloadsCompactArrays, getTaskRunsQueryBuilder, getTaskActivityQueryBuilder, getCurrentRunningStats, @@ -31,6 +31,14 @@ export type * from "./taskRuns.js"; export type * from "./taskEvents.js"; export type * from "./client/queryBuilder.js"; +// Re-export column constants and indices for type-safe array access +export { + TASK_RUN_COLUMNS, + TASK_RUN_INDEX, + PAYLOAD_COLUMNS, + PAYLOAD_INDEX, +} from "./taskRuns.js"; + // TSQL query execution export { executeTSQL, @@ -168,8 +176,8 @@ export class ClickHouse { get taskRuns() { return { - insert: insertTaskRuns(this.writer), - insertPayloads: insertRawTaskRunPayloads(this.writer), + insertCompactArrays: insertTaskRunsCompactArrays(this.writer), + insertPayloadsCompactArrays: insertRawTaskRunPayloadsCompactArrays(this.writer), queryBuilder: getTaskRunsQueryBuilder(this.reader), countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader), tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader), diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index b51c9f38c0..feecb63a00 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -1,7 +1,13 @@ import { clickhouseTest } from "@internal/testcontainers"; import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; -import { getTaskRunsQueryBuilder, insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js"; +import { + getTaskRunsQueryBuilder, + insertRawTaskRunPayloadsCompactArrays, + insertTaskRunsCompactArrays, + type TaskRunInsertArray, + type PayloadInsertArray, +} from "./taskRuns.js"; describe("Task Runs V2", () => { clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => { @@ -11,61 +17,64 @@ describe("Task Runs V2", () => { logLevel: "debug", }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const insertPayloads = insertRawTaskRunPayloads(client, { + const insertPayloads = insertRawTaskRunPayloadsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "env_1234", - environment_type: "DEVELOPMENT", - organization_id: "org_1234", - project_id: "project_1234", - run_id: "run_1234", - friendly_id: "friendly_1234", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "my-task", - queue: "my-queue", - schedule_id: "schedule_1234", - batch_id: "batch_1234", - created_at: Date.now(), - updated_at: Date.now(), - completed_at: undefined, - tags: ["tag1", "tag2"], - output: { - key: "value", - }, - error: { - type: "BUILT_IN_ERROR", - name: "Error", - message: "error", - stackTrace: "stack trace", - }, - usage_duration_ms: 1000, - cost_in_cents: 100, - task_version: "1.0.0", - sdk_version: "1.0.0", - cli_version: "1.0.0", - machine_preset: "small-1x", - is_test: true, - span_id: "span_1234", - trace_id: "trace_1234", - idempotency_key: "idempotency_key_1234", - expiration_ttl: "1h", - root_run_id: "root_run_1234", - parent_run_id: "parent_run_1234", - depth: 1, - concurrency_key: "concurrency_key_1234", - bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], - _version: "1", - }, - ]); + const now = Date.now(); + const taskRunData: TaskRunInsertArray = [ + "env_1234", // environment_id + "org_1234", // organization_id + "project_1234", // project_id + "run_1234", // run_id + now, // updated_at + now, // created_at + "PENDING", // status + "DEVELOPMENT", // environment_type + "friendly_1234", // friendly_id + 1, // attempt + "V2", // engine + "my-task", // task_identifier + "my-queue", // queue + "schedule_1234", // schedule_id + "batch_1234", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + null, // queued_at + null, // expired_at + 1000, // usage_duration_ms + 100, // cost_in_cents + 0, // base_cost_in_cents + { data: { key: "value" } }, // output + { data: { type: "BUILT_IN_ERROR", name: "Error", message: "error", stackTrace: "stack trace" } }, // error + ["tag1", "tag2"], // tags + "1.0.0", // task_version + "1.0.0", // sdk_version + "1.0.0", // cli_version + "small-1x", // machine_preset + "root_run_1234", // root_run_id + "parent_run_1234", // parent_run_id + 1, // depth + "span_1234", // span_id + "trace_1234", // trace_id + "idempotency_key_1234", // idempotency_key + "1h", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "concurrency_key_1234", // concurrency_key + ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([taskRunData]); expect(insertError).toBeNull(); expect(insertResult).toEqual(expect.objectContaining({ executed: true })); @@ -99,15 +108,13 @@ describe("Task Runs V2", () => { ]) ); - const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([ - { - run_id: "run_1234", - created_at: Date.now(), - payload: { - key: "value", - }, - }, - ]); + const payloadData: PayloadInsertArray = [ + "run_1234", // run_id + Date.now(), // created_at + { data: { key: "value" } }, // payload + ]; + + const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([payloadData]); expect(insertPayloadsError).toBeNull(); expect(insertPayloadsResult).toEqual(expect.objectContaining({ executed: true })); @@ -137,96 +144,110 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "1", - }, - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "COMPLETED_SUCCESSFULLY", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "2", - }, - ]); + const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); + const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + + const run1: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "PENDING", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const run2: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "COMPLETED_SUCCESSFULLY", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "2", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([run1, run2]); expect(insertError).toBeNull(); expect(insertResult).toEqual(expect.objectContaining({ executed: true })); @@ -266,54 +287,62 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insert = insertTaskRuns(client, { + const insert = insertTaskRunsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertError, insertResult] = await insert([ - { - environment_id: "cm9kddfcs01zqdy88ld9mmrli", - organization_id: "cm8zs78wb0002dy616dg75tv3", - project_id: "cm9kddfbz01zpdy88t9dstecu", - run_id: "cma45oli70002qrdy47w0j4n7", - environment_type: "PRODUCTION", - friendly_id: "run_cma45oli70002qrdy47w0j4n7", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "retry-task", - queue: "task/retry-task", - schedule_id: "", - batch_id: "", - root_run_id: "", - parent_run_id: "", - depth: 0, - span_id: "538677637f937f54", - trace_id: "20a28486b0b9f50c647b35e8863e36a5", - idempotency_key: "", - created_at: new Date("2025-04-30 16:34:04.312").getTime(), - updated_at: new Date("2025-04-30 16:34:04.312").getTime(), - started_at: null, - executed_at: null, - completed_at: null, - delay_until: null, - queued_at: new Date("2025-04-30 16:34:04.311").getTime(), - expired_at: null, - expiration_ttl: "", - usage_duration_ms: 0, - cost_in_cents: 0, - base_cost_in_cents: 0, - output: null, - error: null, - tags: [], - task_version: "", - sdk_version: "", - cli_version: "", - machine_preset: "", - is_test: true, - _version: "1", - }, - ]); + const createdAt = new Date("2025-04-30 16:34:04.312").getTime(); + const queuedAt = new Date("2025-04-30 16:34:04.311").getTime(); + + const taskRun: TaskRunInsertArray = [ + "cm9kddfcs01zqdy88ld9mmrli", // environment_id + "cm8zs78wb0002dy616dg75tv3", // organization_id + "cm9kddfbz01zpdy88t9dstecu", // project_id + "cma45oli70002qrdy47w0j4n7", // run_id + createdAt, // updated_at + createdAt, // created_at + "PENDING", // status + "PRODUCTION", // environment_type + "run_cma45oli70002qrdy47w0j4n7", // friendly_id + 1, // attempt + "V2", // engine + "retry-task", // task_identifier + "task/retry-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + null, // started_at + null, // executed_at + null, // delay_until + queuedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "538677637f937f54", // span_id + "20a28486b0b9f50c647b35e8863e36a5", // trace_id + "", // idempotency_key + "", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + ]; + + const [insertError, insertResult] = await insert([taskRun]); const queryBuilder = getTaskRunsQueryBuilder(client)(); queryBuilder.where("environment_id = {environmentId: String}", { @@ -360,15 +389,15 @@ describe("Task Runs V2", () => { url: clickhouseContainer.getConnectionUrl(), }); - const insertPayloads = insertRawTaskRunPayloads(client, { + const insertPayloads = insertRawTaskRunPayloadsCompactArrays(client, { async_insert: 0, // turn off async insert for this test }); - const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([ + const payloadData: PayloadInsertArray = [ + "run_1234", // run_id + Date.now(), // created_at { - run_id: "run_1234", - created_at: Date.now(), - payload: { + data: { data: { title: { id: "123", @@ -376,8 +405,10 @@ describe("Task Runs V2", () => { "title.id": 123, }, }, - }, - ]); + }, // payload + ]; + + const [insertPayloadsError, insertPayloadsResult] = await insertPayloads([payloadData]); expect(insertPayloadsError).toBeNull(); expect(insertPayloadsResult).toEqual(expect.objectContaining({ executed: true })); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index d57a8b2a3e..f24bd9e45d 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -52,6 +52,77 @@ export const TaskRunV2 = z.object({ export type TaskRunV2 = z.input; +// Column order for compact format - must match ClickHouse table schema +export const TASK_RUN_COLUMNS = [ + "environment_id", + "organization_id", + "project_id", + "run_id", + "updated_at", + "created_at", + "status", + "environment_type", + "friendly_id", + "attempt", + "engine", + "task_identifier", + "queue", + "schedule_id", + "batch_id", + "completed_at", + "started_at", + "executed_at", + "delay_until", + "queued_at", + "expired_at", + "usage_duration_ms", + "cost_in_cents", + "base_cost_in_cents", + "output", + "error", + "tags", + "task_version", + "sdk_version", + "cli_version", + "machine_preset", + "root_run_id", + "parent_run_id", + "depth", + "span_id", + "trace_id", + "idempotency_key", + "expiration_ttl", + "is_test", + "_version", + "_is_deleted", + "concurrency_key", + "bulk_action_group_ids", + "worker_queue", + "max_duration_in_seconds", +] as const; + +export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number]; + +// Type-safe column indices generated from TASK_RUN_COLUMNS +// This ensures indices stay in sync with column order automatically +export const TASK_RUN_INDEX = Object.fromEntries( + TASK_RUN_COLUMNS.map((col, idx) => [col, idx]) +) as { readonly [K in TaskRunColumnName]: number }; + +export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) { + return ch.insertCompactRaw({ + name: "insertTaskRunsCompactArrays", + table: "trigger_dev.task_runs_v2", + columns: TASK_RUN_COLUMNS, + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +// Object-based insert function for tests and non-performance-critical code export function insertTaskRuns(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertTaskRuns", @@ -73,6 +144,98 @@ export const RawTaskRunPayloadV1 = z.object({ export type RawTaskRunPayloadV1 = z.infer; +export const PAYLOAD_COLUMNS = ["run_id", "created_at", "payload"] as const; + +export type PayloadColumnName = (typeof PAYLOAD_COLUMNS)[number]; + +// Type-safe column indices generated from PAYLOAD_COLUMNS +export const PAYLOAD_INDEX = Object.fromEntries( + PAYLOAD_COLUMNS.map((col, idx) => [col, idx]) +) as { readonly [K in PayloadColumnName]: number }; + +/** + * Type-safe tuple representing a task run insert array. + * Order matches TASK_RUN_COLUMNS exactly. + */ +export type TaskRunInsertArray = [ + environment_id: string, + organization_id: string, + project_id: string, + run_id: string, + updated_at: number, + created_at: number, + status: string, + environment_type: string, + friendly_id: string, + attempt: number, + engine: string, + task_identifier: string, + queue: string, + schedule_id: string, + batch_id: string, + completed_at: number | null, + started_at: number | null, + executed_at: number | null, + delay_until: number | null, + queued_at: number | null, + expired_at: number | null, + usage_duration_ms: number, + cost_in_cents: number, + base_cost_in_cents: number, + output: { data: unknown }, + error: { data: unknown }, + tags: string[], + task_version: string, + sdk_version: string, + cli_version: string, + machine_preset: string, + root_run_id: string, + parent_run_id: string, + depth: number, + span_id: string, + trace_id: string, + idempotency_key: string, + expiration_ttl: string, + is_test: boolean, + _version: string, + _is_deleted: number, + concurrency_key: string, + bulk_action_group_ids: string[], + worker_queue: string, + max_duration_in_seconds: number | null, +]; + +/** + * Type-safe tuple representing a payload insert array. + * Order matches PAYLOAD_COLUMNS exactly. + */ +export type PayloadInsertArray = [ + run_id: string, + created_at: number, + payload: { data: unknown }, +]; + +export function insertRawTaskRunPayloadsCompactArrays( + ch: ClickhouseWriter, + settings?: ClickHouseSettings +) { + return ch.insertCompactRaw({ + name: "insertRawTaskRunPayloadsCompactArrays", + table: "trigger_dev.raw_task_runs_payload_v1", + columns: PAYLOAD_COLUMNS, + settings: { + async_insert: 1, + wait_for_async_insert: 0, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +// Object-based insert function for tests and non-performance-critical code export function insertRawTaskRunPayloads(ch: ClickhouseWriter, settings?: ClickHouseSettings) { return ch.insert({ name: "insertRawTaskRunPayloads", diff --git a/internal-packages/replication/src/pgoutput.ts b/internal-packages/replication/src/pgoutput.ts index 0e75a697f4..809ad87758 100644 --- a/internal-packages/replication/src/pgoutput.ts +++ b/internal-packages/replication/src/pgoutput.ts @@ -20,6 +20,18 @@ export type PgoutputMessage = | MessageType | MessageUpdate; +export type PgoutputMessageArray = + | MessageBegin + | MessageCommit + | MessageDeleteArray + | MessageInsertArray + | MessageMessage + | MessageOrigin + | MessageRelation + | MessageTruncate + | MessageType + | MessageUpdateArray; + export interface MessageBegin { tag: "begin"; commitLsn: string | null; @@ -95,6 +107,26 @@ export interface MessageUpdate { new: Record; } +// Array variants for zero-copy performance +export interface MessageInsertArray { + tag: "insert"; + relation: MessageRelation; + new: any[]; +} +export interface MessageUpdateArray { + tag: "update"; + relation: MessageRelation; + key: any[] | null; + old: any[] | null; + new: any[]; +} +export interface MessageDeleteArray { + tag: "delete"; + relation: MessageRelation; + key: any[] | null; + old: any[] | null; +} + class BinaryReader { private offset = 0; constructor(private buf: Buffer) {} @@ -193,6 +225,35 @@ export class PgoutputParser { } } + public parseArray(buf: Buffer): PgoutputMessageArray { + const reader = new BinaryReader(buf); + const tag = reader.readUint8(); + switch (tag) { + case 0x42: + return this.msgBegin(reader); + case 0x4f: + return this.msgOrigin(reader); + case 0x59: + return this.msgType(reader); + case 0x52: + return this.msgRelation(reader); + case 0x49: + return this.msgInsertArray(reader); + case 0x55: + return this.msgUpdateArray(reader); + case 0x44: + return this.msgDeleteArray(reader); + case 0x54: + return this.msgTruncate(reader); + case 0x4d: + return this.msgMessage(reader); + case 0x43: + return this.msgCommit(reader); + default: + throw Error("unknown pgoutput message"); + } + } + private msgBegin(reader: BinaryReader): MessageBegin { return { tag: "begin", @@ -312,6 +373,55 @@ export class PgoutputParser { } return { tag: "delete", relation, key, old }; } + + // Array variants - skip object creation for performance + private msgInsertArray(reader: BinaryReader): MessageInsertArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + reader.readUint8(); // consume the 'N' key + return { + tag: "insert", + relation, + new: this.readTupleAsArray(reader, relation), + }; + } + private msgUpdateArray(reader: BinaryReader): MessageUpdateArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + let key: any[] | null = null; + let old: any[] | null = null; + let new_: any[] | null = null; + const subMsgKey = reader.readUint8(); + if (subMsgKey === 0x4b) { + key = this.readTupleAsArray(reader, relation); + reader.readUint8(); + new_ = this.readTupleAsArray(reader, relation); + } else if (subMsgKey === 0x4f) { + old = this.readTupleAsArray(reader, relation); + reader.readUint8(); + new_ = this.readTupleAsArray(reader, relation, old); + } else if (subMsgKey === 0x4e) { + new_ = this.readTupleAsArray(reader, relation); + } else { + throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); + } + return { tag: "update", relation, key, old, new: new_ }; + } + private msgDeleteArray(reader: BinaryReader): MessageDeleteArray { + const relation = this._relationCache.get(reader.readInt32()); + if (!relation) throw Error("missing relation"); + let key: any[] | null = null; + let old: any[] | null = null; + const subMsgKey = reader.readUint8(); + if (subMsgKey === 0x4b) { + key = this.readTupleAsArray(reader, relation); + } else if (subMsgKey === 0x4f) { + old = this.readTupleAsArray(reader, relation); + } else { + throw Error(`unknown submessage key ${String.fromCharCode(subMsgKey)}`); + } + return { tag: "delete", relation, key, old }; + } private readKeyTuple(reader: BinaryReader, relation: MessageRelation): Record { const tuple = this.readTuple(reader, relation); const key = Object.create(null); @@ -354,6 +464,40 @@ export class PgoutputParser { } return tuple; } + + private readTupleAsArray( + reader: BinaryReader, + { columns }: MessageRelation, + unchangedToastFallback?: any[] | null + ): any[] { + const nfields = reader.readInt16(); + const tuple = new Array(nfields); + for (let i = 0; i < nfields; i++) { + const { parser } = columns[i]; + const kind = reader.readUint8(); + switch (kind) { + case 0x62: // 'b' binary + const bsize = reader.readInt32(); + tuple[i] = reader.read(bsize); + break; + case 0x74: // 't' text + const valsize = reader.readInt32(); + const valbuf = reader.read(valsize); + const valtext = reader.decodeText(valbuf); + tuple[i] = parser(valtext); + break; + case 0x6e: // 'n' null + tuple[i] = null; + break; + case 0x75: // 'u' unchanged toast datum + tuple[i] = unchangedToastFallback?.[i]; + break; + default: + throw Error(`unknown attribute kind ${String.fromCharCode(kind)}`); + } + } + return tuple; + } private msgTruncate(reader: BinaryReader): MessageTruncate { const nrels = reader.readInt32(); const flags = reader.readUint8(); diff --git a/packages/core/src/v3/imports/superjson-cjs.cts b/packages/core/src/v3/imports/superjson-cjs.cts new file mode 100644 index 0000000000..a7f1466e7c --- /dev/null +++ b/packages/core/src/v3/imports/superjson-cjs.cts @@ -0,0 +1,15 @@ +// @ts-ignore +const { default: superjson } = require("superjson"); + +// @ts-ignore +superjson.registerCustom( + { + isApplicable: (v: unknown): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), + serialize: (v: Buffer) => [...v], + deserialize: (v: number[]) => Buffer.from(v), + }, + "buffer" +); + +// @ts-ignore +module.exports.default = superjson; diff --git a/packages/core/src/v3/imports/superjson.ts b/packages/core/src/v3/imports/superjson.ts new file mode 100644 index 0000000000..aa29250523 --- /dev/null +++ b/packages/core/src/v3/imports/superjson.ts @@ -0,0 +1,14 @@ +// @ts-ignore +import superjson from "superjson"; + +superjson.registerCustom( + { + isApplicable: (v): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), + serialize: (v) => [...v], + deserialize: (v) => Buffer.from(v), + }, + "buffer" +); + +// @ts-ignore +export default superjson; diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index 9bacc41422..ed4d2a0895 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -13,6 +13,7 @@ import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { TriggerTracer } from "../tracer.js"; import { zodfetch } from "../zodfetch.js"; import { flattenAttributes } from "./flattenAttributes.js"; +import superjson from "../imports/superjson.js"; export type IOPacket = { data?: string | undefined; @@ -32,9 +33,7 @@ export async function parsePacket(value: IOPacket, options?: ParsePacketOptions) case "application/json": return JSON.parse(value.data, makeSafeReviver(options)); case "application/super+json": - const { parse } = await loadSuperJSON(); - - return parse(value.data); + return superjson.parse(value.data); case "text/plain": return value.data; case "application/store": @@ -58,11 +57,9 @@ export async function parsePacketAsJson( case "application/json": return JSON.parse(value.data, makeSafeReviver(options)); case "application/super+json": - const { parse, serialize } = await loadSuperJSON(); - - const superJsonResult = parse(value.data); + const superJsonResult = superjson.parse(value.data); - const { json } = serialize(superJsonResult); + const { json } = superjson.serialize(superJsonResult); return json; case "text/plain": @@ -95,8 +92,7 @@ export async function stringifyIO(value: any): Promise { } try { - const { stringify } = await loadSuperJSON(); - const data = stringify(value); + const data = superjson.stringify(value); return { data, dataType: "application/super+json" }; } catch { @@ -302,14 +298,12 @@ export async function createPacketAttributes( [dataTypeKey]: packet.dataType, }; case "application/super+json": - const { parse } = await loadSuperJSON(); - if (typeof packet.data === "undefined" || packet.data === null) { return; } try { - const parsed = parse(packet.data) as any; + const parsed = superjson.parse(packet.data) as any; const jsonified = JSON.parse(JSON.stringify(parsed, makeSafeReplacer())); const result = { @@ -358,9 +352,7 @@ export async function createPacketAttributesAsJson( ); } case "application/super+json": { - const { deserialize } = await loadSuperJSON(); - - const deserialized = deserialize(data) as any; + const deserialized = superjson.deserialize(data) as any; const jsonify = safeJsonParse(JSON.stringify(deserialized, makeSafeReplacer())); return imposeAttributeLimits( @@ -390,18 +382,16 @@ export async function prettyPrintPacket( rawData = safeJsonParse(rawData); } - const { deserialize } = await loadSuperJSON(); - const hasCircularReferences = rawData && rawData.meta && hasCircularReference(rawData.meta); if (hasCircularReferences) { - return await prettyPrintPacket(deserialize(rawData), "application/json", { + return await prettyPrintPacket(superjson.deserialize(rawData), "application/json", { ...options, cloneReferences: false, }); } - return await prettyPrintPacket(deserialize(rawData), "application/json", { + return await prettyPrintPacket(superjson.deserialize(rawData), "application/json", { ...options, cloneReferences: true, }); @@ -512,21 +502,6 @@ function getPacketExtension(outputType: string): string { } } -async function loadSuperJSON() { - const superjson = await import("superjson"); - - superjson.registerCustom( - { - isApplicable: (v): v is Buffer => typeof Buffer === "function" && Buffer.isBuffer(v), - serialize: (v) => [...v], - deserialize: (v) => Buffer.from(v), - }, - "buffer" - ); - - return superjson; -} - function safeJsonParse(value: string): any { try { return JSON.parse(value); @@ -554,7 +529,6 @@ function safeJsonParse(value: string): any { * @throws {Error} If the newPayload is not valid JSON */ export async function replaceSuperJsonPayload(original: string, newPayload: string) { - const superjson = await loadSuperJSON(); const originalObject = superjson.parse(original); const newPayloadObject = JSON.parse(newPayload); const { meta } = superjson.serialize(originalObject); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 185ae798c3..57989c3f7f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5594,9 +5594,6 @@ packages: resolution: {integrity: sha512-R8gLRTZeyp03ymzP/6Lil/28tGeGEzhx1q2k703KGWRAI1VdvPIXdG70VJc2pAMw3NA6JKL5hhFu1sJX0Mnn/A==} engines: {node: '>=6.0.0'} - '@jridgewell/source-map@0.3.11': - resolution: {integrity: sha512-ZMp1V8ZFcPG5dIWnQLr3NSI1MiCU7UETdS/A0G8V/XWHvJv3ZsFqutJn1Y5RPmAPX6F3BiE397OqveU/9NCuIA==} - '@jridgewell/source-map@0.3.3': resolution: {integrity: sha512-b+fsZXeLYi9fEULmfBrhxn4IrPlINf8fiNarzTof004v3lFdntdwa9PF7vFJqm3mg7s+ScJMxXaE3Acp1irZcg==} @@ -17273,9 +17270,6 @@ packages: pump@2.0.1: resolution: {integrity: sha512-ruPMNRkN3MHP1cWJc9OWr+T/xDP0jhXYCLfJcBuX54hhfIBnaQmAUMfDcG4DM5UMWByBbJY69QSphm3jtDKIkA==} - pump@3.0.0: - resolution: {integrity: sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==} - pump@3.0.2: resolution: {integrity: sha512-tUPXtzlGM8FE3P0ZL6DVs/3P58k9nk8/jZeQCurTJylQA8qFYzHFfhBJkuqyE0FifOsQ0uKWekiZ5g8wtr28cw==} @@ -18770,11 +18764,6 @@ packages: uglify-js: optional: true - terser@5.17.1: - resolution: {integrity: sha512-hVl35zClmpisy6oaoKALOpS0rDYLxRFLHhRuDlEGTKey9qHjS1w9GMORjuwIMt70Wan4lwsLYyWDVnWgF+KUEw==} - engines: {node: '>=10'} - hasBin: true - terser@5.44.1: resolution: {integrity: sha512-t/R3R/n0MSwnnazuPpPNVO60LX0SKL45pyl9YlvxIdkH0Of7D5qM2EVe+yASRIlY5pZ73nclYJfNANGWPwFDZw==} engines: {node: '>=10'} @@ -23906,11 +23895,6 @@ snapshots: '@jridgewell/set-array@1.2.1': {} - '@jridgewell/source-map@0.3.11': - dependencies: - '@jridgewell/gen-mapping': 0.3.13 - '@jridgewell/trace-mapping': 0.3.31 - '@jridgewell/source-map@0.3.3': dependencies: '@jridgewell/gen-mapping': 0.3.8 @@ -24304,7 +24288,7 @@ snapshots: json-parse-even-better-errors: 3.0.0 normalize-package-data: 5.0.0 proc-log: 3.0.0 - semver: 7.7.2 + semver: 7.7.3 transitivePeerDependencies: - bluebird @@ -24902,7 +24886,7 @@ snapshots: '@types/shimmer': 1.2.0 import-in-the-middle: 1.11.0 require-in-the-middle: 7.1.1(supports-color@10.0.0) - semver: 7.7.2 + semver: 7.7.3 shimmer: 1.2.1 transitivePeerDependencies: - supports-color @@ -24914,7 +24898,7 @@ snapshots: '@types/shimmer': 1.2.0 import-in-the-middle: 1.11.0 require-in-the-middle: 7.1.1(supports-color@10.0.0) - semver: 7.7.2 + semver: 7.7.3 shimmer: 1.2.1 transitivePeerDependencies: - supports-color @@ -31047,7 +31031,7 @@ snapshots: debug: 4.4.1(supports-color@10.0.0) globby: 11.1.0 is-glob: 4.0.3 - semver: 7.7.2 + semver: 7.7.3 tsutils: 3.21.0(typescript@5.9.3) optionalDependencies: typescript: 5.9.3 @@ -31064,7 +31048,7 @@ snapshots: '@typescript-eslint/typescript-estree': 5.59.6(typescript@5.9.3) eslint: 8.31.0 eslint-scope: 5.1.1 - semver: 7.7.2 + semver: 7.7.3 transitivePeerDependencies: - supports-color - typescript @@ -31553,17 +31537,17 @@ snapshots: mime-types: 3.0.0 negotiator: 1.0.0 - acorn-import-assertions@1.9.0(acorn@8.14.1): + acorn-import-assertions@1.9.0(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-import-attributes@1.9.5(acorn@8.12.1): dependencies: acorn: 8.12.1 - acorn-import-attributes@1.9.5(acorn@8.14.1): + acorn-import-attributes@1.9.5(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-import-phases@1.0.4(acorn@8.15.0): dependencies: @@ -31573,9 +31557,9 @@ snapshots: dependencies: acorn: 8.12.1 - acorn-jsx@5.3.2(acorn@8.14.1): + acorn-jsx@5.3.2(acorn@8.15.0): dependencies: - acorn: 8.14.1 + acorn: 8.15.0 acorn-node@1.8.2: dependencies: @@ -32094,7 +32078,7 @@ snapshots: dependencies: buffer: 5.7.1 inherits: 2.0.4 - readable-stream: 3.6.0 + readable-stream: 3.6.2 body-parser@1.20.3: dependencies: @@ -34166,8 +34150,8 @@ snapshots: espree@9.6.0: dependencies: - acorn: 8.14.1 - acorn-jsx: 5.3.2(acorn@8.14.1) + acorn: 8.15.0 + acorn-jsx: 5.3.2(acorn@8.15.0) eslint-visitor-keys: 3.4.2 esprima@4.0.1: {} @@ -34498,7 +34482,7 @@ snapshots: process-warning: 5.0.0 rfdc: 1.4.1 secure-json-parse: 4.0.0 - semver: 7.7.2 + semver: 7.7.3 toad-cache: 3.7.0 fastq@1.15.0: @@ -34991,7 +34975,7 @@ snapshots: chalk: 4.1.2 debug: 4.4.1(supports-color@10.0.0) interpret: 3.1.1 - semver: 7.7.2 + semver: 7.7.3 tslib: 2.8.1 yargs: 17.7.2 transitivePeerDependencies: @@ -35337,8 +35321,8 @@ snapshots: import-in-the-middle@1.14.2: dependencies: - acorn: 8.14.1 - acorn-import-attributes: 1.9.5(acorn@8.14.1) + acorn: 8.15.0 + acorn-import-attributes: 1.9.5(acorn@8.15.0) cjs-module-lexer: 1.2.3 module-details-from-path: 1.0.3 @@ -36714,8 +36698,8 @@ snapshots: micromark-extension-mdxjs@1.0.0: dependencies: - acorn: 8.14.1 - acorn-jsx: 5.3.2(acorn@8.14.1) + acorn: 8.15.0 + acorn-jsx: 5.3.2(acorn@8.15.0) micromark-extension-mdx-expression: 1.0.3 micromark-extension-mdx-jsx: 1.0.3 micromark-extension-mdx-md: 1.0.0 @@ -38405,7 +38389,7 @@ snapshots: mkdirp-classic: 0.5.3 napi-build-utils: 2.0.0 node-abi: 3.75.0 - pump: 3.0.0 + pump: 3.0.2 rc: 1.2.8 simple-get: 4.0.1 tar-fs: 2.1.3 @@ -38601,11 +38585,6 @@ snapshots: end-of-stream: 1.4.4 once: 1.4.0 - pump@3.0.0: - dependencies: - end-of-stream: 1.4.4 - once: 1.4.0 - pump@3.0.2: dependencies: end-of-stream: 1.4.4 @@ -40724,7 +40703,7 @@ snapshots: end-of-stream: 1.4.4 fs-constants: 1.0.0 inherits: 2.0.4 - readable-stream: 3.6.0 + readable-stream: 3.6.2 tar-stream@3.1.7: dependencies: @@ -40785,22 +40764,15 @@ snapshots: jest-worker: 27.5.1 schema-utils: 3.3.0 serialize-javascript: 6.0.1 - terser: 5.17.1 + terser: 5.44.1 webpack: 5.88.2(@swc/core@1.3.101(@swc/helpers@0.5.15))(esbuild@0.19.11) optionalDependencies: '@swc/core': 1.3.101(@swc/helpers@0.5.15) esbuild: 0.19.11 - terser@5.17.1: - dependencies: - '@jridgewell/source-map': 0.3.3 - acorn: 8.14.1 - commander: 2.20.3 - source-map-support: 0.5.21 - terser@5.44.1: dependencies: - '@jridgewell/source-map': 0.3.11 + '@jridgewell/source-map': 0.3.3 acorn: 8.15.0 commander: 2.20.3 source-map-support: 0.5.21 @@ -41794,7 +41766,7 @@ snapshots: webpack-bundle-analyzer@4.10.1(bufferutil@4.0.9): dependencies: '@discoveryjs/json-ext': 0.5.7 - acorn: 8.14.1 + acorn: 8.15.0 acorn-walk: 8.3.2 commander: 7.2.0 debounce: 1.2.1 @@ -41853,8 +41825,8 @@ snapshots: '@webassemblyjs/ast': 1.11.5 '@webassemblyjs/wasm-edit': 1.11.5 '@webassemblyjs/wasm-parser': 1.11.5 - acorn: 8.14.1 - acorn-import-assertions: 1.9.0(acorn@8.14.1) + acorn: 8.15.0 + acorn-import-assertions: 1.9.0(acorn@8.15.0) browserslist: 4.24.4 chrome-trace-event: 1.0.3 enhanced-resolve: 5.18.3