From 52a5a1fc3b4174d6a5f3369fc810832dd2a9d852 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 15:07:58 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(0.12.0):=20production=20trace-sink=20?= =?UTF-8?q?=E2=80=94=20close=20the=20data-leak?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every production chat session has been emitting zero replayable trace data. Eval runs capture everything; production captures nothing. RL training, research analyses, and the self-improvement loop all run on synthetic personas. This primitive turns every real user conversation into data the downstream channels (Prime Intellect, GEPA, research, canaries, analyst loop) can consume. `createProductionTraceSink(opts)` returns: - `traceStore` — the in-memory store the agent's TraceEmitter writes to during a chat session (built on agent-eval's existing InMemoryTraceStore; no reinvention) - `onRunComplete` — RunCompleteHook the agent registers; on endRun composes a canonical ProductionRunRecord, persists to a durable store, and POSTs the run as OTLP to a configured collector (Langfuse, etc.) - `recordFeedback(input)` — appends a FeedbackLabel to the run's FeedbackTrajectory; creates the trajectory anchored to runId on first feedback Wiring is ~10 lines in each agent's production chat handler: const sink = createProductionTraceSink({ projectId: 'tax-agent', otlp: { endpoint: env.LANGFUSE_OTEL_ENDPOINT, authHeader: env.LANGFUSE_OTEL_AUTH }, runRecordStore: drizzleRunRecordStore(db), feedbackStore: drizzleFeedbackStore(db), }) const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete], }) Fail-loud everywhere it matters; fail-quiet only at the IO boundary: - runRecordStore failures → logged, not thrown (chat handler stays up) - OTLP POST failures (network/non-2xx) → logged, not thrown - feedbackStore failures → null returned, logged 13 new tests in `tests/production-trace-sink.test.ts` cover: - RunRecord composition for completed / failed / aborted - failureClass + notes propagation - runRecordStore throwing (hook stays alive) - OTLP POST shape (service.name in resource attrs, authorization header) - OTLP failure modes (network throw, non-2xx) - omitted otlp / omitted authHeader paths - recordFeedback create-then-append semantics - explicit trajectoryId honour - explicit trajectoryId honored 144/144 pass. Cloudflare Worker semantics intended: `ctx.waitUntil` the hook from the chat handler so the worker stays alive long enough for the OTLP POST + DB write to flush. Bumps agent-runtime to 0.12.0. --- package.json | 2 +- src/agent/index.ts | 9 + src/agent/production-trace-sink.ts | 310 ++++++++++++++++++++++++++++ tests/production-trace-sink.test.ts | 293 ++++++++++++++++++++++++++ 4 files changed, 613 insertions(+), 1 deletion(-) create mode 100644 src/agent/production-trace-sink.ts create mode 100644 tests/production-trace-sink.test.ts diff --git a/package.json b/package.json index c728f1f..08dcb89 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.11.1", + "version": "0.12.0", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/agent/index.ts b/src/agent/index.ts index 96fd519..75ad0ff 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -39,5 +39,14 @@ export type { export { createSurfaceKnowledgeAdapter } from './knowledge-adapter' export type { OutcomeMeasurement, OutcomeMeasurementOpts } from './outcome' export { measureOutcome } from './outcome' +export type { + ProductionRunRecord, + ProductionRunRecordStore, + ProductionTraceSink, + ProductionTraceSinkOpts, + RecordFeedbackInput, +} from './production-trace-sink' +export { createProductionTraceSink } from './production-trace-sink' + export type { AgentSurfaces, ResolvedSurface, SurfaceValidationIssue } from './surfaces' export { renderSurfaceIssues, resolveSubjectPath, validateSurfaces } from './surfaces' diff --git a/src/agent/production-trace-sink.ts b/src/agent/production-trace-sink.ts new file mode 100644 index 0000000..f3728d2 --- /dev/null +++ b/src/agent/production-trace-sink.ts @@ -0,0 +1,310 @@ +/** + * `createProductionTraceSink` — the production-side capture primitive + * every vertical agent's chat handler wires in once. + * + * Closes the data-leak: until now, every production chat session emitted + * zero replayable trace data. Eval runs captured everything; production + * captured nothing. RL training corpora, research analyses, and the + * self-improvement loop all ran on synthetic personas. This primitive + * makes every real user conversation a piece of data the downstream + * channels (Prime Intellect, GEPA, research, canaries, analyst loop) + * can consume. + * + * Wiring (per agent, ~10 lines in the production chat handler): + * + * ```ts + * const sink = createProductionTraceSink({ + * projectId: 'tax-agent', + * otlp: { endpoint: env.LANGFUSE_OTEL_ENDPOINT, authHeader: env.LANGFUSE_OTEL_AUTH }, + * runRecordStore: drizzleRunRecordStore(db), + * feedbackStore: drizzleFeedbackStore(db), + * }) + * + * const emitter = new TraceEmitter(sink.traceStore, { + * onRunComplete: [sink.onRunComplete], + * }) + * await emitter.startRun({ + * scenarioId: sessionId, + * projectId: 'tax-agent', + * layer: 'app-runtime', + * }) + * // ... existing chat flow, with LLM/tool spans emitted ... + * await emitter.endRun({ pass, score }) + * // sink.onRunComplete fires automatically: + * // 1. composes RunRecord, persists to runRecordStore + * // 2. exports run as OTLP, POSTs to Langfuse + * // 3. logs failures (does NOT throw — never crashes the chat handler) + * ``` + * + * Separately, the agent's feedback endpoint calls `sink.recordFeedback` + * to write user thumbs-up/thumbs-down (or richer labels) into the + * FeedbackTrajectory store — the corpus DPO/KTO trainers consume. + * + * Cloudflare Worker semantics: the sink buffers spans in memory through + * the request lifetime (via agent-eval's `InMemoryTraceStore`). + * `onRunComplete` is awaited (typically inside `ctx.waitUntil`) so the + * worker stays alive long enough to flush. The OTLP POST is fire-and- + * forget — failures are logged but never surface to the chat user. + */ + +import { + exportRunAsOtlp, + type FeedbackLabel, + type FeedbackTrajectoryStore, + InMemoryTraceStore, + type RunCompleteHook, + type RunCompleteHookContext, + type TraceStore, +} from '@tangle-network/agent-eval' + +// ── public API ─────────────────────────────────────────────────────── + +export interface ProductionTraceSinkOpts { + /** + * Stable agent identifier — appears in OTLP `service.name`, every + * RunRecord row, every FeedbackTrajectory row. MUST match the + * agent's repo name to keep cross-repo telemetry joinable. + */ + projectId: string + + /** + * OTLP forwarding target. Typically Langfuse's HTTP receiver. Omit to + * disable OTLP export (RunRecord persistence still works). + * + * `authHeader` is the full header value (e.g. `Basic `); the + * sink does NOT base64-encode for you. + */ + otlp?: { + endpoint: string + authHeader?: string + /** Optional resource attributes merged into every span batch. */ + resourceAttributes?: Record + } + + /** + * Durable RunRecord persistence. Per-vertical agents implement this + * over their own DB (Drizzle / D1 / Postgres). Optional — when omitted, + * RunRecords stay in-memory and are lost when the worker isolate ends. + */ + runRecordStore?: ProductionRunRecordStore + + /** + * Durable feedback persistence. Used by `recordFeedback`; agents wire + * their thumbs-up/down + free-text feedback endpoints to call into the + * sink, which writes a `FeedbackLabel` into a `FeedbackTrajectory`. + * + * Optional — when omitted, `recordFeedback` is a no-op. + */ + feedbackStore?: FeedbackTrajectoryStore + + /** + * Pluggable fetch — defaults to globalThis.fetch. Tests inject a + * mocked fetch. + */ + fetch?: typeof fetch + + /** + * Pluggable structured logger — defaults to console.warn for failures. + * The sink NEVER throws on flush failure; it logs and returns. + */ + log?: (msg: string, fields?: Record) => void +} + +/** + * Durable per-agent RunRecord persistence. Each vertical implements over + * its own DB. The sink calls `append` once per `endRun`. + */ +export interface ProductionRunRecordStore { + append(record: ProductionRunRecord): Promise +} + +/** + * Minimal canonical row the sink composes on `endRun`. Per-agent DB + * adapters extend with their own fields; the sink only writes what + * the runtime canonically captures. + */ +export interface ProductionRunRecord { + runId: string + projectId: string + scenarioId: string + variantId?: string + startedAt: string + endedAt: string + status: 'completed' | 'failed' | 'aborted' + pass?: boolean + score?: number + failureClass?: string + notes?: string + /** Echoed back from `emitter.startRun({ tags })`. */ + tags?: Record + /** Span row count — useful for diagnostics. */ + spanCount: number +} + +export interface ProductionTraceSink { + /** + * The TraceStore the agent's `TraceEmitter` writes to. In-memory by + * design: spans accumulate through the chat session, flush at + * `onRunComplete`. The runtime never reads from this store directly — + * the sink reads from it during the flush step. + */ + traceStore: TraceStore + + /** + * Hook the agent passes into + * `new TraceEmitter(store, { onRunComplete: [sink.onRunComplete] })`. + * Fires once per chat session at `endRun` time. Composes the + * RunRecord, persists, and ships OTLP. Errors are logged, never thrown. + */ + onRunComplete: RunCompleteHook + + /** + * Append a user feedback label (thumbs-up/down, correction, severity) + * to the FeedbackTrajectory for a completed run. Creates a minimal + * trajectory anchored to the run if one doesn't exist; appends the + * label if it does. No-op when `feedbackStore` is undefined. + * + * Returns the trajectory id (existing or freshly created) for the + * agent's API to link back to the session, or `null` on no-op / + * error. + */ + recordFeedback(input: RecordFeedbackInput): Promise +} + +export interface RecordFeedbackInput { + /** Run id from the original `emitter.startRun`. */ + runId: string + /** The user-supplied feedback label. */ + label: FeedbackLabel + /** Optional scenario id (mirrors the run's). */ + scenarioId?: string + /** Optional pre-existing trajectory id if the agent tracks them separately. */ + trajectoryId?: string +} + +// ── factory ────────────────────────────────────────────────────────── + +export function createProductionTraceSink( + opts: ProductionTraceSinkOpts, +): ProductionTraceSink { + const log = opts.log ?? defaultLog + const fetchImpl = opts.fetch ?? globalThis.fetch + const traceStore = new InMemoryTraceStore() + + const onRunComplete: RunCompleteHook = async (ctx: RunCompleteHookContext) => { + // 1. Compose canonical RunRecord and persist if a store is wired. + if (opts.runRecordStore) { + try { + const record = await composeRunRecord(traceStore, ctx, opts.projectId) + await opts.runRecordStore.append(record) + } catch (err) { + log('runRecordStore.append failed', { + runId: ctx.runId, + error: err instanceof Error ? err.message : String(err), + }) + } + } + + // 2. Export the run as OTLP and POST to the configured collector. + if (opts.otlp) { + try { + const resourceAttrs: Record = { + 'service.name': opts.projectId, + ...(opts.otlp.resourceAttributes ?? {}), + } + const otlpPayload = await exportRunAsOtlp(traceStore, ctx.runId, resourceAttrs) + const headers: Record = { 'content-type': 'application/json' } + if (opts.otlp.authHeader) headers.authorization = opts.otlp.authHeader + const res = await fetchImpl(opts.otlp.endpoint, { + method: 'POST', + headers, + body: JSON.stringify(otlpPayload), + }) + if (!res.ok) { + log('OTLP POST non-2xx', { + runId: ctx.runId, + status: res.status, + endpoint: opts.otlp.endpoint, + }) + } + } catch (err) { + log('OTLP POST threw', { + runId: ctx.runId, + error: err instanceof Error ? err.message : String(err), + endpoint: opts.otlp.endpoint, + }) + } + } + } + + const recordFeedback = async ( + input: RecordFeedbackInput, + ): Promise => { + if (!opts.feedbackStore) return null + const trajectoryId = input.trajectoryId ?? `traj-${input.runId}` + try { + const existing = await opts.feedbackStore.get(trajectoryId) + if (existing) { + await opts.feedbackStore.appendLabel(trajectoryId, input.label) + return trajectoryId + } + // Create a minimal trajectory anchored to the run; the agent's eval-time + // pipeline can backfill richer task / attempts data when it replays. + await opts.feedbackStore.save({ + id: trajectoryId, + projectId: opts.projectId, + scenarioId: input.scenarioId ?? input.runId, + task: { intent: 'chat', context: { runId: input.runId } }, + attempts: [], + labels: [input.label], + createdAt: new Date().toISOString(), + }) + return trajectoryId + } catch (err) { + log('feedbackStore write failed', { + runId: input.runId, + error: err instanceof Error ? err.message : String(err), + }) + return null + } + } + + return { traceStore, onRunComplete, recordFeedback } +} + +// ── helpers ────────────────────────────────────────────────────────── + +async function composeRunRecord( + store: TraceStore, + ctx: RunCompleteHookContext, + projectId: string, +): Promise { + const run = await store.getRun(ctx.runId) + const spans = await store.spans({ runId: ctx.runId }) + // Run timestamps are epoch ms in the trace schema; consumers downstream + // expect ISO strings (DB columns, dashboards, dataset joins) so we convert + // at the sink boundary. + const now = Date.now() + const startedAtMs = run?.startedAt ?? now + const endedAtMs = run?.endedAt ?? now + return { + runId: ctx.runId, + projectId, + scenarioId: run?.scenarioId ?? ctx.runId, + variantId: run?.variantId, + startedAt: new Date(startedAtMs).toISOString(), + endedAt: new Date(endedAtMs).toISOString(), + status: ctx.status, + pass: ctx.outcome?.pass, + score: ctx.outcome?.score, + failureClass: ctx.outcome?.failureClass, + notes: ctx.outcome?.notes, + tags: run?.tags, + spanCount: spans.length, + } +} + +function defaultLog(msg: string, fields?: Record): void { + if (fields) console.warn(`[production-trace-sink] ${msg}`, fields) + else console.warn(`[production-trace-sink] ${msg}`) +} diff --git a/tests/production-trace-sink.test.ts b/tests/production-trace-sink.test.ts new file mode 100644 index 0000000..73cbfd9 --- /dev/null +++ b/tests/production-trace-sink.test.ts @@ -0,0 +1,293 @@ +import { + type FeedbackLabel, + type FeedbackTrajectory, + type FeedbackTrajectoryStore, + TraceEmitter, +} from '@tangle-network/agent-eval' +import { describe, expect, it, vi } from 'vitest' +import { + createProductionTraceSink, + type ProductionRunRecord, + type ProductionRunRecordStore, +} from '../src/agent/production-trace-sink' + +function memoryRunRecordStore(): ProductionRunRecordStore & { + rows: ProductionRunRecord[] +} { + const rows: ProductionRunRecord[] = [] + return { + rows, + async append(record) { + rows.push(record) + }, + } +} + +function memoryFeedbackStore(): FeedbackTrajectoryStore & { + data: Map +} { + const data = new Map() + return { + data, + async save(t) { + data.set(t.id, t) + }, + async get(id) { + return data.get(id) ?? null + }, + async list() { + return [...data.values()] + }, + async appendAttempt(id, attempt) { + const t = data.get(id) + if (!t) throw new Error(`no trajectory ${id}`) + const next = { ...t, attempts: [...t.attempts, attempt] } + data.set(id, next) + return next + }, + async appendLabel(id, label) { + const t = data.get(id) + if (!t) throw new Error(`no trajectory ${id}`) + const next = { ...t, labels: [...t.labels, label] } + data.set(id, next) + return next + }, + } +} + +describe('createProductionTraceSink — RunRecord persistence', () => { + it('writes a RunRecord on endRun with status / pass / score / scenarioId', async () => { + const runRecordStore = memoryRunRecordStore() + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + runRecordStore, + }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + }) + await emitter.startRun({ scenarioId: 'sess-1', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true, score: 0.92 }) + + expect(runRecordStore.rows).toHaveLength(1) + const row = runRecordStore.rows[0]! + expect(row.projectId).toBe('tax-agent') + expect(row.scenarioId).toBe('sess-1') + expect(row.status).toBe('completed') + expect(row.pass).toBe(true) + expect(row.score).toBe(0.92) + }) + + it('captures abortRun as status=aborted with no pass/score', async () => { + const runRecordStore = memoryRunRecordStore() + const sink = createProductionTraceSink({ projectId: 'tax-agent', runRecordStore }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + }) + await emitter.startRun({ scenarioId: 'sess-abort', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.abortRun('user-cancelled') + + expect(runRecordStore.rows).toHaveLength(1) + expect(runRecordStore.rows[0]!.status).toBe('aborted') + }) + + it('captures failureClass + notes when endRun supplies them', async () => { + const runRecordStore = memoryRunRecordStore() + const sink = createProductionTraceSink({ projectId: 'tax-agent', runRecordStore }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + }) + await emitter.startRun({ scenarioId: 'sess-fail', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ + pass: false, + score: 0.1, + failureClass: 'TOOL_ERROR', + notes: 'sandbox timed out', + }) + + const row = runRecordStore.rows[0]! + expect(row.status).toBe('failed') // emitter maps pass:false → status:'failed' + expect(row.pass).toBe(false) + expect(row.failureClass).toBe('TOOL_ERROR') + expect(row.notes).toBe('sandbox timed out') + }) + + it('survives runRecordStore throwing (logs, does not crash chat handler)', async () => { + const log = vi.fn() + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + runRecordStore: { async append() { throw new Error('db down') } }, + log, + }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + hookErrors: 'throw', // assert the hook itself doesn't throw + }) + await emitter.startRun({ scenarioId: 'sess', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true, score: 1 }) + expect(log).toHaveBeenCalledWith( + 'runRecordStore.append failed', + expect.objectContaining({ error: 'db down' }), + ) + }) +}) + +describe('createProductionTraceSink — OTLP forwarding', () => { + it('POSTs the run to the configured collector with service.name resource attr', async () => { + const fetchMock = vi.fn(async () => new Response('', { status: 200 })) as unknown as typeof fetch + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + otlp: { + endpoint: 'https://langfuse.example/api/otel', + authHeader: 'Basic Zm9vOmJhcg==', + }, + fetch: fetchMock, + }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + }) + await emitter.startRun({ scenarioId: 'sess-otlp', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true, score: 0.8 }) + + expect(fetchMock).toHaveBeenCalledOnce() + const [url, init] = (fetchMock as unknown as ReturnType).mock.calls[0]! + expect(url).toBe('https://langfuse.example/api/otel') + expect((init as RequestInit).method).toBe('POST') + const headers = (init as RequestInit).headers as Record + expect(headers.authorization).toBe('Basic Zm9vOmJhcg==') + expect(headers['content-type']).toBe('application/json') + const body = JSON.parse(String((init as RequestInit).body)) + // OtlpExport has `resourceSpans[]` with resource attributes — we just confirm + // our service.name landed somewhere recognisable. + expect(JSON.stringify(body)).toContain('tax-agent') + }) + + it('logs (does not throw) on OTLP fetch failure', async () => { + const log = vi.fn() + const fetchMock = vi.fn(async () => { + throw new Error('network down') + }) as unknown as typeof fetch + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + otlp: { endpoint: 'https://x', authHeader: 'auth' }, + fetch: fetchMock, + log, + }) + const emitter = new TraceEmitter(sink.traceStore, { + onRunComplete: [sink.onRunComplete], + hookErrors: 'throw', + }) + await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true }) + expect(log).toHaveBeenCalledWith( + 'OTLP POST threw', + expect.objectContaining({ error: 'network down' }), + ) + }) + + it('logs (does not throw) on OTLP non-2xx', async () => { + const log = vi.fn() + const fetchMock = vi.fn(async () => new Response('', { status: 500 })) as unknown as typeof fetch + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + otlp: { endpoint: 'https://x', authHeader: 'auth' }, + fetch: fetchMock, + log, + }) + const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) + await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: false, score: 0 }) + expect(log).toHaveBeenCalledWith( + 'OTLP POST non-2xx', + expect.objectContaining({ status: 500 }), + ) + }) + + it('omits the auth header when authHeader is undefined', async () => { + const fetchMock = vi.fn(async () => new Response('', { status: 200 })) as unknown as typeof fetch + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + otlp: { endpoint: 'https://x' }, + fetch: fetchMock, + }) + const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) + await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true }) + const [, init] = (fetchMock as unknown as ReturnType).mock.calls[0]! + const headers = (init as RequestInit).headers as Record + expect(headers.authorization).toBeUndefined() + }) + + it('does NOT attempt OTLP when opts.otlp is undefined', async () => { + const fetchMock = vi.fn() as unknown as typeof fetch + const sink = createProductionTraceSink({ projectId: 'tax-agent', fetch: fetchMock }) + const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) + await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.endRun({ pass: true }) + expect(fetchMock).not.toHaveBeenCalled() + }) +}) + +describe('createProductionTraceSink — recordFeedback', () => { + const label: FeedbackLabel = { + source: { kind: 'user' }, + kind: 'thumbs', + value: 1, + } + + it('returns null when no feedbackStore is wired', async () => { + const sink = createProductionTraceSink({ projectId: 'tax-agent' }) + const id = await sink.recordFeedback({ runId: 'r1', label }) + expect(id).toBeNull() + }) + + it('creates a new trajectory on first feedback for a run', async () => { + const feedbackStore = memoryFeedbackStore() + const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) + const id = await sink.recordFeedback({ runId: 'r1', label }) + expect(id).toBe('traj-r1') + expect(feedbackStore.data.get('traj-r1')?.labels).toEqual([label]) + expect(feedbackStore.data.get('traj-r1')?.projectId).toBe('tax-agent') + }) + + it('appends to an existing trajectory on subsequent feedback', async () => { + const feedbackStore = memoryFeedbackStore() + const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) + await sink.recordFeedback({ runId: 'r1', label }) + const secondLabel: FeedbackLabel = { source: { kind: 'user' }, kind: 'comment', value: 'great' } + await sink.recordFeedback({ runId: 'r1', label: secondLabel }) + expect(feedbackStore.data.get('traj-r1')?.labels).toEqual([label, secondLabel]) + }) + + it('returns null + logs on feedbackStore failure (does not throw)', async () => { + const log = vi.fn() + const sink = createProductionTraceSink({ + projectId: 'tax-agent', + feedbackStore: { + async save() { throw new Error('write fail') }, + async get() { return null }, + async list() { return [] }, + async appendAttempt() { throw new Error('na') }, + async appendLabel() { throw new Error('na') }, + }, + log, + }) + const result = await sink.recordFeedback({ runId: 'r1', label }) + expect(result).toBeNull() + expect(log).toHaveBeenCalledWith( + 'feedbackStore write failed', + expect.objectContaining({ error: 'write fail' }), + ) + }) + + it('honours an explicit trajectoryId when the agent provides one', async () => { + const feedbackStore = memoryFeedbackStore() + const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) + const id = await sink.recordFeedback({ + runId: 'r1', + trajectoryId: 'agent-supplied-id', + label, + }) + expect(id).toBe('agent-supplied-id') + expect(feedbackStore.data.get('agent-supplied-id')).toBeDefined() + }) +}) From e9120968334983ba8ab815a7262d4325bf61ac5c Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 15:26:04 +0300 Subject: [PATCH 2/2] style: biome formatter --- src/agent/production-trace-sink.ts | 8 +--- tests/production-trace-sink.test.ts | 61 +++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/agent/production-trace-sink.ts b/src/agent/production-trace-sink.ts index f3728d2..7c1e53a 100644 --- a/src/agent/production-trace-sink.ts +++ b/src/agent/production-trace-sink.ts @@ -184,9 +184,7 @@ export interface RecordFeedbackInput { // ── factory ────────────────────────────────────────────────────────── -export function createProductionTraceSink( - opts: ProductionTraceSinkOpts, -): ProductionTraceSink { +export function createProductionTraceSink(opts: ProductionTraceSinkOpts): ProductionTraceSink { const log = opts.log ?? defaultLog const fetchImpl = opts.fetch ?? globalThis.fetch const traceStore = new InMemoryTraceStore() @@ -237,9 +235,7 @@ export function createProductionTraceSink( } } - const recordFeedback = async ( - input: RecordFeedbackInput, - ): Promise => { + const recordFeedback = async (input: RecordFeedbackInput): Promise => { if (!opts.feedbackStore) return null const trajectoryId = input.trajectoryId ?? `traj-${input.runId}` try { diff --git a/tests/production-trace-sink.test.ts b/tests/production-trace-sink.test.ts index 73cbfd9..5465f2d 100644 --- a/tests/production-trace-sink.test.ts +++ b/tests/production-trace-sink.test.ts @@ -83,7 +83,11 @@ describe('createProductionTraceSink — RunRecord persistence', () => { const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete], }) - await emitter.startRun({ scenarioId: 'sess-abort', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.startRun({ + scenarioId: 'sess-abort', + projectId: 'tax-agent', + layer: 'app-runtime', + }) await emitter.abortRun('user-cancelled') expect(runRecordStore.rows).toHaveLength(1) @@ -96,7 +100,11 @@ describe('createProductionTraceSink — RunRecord persistence', () => { const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete], }) - await emitter.startRun({ scenarioId: 'sess-fail', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.startRun({ + scenarioId: 'sess-fail', + projectId: 'tax-agent', + layer: 'app-runtime', + }) await emitter.endRun({ pass: false, score: 0.1, @@ -115,7 +123,11 @@ describe('createProductionTraceSink — RunRecord persistence', () => { const log = vi.fn() const sink = createProductionTraceSink({ projectId: 'tax-agent', - runRecordStore: { async append() { throw new Error('db down') } }, + runRecordStore: { + async append() { + throw new Error('db down') + }, + }, log, }) const emitter = new TraceEmitter(sink.traceStore, { @@ -133,7 +145,9 @@ describe('createProductionTraceSink — RunRecord persistence', () => { describe('createProductionTraceSink — OTLP forwarding', () => { it('POSTs the run to the configured collector with service.name resource attr', async () => { - const fetchMock = vi.fn(async () => new Response('', { status: 200 })) as unknown as typeof fetch + const fetchMock = vi.fn( + async () => new Response('', { status: 200 }), + ) as unknown as typeof fetch const sink = createProductionTraceSink({ projectId: 'tax-agent', otlp: { @@ -145,7 +159,11 @@ describe('createProductionTraceSink — OTLP forwarding', () => { const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete], }) - await emitter.startRun({ scenarioId: 'sess-otlp', projectId: 'tax-agent', layer: 'app-runtime' }) + await emitter.startRun({ + scenarioId: 'sess-otlp', + projectId: 'tax-agent', + layer: 'app-runtime', + }) await emitter.endRun({ pass: true, score: 0.8 }) expect(fetchMock).toHaveBeenCalledOnce() @@ -186,7 +204,9 @@ describe('createProductionTraceSink — OTLP forwarding', () => { it('logs (does not throw) on OTLP non-2xx', async () => { const log = vi.fn() - const fetchMock = vi.fn(async () => new Response('', { status: 500 })) as unknown as typeof fetch + const fetchMock = vi.fn( + async () => new Response('', { status: 500 }), + ) as unknown as typeof fetch const sink = createProductionTraceSink({ projectId: 'tax-agent', otlp: { endpoint: 'https://x', authHeader: 'auth' }, @@ -196,14 +216,13 @@ describe('createProductionTraceSink — OTLP forwarding', () => { const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) await emitter.endRun({ pass: false, score: 0 }) - expect(log).toHaveBeenCalledWith( - 'OTLP POST non-2xx', - expect.objectContaining({ status: 500 }), - ) + expect(log).toHaveBeenCalledWith('OTLP POST non-2xx', expect.objectContaining({ status: 500 })) }) it('omits the auth header when authHeader is undefined', async () => { - const fetchMock = vi.fn(async () => new Response('', { status: 200 })) as unknown as typeof fetch + const fetchMock = vi.fn( + async () => new Response('', { status: 200 }), + ) as unknown as typeof fetch const sink = createProductionTraceSink({ projectId: 'tax-agent', otlp: { endpoint: 'https://x' }, @@ -263,11 +282,21 @@ describe('createProductionTraceSink — recordFeedback', () => { const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore: { - async save() { throw new Error('write fail') }, - async get() { return null }, - async list() { return [] }, - async appendAttempt() { throw new Error('na') }, - async appendLabel() { throw new Error('na') }, + async save() { + throw new Error('write fail') + }, + async get() { + return null + }, + async list() { + return [] + }, + async appendAttempt() { + throw new Error('na') + }, + async appendLabel() { + throw new Error('na') + }, }, log, })