diff --git a/.server-changes/otel-attribute-utf16-sanitization.md b/.server-changes/otel-attribute-utf16-sanitization.md new file mode 100644 index 00000000000..941c7185bc8 --- /dev/null +++ b/.server-changes/otel-attribute-utf16-sanitization.md @@ -0,0 +1,23 @@ +--- +area: webapp +type: fix +--- + +Recover from ClickHouse `JSONEachRow` parse failures caused by lone +UTF-16 surrogates in OTel attribute strings (`Cannot parse JSON object +here ... ParallelParsingBlockInputFormat`). + +`ClickhouseEventRepository.#flushBatch` and `#flushLlmMetricsBatch` now +retry once after sanitizing every row in the batch: any string value +containing a lone surrogate is replaced with `"[invalid-utf16]"`. If +the sanitizer touched no fields (the parse error isn't a surrogate +issue) or the retry still fails, the batch is dropped without further +ClickHouse round-trips, `permanentlyDroppedBatches` increments, and an +error log with a 1KB sample row is emitted. Non-parse errors propagate +unchanged. + +Detection reuses `detectBadJsonStrings` via `JSON.stringify(value)`, +with a latent regex bug fixed: the low-surrogate hex nibble matched +`[cd]` instead of `[c-f]`, missing the U+DE00–U+DFFF half of the range +and false-flagging common emoji pairs. Healthy batches pay zero scan +cost — the check only runs when ClickHouse has already rejected. diff --git a/apps/webapp/app/utils/detectBadJsonStrings.ts b/apps/webapp/app/utils/detectBadJsonStrings.ts index 4a000b54293..99ffec4b9b0 100644 --- a/apps/webapp/app/utils/detectBadJsonStrings.ts +++ b/apps/webapp/app/utils/detectBadJsonStrings.ts @@ -1,3 +1,17 @@ +/** + * Detects unpaired UTF-16 surrogate escape sequences in JSON-encoded text. + * + * Returns true if the input contains a `\uD8XX`/`\uD9XX`/`\uDAXX`/`\uDBXX` + * high-surrogate escape not immediately followed by a `\uDC..`–`\uDF..` low + * surrogate, or a `\uDC..`–`\uDF..` low surrogate not immediately preceded by + * a high surrogate. Strict JSON parsers (e.g. ClickHouse `JSONEachRow`) + * reject input containing such sequences. + * + * Surrogate hex ranges (case-insensitive — inputs from `JSON.stringify` are + * lowercase): + * - High surrogate (U+D800–U+DBFF): `\uD[8-B][0-9A-F][0-9A-F]` + * - Low surrogate (U+DC00–U+DFFF): `\uD[C-F][0-9A-F][0-9A-F]` + */ export function detectBadJsonStrings(jsonString: string): boolean { // Fast path: skip everything if no \u let idx = jsonString.indexOf("\\u"); @@ -13,7 +27,7 @@ export function detectBadJsonStrings(jsonString: string): boolean { if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") { const third = jsonString[idx + 3]; - // High surrogate check + // High surrogate check — third nibble is 8, 9, a, or b (U+D800–U+DBFF) if ( /[89ab]/.test(third) && /[0-9a-f]/.test(jsonString[idx + 4]) && @@ -28,7 +42,7 @@ export function detectBadJsonStrings(jsonString: string): boolean { jsonString[idx + 6] !== "\\" || jsonString[idx + 7] !== "u" || jsonString[idx + 8] !== "d" || - !/[cd]/.test(jsonString[idx + 9]) || + !/[c-f]/.test(jsonString[idx + 9]) || !/[0-9a-f]/.test(jsonString[idx + 10]) || !/[0-9a-f]/.test(jsonString[idx + 11]) ) { @@ -36,9 +50,9 @@ export function detectBadJsonStrings(jsonString: string): boolean { } } - // Low surrogate check + // Low surrogate check — third nibble is c, d, e, or f (U+DC00–U+DFFF) if ( - (third === "c" || third === "d") && + /[c-f]/.test(third) && /[0-9a-f]/.test(jsonString[idx + 4]) && /[0-9a-f]/.test(jsonString[idx + 5]) ) { diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 27b96ed7d37..67efea6847c 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -45,6 +45,11 @@ import { removePrivateProperties, isEmptyObject, } from "./common.server"; +import { + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, +} from "./sanitizeRowsOnParseError.server"; import type { CompleteableTaskRun, CreateEventInput, @@ -104,6 +109,13 @@ export class ClickhouseEventRepository implements IEventRepository { private readonly _llmMetricsFlushScheduler: DynamicFlushScheduler; private _tracer: Tracer; private _version: "v1" | "v2"; + /** + * Counts batches that hit a ClickHouse JSON parse failure that survived + * one sanitize-retry. These batches are dropped on the floor (the scheduler + * is told the flush "succeeded" so its queue counter doesn't leak), and we + * track the drop count for observability. + */ + private _permanentlyDroppedBatches = 0; constructor(config: ClickhouseEventRepositoryConfig) { this._clickhouse = config.clickhouse; @@ -147,6 +159,11 @@ export class ClickhouseEventRepository implements IEventRepository { return this._config.maximumLiveReloadingSetting ?? 1000; } + /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */ + get permanentlyDroppedBatches() { + return this._permanentlyDroppedBatches; + } + /** * Clamps a start time (in nanoseconds) to now if it's too far in the past. * Returns the clamped value as a bigint. @@ -215,19 +232,32 @@ export class ClickhouseEventRepository implements IEventRepository { ? this._clickhouse.taskEventsV2.insert : this._clickhouse.taskEvents.insert; - const [insertError, insertResult] = await insertFn(events, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); + const doInsert = async () => { + const [insertError, insertResult] = await insertFn(events, { + params: { + clickhouse_settings: this.#getClickhouseInsertSettings(), + }, + }); + if (insertError) throw insertError; + return insertResult; + }; + + const outcome = await this.#insertWithJsonParseRecovery( + flushId, + events, + doInsert, + `task_events_${this._version}` + ); - if (insertError) { - throw insertError; + if (outcome.kind === "dropped") { + // Loud log already emitted; nothing landed in ClickHouse — don't publish to Redis. + return; } logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", { events: events.length, - insertResult, + insertResult: outcome.insertResult, + sanitized: outcome.kind === "sanitized", version: this._version, }); @@ -236,22 +266,134 @@ export class ClickhouseEventRepository implements IEventRepository { } async #flushLlmMetricsBatch(flushId: string, rows: LlmMetricsV1Input[]) { + const doInsert = async () => { + const [insertError, insertResult] = await this._clickhouse.llmMetrics.insert(rows, { + params: { + clickhouse_settings: this.#getClickhouseInsertSettings(), + }, + }); + if (insertError) throw insertError; + return insertResult; + }; - const [insertError] = await this._clickhouse.llmMetrics.insert(rows, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); + const outcome = await this.#insertWithJsonParseRecovery( + flushId, + rows, + doInsert, + "llm_metrics_v1" + ); - if (insertError) { - throw insertError; + if (outcome.kind === "dropped") { + return; } logger.info("ClickhouseEventRepository.flushLlmMetricsBatch Inserted LLM metrics batch", { rows: rows.length, + sanitized: outcome.kind === "sanitized", }); } + /** + * Wraps a ClickHouse insert callable with reactive UTF-16 sanitization. + * + * On a `Cannot parse JSON object` failure: + * 1. Sanitize the batch from `max(0, parsedRowN - 1)` onwards (rows + * before the failing one parsed fine — known good). + * 2. Retry the insert once with the sanitized batch. + * 3. If the retry still fails with the same error class, log loudly, + * increment `permanentlyDroppedBatches`, and return without + * throwing — the scheduler's transient-retry path would just repeat + * the same deterministic failure. + * + * Non-parse errors propagate unchanged so the scheduler's existing + * backoff/retry behaviour still handles transient network or CH issues. + */ + async #insertWithJsonParseRecovery( + flushId: string, + rows: T[], + doInsert: () => Promise, + contextLabel: string + ): Promise< + | { kind: "inserted"; insertResult: unknown } + | { kind: "sanitized"; insertResult: unknown } + | { kind: "dropped" } + > { + try { + return { kind: "inserted", insertResult: await doInsert() }; + } catch (firstError) { + if (!isClickHouseJsonParseError(firstError)) throw firstError; + + const firstMessage = + typeof firstError === "object" && firstError !== null && "message" in firstError + ? String((firstError as { message?: unknown }).message ?? "") + : String(firstError); + + // Sanitize the whole batch. ClickHouse's `at row N` index is logged + // for observability but not used to slice — its semantics under + // parallel parsing are not stable enough to safely skip rows. + const rowHint = parseRowNumberFromError(firstMessage); + const { rowsTouched, fieldsSanitized } = sanitizeRows(rows); + + // Sanitizer found nothing to fix → retrying the exact same batch is + // guaranteed to hit the same deterministic parse failure. Skip the + // wasted ClickHouse round-trip and drop loudly. Throwing instead would + // hand the failure back to the scheduler's 3× transient-retry loop — + // exactly the retry storm this wrapper is designed to avoid. + if (fieldsSanitized === 0) { + this._permanentlyDroppedBatches += 1; + logger.error( + "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix", + { + flushId, + contextLabel, + batchSize: rows.length, + clickhouseRowHint: rowHint, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + clickhouseError: firstMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; + } + + logger.warn("Sanitizing batch after ClickHouse JSON parse error", { + flushId, + contextLabel, + batchSize: rows.length, + clickhouseRowHint: rowHint, + rowsTouched, + fieldsSanitized, + clickhouseError: firstMessage.split("\n")[0], + }); + + try { + return { kind: "sanitized", insertResult: await doInsert() }; + } catch (retryError) { + if (!isClickHouseJsonParseError(retryError)) throw retryError; + + this._permanentlyDroppedBatches += 1; + const retryMessage = + typeof retryError === "object" && retryError !== null && "message" in retryError + ? String((retryError as { message?: unknown }).message ?? "") + : String(retryError); + logger.error( + "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error", + { + flushId, + contextLabel, + batchSize: rows.length, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + firstError: firstMessage.split("\n")[0], + retryError: retryMessage.split("\n")[0], + } + ); + + return { kind: "dropped" }; + } + } + } + #createLlmMetricsInput(event: CreateEventInput): LlmMetricsV1Input { const llmMetrics = event._llmMetrics!; diff --git a/apps/webapp/app/v3/eventRepository/sanitizeRowsOnParseError.server.ts b/apps/webapp/app/v3/eventRepository/sanitizeRowsOnParseError.server.ts new file mode 100644 index 00000000000..f04f9c02023 --- /dev/null +++ b/apps/webapp/app/v3/eventRepository/sanitizeRowsOnParseError.server.ts @@ -0,0 +1,118 @@ +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; + +/** + * Replacement string we substitute for any attribute value that contains + * a lone UTF-16 surrogate. JSON-safe, distinctly recognisable in logs and + * the dashboard so operators can spot affected rows. + */ +export const INVALID_UTF16_SENTINEL = "[invalid-utf16]"; + +export type SanitizeResult = { + /** How many rows had at least one string field replaced. */ + rowsTouched: number; + /** Total count of string fields replaced across all sanitized rows. */ + fieldsSanitized: number; +}; + +/** + * Recognises ClickHouse's "Cannot parse JSON object" rejection — the + * deterministic-failure class our sanitizer is designed for. Bubbles up + * from `@clickhouse/client` as an `InsertError` whose `.message` retains + * the original ClickHouse error text. + */ +export function isClickHouseJsonParseError(err: unknown): boolean { + if (!err) return false; + const message = + typeof err === "object" && err !== null && "message" in err + ? String((err as { message?: unknown }).message ?? "") + : String(err); + return message.includes("Cannot parse JSON object"); +} + +/** + * Extracts the row index ClickHouse reported as the first to fail + * (`(at row N)`). Returns `null` if the message doesn't include one — + * caller should treat that as "sanitize from row 0". + */ +export function parseRowNumberFromError(errorMessage: string): number | null { + const match = errorMessage.match(/at row (\d+)/); + return match ? Number.parseInt(match[1], 10) : null; +} + +/** + * Walks `value` recursively and replaces any string leaf that contains a + * lone UTF-16 surrogate with `INVALID_UTF16_SENTINEL`. Mutates objects + * and arrays in place; primitives are returned unchanged. + * + * Caller passes anything: a row object, a single field, an unknown JSON + * payload. The walker doesn't depend on the row's schema — it sanitizes + * every string in the structure, which is exactly what ClickHouse cares + * about when parsing the row's JSON form. + */ +export function sanitizeUnknownInPlace(value: unknown): { value: unknown; fixed: number } { + if (typeof value === "string") { + // `detectBadJsonStrings` works on JSON-escaped text — feed it the + // serialized form so any lone UTF-16 surrogate in the JS string is + // emitted as a `\uXXXX` escape it can spot. Valid surrogate pairs + // (e.g. emoji) are emitted as raw characters by JSON.stringify and + // exit at the function's fast path. + if (detectBadJsonStrings(JSON.stringify(value))) { + return { value: INVALID_UTF16_SENTINEL, fixed: 1 }; + } + return { value, fixed: 0 }; + } + + if (Array.isArray(value)) { + let fixed = 0; + for (let i = 0; i < value.length; i++) { + const result = sanitizeUnknownInPlace(value[i]); + value[i] = result.value; + fixed += result.fixed; + } + return { value, fixed }; + } + + if (value !== null && typeof value === "object") { + let fixed = 0; + const obj = value as Record; + for (const k of Object.keys(obj)) { + const result = sanitizeUnknownInPlace(obj[k]); + obj[k] = result.value; + fixed += result.fixed; + } + return { value, fixed }; + } + + return { value, fixed: 0 }; +} + +/** + * Sanitizes every row in `rows`, mutating each in place so callers can + * hand the same array to the retry insert. + * + * Rationale for scanning the whole batch (instead of starting from the + * row index ClickHouse reports): `at row N` semantics under + * `input_format_parallel_parsing` aren't well-defined — N can be + * chunk-relative rather than batch-global, and 0-vs-1 indexing differs + * between formats. Whole-batch scanning is robust to those quirks and + * also catches multiple bad rows in one pass (so a single retry covers + * the entire failure even if more than one row is poisoned). + * + * The cost is bounded: this only runs on the rare ClickHouse-rejection + * path, and `detectBadJsonStrings` exits in O(1) for clean strings + * (the fast `indexOf("\\u")` check), so healthy attributes are effectively + * free even when included in the walk. + */ +export function sanitizeRows(rows: T[]): SanitizeResult { + const result: SanitizeResult = { rowsTouched: 0, fieldsSanitized: 0 }; + + for (let i = 0; i < rows.length; i++) { + const { fixed } = sanitizeUnknownInPlace(rows[i]); + if (fixed > 0) { + result.rowsTouched++; + result.fieldsSanitized += fixed; + } + } + + return result; +} diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index 7505693e3ab..22dba93f22e 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -39,7 +39,6 @@ import { startSpan } from "./tracing.server"; import { enrichCreatableEvents } from "./utils/enrichCreatableEvents.server"; import { waitForLlmPricingReady } from "./llmPricingRegistry.server"; import { env } from "~/env.server"; -import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { singleton } from "~/utils/singleton"; class OTLPExporter { diff --git a/apps/webapp/test/detectbadJsonStrings.test.ts b/apps/webapp/test/detectbadJsonStrings.test.ts index 7d14bf4aee8..f3d10037c47 100644 --- a/apps/webapp/test/detectbadJsonStrings.test.ts +++ b/apps/webapp/test/detectbadJsonStrings.test.ts @@ -180,6 +180,72 @@ describe("detectBadJsonStrings", () => { // The difference should be reasonable (not more than 5x) expect(noUnicodeTime / withUnicodeTime).toBeLessThan(5); }); + + describe("full UTF-16 low-surrogate range coverage (U+DC00–U+DFFF)", () => { + // Regression guard: a previous version of this scanner used `[cd]` to + // match the low-surrogate nibble, missing the entire U+DE00–U+DFFF + // half of the range. Valid surrogate pairs with low surrogates in that + // upper half (which includes most common emoji) were falsely flagged, + // and lone surrogates in the upper half were falsely passed. + + it("does NOT flag a valid pair with low surrogate in the c range (U+DC00–U+DCFF)", () => { + // 🐍 SNAKE = U+1F40D = 🐍 + expect(detectBadJsonStrings(`{"s":"\\ud83d\\udc0d"}`)).toBe(false); + }); + + it("does NOT flag a valid pair with low surrogate in the d range (U+DD00–U+DDFF)", () => { + // U+1F540 = 🕀 + expect(detectBadJsonStrings(`{"s":"\\ud83d\\udd40"}`)).toBe(false); + }); + + it("does NOT flag a valid pair with low surrogate in the e range (U+DE00–U+DEFF)", () => { + // 😀 GRINNING FACE = U+1F600 = 😀 — previously false-flagged + expect(detectBadJsonStrings(`{"s":"\\ud83d\\ude00"}`)).toBe(false); + }); + + it("does NOT flag a valid pair with low surrogate in the f range (U+DF00–U+DFFF)", () => { + // U+1F700 = 🜀 — previously false-flagged + expect(detectBadJsonStrings(`{"s":"\\ud83d\\udf00"}`)).toBe(false); + }); + + it("flags a lone low surrogate in the e range (\\uDE00)", () => { + // Previously this was NOT flagged because the forward scan only + // recognised low surrogates with third nibble === "c" || "d". + expect(detectBadJsonStrings(`{"s":"prefix \\ude00 suffix"}`)).toBe(true); + }); + + it("flags a lone low surrogate in the f range (\\uDFFF)", () => { + expect(detectBadJsonStrings(`{"s":"prefix \\udfff suffix"}`)).toBe(true); + }); + + it("flags a high surrogate followed by something that looks like a low surrogate but is in the e range with a missing prefix", () => { + // The previous high-surrogate-then-pair check used `[cd]` for the + // matching low surrogate nibble, so any high surrogate followed by + // \uDe.. would be falsely flagged as unpaired. Verify the fix works + // for the valid case AND still flags genuinely broken inputs. + expect(detectBadJsonStrings(`{"s":"\\ud800X"}`)).toBe(true); // truly broken + expect(detectBadJsonStrings(`{"s":"\\ud83d\\ude00"}`)).toBe(false); // valid, but used to flag + }); + }); + + describe("integration with JSON.stringify", () => { + it("does NOT flag JSON.stringify of a valid emoji 😀", () => { + // V8 emits the raw character for valid surrogate pairs, so the + // fast-path returns false without exercising the regex. + expect(detectBadJsonStrings(JSON.stringify("😀"))).toBe(false); + }); + + it("flags JSON.stringify of a lone high surrogate", () => { + expect(detectBadJsonStrings(JSON.stringify("\uD800"))).toBe(true); + }); + + it("flags JSON.stringify of a lone low surrogate in each of c/d/e/f ranges", () => { + expect(detectBadJsonStrings(JSON.stringify("\uDC00"))).toBe(true); + expect(detectBadJsonStrings(JSON.stringify("\uDD00"))).toBe(true); + expect(detectBadJsonStrings(JSON.stringify("\uDE00"))).toBe(true); + expect(detectBadJsonStrings(JSON.stringify("\uDFFF"))).toBe(true); + }); + }); }); function processPacket(data: string): { data?: string; dataType?: string } { diff --git a/apps/webapp/test/otlpUtf16Sanitization.integration.test.ts b/apps/webapp/test/otlpUtf16Sanitization.integration.test.ts new file mode 100644 index 00000000000..7897ecf73fd --- /dev/null +++ b/apps/webapp/test/otlpUtf16Sanitization.integration.test.ts @@ -0,0 +1,197 @@ +import { clickhouseTest } from "@internal/testcontainers"; +import { describe, expect } from "vitest"; +import { + INVALID_UTF16_SENTINEL, + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, +} from "~/v3/eventRepository/sanitizeRowsOnParseError.server"; + +/** + * Integration test that proves the reactive sanitize-and-retry flow works + * against a real ClickHouse instance. Boots a CH container (via testcontainers) + * and reproduces the prod failure path end-to-end. + * + * Three contracts are verified: + * + * 1. **Happy retry path** — insert a row with a lone UTF-16 surrogate, observe + * the parse error, recover via `parseRowNumberFromError` + + * `sanitizeRowsFrom`, retry once, and confirm the row lands with the + * sentinel substituted. + * + * 2. **Real CH error shape** — confirm `isClickHouseJsonParseError` correctly + * recognises the error string we get back from a real CH (not just synthetic + * test fixtures) and that `parseRowNumberFromError` extracts the right + * integer from the same string. + * + * 3. **Non-parse errors don't get swallowed** — push a row past the CH per-row + * size cap and confirm the resulting `Size of JSON object ... is extremely + * large` error is NOT misclassified as a JSON parse error by our predicate. + */ + +const HIGH_SURROGATE = "\uD800"; +const LOW_SURROGATE = "\uDC00"; + +// ClickHouse container boot + image pull on first run can take well past +// vitest's 5 s default. Match what `internal-packages/clickhouse/vitest.config.ts` +// uses for its own clickhouseTest specs. +const INTEGRATION_TIMEOUT_MS = 60_000; + +describe("OTel attribute UTF-16 sanitization → ClickHouse insert", () => { + clickhouseTest( + "lone surrogate is rejected by CH, then sanitized and retried successfully", + async ({ clickhouseClient }) => { + const table = "trigger_dev_test.utf16_repro"; + + await clickhouseClient.command({ + query: "CREATE DATABASE IF NOT EXISTS trigger_dev_test", + }); + await clickhouseClient.command({ query: `DROP TABLE IF EXISTS ${table}` }); + await clickhouseClient.command({ + query: ` + CREATE TABLE ${table} ( + id String, + attributes JSON + ) ENGINE = MergeTree() ORDER BY id + SETTINGS allow_experimental_json_type = 1 + `, + }); + + const rows = [ + { + id: "row-clean-prefix", + attributes: { ai: { prompt: { messages: "valid prompt 1" } } }, + }, + { + id: "row-poisoned", + attributes: { + ai: { prompt: { messages: `valid prefix ${HIGH_SURROGATE} broken tail` } }, + }, + }, + { + id: "row-clean-suffix", + attributes: { ai: { prompt: { messages: "valid prompt 3" } } }, + }, + ]; + + // --- Contract 1: real CH rejects the raw insert with our recognisable error --- + const firstError = await clickhouseClient + .insert({ + table, + values: rows, + format: "JSONEachRow", + clickhouse_settings: { async_insert: 0, input_format_parallel_parsing: 1 }, + }) + .then( + () => null, + (e: unknown) => e as Error + ); + + expect(firstError, "first insert must be rejected").not.toBeNull(); + expect( + isClickHouseJsonParseError(firstError), + "our predicate must recognise the real CH parse error" + ).toBe(true); + const rowN = parseRowNumberFromError(firstError!.message); + expect(rowN, "real CH error must include `at row N`").not.toBeNull(); + expect(rowN! >= 0).toBe(true); + + // --- Recovery: sanitize the whole batch, retry --- + // We don't slice on `rowN` even though we logged it — `at row N` + // semantics under parallel parsing aren't stable enough to skip rows. + const { rowsTouched, fieldsSanitized } = sanitizeRows(rows); + expect(fieldsSanitized, "exactly one field should have been replaced").toBe(1); + expect(rowsTouched).toBe(1); + + // Confirm the targeted row was sanitized and the clean ones were left alone. + expect(rows[1].attributes.ai.prompt.messages).toBe(INVALID_UTF16_SENTINEL); + expect(rows[0].attributes.ai.prompt.messages).toBe("valid prompt 1"); + expect(rows[2].attributes.ai.prompt.messages).toBe("valid prompt 3"); + + // --- Contract 1 (cont'd): retry now lands cleanly --- + await clickhouseClient.insert({ + table, + values: rows, + format: "JSONEachRow", + clickhouse_settings: { async_insert: 0, input_format_parallel_parsing: 1 }, + }); + + const result = await clickhouseClient + .query({ + query: ` + SELECT id, toJSONString(attributes) AS attributes_text + FROM ${table} + ORDER BY id + `, + format: "JSONEachRow", + }) + .then((r) => r.json<{ id: string; attributes_text: string }>()); + + expect(result).toHaveLength(3); + const byId = Object.fromEntries(result.map((r) => [r.id, r])); + expect(byId["row-clean-prefix"].attributes_text).toContain("valid prompt 1"); + expect(byId["row-clean-suffix"].attributes_text).toContain("valid prompt 3"); + expect(byId["row-poisoned"].attributes_text).toContain(INVALID_UTF16_SENTINEL); + + // --- Contract 2: lone LOW surrogate also recognised + recoverable --- + const lowSurrogateRow = { + id: "row-low-surrogate", + attributes: { + ai: { prompt: { messages: `valid prefix ${LOW_SURROGATE} broken tail` } }, + }, + }; + const lowSurrogateError = await clickhouseClient + .insert({ + table, + values: [lowSurrogateRow], + format: "JSONEachRow", + clickhouse_settings: { async_insert: 0, input_format_parallel_parsing: 1 }, + }) + .then( + () => null, + (e: unknown) => e as Error + ); + expect(lowSurrogateError).not.toBeNull(); + expect(isClickHouseJsonParseError(lowSurrogateError)).toBe(true); + + sanitizeRows([lowSurrogateRow]); + expect(lowSurrogateRow.attributes.ai.prompt.messages).toBe(INVALID_UTF16_SENTINEL); + + await clickhouseClient.insert({ + table, + values: [lowSurrogateRow], + format: "JSONEachRow", + clickhouse_settings: { async_insert: 0, input_format_parallel_parsing: 1 }, + }); + }, + INTEGRATION_TIMEOUT_MS + ); + + clickhouseTest( + "non-parse-error rejections (e.g. missing table) are NOT misclassified as JSON parse errors", + async ({ clickhouseClient }) => { + // Pick an error class that is unambiguously NOT a JSON parse failure — + // inserting into a table that doesn't exist. CH returns + // `Table doesn't exist` (UNKNOWN_TABLE). If our predicate ever started + // matching it we'd wastefully sanitize-and-retry an unrelated failure. + const error = await clickhouseClient + .insert({ + table: "trigger_dev_test_nonexistent.utf16_does_not_exist", + values: [{ id: "1", attributes: { ok: "yes" } }], + format: "JSONEachRow", + clickhouse_settings: { async_insert: 0 }, + }) + .then( + () => null, + (e: unknown) => e as Error + ); + + expect(error, "missing-table insert should be rejected").not.toBeNull(); + expect( + isClickHouseJsonParseError(error), + "non-parse error must not be misclassified as JSON parse error" + ).toBe(false); + }, + INTEGRATION_TIMEOUT_MS + ); +}); diff --git a/apps/webapp/test/sanitizeRowsOnParseError.test.ts b/apps/webapp/test/sanitizeRowsOnParseError.test.ts new file mode 100644 index 00000000000..fafa6ca4790 --- /dev/null +++ b/apps/webapp/test/sanitizeRowsOnParseError.test.ts @@ -0,0 +1,161 @@ +import { describe, it, expect } from "vitest"; +import { + INVALID_UTF16_SENTINEL, + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, + sanitizeUnknownInPlace, +} from "~/v3/eventRepository/sanitizeRowsOnParseError.server"; + +const HIGH_SURROGATE = "\uD800"; +const LOW_SURROGATE = "\uDC00"; + +describe("isClickHouseJsonParseError", () => { + it("recognises ClickHouse's parse-error string", () => { + const err = new Error( + "Cannot parse JSON object here: {...}: (while reading the value of key attributes): (at row 15)\n: While executing ParallelParsingBlockInputFormat. " + ); + expect(isClickHouseJsonParseError(err)).toBe(true); + }); + + it("returns false for unrelated errors", () => { + expect(isClickHouseJsonParseError(new Error("Connection refused"))).toBe(false); + expect( + isClickHouseJsonParseError( + new Error("Size of JSON object at position 999 is extremely large.") + ) + ).toBe(false); + }); + + it("returns false for null / undefined / strings", () => { + expect(isClickHouseJsonParseError(null)).toBe(false); + expect(isClickHouseJsonParseError(undefined)).toBe(false); + expect(isClickHouseJsonParseError("Cannot parse JSON object")).toBe(true); + }); +}); + +describe("parseRowNumberFromError", () => { + it("extracts the row index from a typical ClickHouse error message", () => { + expect( + parseRowNumberFromError( + "Cannot parse JSON object here: { ... }: (while reading the value of key attributes): (at row 1942)\n: While executing ParallelParsingBlockInputFormat." + ) + ).toBe(1942); + }); + + it("returns null when no row index is present", () => { + expect(parseRowNumberFromError("Some other error without a row hint")).toBeNull(); + }); + + it("returns the first match when multiple `at row N` substrings exist", () => { + expect(parseRowNumberFromError("at row 1, oops also at row 2")).toBe(1); + }); +}); + +describe("sanitizeUnknownInPlace", () => { + it("returns the string unchanged when it has no surrogates", () => { + const result = sanitizeUnknownInPlace("hello world"); + expect(result).toEqual({ value: "hello world", fixed: 0 }); + }); + + it("replaces a lone-surrogate string with the sentinel", () => { + const result = sanitizeUnknownInPlace(`prefix ${HIGH_SURROGATE} suffix`); + expect(result.value).toBe(INVALID_UTF16_SENTINEL); + expect(result.fixed).toBe(1); + }); + + it("leaves valid surrogate pairs (emoji) intact", () => { + const result = sanitizeUnknownInPlace("hello 😀 world"); + expect(result.value).toBe("hello 😀 world"); + expect(result.fixed).toBe(0); + }); + + it("walks nested objects and mutates string leaves in place", () => { + const row = { + id: "row-1", + attributes: { + ai: { + prompt: { messages: `bad ${HIGH_SURROGATE} string` }, + usage: { input_tokens: 42 }, + }, + clean: "untouched", + }, + }; + const result = sanitizeUnknownInPlace(row); + expect(result.fixed).toBe(1); + expect((row.attributes.ai.prompt as any).messages).toBe(INVALID_UTF16_SENTINEL); + expect(row.attributes.clean).toBe("untouched"); + expect((row.attributes.ai.usage as any).input_tokens).toBe(42); + expect(row.id).toBe("row-1"); + }); + + it("walks arrays recursively", () => { + const value = ["ok", `bad ${LOW_SURROGATE} value`, "also ok", { nested: `also bad ${HIGH_SURROGATE}` }]; + const result = sanitizeUnknownInPlace(value); + expect(result.fixed).toBe(2); + expect(value[1]).toBe(INVALID_UTF16_SENTINEL); + expect((value[3] as any).nested).toBe(INVALID_UTF16_SENTINEL); + expect(value[0]).toBe("ok"); + expect(value[2]).toBe("also ok"); + }); + + it("leaves non-string primitives untouched", () => { + expect(sanitizeUnknownInPlace(42)).toEqual({ value: 42, fixed: 0 }); + expect(sanitizeUnknownInPlace(true)).toEqual({ value: true, fixed: 0 }); + expect(sanitizeUnknownInPlace(null)).toEqual({ value: null, fixed: 0 }); + expect(sanitizeUnknownInPlace(undefined)).toEqual({ value: undefined, fixed: 0 }); + }); +}); + +describe("sanitizeRows", () => { + function makeRow(suffix: string, badField?: string) { + return { + id: `row-${suffix}`, + attributes: { foo: badField ?? "clean" }, + }; + } + + it("sanitizes every row that has bad strings", () => { + const rows = [ + makeRow("0", `bad-0-${HIGH_SURROGATE}`), + makeRow("1", `bad-1-${HIGH_SURROGATE}`), + makeRow("2", "clean"), + makeRow("3", `bad-3-${HIGH_SURROGATE}`), + ]; + + const result = sanitizeRows(rows); + + expect(rows[0].attributes.foo).toBe(INVALID_UTF16_SENTINEL); + expect(rows[1].attributes.foo).toBe(INVALID_UTF16_SENTINEL); + expect(rows[2].attributes.foo).toBe("clean"); + expect(rows[3].attributes.foo).toBe(INVALID_UTF16_SENTINEL); + expect(result.rowsTouched).toBe(3); + expect(result.fieldsSanitized).toBe(3); + }); + + it("returns zero counts when no row has bad strings", () => { + const rows = [makeRow("0"), makeRow("1"), makeRow("2")]; + const result = sanitizeRows(rows); + expect(result).toEqual({ rowsTouched: 0, fieldsSanitized: 0 }); + }); + + it("returns zero counts for an empty batch", () => { + expect(sanitizeRows([])).toEqual({ rowsTouched: 0, fieldsSanitized: 0 }); + }); + + it("counts multiple sanitized fields on the same row as one rowTouched but multiple fields", () => { + const rows = [ + { + id: "r0", + attributes: { + a: `bad ${HIGH_SURROGATE}`, + b: `also bad ${LOW_SURROGATE}`, + c: "fine", + }, + }, + ]; + const result = sanitizeRows(rows); + expect(result.rowsTouched).toBe(1); + expect(result.fieldsSanitized).toBe(2); + }); +});