From 724bbb8914ecf744732d7c9485af6ae5598a184b Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:54:10 +0530 Subject: [PATCH 01/11] feat: add POST /track/batch ingestion endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Accept up to 1000 ITrackHandlerPayload events in a single request and dispatch each through the same per-event pipeline as POST /track. Per-event validation failures are returned in `rejected[]` with their index and reason so callers can fix and retry only the bad rows; the whole batch never fails on a single malformed item. - New zTrackBatchBody schema (envelope-only validation; per-item parsing runs in the handler so we can collect errors without short-circuiting) - Refactor buildContext into buildSharedRequestContext + buildEventContext so the batch handler fetches salts and request-IP geo once and reuses them across all events. Single-event /track keeps identical behavior. - Extract dispatchEvent so /track and /track/batch share the per-type switch; the handler stays the only place that knows about the unsupported `alias` type. - duplicateHook (the 100ms body-hash dedup) moved to be /track-only. Hashing a 1000-item array is meaningless — the hook would either drop the entire retry on hash collision, or no-op on retries that mutate __timestamp. Response shape (always 202 when envelope + auth pass): { accepted: number, rejected: [{ index, reason: 'validation' | 'internal', error }] } Co-Authored-By: Claude Opus 4.7 --- apps/api/src/controllers/track.controller.ts | 279 ++++++++++++++---- .../api/src/routes/track-batch.router.test.ts | 271 +++++++++++++++++ apps/api/src/routes/track.router.ts | 47 ++- packages/validation/src/track.validation.ts | 11 + 4 files changed, 549 insertions(+), 59 deletions(-) create mode 100644 apps/api/src/routes/track-batch.router.test.ts diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index bf6b2d4e3..8907930a7 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -22,14 +22,35 @@ import type { IIdentifyPayload, IIncrementPayload, IReplayPayload, + ITrackBatchBody, ITrackHandlerPayload, ITrackPayload, } from '@openpanel/validation'; +import { + TRACK_BATCH_MAX_EVENTS, + zTrackHandlerPayload, +} from '@openpanel/validation'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; import { getDeviceId } from '@/utils/ids'; +type Salts = Awaited>; + +/** + * Per-request data that is identical for every event in a batch. + * Computed once in the batch handler so we don't re-fetch salts/geo + * or re-parse headers N times. + */ +interface SharedRequestContext { + projectId: string; + requestIp: string; + requestUa: string; + requestHeaders: Record; + requestGeo: GeoLocation; + salts: Salts; +} + export function getStringHeaders(headers: FastifyRequest['headers']) { return Object.entries( pick( @@ -122,28 +143,65 @@ interface TrackContext { geo: GeoLocation; } -async function buildContext( - request: FastifyRequest<{ - Body: ITrackHandlerPayload; - }>, - validatedBody: ITrackHandlerPayload -): Promise { +/** + * Build the per-request shared context. Done once per HTTP request — for + * single-event /track this is just an extra struct; for /track/batch it + * lets N events share one salts + one geo lookup. + */ +async function buildSharedRequestContext( + request: FastifyRequest, +): Promise { const projectId = request.client?.projectId; if (!projectId) { throw new HttpError('Missing projectId', { status: 400 }); } - const timestamp = getTimestamp(request.timestamp, validatedBody.payload); - const ip = + const requestIp = request.clientIp; + const requestUa = request.headers['user-agent'] ?? 'unknown/1.0'; + const requestHeaders = getStringHeaders(request.headers); + + const [requestGeo, salts] = await Promise.all([ + getGeoLocation(requestIp), + getSalts(), + ]); + + return { + projectId, + requestIp, + requestUa, + requestHeaders, + requestGeo, + salts, + }; +} + +/** + * Build a per-event TrackContext from already-fetched shared data. + * Per-event work: timestamp, identity, ip override, deviceId, and a + * second geo lookup *only* if the event overrides __ip. + */ +async function buildEventContext( + shared: SharedRequestContext, + requestTimestamp: FastifyRequest['timestamp'], + validatedBody: ITrackHandlerPayload, +): Promise { + const timestamp = getTimestamp(requestTimestamp, validatedBody.payload); + + const overrideIp = validatedBody.type === 'track' && validatedBody.payload.properties?.__ip ? (validatedBody.payload.properties.__ip as string) - : request.clientIp; - const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + : undefined; + const ip = overrideIp ?? shared.requestIp; + + // Only re-fetch geo when the event overrode the IP — the common case + // (browser SDK, no __ip) reuses the request-level geo computed once. + const geo = + overrideIp && overrideIp !== shared.requestIp + ? await getGeoLocation(overrideIp) + : shared.requestGeo; - const headers = getStringHeaders(request.headers); const identity = getIdentity(validatedBody); const profileId = identity?.profileId; - if (profileId && validatedBody.type === 'track') { validatedBody.payload.profileId = profileId; } @@ -154,22 +212,19 @@ async function buildContext( ? validatedBody.payload?.properties.__deviceId : undefined; - // Get geo location (needed for track and identify) - const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const deviceIdResult = await getDeviceId({ - projectId, + projectId: shared.projectId, ip, - ua, - salts, + ua: shared.requestUa, + salts: shared.salts, overrideDeviceId, }); return { - projectId, + projectId: shared.projectId, ip, - ua, - headers, + ua: shared.requestUa, + headers: shared.requestHeaders, timestamp: { value: timestamp.timestamp, isFromPast: timestamp.isTimestampFromThePast, @@ -181,6 +236,16 @@ async function buildContext( }; } +async function buildContext( + request: FastifyRequest<{ + Body: ITrackHandlerPayload; + }>, + validatedBody: ITrackHandlerPayload, +): Promise { + const shared = await buildSharedRequestContext(request); + return buildEventContext(shared, request.timestamp, validatedBody); +} + async function handleTrack( payload: ITrackPayload, context: TrackContext @@ -361,15 +426,59 @@ async function handleAssignGroup( }); } +/** + * Dispatch a validated event to the matching per-type handler. Shared by + * /track and /track/batch. Throws HttpError(400) for the unsupported `alias` + * type so single-event and batch can both surface it consistently. + */ +async function dispatchEvent( + body: ITrackHandlerPayload, + context: TrackContext, +): Promise { + switch (body.type) { + case 'alias': + throw new HttpError('Alias is not supported', { status: 400 }); + case 'track': + await handleTrack(body.payload, context); + return; + case 'identify': + await handleIdentify(body.payload, context); + return; + case 'increment': + await handleIncrement(body.payload, context); + return; + case 'decrement': + await handleDecrement(body.payload, context); + return; + case 'replay': + await handleReplay(body.payload, context); + return; + case 'group': + await handleGroup(body.payload, context); + return; + case 'assign_group': + await handleAssignGroup(body.payload, context); + return; + default: { + // Exhaustiveness guard — body is `never` here when all variants are + // handled. If a new type is added to `ITrackHandlerPayload`, this + // surfaces as a TS error rather than a silent runtime fallthrough. + const _exhaustive: never = body; + throw new HttpError('Invalid type', { status: 400 }); + } + } +} + export async function handler( request: FastifyRequest<{ Body: ITrackHandlerPayload; }>, - reply: FastifyReply + reply: FastifyReply, ) { const validatedBody = request.body; - // Handle alias (not supported) + // Reject `alias` before building context — saves the salts/geo/deviceId work + // for a request that's going to fail anyway. if (validatedBody.type === 'alias') { return reply.status(400).send({ status: 400, @@ -378,39 +487,8 @@ export async function handler( }); } - // Build request context const context = await buildContext(request, validatedBody); - - // Dispatch to appropriate handler - switch (validatedBody.type) { - case 'track': - await handleTrack(validatedBody.payload, context); - break; - case 'identify': - await handleIdentify(validatedBody.payload, context); - break; - case 'increment': - await handleIncrement(validatedBody.payload, context); - break; - case 'decrement': - await handleDecrement(validatedBody.payload, context); - break; - case 'replay': - await handleReplay(validatedBody.payload, context); - break; - case 'group': - await handleGroup(validatedBody.payload, context); - break; - case 'assign_group': - await handleAssignGroup(validatedBody.payload, context); - break; - default: - return reply.status(400).send({ - status: 400, - error: 'Bad Request', - message: 'Invalid type', - }); - } + await dispatchEvent(validatedBody, context); reply.status(200).send({ deviceId: context.deviceId, @@ -418,6 +496,97 @@ export async function handler( }); } +type BatchItemResult = + | { index: number; status: 'accepted' } + | { + index: number; + status: 'rejected'; + reason: 'validation' | 'internal'; + error: string; + }; + +/** + * POST /track/batch — accepts up to TRACK_BATCH_MAX_EVENTS payloads in one + * request and dispatches each through the same per-event pipeline as /track. + * + * Per-event validation failures do NOT fail the whole batch: the response is + * always 202 (assuming envelope + auth pass) with `{ accepted, rejected[] }` + * so callers can fix and retry just the bad indices. + * + * Optimization: salts + request-IP geo are fetched once and shared across + * all events. Events that override `__ip` still get their own geo lookup. + */ +export async function batchHandler( + request: FastifyRequest<{ + Body: ITrackBatchBody; + }>, + reply: FastifyReply, +) { + const { events } = request.body; + + // Envelope length is already enforced by zTrackBatchBody; the cap repeat + // here is defense-in-depth in case the schema is bypassed in the future. + if (events.length === 0 || events.length > TRACK_BATCH_MAX_EVENTS) { + return reply.status(400).send({ + status: 400, + error: 'Bad Request', + message: `events must be 1..${TRACK_BATCH_MAX_EVENTS} items`, + }); + } + + const shared = await buildSharedRequestContext(request); + + const results = await Promise.all( + events.map>(async (raw, index) => { + const parsed = zTrackHandlerPayload.safeParse(raw); + if (!parsed.success) { + const issue = parsed.error.issues[0]; + const path = issue?.path?.join('.') ?? ''; + const error = path ? `${path}: ${issue?.message}` : issue?.message ?? 'invalid payload'; + return { index, status: 'rejected', reason: 'validation', error }; + } + + try { + const context = await buildEventContext( + shared, + request.timestamp, + parsed.data, + ); + await dispatchEvent(parsed.data, context); + return { index, status: 'accepted' }; + } catch (err) { + // HttpError with 4xx → caller's fault (validation-style: alias, + // unknown type, replay without session). Anything else → ours. + const isClientError = + err instanceof HttpError && err.status >= 400 && err.status < 500; + const reason: 'validation' | 'internal' = isClientError + ? 'validation' + : 'internal'; + const message = + err instanceof Error ? err.message : 'unknown error'; + if (!isClientError) { + request.log.error( + { err, index }, + 'Batch event dispatch failed', + ); + } + return { index, status: 'rejected', reason, error: message }; + } + }), + ); + + const accepted = results.filter((r) => r.status === 'accepted').length; + const rejected = results.filter( + (r): r is Extract => + r.status === 'rejected', + ); + + reply.status(202).send({ + accepted, + rejected, + }); +} + export async function fetchDeviceId( request: FastifyRequest, reply: FastifyReply diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts new file mode 100644 index 000000000..24af7e8ba --- /dev/null +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -0,0 +1,271 @@ +/** + * Integration tests for POST /track/batch. + * + * Side effects (queue, db, geo, redis) are mocked so the test runs without + * Docker. Auth uses the same getClientByIdCached mock as the insights + * router tests, except here we don't need real fixtures — we never read + * from PG/CH, we only verify the controller dispatches each item correctly. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ─── Module mocks (hoisted before imports) ──────────────────────────────────── + +const queueAdd = vi.fn().mockResolvedValue(undefined); +const upsertProfileMock = vi.fn().mockResolvedValue(undefined); + +vi.mock('@openpanel/db', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getClientByIdCached: vi.fn(), + getSalts: vi.fn().mockResolvedValue({ current: 'salt-a', previous: 'salt-b' }), + getProfileById: vi.fn().mockResolvedValue(null), + upsertProfile: upsertProfileMock, + groupBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + replayBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + }; +}); + +vi.mock('@openpanel/queue', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getEventsGroupQueueShard: vi.fn(() => ({ add: queueAdd })), + }; +}); + +vi.mock('@openpanel/geo', () => ({ + getGeoLocation: vi.fn().mockResolvedValue({ + country: 'US', + city: 'San Francisco', + region: 'CA', + longitude: -122.4, + latitude: 37.77, + }), +})); + +vi.mock('@openpanel/common/server', async (importOriginal) => { + const actual = + await importOriginal(); + return { + ...actual, + verifyPassword: vi.fn().mockResolvedValue(true), + generateDeviceId: vi.fn().mockReturnValue('device-test'), + }; +}); + +vi.mock('@openpanel/redis', async (importOriginal) => { + const actual = await importOriginal(); + const fakeRedisClient = new Proxy( + {}, + { + get: (_t, p) => { + if (p === 'status') return 'ready'; + if (p === 'multi') { + return () => ({ + hget: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]), + }); + } + return vi.fn().mockResolvedValue(null); + }, + }, + ); + return { + ...actual, + getCache: async (_key: string, _ttl: number, fn: () => Promise) => + fn(), + getLock: vi.fn().mockResolvedValue(true), + getRedisCache: vi.fn().mockReturnValue(fakeRedisClient), + }; +}); + +// ─── Imports (after mocks) ──────────────────────────────────────────────────── + +import { ClientType, getClientByIdCached } from '@openpanel/db'; +import type { FastifyInstance } from 'fastify'; +import { buildApp } from '../app'; + +// ─── Test client constants ──────────────────────────────────────────────────── + +const CLIENT_ID = '00000000-0000-0000-0000-0000000000aa'; +const CLIENT_SECRET = 'test-secret'; +const PROJECT_ID = 'test-project'; +const ORG_ID = 'test-org'; + +const AUTH = { + 'openpanel-client-id': CLIENT_ID, + 'openpanel-client-secret': CLIENT_SECRET, + 'user-agent': 'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0', + 'content-type': 'application/json', +}; + +const WRITE_CLIENT = { + id: CLIENT_ID, + type: ClientType.write, + projectId: PROJECT_ID, + organizationId: ORG_ID, + secret: 'hashed-secret', + name: 'Batch Test Client', + cors: ['*'], + description: '', + ignoreCorsAndSecret: true, + createdAt: new Date(), + updatedAt: new Date(), + project: { + id: PROJECT_ID, + organizationId: ORG_ID, + cors: ['*'], + filters: [], + allowUnsafeRevenueTracking: true, + }, +}; + +// ─── Lifecycle ──────────────────────────────────────────────────────────────── + +let app: FastifyInstance; + +beforeAll(async () => { + vi.mocked(getClientByIdCached).mockResolvedValue(WRITE_CLIENT as any); + app = await buildApp({ testing: true }); + await app.ready(); +}, 30_000); + +afterAll(async () => { + await app.close(); +}, 10_000); + +beforeEach(() => { + queueAdd.mockClear(); + upsertProfileMock.mockClear(); +}); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function postBatch(body: unknown, headers: Record = AUTH) { + return app.inject({ + method: 'POST', + url: '/track/batch', + headers, + payload: body as any, + }); +} + +const validTrack = (name = 'page_view') => ({ + type: 'track' as const, + payload: { name, properties: { __path: '/home' } }, +}); + +const validIdentify = (profileId = 'user-1') => ({ + type: 'identify' as const, + payload: { profileId, email: 'a@b.com' }, +}); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('POST /track/batch — auth & envelope', () => { + it('returns 401 without client-id', async () => { + const res = await postBatch({ events: [validTrack()] }, { + 'content-type': 'application/json', + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 400 on empty events array', async () => { + const res = await postBatch({ events: [] }); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 on missing events field', async () => { + const res = await postBatch({}); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 when array exceeds 1000 events', async () => { + const events = Array.from({ length: 1001 }, () => validTrack()); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /track/batch — happy path', () => { + it('accepts a single track event and queues it', async () => { + const res = await postBatch({ events: [validTrack()] }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('accepts a mixed batch (track + identify) and dispatches each', async () => { + const res = await postBatch({ + events: [validTrack('signup'), validIdentify('alice'), validTrack('purchase')], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 3, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); // two `track` events + expect(upsertProfileMock).toHaveBeenCalledTimes(1); // one `identify` + }); + + it('treats each event as if sent one by one (per-event queue add)', async () => { + const events = Array.from({ length: 5 }, (_, i) => validTrack(`event_${i}`)); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 5, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(5); + }); +}); + +describe('POST /track/batch — per-item validation', () => { + it('rejects bad rows by index without failing the batch', async () => { + const res = await postBatch({ + events: [ + validTrack('good_1'), + { type: 'track', payload: { name: '' } }, // empty name → invalid + validTrack('good_2'), + { type: 'wrong-type', payload: {} }, // unknown discriminator + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(2); + expect(body.rejected).toHaveLength(2); + expect(body.rejected.map((r: { index: number }) => r.index).sort()).toEqual([1, 3]); + expect(body.rejected.every((r: { reason: string }) => r.reason === 'validation')).toBe(true); + expect(queueAdd).toHaveBeenCalledTimes(2); + }); + + it('rejects alias as per-item validation (does not 400 the whole batch)', async () => { + const res = await postBatch({ + events: [ + validTrack(), + { type: 'alias', payload: { profileId: 'user-1', alias: 'u1' } }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/alias/i); + }); + + it('returns 202 with accepted=0 when every event fails validation', async () => { + const res = await postBatch({ + events: [ + { type: 'track', payload: { name: '' } }, + { type: 'track', payload: {} }, + { type: 'identify', payload: {} }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(3); + expect(queueAdd).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 07afbcdc2..b20212a31 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,25 +1,40 @@ -import { zTrackHandlerPayload } from '@openpanel/validation'; +import { + TRACK_BATCH_MAX_EVENTS, + zTrackBatchBody, + zTrackHandlerPayload, +} from '@openpanel/validation'; import type { FastifyPluginAsyncZodOpenApi } from 'fastify-zod-openapi'; import { z } from 'zod'; -import { fetchDeviceId, handler } from '@/controllers/track.controller'; +import { + batchHandler, + fetchDeviceId, + handler, +} from '@/controllers/track.controller'; import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; +// Per-route body limit for /track/batch. Each event is small (a few KB at +// most after Zod constraints), 1000 events × ~5 KB ≈ 5 MB headroom. +const TRACK_BATCH_BODY_LIMIT_BYTES = 5 * 1024 * 1024; + const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { - fastify.addHook('preValidation', duplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); await fastify.route({ method: 'POST', url: '/', + // The 100 ms body-hash dedup only runs on the single-event endpoint — + // applying it batch-wide would drop a whole 1000-event retry on hash + // collision, which is the opposite of what we want. + preValidation: duplicateHook, schema: { body: zTrackHandlerPayload.and( z.object({ clientId: z.string().optional(), clientSecret: z.string().optional(), - }) + }), ), tags: ['Track'], description: @@ -34,6 +49,30 @@ const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { handler, }); + await fastify.route({ + method: 'POST', + url: '/batch', + bodyLimit: TRACK_BATCH_BODY_LIMIT_BYTES, + schema: { + body: zTrackBatchBody, + tags: ['Track'], + description: `Bulk-ingest up to ${TRACK_BATCH_MAX_EVENTS} tracking events in a single request. Each event is dispatched through the same pipeline as POST /track. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, + response: { + 202: z.object({ + accepted: z.number().int().min(0), + rejected: z.array( + z.object({ + index: z.number().int().min(0), + reason: z.enum(['validation', 'internal']), + error: z.string(), + }), + ), + }), + }, + }, + handler: batchHandler, + }); + await fastify.route({ method: 'GET', url: '/device-id', diff --git a/packages/validation/src/track.validation.ts b/packages/validation/src/track.validation.ts index c97b51a08..ec8dea73b 100644 --- a/packages/validation/src/track.validation.ts +++ b/packages/validation/src/track.validation.ts @@ -220,6 +220,17 @@ export const zTrackHandlerPayload = z.discriminatedUnion('type', [ .meta({ title: 'Assign Group' }), ]) satisfies z.ZodType; +// Batch ingestion: envelope is validated strictly (array length only); per-event +// validation runs inside the controller via `safeParse(zTrackHandlerPayload)` so +// invalid items can be rejected per-index without failing the whole batch. +export const TRACK_BATCH_MAX_EVENTS = 1000; + +export const zTrackBatchBody = z.object({ + events: z.array(z.unknown()).min(1).max(TRACK_BATCH_MAX_EVENTS), +}); + +export type ITrackBatchBody = z.infer; + // Deprecated types for beta version of the SDKs export interface DeprecatedOpenpanelEventOptions { From c7739840d32368154ee3d2381f7c5d96a529fb88 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:54:19 +0530 Subject: [PATCH 02/11] fix: vi.mock hoisting + lint cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap shared spies in `vi.hoisted(...)` so they exist when the hoisted vi.mock factories run. Without this, CI failed with `Cannot access 'upsertProfileMock' before initialization` (TDZ inside the factory). - Drop the post-schema defensive length check in batchHandler — the route schema enforces `.min(1).max(1000)` before the handler sees the body, so the runtime guard was unreachable. - Rework dispatchEvent's exhaustiveness check to actually use the never-typed local in the thrown message. The previous `_exhaustive` was unused and would trip Biome's unused-variable rule. Co-Authored-By: Claude Opus 4.7 --- apps/api/src/controllers/track.controller.ts | 28 ++++++------------- .../api/src/routes/track-batch.router.test.ts | 12 ++++++-- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 8907930a7..2ab3b00cc 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -26,10 +26,7 @@ import type { ITrackHandlerPayload, ITrackPayload, } from '@openpanel/validation'; -import { - TRACK_BATCH_MAX_EVENTS, - zTrackHandlerPayload, -} from '@openpanel/validation'; +import { zTrackHandlerPayload } from '@openpanel/validation'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; @@ -460,11 +457,13 @@ async function dispatchEvent( await handleAssignGroup(body.payload, context); return; default: { - // Exhaustiveness guard — body is `never` here when all variants are - // handled. If a new type is added to `ITrackHandlerPayload`, this - // surfaces as a TS error rather than a silent runtime fallthrough. - const _exhaustive: never = body; - throw new HttpError('Invalid type', { status: 400 }); + // Exhaustiveness check: `body` narrows to `never` when every variant + // of ITrackHandlerPayload['type'] is handled. Adding a new variant + // makes this assignment fail to compile. + const exhaustive: never = body; + throw new HttpError(`Unhandled event type: ${exhaustive}`, { + status: 400, + }); } } } @@ -523,17 +522,6 @@ export async function batchHandler( reply: FastifyReply, ) { const { events } = request.body; - - // Envelope length is already enforced by zTrackBatchBody; the cap repeat - // here is defense-in-depth in case the schema is bypassed in the future. - if (events.length === 0 || events.length > TRACK_BATCH_MAX_EVENTS) { - return reply.status(400).send({ - status: 400, - error: 'Bad Request', - message: `events must be 1..${TRACK_BATCH_MAX_EVENTS} items`, - }); - } - const shared = await buildSharedRequestContext(request); const results = await Promise.all( diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 24af7e8ba..9711d1ac8 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -10,9 +10,15 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; // ─── Module mocks (hoisted before imports) ──────────────────────────────────── - -const queueAdd = vi.fn().mockResolvedValue(undefined); -const upsertProfileMock = vi.fn().mockResolvedValue(undefined); +// +// `vi.mock` is hoisted above all top-level statements, so any spies the factory +// references must be created via `vi.hoisted(...)` (also hoisted) — otherwise +// the factory runs first and hits a temporal-dead-zone ReferenceError. + +const { queueAdd, upsertProfileMock } = vi.hoisted(() => ({ + queueAdd: vi.fn().mockResolvedValue(undefined), + upsertProfileMock: vi.fn().mockResolvedValue(undefined), +})); vi.mock('@openpanel/db', async (importOriginal) => { const actual = await importOriginal(); From d13c3e39edf999f45cd0a29906abe52782df2d85 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:55:05 +0530 Subject: [PATCH 03/11] feat: bump batch limit to 2000 events / 10 MB Updates the constant, the route's bodyLimit, the OpenAPI description, and the over-the-cap test fixture. Note: this only changes transport limits. Real mobile/offline use of the batch endpoint is still limited by pre-existing session-derivation behavior (15-min isTimestampFromThePast threshold, session_start row inflation under concurrent device events). Those will be addressed in follow-up commits. Co-Authored-By: Claude Opus 4.7 --- apps/api/src/routes/track-batch.router.test.ts | 4 ++-- apps/api/src/routes/track.router.ts | 8 ++++---- packages/validation/src/track.validation.ts | 4 +++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 9711d1ac8..5b03c8188 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -188,8 +188,8 @@ describe('POST /track/batch — auth & envelope', () => { expect(res.statusCode).toBe(400); }); - it('returns 400 when array exceeds 1000 events', async () => { - const events = Array.from({ length: 1001 }, () => validTrack()); + it('returns 400 when array exceeds the per-request cap', async () => { + const events = Array.from({ length: 2001 }, () => validTrack()); const res = await postBatch({ events }); expect(res.statusCode).toBe(400); }); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index b20212a31..9d1d73b5a 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -14,9 +14,9 @@ import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; -// Per-route body limit for /track/batch. Each event is small (a few KB at -// most after Zod constraints), 1000 events × ~5 KB ≈ 5 MB headroom. -const TRACK_BATCH_BODY_LIMIT_BYTES = 5 * 1024 * 1024; +// Per-route body limit for /track/batch: 10 MB uncompressed, matching the +// stated public contract ("up to 2000 events and 10 MB per request"). +const TRACK_BATCH_BODY_LIMIT_BYTES = 10 * 1024 * 1024; const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { fastify.addHook('preHandler', clientHook); @@ -56,7 +56,7 @@ const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { schema: { body: zTrackBatchBody, tags: ['Track'], - description: `Bulk-ingest up to ${TRACK_BATCH_MAX_EVENTS} tracking events in a single request. Each event is dispatched through the same pipeline as POST /track. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, + description: `We accept up to ${TRACK_BATCH_MAX_EVENTS} events and 10MB uncompressed per request. Events are part of the request body. Each event is dispatched through the same pipeline as POST /track. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, response: { 202: z.object({ accepted: z.number().int().min(0), diff --git a/packages/validation/src/track.validation.ts b/packages/validation/src/track.validation.ts index ec8dea73b..116777cc3 100644 --- a/packages/validation/src/track.validation.ts +++ b/packages/validation/src/track.validation.ts @@ -223,7 +223,9 @@ export const zTrackHandlerPayload = z.discriminatedUnion('type', [ // Batch ingestion: envelope is validated strictly (array length only); per-event // validation runs inside the controller via `safeParse(zTrackHandlerPayload)` so // invalid items can be rejected per-index without failing the whole batch. -export const TRACK_BATCH_MAX_EVENTS = 1000; +// +// Per-request caps: up to 2000 events and 10 MB uncompressed body. +export const TRACK_BATCH_MAX_EVENTS = 2000; export const zTrackBatchBody = z.object({ events: z.array(z.unknown()).min(1).max(TRACK_BATCH_MAX_EVENTS), From 605bf0ae1fe3515dd35d5893576c841f460774d9 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:56:30 +0530 Subject: [PATCH 04/11] feat: deterministic session_id for all events at API edge Plumb the event's __timestamp through getDeviceId/getInfoFromSession so the deterministic 30-min session bucket is computed from the event's own time, not Date.now(). Critical for /track/batch where one request can carry events spanning days. Two behavior changes: 1. __deviceId override path no longer short-circuits to sessionId: ''. It now still hits the live-session Redis lookup (keyed on the supplied deviceId) and falls back to the deterministic bucket computed at the event's __timestamp. SDKs that supply a stable client-side device id (mobile, server) get sessionful events. 2. The deterministic getSessionId fallback in getInfoFromSession now receives eventMs explicitly. Historical/buffered events bucket into the wall-clock window they actually happened in, so events arriving in different batches with the same __timestamp window collapse to one session_id (merge on read). Live-traffic behavior is unchanged: when a Redis sessionEnd key exists, we still return that session's id from the live mechanism. Only the fallback path changed. The isTimestampFromThePast field on the queue payload stays in place for now; the worker still reads it. A follow-up commit removes both producer and consumer in one step so neither side is broken in isolation. Co-Authored-By: Claude Opus 4.7 --- apps/api/src/controllers/event.controller.ts | 1 + apps/api/src/controllers/track.controller.ts | 4 + .../api/src/routes/track-batch.router.test.ts | 104 ++++++++++++++++++ apps/api/src/utils/ids.ts | 24 +++- 4 files changed, 131 insertions(+), 2 deletions(-) diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 59a6d0f4a..346500c5a 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -34,6 +34,7 @@ export async function postEvent( ip, ua, salts, + eventMs: new Date(timestamp).getTime(), }); const uaInfo = parseUserAgent(ua, request.body?.properties); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 2ab3b00cc..28e6b19b5 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -215,6 +215,10 @@ async function buildEventContext( ua: shared.requestUa, salts: shared.salts, overrideDeviceId, + // Bucket the deterministic session_id by the event's own __timestamp, + // not the wall-clock moment the request arrived. Critical for + // /track/batch where one request can contain events spanning days. + eventMs: timestamp.timestamp, }); return { diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 5b03c8188..08b472142 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -260,6 +260,110 @@ describe('POST /track/batch — per-item validation', () => { expect(body.rejected[0].error).toMatch(/alias/i); }); + it('populates sessionId for events with __deviceId override', async () => { + // When a client supplies __deviceId, the API still resolves a + // sessionId — first via the live-session Redis lookup, then a + // deterministic 30-min bucket keyed on the event's __timestamp. + // Both deviceId and sessionId reach the queue. + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_device_override', + properties: { + __deviceId: 'mobile-device-abc', + __path: '/home', + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.deviceId).toBe('mobile-device-abc'); + expect(queuedJob.data.sessionId).toBeTruthy(); + expect(queuedJob.data.sessionId.length).toBeGreaterThan(0); + }); + + it('buckets historical events by __timestamp, not request time', async () => { + // Two events on the same device, 1h apart in __timestamp. They + // should land in different deterministic 30-min buckets and thus + // get different sessionIds, even though they arrive in the same + // request. + const baseMs = Date.UTC(2026, 4, 1, 10, 0, 0); // 10:00 UTC + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_bucket_a', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_bucket_b', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs + 60 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBeTruthy(); + expect(sessionIdB).toBeTruthy(); + expect(sessionIdA).not.toBe(sessionIdB); + }); + + it('shares sessionId across events in the same 30-min bucket', async () => { + // Two events on the same device, 5 min apart inside the same + // wall-clock 30-min bucket. They should share a sessionId. + const baseMs = Date.UTC(2026, 4, 1, 10, 5, 0); // 10:05 UTC + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_a', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_b', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs + 5 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBe(sessionIdB); + }); + it('returns 202 with accepted=0 when every event fails validation', async () => { const res = await postBatch({ events: [ diff --git a/apps/api/src/utils/ids.ts b/apps/api/src/utils/ids.ts index 3614da1b7..d579845fb 100644 --- a/apps/api/src/utils/ids.ts +++ b/apps/api/src/utils/ids.ts @@ -10,15 +10,31 @@ export async function getDeviceId({ ua, salts, overrideDeviceId, + eventMs, }: { projectId: string; ip: string; ua: string | undefined; salts: { current: string; previous: string }; overrideDeviceId?: string; -}) { + /** + * Wall-clock time of the event being processed. Used as the bucket input for + * the deterministic session_id fallback so historical/buffered events bucket + * into the window they actually happened in, not the moment the API + * received them. + */ + eventMs: number; +}): Promise { + // Client-supplied stable device id (mobile/server SDKs). We still need to + // resolve a sessionId — first try the live session lookup keyed on this + // exact deviceId, then fall back to the deterministic 30-min bucket. if (overrideDeviceId) { - return { deviceId: overrideDeviceId, sessionId: '' }; + return getInfoFromSession({ + projectId, + currentDeviceId: overrideDeviceId, + previousDeviceId: overrideDeviceId, + eventMs, + }); } if (!ua) { @@ -42,6 +58,7 @@ export async function getDeviceId({ projectId, currentDeviceId, previousDeviceId, + eventMs, }); } @@ -54,10 +71,12 @@ async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, + eventMs, }: { projectId: string; currentDeviceId: string; previousDeviceId: string; + eventMs: number; }): Promise { try { const multi = getRedisCache().multi(); @@ -101,6 +120,7 @@ async function getInfoFromSession({ sessionId: getSessionId({ projectId, deviceId: currentDeviceId, + eventMs, graceMs: 5 * 1000, windowMs: 1000 * 60 * 30, }), From 06559ba20388bf3996024b28d3130c08cb55fb19 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:57:27 +0530 Subject: [PATCH 05/11] feat: reject events with __timestamp older than 5 days Events whose __timestamp is more than 5 days in the past throw HttpError(400) from getTimestamp, which the existing batchHandler error path routes to a per-row { reason: 'validation', error: '...' } entry. Single-event /track returns 400 to the caller via the standard handler error path. Window: hard-coded 5 days. Not per-project configurable. Future-timestamp tolerance unchanged: clientTimestamp > now + 1 min still falls back to server time. Co-Authored-By: Claude Opus 4.7 --- apps/api/src/controllers/track.controller.ts | 10 +++++ .../api/src/routes/track-batch.router.test.ts | 42 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 28e6b19b5..ee101a14a 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -109,6 +109,9 @@ export function getTimestamp( // Constants for time validation const ONE_MINUTE_MS = 60 * 1000; const FIFTEEN_MINUTES_MS = 15 * ONE_MINUTE_MS; + // Hard floor for accepted historical events. Public contract for /track + // and /track/batch, hard-coded (not per-project configurable). + const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000; // Use safeTimestamp if invalid or more than 1 minute in the future if ( @@ -118,6 +121,13 @@ export function getTimestamp( return { timestamp: safeTimestamp, isTimestampFromThePast: false }; } + // Reject events older than 5 days. In /track/batch this surfaces as a + // per-row { reason: 'validation' } entry in rejected[]; in single-event + // /track it returns 400 to the caller. + if (clientTimestampNumber < safeTimestamp - FIVE_DAYS_MS) { + throw new HttpError('event timestamp older than 5 days', { status: 400 }); + } + // isTimestampFromThePast is true only if timestamp is older than 15 minutes const isTimestampFromThePast = clientTimestampNumber < safeTimestamp - FIFTEEN_MINUTES_MS; diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 08b472142..5b956ff0d 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -364,6 +364,48 @@ describe('POST /track/batch — per-item validation', () => { expect(sessionIdA).toBe(sessionIdB); }); + it('rejects events with __timestamp older than 5 days', async () => { + // Older events should be rejected per-row with a clear reason. + const sixDaysAgo = Date.now() - 6 * 24 * 60 * 60 * 1000; + const fourDaysAgo = Date.now() - 4 * 24 * 60 * 60 * 1000; + const res = await postBatch({ + events: [ + // valid (4 days old, within window) + { + type: 'track' as const, + payload: { + name: 'probe_within_window', + properties: { + __deviceId: 'd-1', + __timestamp: new Date(fourDaysAgo).toISOString(), + }, + }, + }, + // too old (6 days) + { + type: 'track' as const, + payload: { + name: 'probe_too_old', + properties: { + __deviceId: 'd-2', + __timestamp: new Date(sixDaysAgo).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/5 days/i); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + it('returns 202 with accepted=0 when every event fails validation', async () => { const res = await postBatch({ events: [ From 7df365ebea3a686ea8b11448cb9f11ec8b01dcf0 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 17:59:13 +0530 Subject: [PATCH 06/11] feat: split live vs historical worker dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stop blanking deviceId/sessionId for events that aren't currently live. With the API now computing a deterministic session_id for every event using its __timestamp, the worker can trust those values instead of replacing them with empty strings. The worker now branches on three cases: 1. Server-side event (uaInfo.isServer): enrich from sessionBuffer when a profileId resolves to an existing browser session; otherwise keep the API-computed identity. Never schedules sessionEnd. 2. Historical event (not server, __timestamp older than SESSION_TIMEOUT): write the event with the API-computed identity. No sessionEnd scheduling, no extendSessionEndJob — historical events must not mutate live session state. 3. Live event (default browser SDK traffic): unchanged. Emit session_start when no Redis sessionEnd key exists, schedule the 30-min sessionEnd job, or extend it if already present. Also drops the isTimestampFromThePast field from the queue payload and all its producers. The "is this event currently live?" decision is now local to the worker and uses the same SESSION_TIMEOUT constant the sessionEnd job already runs on (no new tunable). Documented compromise: historical sessions never get a session_end row written in CH. Their is_bounce / duration are null/0. Acceptable trade-off; alternative is a much bigger change involving lazy session-row creation on bucket close. Co-Authored-By: Claude Opus 4.7 --- apps/api/src/controllers/event.controller.ts | 6 +- apps/api/src/controllers/track.controller.ts | 18 +--- apps/worker/src/jobs/events.incoming-event.ts | 87 +++++++++++-------- .../src/jobs/events.incoming-events.test.ts | 49 +++++++++-- packages/queue/src/queues.ts | 1 - 5 files changed, 100 insertions(+), 61 deletions(-) diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 346500c5a..9f7808b9f 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -14,10 +14,7 @@ export async function postEvent( }>, reply: FastifyReply ) { - const { timestamp, isTimestampFromThePast } = getTimestamp( - request.timestamp, - request.body - ); + const { timestamp } = getTimestamp(request.timestamp, request.body); const ip = request.clientIp; const ua = request.headers['user-agent'] ?? 'unknown/1.0'; const projectId = request.client?.projectId; @@ -49,7 +46,6 @@ export async function postEvent( event: { ...request.body, timestamp, - isTimestampFromThePast, }, uaInfo, geo, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index ee101a14a..4d65ba2c2 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -100,7 +100,7 @@ export function getTimestamp( : undefined; if (!userDefinedTimestamp) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } const clientTimestamp = new Date(userDefinedTimestamp); @@ -108,7 +108,6 @@ export function getTimestamp( // Constants for time validation const ONE_MINUTE_MS = 60 * 1000; - const FIFTEEN_MINUTES_MS = 15 * ONE_MINUTE_MS; // Hard floor for accepted historical events. Public contract for /track // and /track/batch, hard-coded (not per-project configurable). const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000; @@ -118,7 +117,7 @@ export function getTimestamp( Number.isNaN(clientTimestampNumber) || clientTimestampNumber > safeTimestamp + ONE_MINUTE_MS ) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } // Reject events older than 5 days. In /track/batch this surfaces as a @@ -128,14 +127,7 @@ export function getTimestamp( throw new HttpError('event timestamp older than 5 days', { status: 400 }); } - // isTimestampFromThePast is true only if timestamp is older than 15 minutes - const isTimestampFromThePast = - clientTimestampNumber < safeTimestamp - FIFTEEN_MINUTES_MS; - - return { - timestamp: clientTimestampNumber, - isTimestampFromThePast, - }; + return { timestamp: clientTimestampNumber }; } interface TrackContext { @@ -143,7 +135,7 @@ interface TrackContext { ip: string; ua?: string; headers: Record; - timestamp: { value: number; isFromPast: boolean }; + timestamp: { value: number }; identity?: IIdentifyPayload; deviceId: string; sessionId: string; @@ -238,7 +230,6 @@ async function buildEventContext( headers: shared.requestHeaders, timestamp: { value: timestamp.timestamp, - isFromPast: timestamp.isTimestampFromThePast, }, identity, deviceId: deviceIdResult.deviceId, @@ -284,7 +275,6 @@ async function handleTrack( ...payload, groups: payload.groups ?? [], timestamp: timestamp.value, - isTimestampFromThePast: timestamp.isFromPast, }, uaInfo, geo, diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 5035e3186..fa71cfc16 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,6 +1,6 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; -import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; +import type { IServiceCreateEventPayload } from '@openpanel/db'; import { checkNotificationRulesForEvent, createEvent, @@ -16,6 +16,7 @@ import { createSessionEndJob, extendSessionEndJob, getActiveSessionEndJob, + SESSION_TIMEOUT, } from '@/utils/session-handler'; const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; @@ -113,7 +114,14 @@ export async function incomingEvent( // this will get the profileId from the alias table if it exists const profileId = body.profileId ? String(body.profileId) : ''; const createdAt = new Date(body.timestamp); - const isTimestampFromThePast = body.isTimestampFromThePast; + // "Live" = the event is recent enough that it could plausibly belong to + // an active in-memory session. We use the same window as SESSION_TIMEOUT + // (30 min) so historical events never push the live sessionEnd job + // forward or create new live sessions. Server-side events are always + // treated as non-live (they get session enrichment from sessionBuffer + // when a profile is supplied; otherwise they keep the API-computed id). + const isLiveEvent = + !uaInfo.isServer && Date.now() - createdAt.getTime() <= SESSION_TIMEOUT; const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) @@ -162,40 +170,42 @@ export async function incomingEvent( : undefined, }; - // if timestamp is from the past we dont want to create a new session - if (uaInfo.isServer || isTimestampFromThePast) { - const session = - profileId && !isTimestampFromThePast - ? await sessionBuffer.getExistingSession({ - profileId, - projectId, - }) - : null; + // Server-side events: when a profileId is supplied, enrich from the + // user's most recent browser session (deviceId, sessionId, geo, UA, path, + // referrer). Without a session, fall back to the API-computed identity. + // Server events never create or extend live sessions in Redis. + if (uaInfo.isServer) { + const enrichment = profileId + ? await sessionBuffer.getExistingSession({ profileId, projectId }) + : null; - const payload = { - ...baseEvent, - deviceId: session?.device_id ?? '', - sessionId: session?.id ?? '', - referrer: session?.referrer ?? undefined, - referrerName: session?.referrer_name ?? undefined, - referrerType: session?.referrer_type ?? undefined, - path: session?.exit_path ?? baseEvent.path, - origin: session?.exit_origin ?? baseEvent.origin, - os: session?.os ?? baseEvent.os, - osVersion: session?.os_version ?? baseEvent.osVersion, - browserVersion: session?.browser_version ?? baseEvent.browserVersion, - browser: session?.browser ?? baseEvent.browser, - device: session?.device ?? baseEvent.device, - brand: session?.brand ?? baseEvent.brand, - model: session?.model ?? baseEvent.model, - city: session?.city ?? baseEvent.city, - country: session?.country ?? baseEvent.country, - region: session?.region ?? baseEvent.region, - longitude: session?.longitude ?? baseEvent.longitude, - latitude: session?.latitude ?? baseEvent.latitude, - }; + const payload: IServiceCreateEventPayload = enrichment + ? { + ...baseEvent, + deviceId: enrichment.device_id, + sessionId: enrichment.id, + referrer: enrichment.referrer ?? undefined, + referrerName: enrichment.referrer_name ?? undefined, + referrerType: enrichment.referrer_type ?? undefined, + path: enrichment.exit_path ?? baseEvent.path, + origin: enrichment.exit_origin ?? baseEvent.origin, + os: enrichment.os ?? baseEvent.os, + osVersion: enrichment.os_version ?? baseEvent.osVersion, + browserVersion: + enrichment.browser_version ?? baseEvent.browserVersion, + browser: enrichment.browser ?? baseEvent.browser, + device: enrichment.device ?? baseEvent.device, + brand: enrichment.brand ?? baseEvent.brand, + model: enrichment.model ?? baseEvent.model, + city: enrichment.city ?? baseEvent.city, + country: enrichment.country ?? baseEvent.country, + region: enrichment.region ?? baseEvent.region, + longitude: enrichment.longitude ?? baseEvent.longitude, + latitude: enrichment.latitude ?? baseEvent.latitude, + } + : baseEvent; - return createEventAndNotify(payload as IServiceEvent, logger, projectId); + return createEventAndNotify(payload, logger, projectId); } const activeSessionEndJob = await getActiveSessionEndJob( @@ -219,6 +229,15 @@ export async function incomingEvent( return null; } + // Historical (buffered) events: the API has already computed a + // deterministic sessionId for them. Write the event but do NOT mutate + // live session state — historical events shouldn't extend the user's + // current session or schedule a 30-min sessionEnd timer. + // Session_start emission for historical buckets lands in a follow-up. + if (!isLiveEvent) { + return createEventAndNotify(payload, logger, projectId); + } + if (activeSessionEndJob) { await extendSessionEndJob({ projectId, diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 30547d7dc..7f8167342 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -79,7 +79,6 @@ describe('incomingEvent', () => { event: { name: 'test_event', timestamp: timestamp.toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -174,7 +173,6 @@ describe('incomingEvent', () => { name: 'test_event', timestamp: timestamp.toISOString(), properties: { __path: 'https://example.com/test' }, - isTimestampFromThePast: false, }, headers: { 'request-id': '123', @@ -256,7 +254,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -361,7 +358,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -379,6 +375,9 @@ describe('incomingEvent', () => { expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ name: 'server_event', + // Server event with profileId but no existing session: keep the + // API-computed identity instead of blanking deviceId/sessionId. + // The fixture sends '' for both so that's what we expect here. deviceId: '', sessionId: '', profileId: 'profile-123', @@ -405,9 +404,11 @@ describe('incomingEvent', () => { duration: 0, path: '', origin: '', - referrer: undefined, + // baseEvent fields fall through uniformly: empty strings for + // referrer/referrerType, undefined for referrerName. + referrer: '', referrerName: undefined, - referrerType: undefined, + referrerType: '', sdkName: 'server', sdkVersion: '1.0.0', groups: [], @@ -434,7 +435,6 @@ describe('incomingEvent', () => { event: { name: eventName, timestamp: new Date().toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -485,4 +485,39 @@ describe('incomingEvent', () => { // events extend the existing one via changeDelay. expect(spySessionsQueueAdd).toHaveBeenCalledTimes(1); }); + + it('historical event preserves API-computed deviceId/sessionId', async () => { + // Event with __timestamp older than SESSION_TIMEOUT (30 min). Worker + // should write it with the deviceId/sessionId the API computed, + // without scheduling sessionEnd (live state untouched). + const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'historical_event', + timestamp: oneHourAgo.toISOString(), + properties: { __path: 'https://example.com/replay' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15', + 'openpanel-sdk-name': 'react-native', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId: 'mobile-device-xyz', + sessionId: 'deterministic-bucket-id', + }; + + await incomingEvent(jobData); + + expect(sessionsQueue.add).not.toHaveBeenCalled(); + expect(createEvent as Mock).toHaveBeenCalledTimes(1); + const written = (createEvent as Mock).mock.calls[0]![0]; + expect(written.deviceId).toBe('mobile-device-xyz'); + expect(written.sessionId).toBe('deterministic-bucket-id'); + expect(written.name).toBe('historical_event'); + }); }); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index bb7b8cb45..8858bb052 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -33,7 +33,6 @@ export interface EventsQueuePayloadIncomingEvent { projectId: string; event: ITrackPayload & { timestamp: string | number; - isTimestampFromThePast: boolean; }; uaInfo: | { From 78b86fc2a3817161660a65ddf0a76bf0ebbc5aa4 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 18:00:28 +0530 Subject: [PATCH 07/11] feat: Redis lock for session_start dedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-introduce the (projectId, sessionId)-keyed lock that was removed on the bet that groupmq's per-groupId serialization would prevent duplicates. That bet holds for single-event live traffic but fails for batch ingestion: when N events for the same device land in /track/batch, all N see no Redis sessionEnd key during the API's parallel dispatch and queue with session: undefined, causing the worker to emit N session_start rows. Live verification on production showed the symptom: a 1000-event batch across 25 devices produced 1000 session_start rows instead of 25. The lock: - Key: session_start:{projectId}:{sessionId} - TTL: SESSION_TIMEOUT (30 min) — same constant the live mechanism already uses, no new tunable - Keyed on sessionId not deviceId so historical events from the same device but different 30-min buckets each get their own session_start Both the live and historical worker branches now gate session_start emission on lock acquisition. When the lock is not acquired: - Live branch: still schedule sessionEnd (idempotent on jobId — no-op if already present), still write the event row - Historical branch: still write the event row, skip session_start Co-Authored-By: Claude Opus 4.7 --- apps/worker/src/jobs/events.incoming-event.ts | 73 +++++++++++++++-- .../src/jobs/events.incoming-events.test.ts | 78 +++++++++++++++++-- 2 files changed, 141 insertions(+), 10 deletions(-) diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index fa71cfc16..758f1839f 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -10,6 +10,7 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import { getLock } from '@openpanel/redis'; import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; import { @@ -19,6 +20,34 @@ import { SESSION_TIMEOUT, } from '@/utils/session-handler'; +/** + * Acquire a Redis-backed lock that prevents duplicate session_start rows for + * the same `(projectId, sessionId)`. Returns true if THIS caller should emit + * the session_start row; false if another worker (or earlier event in the + * same batch) already claimed it. + * + * TTL matches SESSION_TIMEOUT — a session can't extend beyond 30 min of + * inactivity in the live mechanism, and the deterministic bucket is exactly + * 30 min wide. By the time the lock TTL elapses, the session itself has + * rolled. + * + * Keyed on sessionId (not deviceId) so historical events from the same + * device but different 30-min buckets each get their own session_start. + */ +async function acquireSessionStartLock( + projectId: string, + sessionId: string, +): Promise { + if (!sessionId) { + return false; + } + return getLock( + `session_start:${projectId}:${sessionId}`, + '1', + SESSION_TIMEOUT, + ); +} + const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; // This function will merge two objects. @@ -230,11 +259,30 @@ export async function incomingEvent( } // Historical (buffered) events: the API has already computed a - // deterministic sessionId for them. Write the event but do NOT mutate - // live session state — historical events shouldn't extend the user's - // current session or schedule a 30-min sessionEnd timer. - // Session_start emission for historical buckets lands in a follow-up. + // deterministic sessionId for them. Write the event and emit one + // session_start per bucket (Redis lock dedups across batches and + // workers). Do NOT touch live session state — historical events + // must not extend the user's current session or schedule a 30-min + // sessionEnd timer. if (!isLiveEvent) { + if (await acquireSessionStartLock(projectId, sessionId)) { + await createEventAndNotify( + { + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }, + logger, + projectId, + ).catch((error) => { + logger.error( + { err: error, event: payload }, + 'Error creating historical session start event', + ); + // Don't throw — historical session_start is best-effort. The + // event itself should still land. + }); + } return createEventAndNotify(payload, logger, projectId); } @@ -246,7 +294,11 @@ export async function incomingEvent( }).catch((error) => { logger.warn({ err: error }, 'Failed to extend session end job'); }); - } else { + } else if (await acquireSessionStartLock(projectId, sessionId)) { + // Lock prevents the previously-observed batch race: when N events for + // the same device land in the API in parallel, all see no Redis + // sessionEnd key yet, all queue with session: undefined, and would + // each try to emit session_start. The lock collapses them to one. await createEventAndNotify( { ...payload, @@ -270,6 +322,17 @@ export async function incomingEvent( ); throw error; }); + } else { + // Another worker (or earlier event in this batch) claimed the + // session_start. Still ensure a sessionEnd is scheduled so the + // session closes cleanly. createSessionEndJob is idempotent on + // jobId, so this is a no-op when the job already exists. + await createSessionEndJob({ payload }).catch((error) => { + logger.warn( + { err: error, event: payload }, + 'Failed to schedule session end job (lock not acquired)', + ); + }); } return createEventAndNotify(payload, logger, projectId); diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 7f8167342..f9d9450c0 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -26,6 +26,16 @@ vi.mock('@openpanel/db', async () => { }, }; }); +// Mock the session_start dedup lock so tests don't need a live Redis. By +// default the lock is acquired (true) so existing tests' session_start +// expectations still hold; individual tests can override per-call. +vi.mock('@openpanel/redis', async () => { + const actual = await vi.importActual('@openpanel/redis'); + return { + ...actual, + getLock: vi.fn().mockResolvedValue(true), + }; +}); // 30 minutes const SESSION_TIMEOUT = 30 * 60 * 1000; @@ -486,6 +496,54 @@ describe('incomingEvent', () => { expect(spySessionsQueueAdd).toHaveBeenCalledTimes(1); }); + it('does not emit duplicate session_start when lock is held', async () => { + const { getLock } = await import('@openpanel/redis'); + // Simulate "another worker already claimed session_start" by failing + // the lock acquisition. Live event still fires; sessionEnd job is + // still scheduled (it's idempotent on jobId). + vi.mocked(getLock).mockResolvedValueOnce(false); + const spySessionsQueueAdd = vi + .spyOn(sessionsQueue, 'add') + .mockResolvedValue({} as Job); + + const timestamp = new Date(); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'live_event', + timestamp: timestamp.toISOString(), + properties: { __path: 'https://example.com/test' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/91.0.4472.124', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId, + sessionId: newSessionId, + }; + (createEvent as Mock).mockReturnValue({}); + + await incomingEvent(jobData); + + // No session_start emission (lock not acquired) + const startCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'session_start', + ); + expect(startCalls).toHaveLength(0); + // Live event itself still gets created + const liveCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'live_event', + ); + expect(liveCalls).toHaveLength(1); + // sessionEnd is still scheduled even when lock not acquired (idempotent) + expect(spySessionsQueueAdd).toHaveBeenCalled(); + }); + it('historical event preserves API-computed deviceId/sessionId', async () => { // Event with __timestamp older than SESSION_TIMEOUT (30 min). Worker // should write it with the deviceId/sessionId the API computed, @@ -511,13 +569,23 @@ describe('incomingEvent', () => { sessionId: 'deterministic-bucket-id', }; + (createEvent as Mock).mockReturnValue({}); await incomingEvent(jobData); + // Live state untouched: no sessionEnd job scheduled expect(sessionsQueue.add).not.toHaveBeenCalled(); - expect(createEvent as Mock).toHaveBeenCalledTimes(1); - const written = (createEvent as Mock).mock.calls[0]![0]; - expect(written.deviceId).toBe('mobile-device-xyz'); - expect(written.sessionId).toBe('deterministic-bucket-id'); - expect(written.name).toBe('historical_event'); + // Two createEvent calls: one for the historical session_start (lock + // acquired by default in the redis mock), one for the event itself + expect((createEvent as Mock).mock.calls).toHaveLength(2); + const startCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'session_start', + ); + const eventCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'historical_event', + ); + expect(startCall).toBeDefined(); + expect(eventCall).toBeDefined(); + expect(eventCall![0].deviceId).toBe('mobile-device-xyz'); + expect(eventCall![0].sessionId).toBe('deterministic-bucket-id'); }); }); From 4773d1a9bc390cd60bf6e2a8a2ee8ead7cc79dd7 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 19:14:57 +0530 Subject: [PATCH 08/11] fix: tests use Date.now()-relative timestamps + correct referrerName Three CI failures, all in tests: 1. Worker server-events test expected `referrerName: undefined` but the actual baseEvent now resolves it to `''` (empty string from the utmReferrer/referrer fallback chain). Align the expectation. 2 + 3. API bucketing tests used hard-coded Date.UTC(2026, 4, 1, ...) timestamps. That date is more than 5 days before "now" when CI runs, so the new 5-day rejection rejects both events and the tests see accepted: 0 instead of 2. Switch to Date.now()-relative timestamps: - "different buckets" test: anchor to (now - 2h) and use a 1h gap - "same bucket" test: anchor to the start of a bucket that closed 1h ago and use a 5-min gap (both inside one wall-clock window and inside the 5-day acceptance window) Co-Authored-By: Claude Opus 4.7 --- apps/api/src/routes/track-batch.router.test.ts | 15 +++++++++++---- .../src/jobs/events.incoming-events.test.ts | 7 ++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 5b956ff0d..2737bec91 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -292,8 +292,9 @@ describe('POST /track/batch — per-item validation', () => { // Two events on the same device, 1h apart in __timestamp. They // should land in different deterministic 30-min buckets and thus // get different sessionIds, even though they arrive in the same - // request. - const baseMs = Date.UTC(2026, 4, 1, 10, 0, 0); // 10:00 UTC + // request. Anchored to "now" so the events stay inside the 5-day + // acceptance window when the test runs. + const baseMs = Date.now() - 2 * 60 * 60 * 1000; // 2h ago const res = await postBatch({ events: [ { @@ -330,8 +331,14 @@ describe('POST /track/batch — per-item validation', () => { it('shares sessionId across events in the same 30-min bucket', async () => { // Two events on the same device, 5 min apart inside the same - // wall-clock 30-min bucket. They should share a sessionId. - const baseMs = Date.UTC(2026, 4, 1, 10, 5, 0); // 10:05 UTC + // wall-clock 30-min bucket. They should share a sessionId. Anchor + // to the bucket that closed ~1 hour ago so both timestamps are in + // the past (avoiding the future-timestamp guard) and well within + // the 5-day acceptance window. + const WINDOW_MS = 30 * 60 * 1000; + const oneHourAgoBucket = + Math.floor((Date.now() - 60 * 60 * 1000) / WINDOW_MS) * WINDOW_MS; + const baseMs = oneHourAgoBucket + 60_000; // 1 min past bucket start (past the grace) const res = await postBatch({ events: [ { diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index f9d9450c0..49fe9da36 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -414,10 +414,11 @@ describe('incomingEvent', () => { duration: 0, path: '', origin: '', - // baseEvent fields fall through uniformly: empty strings for - // referrer/referrerType, undefined for referrerName. + // baseEvent fields fall through uniformly when there's no + // session enrichment available — empty strings for all referrer + // fields rather than the previous mix of undefined/''. referrer: '', - referrerName: undefined, + referrerName: '', referrerType: '', sdkName: 'server', sdkVersion: '1.0.0', From 44b297dae9b2aed411a196736d88d991c9a811eb Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Mon, 18 May 2026 21:04:13 +0530 Subject: [PATCH 09/11] fix(test): reset getJob spy in lock-not-acquired test vi.clearAllMocks resets call history but keeps mockResolvedValue implementations. A prior test set sessionsQueue.getJob to return a live job, which leaked into the lock-not-acquired test and routed control to the extendSessionEndJob branch instead of the lock check. That in turn left a mockResolvedValueOnce(false) on getLock unconsumed, which then broke the next test's session_start emission. Force getJob to undefined explicitly so each test starts from a clean "no active session" state. --- apps/worker/src/jobs/events.incoming-events.test.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 49fe9da36..345b57fd7 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -503,6 +503,11 @@ describe('incomingEvent', () => { // the lock acquisition. Live event still fires; sessionEnd job is // still scheduled (it's idempotent on jobId). vi.mocked(getLock).mockResolvedValueOnce(false); + // No active session-end job exists yet — force getJob to undefined so the + // worker falls into the "no active session" branch where the lock check + // matters. (vi.clearAllMocks resets call history but keeps implementations + // set via mockResolvedValue in previous tests.) + vi.spyOn(sessionsQueue, 'getJob').mockResolvedValue(undefined); const spySessionsQueueAdd = vi .spyOn(sessionsQueue, 'add') .mockResolvedValue({} as Job); From e5bcc3b0228db7c5f12dd66ed74f7e1c91b359db Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Tue, 19 May 2026 00:49:38 +0530 Subject: [PATCH 10/11] perf: bound batch per-event concurrency to 50 Process /track/batch events in chunks of 50 instead of an unbounded Promise.all over up to 2000 events. Each event hits Redis (session lookup + groupmq add) and may trigger a geo lookup if __ip is overridden, so an unbounded fan-out can pressure Redis pool size and the geo provider's rate budget on smaller self-hosted instances. 50 keeps the pipeline saturated without thundering-herd behaviour. Throughput in our prod testing was already fine (2000 events / ~1.3s) without bounding, but the unbounded version is fragile under burst traffic for self-hosters. Per-index result alignment is preserved via the existing `index` field on BatchItemResult. --- apps/api/src/controllers/track.controller.ts | 93 ++++++++++++-------- 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 4d65ba2c2..fc0e180b6 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -519,6 +519,14 @@ type BatchItemResult = * Optimization: salts + request-IP geo are fetched once and shared across * all events. Events that override `__ip` still get their own geo lookup. */ +// Bounded concurrency for per-event processing inside a batch. Each event +// hits Redis (session lookup + groupmq queue add) and may trigger a geo +// lookup if it overrides `__ip`, so an unbounded `Promise.all` over 2000 +// events can spike Redis pool usage and the geo provider's rate budget on +// smaller self-hosted instances. 50 keeps the pipeline saturated without +// turning a single big batch into a thundering herd. +const BATCH_CONCURRENCY = 50; + export async function batchHandler( request: FastifyRequest<{ Body: ITrackBatchBody; @@ -528,44 +536,59 @@ export async function batchHandler( const { events } = request.body; const shared = await buildSharedRequestContext(request); - const results = await Promise.all( - events.map>(async (raw, index) => { - const parsed = zTrackHandlerPayload.safeParse(raw); - if (!parsed.success) { - const issue = parsed.error.issues[0]; - const path = issue?.path?.join('.') ?? ''; - const error = path ? `${path}: ${issue?.message}` : issue?.message ?? 'invalid payload'; - return { index, status: 'rejected', reason: 'validation', error }; - } + const processOne = async ( + raw: unknown, + index: number, + ): Promise => { + const parsed = zTrackHandlerPayload.safeParse(raw); + if (!parsed.success) { + const issue = parsed.error.issues[0]; + const path = issue?.path?.join('.') ?? ''; + const error = path + ? `${path}: ${issue?.message}` + : issue?.message ?? 'invalid payload'; + return { index, status: 'rejected', reason: 'validation', error }; + } - try { - const context = await buildEventContext( - shared, - request.timestamp, - parsed.data, + try { + const context = await buildEventContext( + shared, + request.timestamp, + parsed.data, + ); + await dispatchEvent(parsed.data, context); + return { index, status: 'accepted' }; + } catch (err) { + // HttpError with 4xx → caller's fault (validation-style: alias, + // unknown type, replay without session). Anything else → ours. + const isClientError = + err instanceof HttpError && err.status >= 400 && err.status < 500; + const reason: 'validation' | 'internal' = isClientError + ? 'validation' + : 'internal'; + const message = err instanceof Error ? err.message : 'unknown error'; + if (!isClientError) { + request.log.error( + { err, index }, + 'Batch event dispatch failed', ); - await dispatchEvent(parsed.data, context); - return { index, status: 'accepted' }; - } catch (err) { - // HttpError with 4xx → caller's fault (validation-style: alias, - // unknown type, replay without session). Anything else → ours. - const isClientError = - err instanceof HttpError && err.status >= 400 && err.status < 500; - const reason: 'validation' | 'internal' = isClientError - ? 'validation' - : 'internal'; - const message = - err instanceof Error ? err.message : 'unknown error'; - if (!isClientError) { - request.log.error( - { err, index }, - 'Batch event dispatch failed', - ); - } - return { index, status: 'rejected', reason, error: message }; } - }), - ); + return { index, status: 'rejected', reason, error: message }; + } + }; + + // Process in chunks of BATCH_CONCURRENCY. We keep results aligned with + // input indices via the `index` field on each BatchItemResult. + const results: BatchItemResult[] = new Array(events.length); + for (let start = 0; start < events.length; start += BATCH_CONCURRENCY) { + const end = Math.min(start + BATCH_CONCURRENCY, events.length); + const chunk = await Promise.all( + events.slice(start, end).map((raw, i) => processOne(raw, start + i)), + ); + for (const r of chunk) { + results[r.index] = r; + } + } const accepted = results.filter((r) => r.status === 'accepted').length; const rejected = results.filter( From cabae9b5cd44f8ac347c805541efff4e5d43a58d Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Tue, 19 May 2026 00:54:30 +0530 Subject: [PATCH 11/11] test: per-index results preserved across batch chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a 200-event batch with bad rows scattered across chunk boundaries (indices 0, 50, 99, 100, 149, 199 — first/last of each 50-event chunk plus a chunk-internal one). Locks down the contract that BatchItemResult.index aligns with the input index regardless of the chunked processing order, which would catch any future off-by-one in the slicing or out-of-order accumulation. --- .../api/src/routes/track-batch.router.test.ts | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts index 2737bec91..adcd4c462 100644 --- a/apps/api/src/routes/track-batch.router.test.ts +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -427,4 +427,29 @@ describe('POST /track/batch — per-item validation', () => { expect(body.rejected).toHaveLength(3); expect(queueAdd).not.toHaveBeenCalled(); }); + + // Regression: per-event processing is chunked (BATCH_CONCURRENCY = 50). + // A 200-event batch spans 4 chunks. Verifies that rejected indices land in + // the right positions across chunk boundaries — including the very first + // event in chunk 1, the last event in chunk 2, and one in chunk 4 — which + // would catch off-by-one slicing or out-of-order result accumulation. + it('preserves per-index results across chunk boundaries', async () => { + const SIZE = 200; + const badIndices = new Set([0, 50, 99, 100, 149, 199]); + const events = Array.from({ length: SIZE }, (_, i) => + badIndices.has(i) + ? { type: 'track', payload: { name: '' } } // invalid + : validTrack(`chunked_${i}`), + ); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(SIZE - badIndices.size); + expect(body.rejected).toHaveLength(badIndices.size); + const rejectedIndices = new Set( + body.rejected.map((r: { index: number }) => r.index), + ); + expect(rejectedIndices).toEqual(badIndices); + expect(queueAdd).toHaveBeenCalledTimes(SIZE - badIndices.size); + }); });