diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 59a6d0f4a..9f7808b9f 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -14,10 +14,7 @@ export async function postEvent( }>, reply: FastifyReply ) { - const { timestamp, isTimestampFromThePast } = getTimestamp( - request.timestamp, - request.body - ); + const { timestamp } = getTimestamp(request.timestamp, request.body); const ip = request.clientIp; const ua = request.headers['user-agent'] ?? 'unknown/1.0'; const projectId = request.client?.projectId; @@ -34,6 +31,7 @@ export async function postEvent( ip, ua, salts, + eventMs: new Date(timestamp).getTime(), }); const uaInfo = parseUserAgent(ua, request.body?.properties); @@ -48,7 +46,6 @@ export async function postEvent( event: { ...request.body, timestamp, - isTimestampFromThePast, }, uaInfo, geo, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index bf6b2d4e3..fc0e180b6 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -22,14 +22,32 @@ import type { IIdentifyPayload, IIncrementPayload, IReplayPayload, + ITrackBatchBody, ITrackHandlerPayload, ITrackPayload, } from '@openpanel/validation'; +import { zTrackHandlerPayload } from '@openpanel/validation'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; import { getDeviceId } from '@/utils/ids'; +type Salts = Awaited>; + +/** + * Per-request data that is identical for every event in a batch. + * Computed once in the batch handler so we don't re-fetch salts/geo + * or re-parse headers N times. + */ +interface SharedRequestContext { + projectId: string; + requestIp: string; + requestUa: string; + requestHeaders: Record; + requestGeo: GeoLocation; + salts: Salts; +} + export function getStringHeaders(headers: FastifyRequest['headers']) { return Object.entries( pick( @@ -82,7 +100,7 @@ export function getTimestamp( : undefined; if (!userDefinedTimestamp) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } const clientTimestamp = new Date(userDefinedTimestamp); @@ -90,24 +108,26 @@ export function getTimestamp( // Constants for time validation const ONE_MINUTE_MS = 60 * 1000; - const FIFTEEN_MINUTES_MS = 15 * ONE_MINUTE_MS; + // Hard floor for accepted historical events. Public contract for /track + // and /track/batch, hard-coded (not per-project configurable). + const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000; // Use safeTimestamp if invalid or more than 1 minute in the future if ( Number.isNaN(clientTimestampNumber) || clientTimestampNumber > safeTimestamp + ONE_MINUTE_MS ) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } - // isTimestampFromThePast is true only if timestamp is older than 15 minutes - const isTimestampFromThePast = - clientTimestampNumber < safeTimestamp - FIFTEEN_MINUTES_MS; + // Reject events older than 5 days. In /track/batch this surfaces as a + // per-row { reason: 'validation' } entry in rejected[]; in single-event + // /track it returns 400 to the caller. + if (clientTimestampNumber < safeTimestamp - FIVE_DAYS_MS) { + throw new HttpError('event timestamp older than 5 days', { status: 400 }); + } - return { - timestamp: clientTimestampNumber, - isTimestampFromThePast, - }; + return { timestamp: clientTimestampNumber }; } interface TrackContext { @@ -115,35 +135,72 @@ interface TrackContext { ip: string; ua?: string; headers: Record; - timestamp: { value: number; isFromPast: boolean }; + timestamp: { value: number }; identity?: IIdentifyPayload; deviceId: string; sessionId: string; geo: GeoLocation; } -async function buildContext( - request: FastifyRequest<{ - Body: ITrackHandlerPayload; - }>, - validatedBody: ITrackHandlerPayload -): Promise { +/** + * Build the per-request shared context. Done once per HTTP request — for + * single-event /track this is just an extra struct; for /track/batch it + * lets N events share one salts + one geo lookup. + */ +async function buildSharedRequestContext( + request: FastifyRequest, +): Promise { const projectId = request.client?.projectId; if (!projectId) { throw new HttpError('Missing projectId', { status: 400 }); } - const timestamp = getTimestamp(request.timestamp, validatedBody.payload); - const ip = + const requestIp = request.clientIp; + const requestUa = request.headers['user-agent'] ?? 'unknown/1.0'; + const requestHeaders = getStringHeaders(request.headers); + + const [requestGeo, salts] = await Promise.all([ + getGeoLocation(requestIp), + getSalts(), + ]); + + return { + projectId, + requestIp, + requestUa, + requestHeaders, + requestGeo, + salts, + }; +} + +/** + * Build a per-event TrackContext from already-fetched shared data. + * Per-event work: timestamp, identity, ip override, deviceId, and a + * second geo lookup *only* if the event overrides __ip. + */ +async function buildEventContext( + shared: SharedRequestContext, + requestTimestamp: FastifyRequest['timestamp'], + validatedBody: ITrackHandlerPayload, +): Promise { + const timestamp = getTimestamp(requestTimestamp, validatedBody.payload); + + const overrideIp = validatedBody.type === 'track' && validatedBody.payload.properties?.__ip ? (validatedBody.payload.properties.__ip as string) - : request.clientIp; - const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + : undefined; + const ip = overrideIp ?? shared.requestIp; + + // Only re-fetch geo when the event overrode the IP — the common case + // (browser SDK, no __ip) reuses the request-level geo computed once. + const geo = + overrideIp && overrideIp !== shared.requestIp + ? await getGeoLocation(overrideIp) + : shared.requestGeo; - const headers = getStringHeaders(request.headers); const identity = getIdentity(validatedBody); const profileId = identity?.profileId; - if (profileId && validatedBody.type === 'track') { validatedBody.payload.profileId = profileId; } @@ -154,25 +211,25 @@ async function buildContext( ? validatedBody.payload?.properties.__deviceId : undefined; - // Get geo location (needed for track and identify) - const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const deviceIdResult = await getDeviceId({ - projectId, + projectId: shared.projectId, ip, - ua, - salts, + ua: shared.requestUa, + salts: shared.salts, overrideDeviceId, + // Bucket the deterministic session_id by the event's own __timestamp, + // not the wall-clock moment the request arrived. Critical for + // /track/batch where one request can contain events spanning days. + eventMs: timestamp.timestamp, }); return { - projectId, + projectId: shared.projectId, ip, - ua, - headers, + ua: shared.requestUa, + headers: shared.requestHeaders, timestamp: { value: timestamp.timestamp, - isFromPast: timestamp.isTimestampFromThePast, }, identity, deviceId: deviceIdResult.deviceId, @@ -181,6 +238,16 @@ async function buildContext( }; } +async function buildContext( + request: FastifyRequest<{ + Body: ITrackHandlerPayload; + }>, + validatedBody: ITrackHandlerPayload, +): Promise { + const shared = await buildSharedRequestContext(request); + return buildEventContext(shared, request.timestamp, validatedBody); +} + async function handleTrack( payload: ITrackPayload, context: TrackContext @@ -208,7 +275,6 @@ async function handleTrack( ...payload, groups: payload.groups ?? [], timestamp: timestamp.value, - isTimestampFromThePast: timestamp.isFromPast, }, uaInfo, geo, @@ -361,15 +427,61 @@ async function handleAssignGroup( }); } +/** + * Dispatch a validated event to the matching per-type handler. Shared by + * /track and /track/batch. Throws HttpError(400) for the unsupported `alias` + * type so single-event and batch can both surface it consistently. + */ +async function dispatchEvent( + body: ITrackHandlerPayload, + context: TrackContext, +): Promise { + switch (body.type) { + case 'alias': + throw new HttpError('Alias is not supported', { status: 400 }); + case 'track': + await handleTrack(body.payload, context); + return; + case 'identify': + await handleIdentify(body.payload, context); + return; + case 'increment': + await handleIncrement(body.payload, context); + return; + case 'decrement': + await handleDecrement(body.payload, context); + return; + case 'replay': + await handleReplay(body.payload, context); + return; + case 'group': + await handleGroup(body.payload, context); + return; + case 'assign_group': + await handleAssignGroup(body.payload, context); + return; + default: { + // Exhaustiveness check: `body` narrows to `never` when every variant + // of ITrackHandlerPayload['type'] is handled. Adding a new variant + // makes this assignment fail to compile. + const exhaustive: never = body; + throw new HttpError(`Unhandled event type: ${exhaustive}`, { + status: 400, + }); + } + } +} + export async function handler( request: FastifyRequest<{ Body: ITrackHandlerPayload; }>, - reply: FastifyReply + reply: FastifyReply, ) { const validatedBody = request.body; - // Handle alias (not supported) + // Reject `alias` before building context — saves the salts/geo/deviceId work + // for a request that's going to fail anyway. if (validatedBody.type === 'alias') { return reply.status(400).send({ status: 400, @@ -378,39 +490,8 @@ export async function handler( }); } - // Build request context const context = await buildContext(request, validatedBody); - - // Dispatch to appropriate handler - switch (validatedBody.type) { - case 'track': - await handleTrack(validatedBody.payload, context); - break; - case 'identify': - await handleIdentify(validatedBody.payload, context); - break; - case 'increment': - await handleIncrement(validatedBody.payload, context); - break; - case 'decrement': - await handleDecrement(validatedBody.payload, context); - break; - case 'replay': - await handleReplay(validatedBody.payload, context); - break; - case 'group': - await handleGroup(validatedBody.payload, context); - break; - case 'assign_group': - await handleAssignGroup(validatedBody.payload, context); - break; - default: - return reply.status(400).send({ - status: 400, - error: 'Bad Request', - message: 'Invalid type', - }); - } + await dispatchEvent(validatedBody, context); reply.status(200).send({ deviceId: context.deviceId, @@ -418,6 +499,109 @@ export async function handler( }); } +type BatchItemResult = + | { index: number; status: 'accepted' } + | { + index: number; + status: 'rejected'; + reason: 'validation' | 'internal'; + error: string; + }; + +/** + * POST /track/batch — accepts up to TRACK_BATCH_MAX_EVENTS payloads in one + * request and dispatches each through the same per-event pipeline as /track. + * + * Per-event validation failures do NOT fail the whole batch: the response is + * always 202 (assuming envelope + auth pass) with `{ accepted, rejected[] }` + * so callers can fix and retry just the bad indices. + * + * Optimization: salts + request-IP geo are fetched once and shared across + * all events. Events that override `__ip` still get their own geo lookup. + */ +// Bounded concurrency for per-event processing inside a batch. Each event +// hits Redis (session lookup + groupmq queue add) and may trigger a geo +// lookup if it overrides `__ip`, so an unbounded `Promise.all` over 2000 +// events can spike Redis pool usage and the geo provider's rate budget on +// smaller self-hosted instances. 50 keeps the pipeline saturated without +// turning a single big batch into a thundering herd. +const BATCH_CONCURRENCY = 50; + +export async function batchHandler( + request: FastifyRequest<{ + Body: ITrackBatchBody; + }>, + reply: FastifyReply, +) { + const { events } = request.body; + const shared = await buildSharedRequestContext(request); + + const processOne = async ( + raw: unknown, + index: number, + ): Promise => { + const parsed = zTrackHandlerPayload.safeParse(raw); + if (!parsed.success) { + const issue = parsed.error.issues[0]; + const path = issue?.path?.join('.') ?? ''; + const error = path + ? `${path}: ${issue?.message}` + : issue?.message ?? 'invalid payload'; + return { index, status: 'rejected', reason: 'validation', error }; + } + + try { + const context = await buildEventContext( + shared, + request.timestamp, + parsed.data, + ); + await dispatchEvent(parsed.data, context); + return { index, status: 'accepted' }; + } catch (err) { + // HttpError with 4xx → caller's fault (validation-style: alias, + // unknown type, replay without session). Anything else → ours. + const isClientError = + err instanceof HttpError && err.status >= 400 && err.status < 500; + const reason: 'validation' | 'internal' = isClientError + ? 'validation' + : 'internal'; + const message = err instanceof Error ? err.message : 'unknown error'; + if (!isClientError) { + request.log.error( + { err, index }, + 'Batch event dispatch failed', + ); + } + return { index, status: 'rejected', reason, error: message }; + } + }; + + // Process in chunks of BATCH_CONCURRENCY. We keep results aligned with + // input indices via the `index` field on each BatchItemResult. + const results: BatchItemResult[] = new Array(events.length); + for (let start = 0; start < events.length; start += BATCH_CONCURRENCY) { + const end = Math.min(start + BATCH_CONCURRENCY, events.length); + const chunk = await Promise.all( + events.slice(start, end).map((raw, i) => processOne(raw, start + i)), + ); + for (const r of chunk) { + results[r.index] = r; + } + } + + const accepted = results.filter((r) => r.status === 'accepted').length; + const rejected = results.filter( + (r): r is Extract => + r.status === 'rejected', + ); + + reply.status(202).send({ + accepted, + rejected, + }); +} + export async function fetchDeviceId( request: FastifyRequest, reply: FastifyReply diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts new file mode 100644 index 000000000..adcd4c462 --- /dev/null +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -0,0 +1,455 @@ +/** + * Integration tests for POST /track/batch. + * + * Side effects (queue, db, geo, redis) are mocked so the test runs without + * Docker. Auth uses the same getClientByIdCached mock as the insights + * router tests, except here we don't need real fixtures — we never read + * from PG/CH, we only verify the controller dispatches each item correctly. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ─── Module mocks (hoisted before imports) ──────────────────────────────────── +// +// `vi.mock` is hoisted above all top-level statements, so any spies the factory +// references must be created via `vi.hoisted(...)` (also hoisted) — otherwise +// the factory runs first and hits a temporal-dead-zone ReferenceError. + +const { queueAdd, upsertProfileMock } = vi.hoisted(() => ({ + queueAdd: vi.fn().mockResolvedValue(undefined), + upsertProfileMock: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('@openpanel/db', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getClientByIdCached: vi.fn(), + getSalts: vi.fn().mockResolvedValue({ current: 'salt-a', previous: 'salt-b' }), + getProfileById: vi.fn().mockResolvedValue(null), + upsertProfile: upsertProfileMock, + groupBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + replayBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + }; +}); + +vi.mock('@openpanel/queue', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getEventsGroupQueueShard: vi.fn(() => ({ add: queueAdd })), + }; +}); + +vi.mock('@openpanel/geo', () => ({ + getGeoLocation: vi.fn().mockResolvedValue({ + country: 'US', + city: 'San Francisco', + region: 'CA', + longitude: -122.4, + latitude: 37.77, + }), +})); + +vi.mock('@openpanel/common/server', async (importOriginal) => { + const actual = + await importOriginal(); + return { + ...actual, + verifyPassword: vi.fn().mockResolvedValue(true), + generateDeviceId: vi.fn().mockReturnValue('device-test'), + }; +}); + +vi.mock('@openpanel/redis', async (importOriginal) => { + const actual = await importOriginal(); + const fakeRedisClient = new Proxy( + {}, + { + get: (_t, p) => { + if (p === 'status') return 'ready'; + if (p === 'multi') { + return () => ({ + hget: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]), + }); + } + return vi.fn().mockResolvedValue(null); + }, + }, + ); + return { + ...actual, + getCache: async (_key: string, _ttl: number, fn: () => Promise) => + fn(), + getLock: vi.fn().mockResolvedValue(true), + getRedisCache: vi.fn().mockReturnValue(fakeRedisClient), + }; +}); + +// ─── Imports (after mocks) ──────────────────────────────────────────────────── + +import { ClientType, getClientByIdCached } from '@openpanel/db'; +import type { FastifyInstance } from 'fastify'; +import { buildApp } from '../app'; + +// ─── Test client constants ──────────────────────────────────────────────────── + +const CLIENT_ID = '00000000-0000-0000-0000-0000000000aa'; +const CLIENT_SECRET = 'test-secret'; +const PROJECT_ID = 'test-project'; +const ORG_ID = 'test-org'; + +const AUTH = { + 'openpanel-client-id': CLIENT_ID, + 'openpanel-client-secret': CLIENT_SECRET, + 'user-agent': 'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0', + 'content-type': 'application/json', +}; + +const WRITE_CLIENT = { + id: CLIENT_ID, + type: ClientType.write, + projectId: PROJECT_ID, + organizationId: ORG_ID, + secret: 'hashed-secret', + name: 'Batch Test Client', + cors: ['*'], + description: '', + ignoreCorsAndSecret: true, + createdAt: new Date(), + updatedAt: new Date(), + project: { + id: PROJECT_ID, + organizationId: ORG_ID, + cors: ['*'], + filters: [], + allowUnsafeRevenueTracking: true, + }, +}; + +// ─── Lifecycle ──────────────────────────────────────────────────────────────── + +let app: FastifyInstance; + +beforeAll(async () => { + vi.mocked(getClientByIdCached).mockResolvedValue(WRITE_CLIENT as any); + app = await buildApp({ testing: true }); + await app.ready(); +}, 30_000); + +afterAll(async () => { + await app.close(); +}, 10_000); + +beforeEach(() => { + queueAdd.mockClear(); + upsertProfileMock.mockClear(); +}); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function postBatch(body: unknown, headers: Record = AUTH) { + return app.inject({ + method: 'POST', + url: '/track/batch', + headers, + payload: body as any, + }); +} + +const validTrack = (name = 'page_view') => ({ + type: 'track' as const, + payload: { name, properties: { __path: '/home' } }, +}); + +const validIdentify = (profileId = 'user-1') => ({ + type: 'identify' as const, + payload: { profileId, email: 'a@b.com' }, +}); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('POST /track/batch — auth & envelope', () => { + it('returns 401 without client-id', async () => { + const res = await postBatch({ events: [validTrack()] }, { + 'content-type': 'application/json', + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 400 on empty events array', async () => { + const res = await postBatch({ events: [] }); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 on missing events field', async () => { + const res = await postBatch({}); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 when array exceeds the per-request cap', async () => { + const events = Array.from({ length: 2001 }, () => validTrack()); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /track/batch — happy path', () => { + it('accepts a single track event and queues it', async () => { + const res = await postBatch({ events: [validTrack()] }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('accepts a mixed batch (track + identify) and dispatches each', async () => { + const res = await postBatch({ + events: [validTrack('signup'), validIdentify('alice'), validTrack('purchase')], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 3, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); // two `track` events + expect(upsertProfileMock).toHaveBeenCalledTimes(1); // one `identify` + }); + + it('treats each event as if sent one by one (per-event queue add)', async () => { + const events = Array.from({ length: 5 }, (_, i) => validTrack(`event_${i}`)); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 5, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(5); + }); +}); + +describe('POST /track/batch — per-item validation', () => { + it('rejects bad rows by index without failing the batch', async () => { + const res = await postBatch({ + events: [ + validTrack('good_1'), + { type: 'track', payload: { name: '' } }, // empty name → invalid + validTrack('good_2'), + { type: 'wrong-type', payload: {} }, // unknown discriminator + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(2); + expect(body.rejected).toHaveLength(2); + expect(body.rejected.map((r: { index: number }) => r.index).sort()).toEqual([1, 3]); + expect(body.rejected.every((r: { reason: string }) => r.reason === 'validation')).toBe(true); + expect(queueAdd).toHaveBeenCalledTimes(2); + }); + + it('rejects alias as per-item validation (does not 400 the whole batch)', async () => { + const res = await postBatch({ + events: [ + validTrack(), + { type: 'alias', payload: { profileId: 'user-1', alias: 'u1' } }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/alias/i); + }); + + it('populates sessionId for events with __deviceId override', async () => { + // When a client supplies __deviceId, the API still resolves a + // sessionId — first via the live-session Redis lookup, then a + // deterministic 30-min bucket keyed on the event's __timestamp. + // Both deviceId and sessionId reach the queue. + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_device_override', + properties: { + __deviceId: 'mobile-device-abc', + __path: '/home', + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.deviceId).toBe('mobile-device-abc'); + expect(queuedJob.data.sessionId).toBeTruthy(); + expect(queuedJob.data.sessionId.length).toBeGreaterThan(0); + }); + + it('buckets historical events by __timestamp, not request time', async () => { + // Two events on the same device, 1h apart in __timestamp. They + // should land in different deterministic 30-min buckets and thus + // get different sessionIds, even though they arrive in the same + // request. Anchored to "now" so the events stay inside the 5-day + // acceptance window when the test runs. + const baseMs = Date.now() - 2 * 60 * 60 * 1000; // 2h ago + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_bucket_a', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_bucket_b', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs + 60 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBeTruthy(); + expect(sessionIdB).toBeTruthy(); + expect(sessionIdA).not.toBe(sessionIdB); + }); + + it('shares sessionId across events in the same 30-min bucket', async () => { + // Two events on the same device, 5 min apart inside the same + // wall-clock 30-min bucket. They should share a sessionId. Anchor + // to the bucket that closed ~1 hour ago so both timestamps are in + // the past (avoiding the future-timestamp guard) and well within + // the 5-day acceptance window. + const WINDOW_MS = 30 * 60 * 1000; + const oneHourAgoBucket = + Math.floor((Date.now() - 60 * 60 * 1000) / WINDOW_MS) * WINDOW_MS; + const baseMs = oneHourAgoBucket + 60_000; // 1 min past bucket start (past the grace) + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_a', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_b', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs + 5 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBe(sessionIdB); + }); + + it('rejects events with __timestamp older than 5 days', async () => { + // Older events should be rejected per-row with a clear reason. + const sixDaysAgo = Date.now() - 6 * 24 * 60 * 60 * 1000; + const fourDaysAgo = Date.now() - 4 * 24 * 60 * 60 * 1000; + const res = await postBatch({ + events: [ + // valid (4 days old, within window) + { + type: 'track' as const, + payload: { + name: 'probe_within_window', + properties: { + __deviceId: 'd-1', + __timestamp: new Date(fourDaysAgo).toISOString(), + }, + }, + }, + // too old (6 days) + { + type: 'track' as const, + payload: { + name: 'probe_too_old', + properties: { + __deviceId: 'd-2', + __timestamp: new Date(sixDaysAgo).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/5 days/i); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('returns 202 with accepted=0 when every event fails validation', async () => { + const res = await postBatch({ + events: [ + { type: 'track', payload: { name: '' } }, + { type: 'track', payload: {} }, + { type: 'identify', payload: {} }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(3); + expect(queueAdd).not.toHaveBeenCalled(); + }); + + // Regression: per-event processing is chunked (BATCH_CONCURRENCY = 50). + // A 200-event batch spans 4 chunks. Verifies that rejected indices land in + // the right positions across chunk boundaries — including the very first + // event in chunk 1, the last event in chunk 2, and one in chunk 4 — which + // would catch off-by-one slicing or out-of-order result accumulation. + it('preserves per-index results across chunk boundaries', async () => { + const SIZE = 200; + const badIndices = new Set([0, 50, 99, 100, 149, 199]); + const events = Array.from({ length: SIZE }, (_, i) => + badIndices.has(i) + ? { type: 'track', payload: { name: '' } } // invalid + : validTrack(`chunked_${i}`), + ); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(SIZE - badIndices.size); + expect(body.rejected).toHaveLength(badIndices.size); + const rejectedIndices = new Set( + body.rejected.map((r: { index: number }) => r.index), + ); + expect(rejectedIndices).toEqual(badIndices); + expect(queueAdd).toHaveBeenCalledTimes(SIZE - badIndices.size); + }); +}); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 07afbcdc2..9d1d73b5a 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,25 +1,40 @@ -import { zTrackHandlerPayload } from '@openpanel/validation'; +import { + TRACK_BATCH_MAX_EVENTS, + zTrackBatchBody, + zTrackHandlerPayload, +} from '@openpanel/validation'; import type { FastifyPluginAsyncZodOpenApi } from 'fastify-zod-openapi'; import { z } from 'zod'; -import { fetchDeviceId, handler } from '@/controllers/track.controller'; +import { + batchHandler, + fetchDeviceId, + handler, +} from '@/controllers/track.controller'; import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; +// Per-route body limit for /track/batch: 10 MB uncompressed, matching the +// stated public contract ("up to 2000 events and 10 MB per request"). +const TRACK_BATCH_BODY_LIMIT_BYTES = 10 * 1024 * 1024; + const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { - fastify.addHook('preValidation', duplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); await fastify.route({ method: 'POST', url: '/', + // The 100 ms body-hash dedup only runs on the single-event endpoint — + // applying it batch-wide would drop a whole 1000-event retry on hash + // collision, which is the opposite of what we want. + preValidation: duplicateHook, schema: { body: zTrackHandlerPayload.and( z.object({ clientId: z.string().optional(), clientSecret: z.string().optional(), - }) + }), ), tags: ['Track'], description: @@ -34,6 +49,30 @@ const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { handler, }); + await fastify.route({ + method: 'POST', + url: '/batch', + bodyLimit: TRACK_BATCH_BODY_LIMIT_BYTES, + schema: { + body: zTrackBatchBody, + tags: ['Track'], + description: `We accept up to ${TRACK_BATCH_MAX_EVENTS} events and 10MB uncompressed per request. Events are part of the request body. Each event is dispatched through the same pipeline as POST /track. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, + response: { + 202: z.object({ + accepted: z.number().int().min(0), + rejected: z.array( + z.object({ + index: z.number().int().min(0), + reason: z.enum(['validation', 'internal']), + error: z.string(), + }), + ), + }), + }, + }, + handler: batchHandler, + }); + await fastify.route({ method: 'GET', url: '/device-id', diff --git a/apps/api/src/utils/ids.ts b/apps/api/src/utils/ids.ts index 3614da1b7..d579845fb 100644 --- a/apps/api/src/utils/ids.ts +++ b/apps/api/src/utils/ids.ts @@ -10,15 +10,31 @@ export async function getDeviceId({ ua, salts, overrideDeviceId, + eventMs, }: { projectId: string; ip: string; ua: string | undefined; salts: { current: string; previous: string }; overrideDeviceId?: string; -}) { + /** + * Wall-clock time of the event being processed. Used as the bucket input for + * the deterministic session_id fallback so historical/buffered events bucket + * into the window they actually happened in, not the moment the API + * received them. + */ + eventMs: number; +}): Promise { + // Client-supplied stable device id (mobile/server SDKs). We still need to + // resolve a sessionId — first try the live session lookup keyed on this + // exact deviceId, then fall back to the deterministic 30-min bucket. if (overrideDeviceId) { - return { deviceId: overrideDeviceId, sessionId: '' }; + return getInfoFromSession({ + projectId, + currentDeviceId: overrideDeviceId, + previousDeviceId: overrideDeviceId, + eventMs, + }); } if (!ua) { @@ -42,6 +58,7 @@ export async function getDeviceId({ projectId, currentDeviceId, previousDeviceId, + eventMs, }); } @@ -54,10 +71,12 @@ async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, + eventMs, }: { projectId: string; currentDeviceId: string; previousDeviceId: string; + eventMs: number; }): Promise { try { const multi = getRedisCache().multi(); @@ -101,6 +120,7 @@ async function getInfoFromSession({ sessionId: getSessionId({ projectId, deviceId: currentDeviceId, + eventMs, graceMs: 5 * 1000, windowMs: 1000 * 60 * 30, }), diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 5035e3186..758f1839f 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,6 +1,6 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; -import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; +import type { IServiceCreateEventPayload } from '@openpanel/db'; import { checkNotificationRulesForEvent, createEvent, @@ -10,14 +10,44 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import { getLock } from '@openpanel/redis'; import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; import { createSessionEndJob, extendSessionEndJob, getActiveSessionEndJob, + SESSION_TIMEOUT, } from '@/utils/session-handler'; +/** + * Acquire a Redis-backed lock that prevents duplicate session_start rows for + * the same `(projectId, sessionId)`. Returns true if THIS caller should emit + * the session_start row; false if another worker (or earlier event in the + * same batch) already claimed it. + * + * TTL matches SESSION_TIMEOUT — a session can't extend beyond 30 min of + * inactivity in the live mechanism, and the deterministic bucket is exactly + * 30 min wide. By the time the lock TTL elapses, the session itself has + * rolled. + * + * Keyed on sessionId (not deviceId) so historical events from the same + * device but different 30-min buckets each get their own session_start. + */ +async function acquireSessionStartLock( + projectId: string, + sessionId: string, +): Promise { + if (!sessionId) { + return false; + } + return getLock( + `session_start:${projectId}:${sessionId}`, + '1', + SESSION_TIMEOUT, + ); +} + const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; // This function will merge two objects. @@ -113,7 +143,14 @@ export async function incomingEvent( // this will get the profileId from the alias table if it exists const profileId = body.profileId ? String(body.profileId) : ''; const createdAt = new Date(body.timestamp); - const isTimestampFromThePast = body.isTimestampFromThePast; + // "Live" = the event is recent enough that it could plausibly belong to + // an active in-memory session. We use the same window as SESSION_TIMEOUT + // (30 min) so historical events never push the live sessionEnd job + // forward or create new live sessions. Server-side events are always + // treated as non-live (they get session enrichment from sessionBuffer + // when a profile is supplied; otherwise they keep the API-computed id). + const isLiveEvent = + !uaInfo.isServer && Date.now() - createdAt.getTime() <= SESSION_TIMEOUT; const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) @@ -162,40 +199,42 @@ export async function incomingEvent( : undefined, }; - // if timestamp is from the past we dont want to create a new session - if (uaInfo.isServer || isTimestampFromThePast) { - const session = - profileId && !isTimestampFromThePast - ? await sessionBuffer.getExistingSession({ - profileId, - projectId, - }) - : null; + // Server-side events: when a profileId is supplied, enrich from the + // user's most recent browser session (deviceId, sessionId, geo, UA, path, + // referrer). Without a session, fall back to the API-computed identity. + // Server events never create or extend live sessions in Redis. + if (uaInfo.isServer) { + const enrichment = profileId + ? await sessionBuffer.getExistingSession({ profileId, projectId }) + : null; - const payload = { - ...baseEvent, - deviceId: session?.device_id ?? '', - sessionId: session?.id ?? '', - referrer: session?.referrer ?? undefined, - referrerName: session?.referrer_name ?? undefined, - referrerType: session?.referrer_type ?? undefined, - path: session?.exit_path ?? baseEvent.path, - origin: session?.exit_origin ?? baseEvent.origin, - os: session?.os ?? baseEvent.os, - osVersion: session?.os_version ?? baseEvent.osVersion, - browserVersion: session?.browser_version ?? baseEvent.browserVersion, - browser: session?.browser ?? baseEvent.browser, - device: session?.device ?? baseEvent.device, - brand: session?.brand ?? baseEvent.brand, - model: session?.model ?? baseEvent.model, - city: session?.city ?? baseEvent.city, - country: session?.country ?? baseEvent.country, - region: session?.region ?? baseEvent.region, - longitude: session?.longitude ?? baseEvent.longitude, - latitude: session?.latitude ?? baseEvent.latitude, - }; + const payload: IServiceCreateEventPayload = enrichment + ? { + ...baseEvent, + deviceId: enrichment.device_id, + sessionId: enrichment.id, + referrer: enrichment.referrer ?? undefined, + referrerName: enrichment.referrer_name ?? undefined, + referrerType: enrichment.referrer_type ?? undefined, + path: enrichment.exit_path ?? baseEvent.path, + origin: enrichment.exit_origin ?? baseEvent.origin, + os: enrichment.os ?? baseEvent.os, + osVersion: enrichment.os_version ?? baseEvent.osVersion, + browserVersion: + enrichment.browser_version ?? baseEvent.browserVersion, + browser: enrichment.browser ?? baseEvent.browser, + device: enrichment.device ?? baseEvent.device, + brand: enrichment.brand ?? baseEvent.brand, + model: enrichment.model ?? baseEvent.model, + city: enrichment.city ?? baseEvent.city, + country: enrichment.country ?? baseEvent.country, + region: enrichment.region ?? baseEvent.region, + longitude: enrichment.longitude ?? baseEvent.longitude, + latitude: enrichment.latitude ?? baseEvent.latitude, + } + : baseEvent; - return createEventAndNotify(payload as IServiceEvent, logger, projectId); + return createEventAndNotify(payload, logger, projectId); } const activeSessionEndJob = await getActiveSessionEndJob( @@ -219,6 +258,34 @@ export async function incomingEvent( return null; } + // Historical (buffered) events: the API has already computed a + // deterministic sessionId for them. Write the event and emit one + // session_start per bucket (Redis lock dedups across batches and + // workers). Do NOT touch live session state — historical events + // must not extend the user's current session or schedule a 30-min + // sessionEnd timer. + if (!isLiveEvent) { + if (await acquireSessionStartLock(projectId, sessionId)) { + await createEventAndNotify( + { + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }, + logger, + projectId, + ).catch((error) => { + logger.error( + { err: error, event: payload }, + 'Error creating historical session start event', + ); + // Don't throw — historical session_start is best-effort. The + // event itself should still land. + }); + } + return createEventAndNotify(payload, logger, projectId); + } + if (activeSessionEndJob) { await extendSessionEndJob({ projectId, @@ -227,7 +294,11 @@ export async function incomingEvent( }).catch((error) => { logger.warn({ err: error }, 'Failed to extend session end job'); }); - } else { + } else if (await acquireSessionStartLock(projectId, sessionId)) { + // Lock prevents the previously-observed batch race: when N events for + // the same device land in the API in parallel, all see no Redis + // sessionEnd key yet, all queue with session: undefined, and would + // each try to emit session_start. The lock collapses them to one. await createEventAndNotify( { ...payload, @@ -251,6 +322,17 @@ export async function incomingEvent( ); throw error; }); + } else { + // Another worker (or earlier event in this batch) claimed the + // session_start. Still ensure a sessionEnd is scheduled so the + // session closes cleanly. createSessionEndJob is idempotent on + // jobId, so this is a no-op when the job already exists. + await createSessionEndJob({ payload }).catch((error) => { + logger.warn( + { err: error, event: payload }, + 'Failed to schedule session end job (lock not acquired)', + ); + }); } return createEventAndNotify(payload, logger, projectId); diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 30547d7dc..345b57fd7 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -26,6 +26,16 @@ vi.mock('@openpanel/db', async () => { }, }; }); +// Mock the session_start dedup lock so tests don't need a live Redis. By +// default the lock is acquired (true) so existing tests' session_start +// expectations still hold; individual tests can override per-call. +vi.mock('@openpanel/redis', async () => { + const actual = await vi.importActual('@openpanel/redis'); + return { + ...actual, + getLock: vi.fn().mockResolvedValue(true), + }; +}); // 30 minutes const SESSION_TIMEOUT = 30 * 60 * 1000; @@ -79,7 +89,6 @@ describe('incomingEvent', () => { event: { name: 'test_event', timestamp: timestamp.toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -174,7 +183,6 @@ describe('incomingEvent', () => { name: 'test_event', timestamp: timestamp.toISOString(), properties: { __path: 'https://example.com/test' }, - isTimestampFromThePast: false, }, headers: { 'request-id': '123', @@ -256,7 +264,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -361,7 +368,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -379,6 +385,9 @@ describe('incomingEvent', () => { expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ name: 'server_event', + // Server event with profileId but no existing session: keep the + // API-computed identity instead of blanking deviceId/sessionId. + // The fixture sends '' for both so that's what we expect here. deviceId: '', sessionId: '', profileId: 'profile-123', @@ -405,9 +414,12 @@ describe('incomingEvent', () => { duration: 0, path: '', origin: '', - referrer: undefined, - referrerName: undefined, - referrerType: undefined, + // baseEvent fields fall through uniformly when there's no + // session enrichment available — empty strings for all referrer + // fields rather than the previous mix of undefined/''. + referrer: '', + referrerName: '', + referrerType: '', sdkName: 'server', sdkVersion: '1.0.0', groups: [], @@ -434,7 +446,6 @@ describe('incomingEvent', () => { event: { name: eventName, timestamp: new Date().toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -485,4 +496,102 @@ describe('incomingEvent', () => { // events extend the existing one via changeDelay. expect(spySessionsQueueAdd).toHaveBeenCalledTimes(1); }); + + it('does not emit duplicate session_start when lock is held', async () => { + const { getLock } = await import('@openpanel/redis'); + // Simulate "another worker already claimed session_start" by failing + // the lock acquisition. Live event still fires; sessionEnd job is + // still scheduled (it's idempotent on jobId). + vi.mocked(getLock).mockResolvedValueOnce(false); + // No active session-end job exists yet — force getJob to undefined so the + // worker falls into the "no active session" branch where the lock check + // matters. (vi.clearAllMocks resets call history but keeps implementations + // set via mockResolvedValue in previous tests.) + vi.spyOn(sessionsQueue, 'getJob').mockResolvedValue(undefined); + const spySessionsQueueAdd = vi + .spyOn(sessionsQueue, 'add') + .mockResolvedValue({} as Job); + + const timestamp = new Date(); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'live_event', + timestamp: timestamp.toISOString(), + properties: { __path: 'https://example.com/test' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/91.0.4472.124', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId, + sessionId: newSessionId, + }; + (createEvent as Mock).mockReturnValue({}); + + await incomingEvent(jobData); + + // No session_start emission (lock not acquired) + const startCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'session_start', + ); + expect(startCalls).toHaveLength(0); + // Live event itself still gets created + const liveCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'live_event', + ); + expect(liveCalls).toHaveLength(1); + // sessionEnd is still scheduled even when lock not acquired (idempotent) + expect(spySessionsQueueAdd).toHaveBeenCalled(); + }); + + it('historical event preserves API-computed deviceId/sessionId', async () => { + // Event with __timestamp older than SESSION_TIMEOUT (30 min). Worker + // should write it with the deviceId/sessionId the API computed, + // without scheduling sessionEnd (live state untouched). + const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'historical_event', + timestamp: oneHourAgo.toISOString(), + properties: { __path: 'https://example.com/replay' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15', + 'openpanel-sdk-name': 'react-native', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId: 'mobile-device-xyz', + sessionId: 'deterministic-bucket-id', + }; + + (createEvent as Mock).mockReturnValue({}); + await incomingEvent(jobData); + + // Live state untouched: no sessionEnd job scheduled + expect(sessionsQueue.add).not.toHaveBeenCalled(); + // Two createEvent calls: one for the historical session_start (lock + // acquired by default in the redis mock), one for the event itself + expect((createEvent as Mock).mock.calls).toHaveLength(2); + const startCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'session_start', + ); + const eventCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'historical_event', + ); + expect(startCall).toBeDefined(); + expect(eventCall).toBeDefined(); + expect(eventCall![0].deviceId).toBe('mobile-device-xyz'); + expect(eventCall![0].sessionId).toBe('deterministic-bucket-id'); + }); }); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index bb7b8cb45..8858bb052 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -33,7 +33,6 @@ export interface EventsQueuePayloadIncomingEvent { projectId: string; event: ITrackPayload & { timestamp: string | number; - isTimestampFromThePast: boolean; }; uaInfo: | { diff --git a/packages/validation/src/track.validation.ts b/packages/validation/src/track.validation.ts index c97b51a08..116777cc3 100644 --- a/packages/validation/src/track.validation.ts +++ b/packages/validation/src/track.validation.ts @@ -220,6 +220,19 @@ export const zTrackHandlerPayload = z.discriminatedUnion('type', [ .meta({ title: 'Assign Group' }), ]) satisfies z.ZodType; +// Batch ingestion: envelope is validated strictly (array length only); per-event +// validation runs inside the controller via `safeParse(zTrackHandlerPayload)` so +// invalid items can be rejected per-index without failing the whole batch. +// +// Per-request caps: up to 2000 events and 10 MB uncompressed body. +export const TRACK_BATCH_MAX_EVENTS = 2000; + +export const zTrackBatchBody = z.object({ + events: z.array(z.unknown()).min(1).max(TRACK_BATCH_MAX_EVENTS), +}); + +export type ITrackBatchBody = z.infer; + // Deprecated types for beta version of the SDKs export interface DeprecatedOpenpanelEventOptions {