diff --git a/package.json b/package.json index 9e3f78113..30e5bb3eb 100644 --- a/package.json +++ b/package.json @@ -675,6 +675,8 @@ ], "dependencies": { "@abraham/reflection": "^0.13.0", + "@opentelemetry/api": "^1.9.1", + "@opentelemetry/api-logs": "^0.218.0", "@peculiar/x509": "^2.0.0", "@repo/shared": "workspace:*", "axios": "^1.16.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4b2288751..f5bb93131 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -78,6 +78,12 @@ importers: '@abraham/reflection': specifier: ^0.13.0 version: 0.13.0 + '@opentelemetry/api': + specifier: ^1.9.1 + version: 1.9.1 + '@opentelemetry/api-logs': + specifier: ^0.218.0 + version: 0.218.0 '@peculiar/x509': specifier: ^2.0.0 version: 2.0.0 @@ -303,7 +309,7 @@ importers: version: 8.0.13(@types/node@22.19.19)(esbuild@0.28.0) vitest: specifier: ^4.1.6 - version: 4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) + version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) packages/chat: dependencies: @@ -1307,6 +1313,14 @@ packages: resolution: {integrity: sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg==} engines: {node: '>= 8'} + '@opentelemetry/api-logs@0.218.0': + resolution: {integrity: sha512-fmEWp5kXlGEc3i/lR698Hz41DfGyN4Tbe4g7L1AxSc7fF8Xeh/FQ9Quqpa9dVA413Q1Ad43QOLzU4JoXgbFPWw==} + engines: {node: '>=8.0.0'} + + '@opentelemetry/api@1.9.1': + resolution: {integrity: sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==} + engines: {node: '>=8.0.0'} + '@oxc-parser/binding-android-arm-eabi@0.127.0': resolution: {integrity: sha512-0LC7ye4hvqbIKxAzThzvswgHLFu2AURKzYLeSVvLdu2TBOYWQDmHnTqPLeA597BcUCxiLqLsS4CJ5uoI5WYWCQ==} engines: {node: ^20.19.0 || >=22.12.0} @@ -5999,6 +6013,12 @@ snapshots: '@nodelib/fs.scandir': 2.1.5 fastq: 1.20.1 + '@opentelemetry/api-logs@0.218.0': + dependencies: + '@opentelemetry/api': 1.9.1 + + '@opentelemetry/api@1.9.1': {} + '@oxc-parser/binding-android-arm-eabi@0.127.0': optional: true @@ -6857,7 +6877,7 @@ snapshots: obug: 2.1.1 std-env: 4.1.0 tinyrainbow: 3.1.0 - vitest: 4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) + vitest: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13) '@vitest/expect@3.2.4': dependencies: @@ -10121,7 +10141,7 @@ snapshots: esbuild: 0.28.0 fsevents: 2.3.3 - vitest@4.1.6(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13): + vitest@4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(@vitest/coverage-v8@4.1.6)(jsdom@29.1.1)(vite@8.0.13): dependencies: '@vitest/expect': 4.1.6 '@vitest/mocker': 4.1.6(vite@8.0.13) @@ -10144,6 +10164,7 @@ snapshots: vite: 8.0.13(@types/node@22.19.19)(esbuild@0.28.0) why-is-node-running: 2.3.0 optionalDependencies: + '@opentelemetry/api': 1.9.1 '@types/node': 22.19.19 '@vitest/coverage-v8': 4.1.6(vitest@4.1.6) jsdom: 29.1.1 diff --git a/src/telemetry/export/metrics.ts b/src/telemetry/export/metrics.ts new file mode 100644 index 000000000..6b7f34daf --- /dev/null +++ b/src/telemetry/export/metrics.ts @@ -0,0 +1,82 @@ +import type { TelemetryEvent } from "../event"; + +/** One measurement, classified for export. */ +export interface MetricMeasurement { + readonly name: string; + readonly value: number; + readonly kind: "gauge" | "counter"; + /** OTel/UCUM unit (e.g. "ms", "Mbit/s", "{request}", "1"). */ + readonly unit: string; +} + +/** + * Typed view of a metric event. `windowSeconds` is set on windowed events + * (`http.requests`) and absent on point-in-time samples; exporters use it to + * stamp gauge start times and anchor cumulative counters. + */ +export interface MetricDescriptor { + readonly windowSeconds?: number; + readonly measurements: readonly MetricMeasurement[]; +} + +// Single source of truth for which event names are metric series. +const METRIC_EVENT_NAMES: ReadonlySet = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +export function isMetricEvent(event: TelemetryEvent): boolean { + return METRIC_EVENT_NAMES.has(event.eventName); +} + +/** Typed layout for a metric event, or `undefined` if it isn't a metric. */ +export function describeMetricEvent( + event: TelemetryEvent, +): MetricDescriptor | undefined { + if (!isMetricEvent(event)) { + return undefined; + } + if (event.eventName === "http.requests") { + return describeHttpRequests(event); + } + return { + measurements: Object.entries(event.measurements).map(([name, value]) => ({ + name, + value, + kind: "gauge", + unit: measurementUnit(name), + })), + }; +} + +// `window_seconds` is metadata, `count_*` are cumulative counters, the rest gauges. +function describeHttpRequests(event: TelemetryEvent): MetricDescriptor { + let windowSeconds = 0; + const measurements: MetricMeasurement[] = []; + for (const [name, value] of Object.entries(event.measurements)) { + if (name === "window_seconds") { + windowSeconds = value; + } else if (name.startsWith("count_")) { + measurements.push({ name, value, kind: "counter", unit: "{request}" }); + } else { + measurements.push({ + name, + value, + kind: "gauge", + unit: measurementUnit(name), + }); + } + } + return { windowSeconds, measurements }; +} + +function measurementUnit(name: string): string { + if (name.endsWith("_ms") || name.endsWith("Ms")) { + return "ms"; + } + if (name.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} diff --git a/src/telemetry/export/writers/otlp/envelope.ts b/src/telemetry/export/writers/otlp/envelope.ts new file mode 100644 index 000000000..82f2b9797 --- /dev/null +++ b/src/telemetry/export/writers/otlp/envelope.ts @@ -0,0 +1,97 @@ +import { createWriteStream } from "node:fs"; + +import { toError } from "../../../../error/errorUtils"; + +/** Append-only writer for one OTLP/JSON envelope file. `append` is not re-entrant. */ +export interface EnvelopeFile { + append(value: unknown): Promise; + close(): Promise; +} + +/** Streams `v1,v2,...` JSON into `filePath`. */ +export async function openEnvelopeFile( + filePath: string, + prefix: string, + suffix: string, +): Promise { + const stream = createWriteStream(filePath, { encoding: "utf8" }); + // Open failures (ENOENT/EACCES) surface as 'error' events, not write + // callbacks; capture them so pending writes reject instead of hanging. + let asyncError: Error | undefined; + stream.once("error", (err) => { + asyncError ??= err; + }); + + await write(stream, prefix, filePath, () => asyncError); + let written = 0; + let closed = false; + return { + async append(value) { + await write( + stream, + (written === 0 ? "" : ",") + JSON.stringify(value), + filePath, + () => asyncError, + ); + written += 1; + }, + async close() { + if (closed) { + return; + } + closed = true; + try { + await write(stream, suffix, filePath, () => asyncError); + } catch (err) { + // Re-label suffix-write failures as a close failure. + const inner = (err as { cause?: unknown }).cause; + const msg = + inner instanceof Error ? inner.message : toError(err).message; + throw new Error(`Failed to close ${filePath}: ${msg}`, { cause: err }); + } + await new Promise((resolve, reject) => { + stream.end((err?: Error | null) => { + const failure = err ?? asyncError; + if (failure) { + reject( + new Error(`Failed to close ${filePath}: ${failure.message}`, { + cause: failure, + }), + ); + } else { + resolve(); + } + }); + }); + }, + }; +} + +function write( + stream: NodeJS.WritableStream, + chunk: string, + filePath: string, + asyncError: () => Error | undefined, +): Promise { + return new Promise((resolve, reject) => { + const reject_ = (err: unknown) => + reject( + new Error(`Failed to write ${filePath}: ${toError(err).message}`, { + cause: err, + }), + ); + const existing = asyncError(); + if (existing) { + reject_(existing); + return; + } + stream.write(chunk, "utf8", (err) => { + const failure = err ?? asyncError(); + if (failure) { + reject_(failure); + } else { + resolve(); + } + }); + }); +} diff --git a/src/telemetry/export/writers/otlp/records.ts b/src/telemetry/export/writers/otlp/records.ts new file mode 100644 index 000000000..94c58f51e --- /dev/null +++ b/src/telemetry/export/writers/otlp/records.ts @@ -0,0 +1,269 @@ +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; +import { SeverityNumber } from "@opentelemetry/api-logs"; + +import { type MetricDescriptor, type MetricMeasurement } from "../../metrics"; +import { parseTelemetryTimestampMs } from "../../range"; + +import type { TelemetryContext, TelemetryEvent } from "../../../event"; + +import type { + OtlpKeyValue, + OtlpLogRecord, + OtlpMetric, + OtlpSpan, + OtlpStatus, +} from "./types"; + +/** Per-export state for cumulative HTTP counters. */ +export interface ExportState { + cumulativeStart: string | undefined; + readonly cumulativeTotals: Map; +} + +export function newExportState(): ExportState { + return { cumulativeStart: undefined, cumulativeTotals: new Map() }; +} + +/** OTLP `Resource`, one per export. */ +export function otlpResource(context: TelemetryContext) { + return { + attributes: keyValues({ + "service.name": "coder-vscode-extension", + "service.version": context.extensionVersion, + "service.instance.id": context.sessionId, + "host.id": context.machineId, + "host.arch": context.hostArch, + "os.type": context.osType, + "os.version": context.osVersion, + "vscode.platform.name": context.platformName, + "vscode.platform.version": context.platformVersion, + "coder.deployment.url": context.deploymentUrl, + }), + }; +} + +/** OTLP `InstrumentationScope`, shared by every record. */ +export function otlpScope(version: string) { + return { name: "coder.vscode-coder.telemetry.export", version }; +} + +export function logRecord(event: TelemetryEvent): OtlpLogRecord { + const timeUnixNano = toUnixNano(event.timestamp); + const errored = event.error !== undefined; + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: errored ? SeverityNumber.ERROR : SeverityNumber.INFO, + severityText: errored ? "ERROR" : "INFO", + body: { stringValue: event.eventName }, + attributes: keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error && exceptionAttributes(event.error)), + }), + }; +} + +export function spanRecord(event: TelemetryEvent): OtlpSpan { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = String( + BigInt(endTimeUnixNano) - nanosFromMs(event.measurements.durationMs ?? 0), + ); + // durationMs is encoded as start/end times; don't repeat it as an attribute. + const { durationMs: _durationMs, ...measurements } = event.measurements; + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: event.eventName, + // OTLP proto SpanKind reserves 0 for UNSPECIFIED; api values shift by 1. + kind: SpanKind.INTERNAL + 1, + startTimeUnixNano, + endTimeUnixNano, + attributes: keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...measurements, + }), + status: spanStatus(event), + ...(event.error && { + events: [ + { + name: "exception", + timeUnixNano: endTimeUnixNano, + attributes: keyValues(exceptionAttributes(event.error)), + }, + ], + }), + }; +} + +function spanStatus(event: TelemetryEvent): OtlpStatus { + if (event.properties.result === "success") { + return { code: SpanStatusCode.OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: SpanStatusCode.ERROR, + ...(event.error && { message: event.error.message }), + }; + } + return { code: SpanStatusCode.UNSET }; +} + +/** Gauge and cumulative-sum records for one classified metric event. */ +export function metricRecords( + event: TelemetryEvent, + descriptor: MetricDescriptor, + state: ExportState, +): OtlpMetric[] { + const timeUnixNano = toUnixNano(event.timestamp); + const attributes = keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); + const windowStart = + descriptor.windowSeconds !== undefined + ? String( + BigInt(timeUnixNano) - nanosFromSeconds(descriptor.windowSeconds), + ) + : undefined; + // Anchor cumulative series on the first event seen; reused across the export. + state.cumulativeStart ??= windowStart ?? timeUnixNano; + + const records: OtlpMetric[] = []; + for (const m of descriptor.measurements) { + if (m.kind === "counter") { + const sum = cumulativeSum(event, m, attributes, timeUnixNano, state); + if (sum) { + records.push(sum); + } + } else { + records.push( + gaugeRecord(event.eventName, m, attributes, timeUnixNano, windowStart), + ); + } + } + return records; +} + +function gaugeRecord( + eventName: string, + measurement: MetricMeasurement, + attributes: readonly OtlpKeyValue[], + timeUnixNano: string, + startTimeUnixNano?: string, +): OtlpMetric { + return { + name: `${eventName}.${measurement.name}`, + description: eventName, + unit: measurement.unit, + gauge: { + dataPoints: [ + { + attributes, + ...(startTimeUnixNano !== undefined && { startTimeUnixNano }), + timeUnixNano, + asDouble: measurement.value, + }, + ], + }, + }; +} + +// No enum in @opentelemetry/api; 2 == CUMULATIVE. +const AGGREGATION_TEMPORALITY_CUMULATIVE = 2; + +function cumulativeSum( + event: TelemetryEvent, + measurement: MetricMeasurement, + attributes: readonly OtlpKeyValue[], + timeUnixNano: string, + state: ExportState, +): OtlpMetric | undefined { + // Clamp the anchor so out-of-order events can't emit startTime > timeTime. + const anchor = state.cumulativeStart ?? timeUnixNano; + const startTimeUnixNano = + BigInt(anchor) <= BigInt(timeUnixNano) ? anchor : timeUnixNano; + const key = `${event.eventName}|${measurement.name}|${seriesKey(event.properties)}`; + const total = + (state.cumulativeTotals.get(key) ?? 0n) + + toIntegerBigInt(measurement.value); + state.cumulativeTotals.set(key, total); + // Suppress zero counters; absence reads as "no events". + if (total === 0n) { + return undefined; + } + return { + name: `${event.eventName}.${measurement.name}`, + description: event.eventName, + unit: measurement.unit, + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE, + isMonotonic: true, + dataPoints: [ + { attributes, startTimeUnixNano, timeUnixNano, asInt: String(total) }, + ], + }, + }; +} + +/** Stable key for property labels so identical labels share a series. */ +function seriesKey(properties: Readonly>): string { + return Object.entries(properties) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}=${v}`) + .join("|"); +} + +function exceptionAttributes( + error: NonNullable, +): Record { + return { + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }; +} + +function keyValues( + values: Readonly>, +): OtlpKeyValue[] { + return Object.entries(values).map(([key, value]) => ({ + key, + value: + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }, + })); +} + +function toUnixNano(timestamp: string): string { + return String(BigInt(parseTelemetryTimestampMs(timestamp)) * 1_000_000n); +} + +function nanosFromMs(ms: number): bigint { + return toNonNegativeBigInt(ms * 1e6); +} + +function nanosFromSeconds(seconds: number): bigint { + return toNonNegativeBigInt(seconds * 1e9); +} + +// Coerce non-finite/negative to 0n; round the rest. +function toNonNegativeBigInt(n: number): bigint { + if (!Number.isFinite(n) || n <= 0) { + return 0n; + } + return BigInt(Math.round(n)); +} + +// Counter increments must be integers; coerce NaN/Infinity to 0n. +function toIntegerBigInt(n: number): bigint { + if (!Number.isFinite(n)) { + return 0n; + } + return BigInt(Math.round(n)); +} diff --git a/src/telemetry/export/writers/otlp/types.ts b/src/telemetry/export/writers/otlp/types.ts new file mode 100644 index 000000000..697a5ddf9 --- /dev/null +++ b/src/telemetry/export/writers/otlp/types.ts @@ -0,0 +1,74 @@ +/** + * Local TypeScript shapes for the OTLP/JSON wire format. Upstream TS types + * live in `@opentelemetry/otlp-transformer`, which pulls in the OTel SDK; we + * mirror the proto schema instead so the writer stays SDK-free. + * + * Spec (opentelemetry-proto v1.10.0): + * https://github.com/open-telemetry/opentelemetry-proto/tree/ca839c51f706f5d53bfb46f06c3e90c3af3a52c6/opentelemetry/proto + */ + +export interface OtlpKeyValue { + readonly key: string; + readonly value: + | { readonly stringValue: string } + | { readonly doubleValue: number }; +} + +export interface OtlpLogRecord { + readonly timeUnixNano: string; + readonly observedTimeUnixNano: string; + readonly severityNumber: number; + readonly severityText: string; + readonly body: { readonly stringValue: string }; + readonly attributes: readonly OtlpKeyValue[]; +} + +export interface OtlpStatus { + readonly code: number; + readonly message?: string; +} + +export interface OtlpSpanEvent { + readonly name: string; + readonly timeUnixNano: string; + readonly attributes: readonly OtlpKeyValue[]; +} + +export interface OtlpSpan { + readonly traceId: string; + readonly spanId: string; + readonly parentSpanId?: string; + readonly name: string; + readonly kind: number; + readonly startTimeUnixNano: string; + readonly endTimeUnixNano: string; + readonly attributes: readonly OtlpKeyValue[]; + readonly status: OtlpStatus; + readonly events?: readonly OtlpSpanEvent[]; +} + +export interface OtlpGaugePoint { + readonly attributes: readonly OtlpKeyValue[]; + readonly startTimeUnixNano?: string; + readonly timeUnixNano: string; + readonly asDouble: number; +} + +export interface OtlpSumPoint { + readonly attributes: readonly OtlpKeyValue[]; + readonly startTimeUnixNano: string; + readonly timeUnixNano: string; + readonly asInt: string; +} + +export interface OtlpMetric { + readonly name: string; + readonly description: string; + readonly unit: string; + readonly gauge?: { readonly dataPoints: readonly OtlpGaugePoint[] }; + readonly sum?: { + readonly aggregationTemporality: number; + readonly isMonotonic: boolean; + readonly dataPoints: readonly OtlpSumPoint[]; + }; +} diff --git a/src/telemetry/export/writers/otlp/writer.ts b/src/telemetry/export/writers/otlp/writer.ts new file mode 100644 index 000000000..bc16a97dc --- /dev/null +++ b/src/telemetry/export/writers/otlp/writer.ts @@ -0,0 +1,185 @@ +import { zip } from "fflate"; +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { promisify } from "node:util"; + +import { toError } from "../../../../error/errorUtils"; +import { writeAtomically } from "../../../../util/fs"; +import { describeMetricEvent } from "../../metrics"; + +import { openEnvelopeFile, type EnvelopeFile } from "./envelope"; +import { + type ExportState, + logRecord, + metricRecords, + newExportState, + otlpResource, + otlpScope, + spanRecord, +} from "./records"; + +import type { TelemetryContext, TelemetryEvent } from "../../../event"; + +export interface OtlpExportCounts { + readonly logs: number; + readonly traces: number; + readonly metrics: number; +} + +interface Envelope { + readonly file: string; + readonly resourceKey: string; + readonly scopeKey: string; + readonly recordsKey: string; +} + +const ENVELOPES = { + logs: { + file: "logs.json", + resourceKey: "resourceLogs", + scopeKey: "scopeLogs", + recordsKey: "logRecords", + }, + traces: { + file: "traces.json", + resourceKey: "resourceSpans", + scopeKey: "scopeSpans", + recordsKey: "spans", + }, + metrics: { + file: "metrics.json", + resourceKey: "resourceMetrics", + scopeKey: "scopeMetrics", + recordsKey: "metrics", + }, +} as const satisfies Record; + +const OTLP_SCHEMA_URL = "https://opentelemetry.io/schemas/1.24.0"; +const ENVELOPE_SUFFIX = "]}]}]}\n"; + +const zipAsync = promisify(zip); + +/** + * Writes `events` as an OTLP/JSON zip (`logs.json`, `traces.json`, + * `metrics.json`) to `outputPath`. Records stream into a staging directory + * then get packed in-memory; the zip is atomically renamed at the end. + */ +export async function writeOtlpZipExport( + outputPath: string, + events: AsyncIterable, + context: TelemetryContext, + onCleanupError: (err: unknown, tempPath: string) => void, +): Promise { + return writeAtomically( + outputPath, + async (zipPath) => { + const stagingDir = await fs.mkdtemp(`${outputPath}.staging-`); + try { + const counts = await writeStagedFiles(stagingDir, events, context); + await packZip(zipPath, stagingDir); + return counts; + } finally { + await fs.rm(stagingDir, { recursive: true, force: true }); + } + }, + onCleanupError, + ); +} + +async function writeStagedFiles( + dir: string, + events: AsyncIterable, + context: TelemetryContext, +): Promise { + const resource = JSON.stringify(otlpResource(context)); + const scope = JSON.stringify(otlpScope(context.extensionVersion)); + const open = (e: Envelope) => + openEnvelopeFile( + path.join(dir, e.file), + envelopePrefix(e, resource, scope), + ENVELOPE_SUFFIX, + ); + const [logs, traces, metrics] = await Promise.all([ + open(ENVELOPES.logs), + open(ENVELOPES.traces), + open(ENVELOPES.metrics), + ]); + const state = newExportState(); + const counts = { logs: 0, traces: 0, metrics: 0 }; + + try { + for await (const event of events) { + await routeEvent(event, { logs, traces, metrics }, counts, state); + } + // Success path: surface close failures. + await Promise.all([logs.close(), traces.close(), metrics.close()]); + } catch (loopError) { + // Failure path: close quietly so the original error isn't masked. + await Promise.allSettled([logs.close(), traces.close(), metrics.close()]); + throw loopError; + } + + return counts; +} + +async function routeEvent( + event: TelemetryEvent, + files: { logs: EnvelopeFile; traces: EnvelopeFile; metrics: EnvelopeFile }, + counts: { logs: number; traces: number; metrics: number }, + state: ExportState, +): Promise { + try { + const metric = describeMetricEvent(event); + if (metric) { + counts.metrics += 1; + for (const record of metricRecords(event, metric, state)) { + await files.metrics.append(record); + } + } else if (event.traceId !== undefined) { + counts.traces += 1; + await files.traces.append(spanRecord(event)); + } else { + counts.logs += 1; + await files.logs.append(logRecord(event)); + } + } catch (err) { + throw new Error( + `Failed to export event ${event.eventId} (${event.eventName}): ${toError(err).message}`, + { cause: err }, + ); + } +} + +async function packZip(outputPath: string, sourceDir: string): Promise { + const files = await Promise.all( + Object.values(ENVELOPES).map(async (e) => { + try { + return [ + e.file, + await fs.readFile(path.join(sourceDir, e.file)), + ] as const; + } catch (err) { + throw new Error( + `Failed to read staged ${e.file}: ${toError(err).message}`, + { cause: err }, + ); + } + }), + ); + try { + await fs.writeFile(outputPath, await zipAsync(Object.fromEntries(files))); + } catch (err) { + throw new Error( + `Failed to pack OTLP zip ${path.basename(outputPath)}: ${toError(err).message}`, + { cause: err }, + ); + } +} + +function envelopePrefix( + envelope: Envelope, + resource: string, + scope: string, +): string { + return `{"${envelope.resourceKey}":[{"resource":${resource},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.scopeKey}":[{"scope":${scope},"schemaUrl":"${OTLP_SCHEMA_URL}","${envelope.recordsKey}":[`; +} diff --git a/test/unit/remote/sshOverrides.test.ts b/test/unit/remote/sshOverrides.test.ts index 0c60117dd..08d7f78f7 100644 --- a/test/unit/remote/sshOverrides.test.ts +++ b/test/unit/remote/sshOverrides.test.ts @@ -12,12 +12,7 @@ import { createMockLogger, } from "../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); /** Helper to extract a single override by key from the result. */ function findOverride( diff --git a/test/unit/remote/sshProcess.test.ts b/test/unit/remote/sshProcess.test.ts index 44139b8e0..3ab3a61a1 100644 --- a/test/unit/remote/sshProcess.test.ts +++ b/test/unit/remote/sshProcess.test.ts @@ -32,10 +32,7 @@ function makeNetworkJson(overrides: Partial = {}): string { vi.mock("find-process", () => ({ default: vi.fn() })); -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); describe("SshProcessMonitor", () => { let activeMonitors: SshProcessMonitor[] = []; diff --git a/test/unit/telemetry/export/metrics.test.ts b/test/unit/telemetry/export/metrics.test.ts new file mode 100644 index 000000000..f6a3f33cf --- /dev/null +++ b/test/unit/telemetry/export/metrics.test.ts @@ -0,0 +1,95 @@ +import { describe, expect, it } from "vitest"; + +import { describeMetricEvent, isMetricEvent } from "@/telemetry/export/metrics"; + +import { createTelemetryEventFactory } from "../../../mocks/telemetry"; + +const makeEvent = createTelemetryEventFactory(); + +describe("isMetricEvent", () => { + it.each([ + ["http.requests", true], + ["ssh.network.info", true], + ["ssh.network.sampled", true], + ["log.something", false], + ["remote.setup.workspace_ready", false], + ])("returns %p for %s", (name, expected) => { + expect(isMetricEvent(makeEvent({ eventName: name }))).toBe(expected); + }); +}); + +describe("describeMetricEvent", () => { + it("returns undefined for non-metric events", () => { + expect( + describeMetricEvent(makeEvent({ eventName: "log.info" })), + ).toBeUndefined(); + }); + + it("classifies all measurements as gauges for non-http metric events", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { latencyMs: 35, downloadMbits: 10, custom: 1 }, + }), + ); + expect(descriptor).toEqual({ + measurements: [ + { name: "latencyMs", value: 35, kind: "gauge", unit: "ms" }, + { name: "downloadMbits", value: 10, kind: "gauge", unit: "Mbit/s" }, + { name: "custom", value: 1, kind: "gauge", unit: "1" }, + ], + }); + }); + + it("partitions http.requests into counters and gauges and extracts the window", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "http.requests", + measurements: { + window_seconds: 60, + count_2xx: 5, + count_5xx: 1, + p95_duration_ms: 42, + }, + }), + ); + expect(descriptor).toEqual({ + windowSeconds: 60, + measurements: [ + { name: "count_2xx", value: 5, kind: "counter", unit: "{request}" }, + { name: "count_5xx", value: 1, kind: "counter", unit: "{request}" }, + { + name: "p95_duration_ms", + value: 42, + kind: "gauge", + unit: "ms", + }, + ], + }); + }); + + it("defaults http.requests windowSeconds to 0 when absent", () => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "http.requests", + measurements: { count_2xx: 1 }, + }), + ); + expect(descriptor?.windowSeconds).toBe(0); + }); + + it.each([ + ["latency_ms", "ms"], + ["durationMs", "ms"], + ["downloadMbits", "Mbit/s"], + ["something_else", "1"], + ])("derives unit for measurement '%s' -> '%s'", (name, unit) => { + const descriptor = describeMetricEvent( + makeEvent({ + eventName: "ssh.network.sampled", + measurements: { [name]: 1 }, + }), + ); + expect(descriptor?.measurements[0].unit).toBe(unit); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp/envelope.test.ts b/test/unit/telemetry/export/writers/otlp/envelope.test.ts new file mode 100644 index 000000000..f45a25cc9 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/envelope.test.ts @@ -0,0 +1,120 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const { stream } = vi.hoisted(() => ({ + stream: { + chunks: [] as string[], + writeFailMsg: null as string | null, + endFailMsg: null as string | null, + listeners: new Map void>(), + write(chunk: string, _enc: string, cb: (err?: Error | null) => void) { + if (stream.writeFailMsg) { + cb(new Error(stream.writeFailMsg)); + return; + } + stream.chunks.push(chunk); + cb(null); + }, + end(cb: (err?: Error | null) => void) { + if (stream.endFailMsg) { + cb(new Error(stream.endFailMsg)); + return; + } + cb(null); + }, + once(event: string, listener: (err: Error) => void) { + stream.listeners.set(event, listener); + return stream; + }, + emit(event: string, err: Error) { + stream.listeners.get(event)?.(err); + }, + }, +})); + +vi.mock("node:fs", () => ({ + createWriteStream: () => stream, +})); + +const { openEnvelopeFile } = + await import("@/telemetry/export/writers/otlp/envelope"); + +beforeEach(() => { + stream.chunks = []; + stream.writeFailMsg = null; + stream.endFailMsg = null; + stream.listeners = new Map(); +}); + +afterEach(() => vi.clearAllMocks()); + +describe("openEnvelopeFile", () => { + it("writes only the prefix and suffix when no values are appended", async () => { + const env = await openEnvelopeFile("/x.json", "PRE", "SUF"); + await env.close(); + expect(stream.chunks.join("")).toBe("PRESUF"); + }); + + it("serializes appended values as JSON, comma-separated", async () => { + const env = await openEnvelopeFile("/x.json", "[", "]"); + await env.append({ a: 1 }); + await env.append("two"); + await env.append([3, 4]); + await env.close(); + expect(stream.chunks.join("")).toBe('[{"a":1},"two",[3,4]]'); + }); + + it("inserts commas between values but not before the first", async () => { + const env = await openEnvelopeFile("/x.json", "[", "]"); + await env.append(1); + await env.append(2); + await env.close(); + expect(stream.chunks).toEqual(["[", "1", ",2", "]"]); + }); + + it("wraps prefix-write failures with the file path", async () => { + stream.writeFailMsg = "disk full"; + await expect(openEnvelopeFile("/foo.json", "[", "]")).rejects.toThrow( + "Failed to write /foo.json: disk full", + ); + }); + + it("wraps append-time write failures with the file path", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.writeFailMsg = "disk full"; + await expect(env.append({ a: 1 })).rejects.toThrow( + "Failed to write /foo.json: disk full", + ); + }); + + it("wraps suffix-write failures during close as a close failure", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.writeFailMsg = "disk full"; + await expect(env.close()).rejects.toThrow( + "Failed to close /foo.json: disk full", + ); + }); + + it("wraps stream.end failures with the file path", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.endFailMsg = "stream gone"; + await expect(env.close()).rejects.toThrow( + "Failed to close /foo.json: stream gone", + ); + }); + + it("rejects subsequent writes after the stream emits 'error'", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + stream.emit("error", new Error("ENOENT")); + await expect(env.append({ a: 1 })).rejects.toThrow( + "Failed to write /foo.json: ENOENT", + ); + }); + + it("is idempotent: calling close() twice is safe", async () => { + const env = await openEnvelopeFile("/foo.json", "[", "]"); + await env.close(); + // Second close is a no-op and does not double-write the suffix. + await env.close(); + expect(stream.chunks).toEqual(["[", "]"]); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp/records.test.ts b/test/unit/telemetry/export/writers/otlp/records.test.ts new file mode 100644 index 000000000..414dbf498 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/records.test.ts @@ -0,0 +1,347 @@ +import { describe, expect, it } from "vitest"; + +import { type MetricDescriptor } from "@/telemetry/export/metrics"; +import { + type ExportState, + logRecord, + metricRecords, + newExportState, + otlpResource, + otlpScope, + spanRecord, +} from "@/telemetry/export/writers/otlp/records"; + +import { createTelemetryEventFactory } from "../../../../../mocks/telemetry"; + +const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + +const makeEvent = createTelemetryEventFactory(); + +/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ +function attrs(raw: unknown): Record { + const list = raw as Array<{ + key: string; + value: { stringValue?: string; doubleValue?: number }; + }>; + return Object.fromEntries( + list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), + ); +} + +describe("otlpResource", () => { + it("maps the session context onto OTel-standard semconv keys", () => { + const { context } = makeEvent(); + expect(attrs(otlpResource(context).attributes)).toEqual({ + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "service.instance.id": "session-id", + "host.id": "machine-id", + "host.arch": "x64", + "os.type": "linux", + "os.version": "6.0.0", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }); + }); +}); + +describe("otlpScope", () => { + it("names the scope and stamps the extension version", () => { + expect(otlpScope("9.9.9")).toEqual({ + name: "coder.vscode-coder.telemetry.export", + version: "9.9.9", + }); + }); +}); + +describe("logRecord", () => { + it("emits INFO records merging properties and measurements", () => { + const record = logRecord( + makeEvent({ + eventName: "log.info", + properties: { source: "unit" }, + measurements: { count: 3 }, + }), + ); + + expect(record).toMatchObject({ + severityNumber: 9, + severityText: "INFO", + body: { stringValue: "log.info" }, + }); + expect(record.timeUnixNano).toBe(record.observedTimeUnixNano); + expect(attrs(record.attributes)).toEqual({ source: "unit", count: 3 }); + }); + + it("emits ERROR records and omits optional exception fields when unset", () => { + const full = logRecord( + makeEvent({ + error: { message: "boom", type: "RangeError", code: "E_RANGE" }, + }), + ); + const minimal = logRecord(makeEvent({ error: { message: "boom" } })); + + expect(full).toMatchObject({ severityNumber: 17, severityText: "ERROR" }); + expect(attrs(full.attributes)).toMatchObject({ + "exception.message": "boom", + "exception.type": "RangeError", + "exception.code": "E_RANGE", + }); + expect(attrs(minimal.attributes)).toEqual({ "exception.message": "boom" }); + }); +}); + +describe("spanRecord", () => { + it("encodes an INTERNAL span with derived start time and parent linkage", () => { + const span = spanRecord( + makeEvent({ + eventName: "remote.setup.workspace_ready", + traceId: TRACE_ID, + parentEventId: "parent-span-id", + properties: { result: "success", route: "/api" }, + measurements: { durationMs: 250, retries: 2 }, + }), + ); + + expect(span).toMatchObject({ + traceId: TRACE_ID, + parentSpanId: "parent-span-id", + name: "remote.setup.workspace_ready", + kind: 1, // OTel SpanKind.INTERNAL (0) + 1 OTLP proto offset. + status: { code: 1 }, + }); + expect(BigInt(span.endTimeUnixNano) - BigInt(span.startTimeUnixNano)).toBe( + 250_000_000n, + ); + expect(attrs(span.attributes)).toEqual({ + "coder.event_name": "remote.setup.workspace_ready", + result: "success", + route: "/api", + retries: 2, + }); + }); + + it("collapses start to end and omits parentSpanId on a minimal span", () => { + const span = spanRecord(makeEvent({ traceId: TRACE_ID })); + + expect(span).not.toHaveProperty("parentSpanId"); + expect(span.startTimeUnixNano).toBe(span.endTimeUnixNano); + }); + + it.each([ + [{ properties: { result: "success" } }, { code: 1 }], + [{ properties: { result: "error" } }, { code: 2 }], + [{ error: { message: "boom" } }, { code: 2, message: "boom" }], + [{}, { code: 0 }], + ])("maps span status: %j -> %j", (overrides, expected) => { + const span = spanRecord(makeEvent({ traceId: TRACE_ID, ...overrides })); + expect(span.status).toEqual(expected); + }); + + it("attaches an `exception` event to errored spans", () => { + const span = spanRecord( + makeEvent({ + traceId: TRACE_ID, + error: { message: "boom", type: "Error" }, + }), + ); + const [exception] = span.events ?? []; + + expect(exception.name).toBe("exception"); + expect(exception.timeUnixNano).toBe(span.endTimeUnixNano); + expect(attrs(exception.attributes)).toEqual({ + "exception.message": "boom", + "exception.type": "Error", + }); + }); +}); + +describe("metricRecords", () => { + const gauge = (name: string, value: number, unit = "1") => + ({ name, value, kind: "gauge", unit }) as const; + const counter = (name: string, value: number) => + ({ name, value, kind: "counter", unit: "{request}" }) as const; + + it("emits one gauge per measurement when the descriptor has no window", () => { + const event = makeEvent({ + eventName: "ssh.network.sampled", + properties: { p2p: "true" }, + }); + const descriptor: MetricDescriptor = { + measurements: [ + gauge("latencyMs", 35, "ms"), + gauge("downloadMbits", 10, "Mbit/s"), + ], + }; + + const records = metricRecords(event, descriptor, newExportState()); + + expect(records.map((r) => [r.name, r.unit])).toEqual([ + ["ssh.network.sampled.latencyMs", "ms"], + ["ssh.network.sampled.downloadMbits", "Mbit/s"], + ]); + const point = records[0].gauge!.dataPoints[0]; + expect(point).not.toHaveProperty("startTimeUnixNano"); + expect(point).toMatchObject({ asDouble: 35 }); + expect(attrs(point.attributes)).toMatchObject({ + "coder.event_name": "ssh.network.sampled", + p2p: "true", + }); + }); + + it("accumulates counter values into cumulative monotonic sums anchored at the first window", () => { + const state = newExportState(); + const first = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:01:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 2)] }, + state, + ); + const second = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/a" }, + timestamp: "2026-05-04T12:02:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 3)] }, + state, + ); + // 2026-05-04T12:00:00.000Z (window start = first event time − 60s) in ns: + const expectedStart = String( + BigInt(Date.parse("2026-05-04T12:00:00.000Z")) * 1_000_000n, + ); + + expect([ + first[0].sum!.dataPoints[0].asInt, + second[0].sum!.dataPoints[0].asInt, + ]).toEqual(["2", "5"]); + expect(first[0].sum!.aggregationTemporality).toBe(2); + expect(first[0].sum!.isMonotonic).toBe(true); + expect(first[0].sum!.dataPoints[0].startTimeUnixNano).toBe(expectedStart); + expect(second[0].sum!.dataPoints[0].startTimeUnixNano).toBe(expectedStart); + }); + + it("clamps startTimeUnixNano <= timeUnixNano for events that arrive before the anchor", () => { + const state = newExportState(); + // First event lands at T=12:03 with a 60s window → anchor = 12:02. + metricRecords( + makeEvent({ + eventName: "http.requests", + timestamp: "2026-05-04T12:03:00.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 1)] }, + state, + ); + // Out-of-order event at T=12:01:30 (earlier than the anchor). + const records = metricRecords( + makeEvent({ + eventName: "http.requests", + timestamp: "2026-05-04T12:01:30.000Z", + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 1)] }, + state, + ); + const point = records[0].sum!.dataPoints[0]; + + expect(BigInt(point.startTimeUnixNano)).toBeLessThanOrEqual( + BigInt(point.timeUnixNano), + ); + }); + + it("keeps cumulative totals separate by eventName when measurement names collide", () => { + const state = newExportState(); + const first = metricRecords( + makeEvent({ + eventName: "http.requests", + properties: { route: "/a" }, + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 3)] }, + state, + ); + // A second metric event type that happens to share a counter name and + // matching properties must not accumulate into the first series. + const second = metricRecords( + makeEvent({ + eventName: "ssh.network.info", + properties: { route: "/a" }, + }), + { windowSeconds: 60, measurements: [counter("count_2xx", 5)] }, + state, + ); + + expect(first[0].sum!.dataPoints[0].asInt).toBe("3"); + expect(second[0].sum!.dataPoints[0].asInt).toBe("5"); + }); + + it("coerces NaN/Infinity inputs to safe zeros instead of throwing", () => { + // durationMs=NaN in a span; counter=Infinity in a metric; windowSeconds=NaN. + expect(() => + spanRecord( + makeEvent({ + traceId: TRACE_ID, + measurements: { durationMs: NaN, retries: 1 }, + }), + ), + ).not.toThrow(); + + const state = newExportState(); + const records = metricRecords( + makeEvent({ eventName: "http.requests" }), + { windowSeconds: NaN, measurements: [counter("count_2xx", Infinity)] }, + state, + ); + + // Infinity counter coerces to 0 → suppressed. + expect(records).toEqual([]); + }); + + it("stamps gauges with a window start when the descriptor declares one", () => { + const event = makeEvent({ eventName: "http.requests" }); + const descriptor: MetricDescriptor = { + windowSeconds: 60, + measurements: [gauge("p95_duration_ms", 42, "ms")], + }; + + const [record] = metricRecords(event, descriptor, newExportState()); + const point = record.gauge!.dataPoints[0]; + + expect(BigInt(point.timeUnixNano) - BigInt(point.startTimeUnixNano!)).toBe( + 60_000_000_000n, + ); + }); + + it("suppresses zero-valued cumulative counters", () => { + const state: ExportState = newExportState(); + const records = metricRecords( + makeEvent({ eventName: "http.requests" }), + { + windowSeconds: 60, + measurements: [ + counter("count_2xx", 0), + counter("count_5xx", 0), + gauge("p95_duration_ms", 10, "ms"), + ], + }, + state, + ); + + expect(records.map((r) => r.name)).toEqual([ + "http.requests.p95_duration_ms", + ]); + }); + + it("treats windowSeconds=0 as a zero-width window", () => { + const [record] = metricRecords( + makeEvent({ eventName: "http.requests" }), + { windowSeconds: 0, measurements: [counter("count_2xx", 1)] }, + newExportState(), + ); + const point = record.sum!.dataPoints[0]; + + expect(point.startTimeUnixNano).toBe(point.timeUnixNano); + }); +}); diff --git a/test/unit/telemetry/export/writers/otlp/writer.test.ts b/test/unit/telemetry/export/writers/otlp/writer.test.ts new file mode 100644 index 000000000..fcc6d24b1 --- /dev/null +++ b/test/unit/telemetry/export/writers/otlp/writer.test.ts @@ -0,0 +1,198 @@ +import { unzipSync } from "fflate"; +import { vol } from "memfs"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { writeOtlpZipExport } from "@/telemetry/export/writers/otlp/writer"; + +import { asyncIterable } from "../../../../../mocks/asyncIterable"; +import { createTelemetryEventFactory } from "../../../../../mocks/telemetry"; + +import type { TelemetryContext, TelemetryEvent } from "@/telemetry/event"; + +vi.mock("node:fs", async () => (await import("memfs")).fs); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); + +const OUT = "/exports/telemetry.otlp.zip"; +const TRACE_ID = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + +let makeEvent: ReturnType; +let context: TelemetryContext; + +beforeEach(() => { + vol.reset(); + vol.mkdirSync("/exports", { recursive: true }); + makeEvent = createTelemetryEventFactory(); + context = makeEvent().context; +}); + +afterEach(() => vol.reset()); + +type Rec = Record; + +interface ParsedEnvelope { + resource: { attributes: unknown }; + schemaUrl: unknown; + scope: { name: string; version: string }; + records: unknown[]; +} + +function parseEnvelope( + files: Record, + name: string, + resourceKey: string, + scopeKey: string, + recordsKey: string, +): ParsedEnvelope { + const env = JSON.parse(new TextDecoder().decode(files[name])) as Rec; + const wrapper = (env[resourceKey] as Rec[])[0]; + const scopeWrapper = (wrapper[scopeKey] as Rec[])[0]; + return { + resource: wrapper.resource as { attributes: unknown }, + schemaUrl: wrapper.schemaUrl, + scope: scopeWrapper.scope as { name: string; version: string }, + records: scopeWrapper[recordsKey] as unknown[], + }; +} + +/** Reads the zip and returns the parsed envelope for each signal. */ +async function exportAndRead(events: readonly TelemetryEvent[]) { + const counts = await writeOtlpZipExport( + OUT, + asyncIterable(events), + context, + () => {}, + ); + const files = unzipSync(vol.readFileSync(OUT) as Uint8Array); + return { + counts, + logs: parseEnvelope( + files, + "logs.json", + "resourceLogs", + "scopeLogs", + "logRecords", + ), + traces: parseEnvelope( + files, + "traces.json", + "resourceSpans", + "scopeSpans", + "spans", + ), + metrics: parseEnvelope( + files, + "metrics.json", + "resourceMetrics", + "scopeMetrics", + "metrics", + ), + }; +} + +/** Flatten OTLP `[{key, value: {stringValue|doubleValue}}]` to `{key: value}`. */ +function attrs(raw: unknown): Record { + const list = raw as Array<{ + key: string; + value: { stringValue?: string; doubleValue?: number }; + }>; + return Object.fromEntries( + list.map((a) => [a.key, a.value.doubleValue ?? a.value.stringValue!]), + ); +} + +describe("writeOtlpZipExport", () => { + it("packs logs.json, traces.json, and metrics.json into the zip", async () => { + await writeOtlpZipExport( + OUT, + asyncIterable([makeEvent()]), + context, + () => {}, + ); + const files = unzipSync(vol.readFileSync(OUT) as Uint8Array); + expect(Object.keys(files).sort()).toEqual([ + "logs.json", + "metrics.json", + "traces.json", + ]); + }); + + it("counts events by signal even when a metric event fans out into multiple records", async () => { + const { counts, logs, traces, metrics } = await exportAndRead([ + makeEvent({ eventName: "log.info" }), + makeEvent({ eventName: "log.warn" }), + makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 1, p95_duration_ms: 5 }, + }), + ]); + + expect(counts).toEqual({ logs: 2, traces: 1, metrics: 1 }); + expect([ + logs.records.length, + traces.records.length, + metrics.records.length, + ]).toEqual([2, 1, 2]); + }); + + it("writes the same resource and scope into every envelope file", async () => { + const { logs, traces, metrics } = await exportAndRead([ + makeEvent({ eventName: "log.info" }), + makeEvent({ eventName: "trace.x", traceId: TRACE_ID }), + makeEvent({ + eventName: "http.requests", + measurements: { window_seconds: 60, count_2xx: 1 }, + }), + ]); + + const expectedResource = { + "service.name": "coder-vscode-extension", + "service.version": "1.14.5", + "service.instance.id": "session-id", + "host.id": "machine-id", + "host.arch": "x64", + "os.type": "linux", + "os.version": "6.0.0", + "vscode.platform.name": "Visual Studio Code", + "vscode.platform.version": "1.106.0", + "coder.deployment.url": "https://coder.example.com", + }; + const expectedScope = { + name: "coder.vscode-coder.telemetry.export", + version: "1.14.5", + }; + + for (const env of [logs, traces, metrics]) { + expect(attrs(env.resource.attributes)).toEqual(expectedResource); + expect(env.scope).toEqual(expectedScope); + expect(env.schemaUrl).toBe("https://opentelemetry.io/schemas/1.24.0"); + } + }); + + it("propagates midstream iterator errors", async () => { + const failing = (async function* () { + yield makeEvent(); + await Promise.resolve(); + throw new Error("boom"); + })(); + + await expect( + writeOtlpZipExport(OUT, failing, context, () => {}), + ).rejects.toThrow(/boom/); + }); + + it("wraps per-event mapping failures with the event identity", async () => { + await expect( + writeOtlpZipExport( + OUT, + asyncIterable([ + makeEvent({ eventId: "id-bad", timestamp: "not-a-date" }), + ]), + context, + () => {}, + ), + ).rejects.toThrow( + /Failed to export event id-bad .*Invalid telemetry timestamp/, + ); + }); +}); diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index c2191f4a2..9c0c31e4a 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -14,12 +14,7 @@ import { MockConfigurationProvider, } from "../../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); const BASE_DIR = "/telemetry"; const SESSION_ID = "12345678-aaaa-bbbb-cccc-dddddddddddd"; diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts index a765669b7..b363f3ef1 100644 --- a/test/unit/util/fileCleanup.test.ts +++ b/test/unit/util/fileCleanup.test.ts @@ -6,12 +6,7 @@ import { cleanupFiles } from "@/util/fileCleanup"; import { createMockLogger } from "../../mocks/testHelpers"; -import type * as fs from "node:fs"; - -vi.mock("node:fs/promises", async () => { - const memfs: { fs: typeof fs } = await vi.importActual("memfs"); - return memfs.fs.promises; -}); +vi.mock("node:fs/promises", async () => (await import("memfs")).fs.promises); describe("cleanupFiles", () => { beforeEach(() => {