Skip to content

Commit fa0034d

Browse files
committed
fix(webapp): sanitize OTel attributes on ClickHouse JSON parse rejection
ClickHouse's JSONEachRow parser rejects rows containing unpaired UTF-16 surrogates (`Cannot parse JSON object here ... ParallelParsingBlock InputFormat`), losing the whole 5–10k-row batch through the scheduler's retry path. Locally reproduced with ~10 KB rows; the 100 MB size-stress error is distinct (`Size of JSON object is extremely large`), so the root cause is content quality, not size. `ClickhouseEventRepository.#flushBatch` and `#flushLlmMetricsBatch` now retry once after sanitizing every row in the batch — any string with a lone surrogate is replaced with `"[invalid-utf16]"`. ClickHouse's `at row N` hint is logged for observability but not used to slice; its semantics under `input_format_parallel_parsing` aren't reliable, and a whole-batch scan catches multi-row poisoning in one pass. If the retry also fails: loud error log with sample row, `permanentlyDroppedBatches` increments, return normally — deterministic parse failures don't benefit from the scheduler's transient-retry backoff. Non-parse errors propagate unchanged. Detection reuses `detectBadJsonStrings` via `JSON.stringify(value)`, with a latent regex bug fixed: the low-surrogate nibble matched `[cd]` instead of `[c-f]`, missing U+DE00–U+DFFF and false-flagging common emoji pairs (e.g. 😀). Healthy batches pay zero scan cost — the check only runs when ClickHouse has already rejected.
1 parent 906d5fa commit fa0034d

8 files changed

Lines changed: 716 additions & 20 deletions
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Recover from ClickHouse `JSONEachRow` parse failures caused by lone
7+
UTF-16 surrogates in OTel attribute strings (`Cannot parse JSON object
8+
here ... ParallelParsingBlockInputFormat`).
9+
10+
`ClickhouseEventRepository.#flushBatch` and `#flushLlmMetricsBatch` now
11+
retry once after sanitizing every row in the batch: any string value
12+
containing a lone surrogate is replaced with `"[invalid-utf16]"`.
13+
If the retry still fails, the batch is dropped,
14+
`permanentlyDroppedBatches` increments, and an error log with a 1KB
15+
sample row is emitted. Non-parse errors propagate unchanged.
16+
17+
Detection reuses `detectBadJsonStrings` via `JSON.stringify(value)`,
18+
with a latent regex bug fixed: the low-surrogate hex nibble matched
19+
`[cd]` instead of `[c-f]`, missing the U+DE00–U+DFFF half of the range
20+
and false-flagging common emoji pairs. Healthy batches pay zero scan
21+
cost — the check only runs when ClickHouse has already rejected.

apps/webapp/app/utils/detectBadJsonStrings.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
/**
2+
* Detects unpaired UTF-16 surrogate escape sequences in JSON-encoded text.
3+
*
4+
* Returns true if the input contains a `\uD8XX`/`\uD9XX`/`\uDAXX`/`\uDBXX`
5+
* high-surrogate escape not immediately followed by a `\uDC..`–`\uDF..` low
6+
* surrogate, or a `\uDC..`–`\uDF..` low surrogate not immediately preceded by
7+
* a high surrogate. Strict JSON parsers (e.g. ClickHouse `JSONEachRow`)
8+
* reject input containing such sequences.
9+
*
10+
* Surrogate hex ranges (case-insensitive — inputs from `JSON.stringify` are
11+
* lowercase):
12+
* - High surrogate (U+D800–U+DBFF): `\uD[8-B][0-9A-F][0-9A-F]`
13+
* - Low surrogate (U+DC00–U+DFFF): `\uD[C-F][0-9A-F][0-9A-F]`
14+
*/
115
export function detectBadJsonStrings(jsonString: string): boolean {
216
// Fast path: skip everything if no \u
317
let idx = jsonString.indexOf("\\u");
@@ -13,7 +27,7 @@ export function detectBadJsonStrings(jsonString: string): boolean {
1327
if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") {
1428
const third = jsonString[idx + 3];
1529

16-
// High surrogate check
30+
// High surrogate check — third nibble is 8, 9, a, or b (U+D800–U+DBFF)
1731
if (
1832
/[89ab]/.test(third) &&
1933
/[0-9a-f]/.test(jsonString[idx + 4]) &&
@@ -28,17 +42,17 @@ export function detectBadJsonStrings(jsonString: string): boolean {
2842
jsonString[idx + 6] !== "\\" ||
2943
jsonString[idx + 7] !== "u" ||
3044
jsonString[idx + 8] !== "d" ||
31-
!/[cd]/.test(jsonString[idx + 9]) ||
45+
!/[c-f]/.test(jsonString[idx + 9]) ||
3246
!/[0-9a-f]/.test(jsonString[idx + 10]) ||
3347
!/[0-9a-f]/.test(jsonString[idx + 11])
3448
) {
3549
return true; // Incomplete high surrogate
3650
}
3751
}
3852

39-
// Low surrogate check
53+
// Low surrogate check — third nibble is c, d, e, or f (U+DC00–U+DFFF)
4054
if (
41-
(third === "c" || third === "d") &&
55+
/[c-f]/.test(third) &&
4256
/[0-9a-f]/.test(jsonString[idx + 4]) &&
4357
/[0-9a-f]/.test(jsonString[idx + 5])
4458
) {

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 135 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ import {
4545
removePrivateProperties,
4646
isEmptyObject,
4747
} from "./common.server";
48+
import {
49+
isClickHouseJsonParseError,
50+
parseRowNumberFromError,
51+
sanitizeRows,
52+
} from "./sanitizeRowsOnParseError.server";
4853
import type {
4954
CompleteableTaskRun,
5055
CreateEventInput,
@@ -104,6 +109,13 @@ export class ClickhouseEventRepository implements IEventRepository {
104109
private readonly _llmMetricsFlushScheduler: DynamicFlushScheduler<LlmMetricsV1Input>;
105110
private _tracer: Tracer;
106111
private _version: "v1" | "v2";
112+
/**
113+
* Counts batches that hit a ClickHouse JSON parse failure that survived
114+
* one sanitize-retry. These batches are dropped on the floor (the scheduler
115+
* is told the flush "succeeded" so its queue counter doesn't leak), and we
116+
* track the drop count for observability.
117+
*/
118+
private _permanentlyDroppedBatches = 0;
107119

108120
constructor(config: ClickhouseEventRepositoryConfig) {
109121
this._clickhouse = config.clickhouse;
@@ -147,6 +159,11 @@ export class ClickhouseEventRepository implements IEventRepository {
147159
return this._config.maximumLiveReloadingSetting ?? 1000;
148160
}
149161

162+
/** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
163+
get permanentlyDroppedBatches() {
164+
return this._permanentlyDroppedBatches;
165+
}
166+
150167
/**
151168
* Clamps a start time (in nanoseconds) to now if it's too far in the past.
152169
* Returns the clamped value as a bigint.
@@ -215,19 +232,32 @@ export class ClickhouseEventRepository implements IEventRepository {
215232
? this._clickhouse.taskEventsV2.insert
216233
: this._clickhouse.taskEvents.insert;
217234

218-
const [insertError, insertResult] = await insertFn(events, {
219-
params: {
220-
clickhouse_settings: this.#getClickhouseInsertSettings(),
221-
},
222-
});
235+
const doInsert = async () => {
236+
const [insertError, insertResult] = await insertFn(events, {
237+
params: {
238+
clickhouse_settings: this.#getClickhouseInsertSettings(),
239+
},
240+
});
241+
if (insertError) throw insertError;
242+
return insertResult;
243+
};
244+
245+
const outcome = await this.#insertWithJsonParseRecovery(
246+
flushId,
247+
events,
248+
doInsert,
249+
`task_events_${this._version}`
250+
);
223251

224-
if (insertError) {
225-
throw insertError;
252+
if (outcome.kind === "dropped") {
253+
// Loud log already emitted; nothing landed in ClickHouse — don't publish to Redis.
254+
return;
226255
}
227256

228257
logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", {
229258
events: events.length,
230-
insertResult,
259+
insertResult: outcome.insertResult,
260+
sanitized: outcome.kind === "sanitized",
231261
version: this._version,
232262
});
233263

@@ -236,22 +266,112 @@ export class ClickhouseEventRepository implements IEventRepository {
236266
}
237267

238268
async #flushLlmMetricsBatch(flushId: string, rows: LlmMetricsV1Input[]) {
269+
const doInsert = async () => {
270+
const [insertError, insertResult] = await this._clickhouse.llmMetrics.insert(rows, {
271+
params: {
272+
clickhouse_settings: this.#getClickhouseInsertSettings(),
273+
},
274+
});
275+
if (insertError) throw insertError;
276+
return insertResult;
277+
};
239278

240-
const [insertError] = await this._clickhouse.llmMetrics.insert(rows, {
241-
params: {
242-
clickhouse_settings: this.#getClickhouseInsertSettings(),
243-
},
244-
});
279+
const outcome = await this.#insertWithJsonParseRecovery(
280+
flushId,
281+
rows,
282+
doInsert,
283+
"llm_metrics_v1"
284+
);
245285

246-
if (insertError) {
247-
throw insertError;
286+
if (outcome.kind === "dropped") {
287+
return;
248288
}
249289

250290
logger.info("ClickhouseEventRepository.flushLlmMetricsBatch Inserted LLM metrics batch", {
251291
rows: rows.length,
292+
sanitized: outcome.kind === "sanitized",
252293
});
253294
}
254295

296+
/**
297+
* Wraps a ClickHouse insert callable with reactive UTF-16 sanitization.
298+
*
299+
* On a `Cannot parse JSON object` failure:
300+
* 1. Sanitize the batch from `max(0, parsedRowN - 1)` onwards (rows
301+
* before the failing one parsed fine — known good).
302+
* 2. Retry the insert once with the sanitized batch.
303+
* 3. If the retry still fails with the same error class, log loudly,
304+
* increment `permanentlyDroppedBatches`, and return without
305+
* throwing — the scheduler's transient-retry path would just repeat
306+
* the same deterministic failure.
307+
*
308+
* Non-parse errors propagate unchanged so the scheduler's existing
309+
* backoff/retry behaviour still handles transient network or CH issues.
310+
*/
311+
async #insertWithJsonParseRecovery<T extends object>(
312+
flushId: string,
313+
rows: T[],
314+
doInsert: () => Promise<unknown>,
315+
contextLabel: string
316+
): Promise<
317+
| { kind: "inserted"; insertResult: unknown }
318+
| { kind: "sanitized"; insertResult: unknown }
319+
| { kind: "dropped" }
320+
> {
321+
try {
322+
return { kind: "inserted", insertResult: await doInsert() };
323+
} catch (firstError) {
324+
if (!isClickHouseJsonParseError(firstError)) throw firstError;
325+
326+
const firstMessage =
327+
typeof firstError === "object" && firstError !== null && "message" in firstError
328+
? String((firstError as { message?: unknown }).message ?? "")
329+
: String(firstError);
330+
331+
// Sanitize the whole batch. ClickHouse's `at row N` index is logged
332+
// for observability but not used to slice — its semantics under
333+
// parallel parsing are not stable enough to safely skip rows.
334+
const rowHint = parseRowNumberFromError(firstMessage);
335+
const { rowsTouched, fieldsSanitized } = sanitizeRows(rows);
336+
337+
logger.warn("Sanitizing batch after ClickHouse JSON parse error", {
338+
flushId,
339+
contextLabel,
340+
batchSize: rows.length,
341+
clickhouseRowHint: rowHint,
342+
rowsTouched,
343+
fieldsSanitized,
344+
clickhouseError: firstMessage.split("\n")[0],
345+
});
346+
347+
try {
348+
return { kind: "sanitized", insertResult: await doInsert() };
349+
} catch (retryError) {
350+
if (!isClickHouseJsonParseError(retryError)) throw retryError;
351+
352+
this._permanentlyDroppedBatches += 1;
353+
const retryMessage =
354+
typeof retryError === "object" && retryError !== null && "message" in retryError
355+
? String((retryError as { message?: unknown }).message ?? "")
356+
: String(retryError);
357+
logger.error(
358+
"Dropped batch after sanitize-retry still hit ClickHouse JSON parse error",
359+
{
360+
flushId,
361+
contextLabel,
362+
batchSize: rows.length,
363+
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
364+
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
365+
firstError: firstMessage.split("\n")[0],
366+
retryError: retryMessage.split("\n")[0],
367+
}
368+
);
369+
370+
return { kind: "dropped" };
371+
}
372+
}
373+
}
374+
255375
#createLlmMetricsInput(event: CreateEventInput): LlmMetricsV1Input {
256376
const llmMetrics = event._llmMetrics!;
257377

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
2+
3+
/**
4+
* Replacement string we substitute for any attribute value that contains
5+
* a lone UTF-16 surrogate. JSON-safe, distinctly recognisable in logs and
6+
* the dashboard so operators can spot affected rows.
7+
*/
8+
export const INVALID_UTF16_SENTINEL = "[invalid-utf16]";
9+
10+
export type SanitizeResult = {
11+
/** How many rows had at least one string field replaced. */
12+
rowsTouched: number;
13+
/** Total count of string fields replaced across all sanitized rows. */
14+
fieldsSanitized: number;
15+
};
16+
17+
/**
18+
* Recognises ClickHouse's "Cannot parse JSON object" rejection — the
19+
* deterministic-failure class our sanitizer is designed for. Bubbles up
20+
* from `@clickhouse/client` as an `InsertError` whose `.message` retains
21+
* the original ClickHouse error text.
22+
*/
23+
export function isClickHouseJsonParseError(err: unknown): boolean {
24+
if (!err) return false;
25+
const message =
26+
typeof err === "object" && err !== null && "message" in err
27+
? String((err as { message?: unknown }).message ?? "")
28+
: String(err);
29+
return message.includes("Cannot parse JSON object");
30+
}
31+
32+
/**
33+
* Extracts the row index ClickHouse reported as the first to fail
34+
* (`(at row N)`). Returns `null` if the message doesn't include one —
35+
* caller should treat that as "sanitize from row 0".
36+
*/
37+
export function parseRowNumberFromError(errorMessage: string): number | null {
38+
const match = errorMessage.match(/at row (\d+)/);
39+
return match ? Number.parseInt(match[1], 10) : null;
40+
}
41+
42+
/**
43+
* Walks `value` recursively and replaces any string leaf that contains a
44+
* lone UTF-16 surrogate with `INVALID_UTF16_SENTINEL`. Mutates objects
45+
* and arrays in place; primitives are returned unchanged.
46+
*
47+
* Caller passes anything: a row object, a single field, an unknown JSON
48+
* payload. The walker doesn't depend on the row's schema — it sanitizes
49+
* every string in the structure, which is exactly what ClickHouse cares
50+
* about when parsing the row's JSON form.
51+
*/
52+
export function sanitizeUnknownInPlace(value: unknown): { value: unknown; fixed: number } {
53+
if (typeof value === "string") {
54+
// `detectBadJsonStrings` works on JSON-escaped text — feed it the
55+
// serialized form so any lone UTF-16 surrogate in the JS string is
56+
// emitted as a `\uXXXX` escape it can spot. Valid surrogate pairs
57+
// (e.g. emoji) are emitted as raw characters by JSON.stringify and
58+
// exit at the function's fast path.
59+
if (detectBadJsonStrings(JSON.stringify(value))) {
60+
return { value: INVALID_UTF16_SENTINEL, fixed: 1 };
61+
}
62+
return { value, fixed: 0 };
63+
}
64+
65+
if (Array.isArray(value)) {
66+
let fixed = 0;
67+
for (let i = 0; i < value.length; i++) {
68+
const result = sanitizeUnknownInPlace(value[i]);
69+
value[i] = result.value;
70+
fixed += result.fixed;
71+
}
72+
return { value, fixed };
73+
}
74+
75+
if (value !== null && typeof value === "object") {
76+
let fixed = 0;
77+
const obj = value as Record<string, unknown>;
78+
for (const k of Object.keys(obj)) {
79+
const result = sanitizeUnknownInPlace(obj[k]);
80+
obj[k] = result.value;
81+
fixed += result.fixed;
82+
}
83+
return { value, fixed };
84+
}
85+
86+
return { value, fixed: 0 };
87+
}
88+
89+
/**
90+
* Sanitizes every row in `rows`, mutating each in place so callers can
91+
* hand the same array to the retry insert.
92+
*
93+
* Rationale for scanning the whole batch (instead of starting from the
94+
* row index ClickHouse reports): `at row N` semantics under
95+
* `input_format_parallel_parsing` aren't well-defined — N can be
96+
* chunk-relative rather than batch-global, and 0-vs-1 indexing differs
97+
* between formats. Whole-batch scanning is robust to those quirks and
98+
* also catches multiple bad rows in one pass (so a single retry covers
99+
* the entire failure even if more than one row is poisoned).
100+
*
101+
* The cost is bounded: this only runs on the rare ClickHouse-rejection
102+
* path, and `detectBadJsonStrings` exits in O(1) for clean strings
103+
* (the fast `indexOf("\\u")` check), so healthy attributes are effectively
104+
* free even when included in the walk.
105+
*/
106+
export function sanitizeRows<T extends object>(rows: T[]): SanitizeResult {
107+
const result: SanitizeResult = { rowsTouched: 0, fieldsSanitized: 0 };
108+
109+
for (let i = 0; i < rows.length; i++) {
110+
const { fixed } = sanitizeUnknownInPlace(rows[i]);
111+
if (fixed > 0) {
112+
result.rowsTouched++;
113+
result.fieldsSanitized += fixed;
114+
}
115+
}
116+
117+
return result;
118+
}

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import { startSpan } from "./tracing.server";
3939
import { enrichCreatableEvents } from "./utils/enrichCreatableEvents.server";
4040
import { waitForLlmPricingReady } from "./llmPricingRegistry.server";
4141
import { env } from "~/env.server";
42-
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4342
import { singleton } from "~/utils/singleton";
4443

4544
class OTLPExporter {

0 commit comments

Comments
 (0)