feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#374
feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#374mraj602-tohands wants to merge 11 commits into
Conversation
Accept up to 1000 ITrackHandlerPayload events in a single request and
dispatch each through the same per-event pipeline as POST /track.
Per-event validation failures are returned in `rejected[]` with their
index and reason so callers can fix and retry only the bad rows; the
whole batch never fails on a single malformed item.
- New zTrackBatchBody schema (envelope-only validation; per-item
parsing runs in the handler so we can collect errors without
short-circuiting)
- Refactor buildContext into buildSharedRequestContext +
buildEventContext so the batch handler fetches salts and request-IP
geo once and reuses them across all events. Single-event /track
keeps identical behavior.
- Extract dispatchEvent so /track and /track/batch share the per-type
switch; the handler stays the only place that knows about the
unsupported `alias` type.
- duplicateHook (the 100ms body-hash dedup) moved to be /track-only.
Hashing a 1000-item array is meaningless — the hook would either
drop the entire retry on hash collision, or no-op on retries that
mutate __timestamp.
Response shape (always 202 when envelope + auth pass):
{ accepted: number,
rejected: [{ index, reason: 'validation' | 'internal', error }] }
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Wrap shared spies in `vi.hoisted(...)` so they exist when the hoisted vi.mock factories run. Without this, CI failed with `Cannot access 'upsertProfileMock' before initialization` (TDZ inside the factory). - Drop the post-schema defensive length check in batchHandler — the route schema enforces `.min(1).max(1000)` before the handler sees the body, so the runtime guard was unreachable. - Rework dispatchEvent's exhaustiveness check to actually use the never-typed local in the thrown message. The previous `_exhaustive` was unused and would trip Biome's unused-variable rule. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Updates the constant, the route's bodyLimit, the OpenAPI description, and the over-the-cap test fixture. Note: this only changes transport limits. Real mobile/offline use of the batch endpoint is still limited by pre-existing session-derivation behavior (15-min isTimestampFromThePast threshold, session_start row inflation under concurrent device events). Those will be addressed in follow-up commits. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Plumb the event's __timestamp through getDeviceId/getInfoFromSession so the deterministic 30-min session bucket is computed from the event's own time, not Date.now(). Critical for /track/batch where one request can carry events spanning days. Two behavior changes: 1. __deviceId override path no longer short-circuits to sessionId: ''. It now still hits the live-session Redis lookup (keyed on the supplied deviceId) and falls back to the deterministic bucket computed at the event's __timestamp. SDKs that supply a stable client-side device id (mobile, server) get sessionful events. 2. The deterministic getSessionId fallback in getInfoFromSession now receives eventMs explicitly. Historical/buffered events bucket into the wall-clock window they actually happened in, so events arriving in different batches with the same __timestamp window collapse to one session_id (merge on read). Live-traffic behavior is unchanged: when a Redis sessionEnd key exists, we still return that session's id from the live mechanism. Only the fallback path changed. The isTimestampFromThePast field on the queue payload stays in place for now; the worker still reads it. A follow-up commit removes both producer and consumer in one step so neither side is broken in isolation. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Events whose __timestamp is more than 5 days in the past throw
HttpError(400) from getTimestamp, which the existing batchHandler
error path routes to a per-row { reason: 'validation', error: '...' }
entry. Single-event /track returns 400 to the caller via the
standard handler error path.
Window: hard-coded 5 days. Not per-project configurable.
Future-timestamp tolerance unchanged: clientTimestamp > now + 1 min
still falls back to server time.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Stop blanking deviceId/sessionId for events that aren't currently live. With the API now computing a deterministic session_id for every event using its __timestamp, the worker can trust those values instead of replacing them with empty strings. The worker now branches on three cases: 1. Server-side event (uaInfo.isServer): enrich from sessionBuffer when a profileId resolves to an existing browser session; otherwise keep the API-computed identity. Never schedules sessionEnd. 2. Historical event (not server, __timestamp older than SESSION_TIMEOUT): write the event with the API-computed identity. No sessionEnd scheduling, no extendSessionEndJob — historical events must not mutate live session state. 3. Live event (default browser SDK traffic): unchanged. Emit session_start when no Redis sessionEnd key exists, schedule the 30-min sessionEnd job, or extend it if already present. Also drops the isTimestampFromThePast field from the queue payload and all its producers. The "is this event currently live?" decision is now local to the worker and uses the same SESSION_TIMEOUT constant the sessionEnd job already runs on (no new tunable). Documented compromise: historical sessions never get a session_end row written in CH. Their is_bounce / duration are null/0. Acceptable trade-off; alternative is a much bigger change involving lazy session-row creation on bucket close. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Re-introduce the (projectId, sessionId)-keyed lock that was removed
on the bet that groupmq's per-groupId serialization would prevent
duplicates. That bet holds for single-event live traffic but fails
for batch ingestion: when N events for the same device land in
/track/batch, all N see no Redis sessionEnd key during the API's
parallel dispatch and queue with session: undefined, causing the
worker to emit N session_start rows.
Live verification on production showed the symptom: a 1000-event
batch across 25 devices produced 1000 session_start rows instead
of 25.
The lock:
- Key: session_start:{projectId}:{sessionId}
- TTL: SESSION_TIMEOUT (30 min) — same constant the live mechanism
already uses, no new tunable
- Keyed on sessionId not deviceId so historical events from the
same device but different 30-min buckets each get their own
session_start
Both the live and historical worker branches now gate session_start
emission on lock acquisition. When the lock is not acquired:
- Live branch: still schedule sessionEnd (idempotent on jobId —
no-op if already present), still write the event row
- Historical branch: still write the event row, skip session_start
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three CI failures, all in tests:
1. Worker server-events test expected `referrerName: undefined` but
the actual baseEvent now resolves it to `''` (empty string from the
utmReferrer/referrer fallback chain). Align the expectation.
2 + 3. API bucketing tests used hard-coded Date.UTC(2026, 4, 1, ...)
timestamps. That date is more than 5 days before "now" when CI
runs, so the new 5-day rejection rejects both events and the
tests see accepted: 0 instead of 2. Switch to Date.now()-relative
timestamps:
- "different buckets" test: anchor to (now - 2h) and use a 1h gap
- "same bucket" test: anchor to the start of a bucket that closed
1h ago and use a 5-min gap (both inside one wall-clock window
and inside the 5-day acceptance window)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
vi.clearAllMocks resets call history but keeps mockResolvedValue implementations. A prior test set sessionsQueue.getJob to return a live job, which leaked into the lock-not-acquired test and routed control to the extendSessionEndJob branch instead of the lock check. That in turn left a mockResolvedValueOnce(false) on getLock unconsumed, which then broke the next test's session_start emission. Force getJob to undefined explicitly so each test starts from a clean "no active session" state.
📝 WalkthroughWalkthroughThis PR implements POST /track/batch for bulk event ingestion with per-item validation and bounded concurrency, derives device/session IDs deterministically from each event's timestamp (eventMs), removes the old past-flag from queue payloads, and prevents duplicate session_start emissions using a Redis lock strategy in the worker. ChangesBatch event ingestion and session bucketing
Sequence Diagram(s)sequenceDiagram
participant Client
participant BatchRoute as POST /track/batch
participant TrackController as batchHandler
participant Dispatch as dispatchEvent
participant QueueDB as Queue/DB
participant Worker as Worker
Client->>BatchRoute: POST /track/batch (auth + events[])
BatchRoute->>TrackController: validate + process
TrackController->>TrackController: buildSharedRequestContext (geo, salts)
loop each event in batch
TrackController->>Dispatch: buildEventContext (eventMs from __timestamp)
Dispatch->>Dispatch: getDeviceId(eventMs) => deterministic sessionId
Dispatch->>QueueDB: dispatch by type (track→queue, identify→profile upsert)
end
TrackController->>Client: 202 { accepted, rejected[] }
QueueDB->>Worker: incoming-event job
Worker->>Worker: isLiveEvent = !uaInfo.isServer && recent(createdAt)
alt isLiveEvent
Worker->>Worker: acquireSessionStartLock(projectId, sessionId)
alt lock acquired
Worker->>QueueDB: emit session_start
else lock not acquired
Worker->>QueueDB: schedule session_end (idempotent) and log
end
else historical
Worker->>Worker: acquireSessionStartLock (best-effort)
alt lock acquired
Worker->>QueueDB: emit historical session_start (best-effort)
end
end
Worker->>QueueDB: createEventAndNotify
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/api/src/utils/ids.ts (1)
32-37: ⚡ Quick winAvoid duplicate Redis lookup when
overrideDeviceIdis set.This path passes the same ID as both current and previous, which causes duplicate
hgetcalls ingetInfoFromSession. MakepreviousDeviceIdoptional (or skip second fetch when equal).Suggested change
- return getInfoFromSession({ - projectId, - currentDeviceId: overrideDeviceId, - previousDeviceId: overrideDeviceId, - eventMs, - }); + return getInfoFromSession({ + projectId, + currentDeviceId: overrideDeviceId, + eventMs, + });async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, eventMs, }: { projectId: string; currentDeviceId: string; - previousDeviceId: string; + previousDeviceId?: string; eventMs: number; }): Promise<DeviceIdResult> { try { const multi = getRedisCache().multi(); multi.hget( `bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`, 'data' ); - multi.hget( - `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`, - 'data' - ); + if (previousDeviceId && previousDeviceId !== currentDeviceId) { + multi.hget( + `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`, + 'data' + ); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/api/src/utils/ids.ts` around lines 32 - 37, The call is passing overrideDeviceId as both currentDeviceId and previousDeviceId causing duplicate Redis hget calls; make previousDeviceId optional and modify getInfoFromSession to skip the second fetch when previousDeviceId is undefined or equal to currentDeviceId (i.e., only perform the hget for previousDeviceId if previousDeviceId && previousDeviceId !== currentDeviceId), or alternatively stop passing previousDeviceId from the caller when it's the same as currentDeviceId so the function's existing optional behavior can be used; update getInfoFromSession signature/usage accordingly and ensure any Redis hget for previousDeviceId is guarded by that condition.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/api/src/controllers/track.controller.ts`:
- Around line 531-568: The current Promise.all(events.map(...)) launches all
event pipelines concurrently (up to ~2000), causing load spikes; replace that
full-parallel map with a bounded-concurrency executor so per-item work (the
logic using zTrackHandlerPayload, buildEventContext, dispatchEvent, and
request.log) runs only N at a time. For example, import or implement a p-limit
style limiter, create const limit = pLimit(CONCURRENCY) (or a simple semaphore),
then map events to limit(() => /* the existing async handler body that returns
BatchItemResult and uses parsed, buildEventContext, dispatchEvent, request.log
*/) and await Promise.all(limitedPromises) so results keep their original index
and status semantics; ensure error handling and returned shape ({ index, status,
reason, error }) are preserved exactly.
---
Nitpick comments:
In `@apps/api/src/utils/ids.ts`:
- Around line 32-37: The call is passing overrideDeviceId as both
currentDeviceId and previousDeviceId causing duplicate Redis hget calls; make
previousDeviceId optional and modify getInfoFromSession to skip the second fetch
when previousDeviceId is undefined or equal to currentDeviceId (i.e., only
perform the hget for previousDeviceId if previousDeviceId && previousDeviceId
!== currentDeviceId), or alternatively stop passing previousDeviceId from the
caller when it's the same as currentDeviceId so the function's existing optional
behavior can be used; update getInfoFromSession signature/usage accordingly and
ensure any Redis hget for previousDeviceId is guarded by that condition.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 50b23ca2-f41b-4739-8511-83a3ffb87243
📒 Files selected for processing (9)
apps/api/src/controllers/event.controller.tsapps/api/src/controllers/track.controller.tsapps/api/src/routes/track-batch.router.test.tsapps/api/src/routes/track.router.tsapps/api/src/utils/ids.tsapps/worker/src/jobs/events.incoming-event.tsapps/worker/src/jobs/events.incoming-events.test.tspackages/queue/src/queues.tspackages/validation/src/track.validation.ts
💤 Files with no reviewable changes (1)
- packages/queue/src/queues.ts
Process /track/batch events in chunks of 50 instead of an unbounded Promise.all over up to 2000 events. Each event hits Redis (session lookup + groupmq add) and may trigger a geo lookup if __ip is overridden, so an unbounded fan-out can pressure Redis pool size and the geo provider's rate budget on smaller self-hosted instances. 50 keeps the pipeline saturated without thundering-herd behaviour. Throughput in our prod testing was already fine (2000 events / ~1.3s) without bounding, but the unbounded version is fragile under burst traffic for self-hosters. Per-index result alignment is preserved via the existing `index` field on BatchItemResult.
|
Thanks @coderabbitai for the review. Inline comment (concurrency): addressed in e5bcc3b — see the threaded reply for rationale and trade-off discussion. Pre-merge: docstring coverage 35%: leaving as-is. Most of the missing docstrings are on test fixtures and small internal helpers where a docstring would just restate the next line of code. The load-bearing functions ( Possibly related PRs (#231): looked at it — it touches the queue payload type and ingestion path in adjacent ways but doesn't conflict. If/when #231 lands first I'd reconcile during rebase; if this PR lands first that one would need to update for the removed Branch HEAD is now |
|
Triggering a fresh review against (´• ω •`) 🐇 ✅ Actions performedReview triggered.
|
Adds a 200-event batch with bad rows scattered across chunk boundaries (indices 0, 50, 99, 100, 149, 199 — first/last of each 50-event chunk plus a chunk-internal one). Locks down the contract that BatchItemResult.index aligns with the input index regardless of the chunked processing order, which would catch any future off-by-one in the slicing or out-of-order accumulation.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
apps/api/src/routes/track-batch.router.test.ts (2)
226-261: ⚡ Quick winCover the per-item
internalrejection path.This section only exercises validation failures. The batch contract also returns
reason: 'internal'when dispatch fails, so a regression inqueueAdd/upsertProfileerror handling would currently go uncaught.🧪 Suggested regression test
+ it('marks downstream dispatch failures as per-item internal rejections', async () => { + queueAdd.mockRejectedValueOnce(new Error('queue exploded')); + + const res = await postBatch({ + events: [validTrack('bad_dispatch'), validTrack('still_ok')], + }); + + expect(res.statusCode).toBe(202); + expect(res.json()).toMatchObject({ + accepted: 1, + rejected: [{ index: 0, reason: 'internal' }], + }); + expect(queueAdd).toHaveBeenCalledTimes(2); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/api/src/routes/track-batch.router.test.ts` around lines 226 - 261, Add a test in the same "per-item validation" suite that simulates an internal dispatch failure by having the mocked queueAdd (or upsertProfile if that path is used) throw for one event; call postBatch with multiple events (including at least one that would be accepted) and assert the response is 202, that accepted count reflects only successfully-dispatched items, that the rejected array includes the failing event's index with reason 'internal', and that queueAdd/upsertProfile was called as expected; reference postBatch, queueAdd, upsertProfile, and validTrack to locate the relevant stubbing and assertions to implement this regression test.
291-414: ⚡ Quick winAdd coverage for future
__timestamphandling.These timestamp tests cover historical rejection and bucket derivation, but not the
> 1 minute future => clamp to nowrule or the ~1 minute tolerance. That leaves a core acceptance rule unguarded.🧪 Suggested regression test
+ it('clamps timestamps that are more than 1 minute in the future', async () => { + const before = Date.now(); + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'future_event', + properties: { + __deviceId: 'future-device', + __timestamp: new Date(before + 5 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + + expect(res.statusCode).toBe(202); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.eventMs).toBeGreaterThanOrEqual(before); + expect(queuedJob.data.eventMs).toBeLessThanOrEqual(Date.now()); + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/api/src/routes/track-batch.router.test.ts` around lines 291 - 414, Add a new test that verifies timestamps more than 1 minute in the future are clamped to now: use postBatch to send two track events for the same device where one has __timestamp = new Date(Date.now() + 2*60*1000).toISOString() (2 minutes in the future) and the other uses Date.now(); assert the response shows both accepted, queueAdd was called twice, extract sessionId from queueAdd.mock.calls[0]?.[0].data.sessionId and [1] and assert they are equal (showing the future timestamp was clamped into the same session/bucket), naming the test something like 'clamps __timestamp >1 minute in future to now'.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@apps/api/src/routes/track-batch.router.test.ts`:
- Around line 226-261: Add a test in the same "per-item validation" suite that
simulates an internal dispatch failure by having the mocked queueAdd (or
upsertProfile if that path is used) throw for one event; call postBatch with
multiple events (including at least one that would be accepted) and assert the
response is 202, that accepted count reflects only successfully-dispatched
items, that the rejected array includes the failing event's index with reason
'internal', and that queueAdd/upsertProfile was called as expected; reference
postBatch, queueAdd, upsertProfile, and validTrack to locate the relevant
stubbing and assertions to implement this regression test.
- Around line 291-414: Add a new test that verifies timestamps more than 1
minute in the future are clamped to now: use postBatch to send two track events
for the same device where one has __timestamp = new Date(Date.now() +
2*60*1000).toISOString() (2 minutes in the future) and the other uses
Date.now(); assert the response shows both accepted, queueAdd was called twice,
extract sessionId from queueAdd.mock.calls[0]?.[0].data.sessionId and [1] and
assert they are equal (showing the future timestamp was clamped into the same
session/bucket), naming the test something like 'clamps __timestamp >1 minute in
future to now'.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 188ee57d-0327-4414-805e-31e58c360005
📒 Files selected for processing (1)
apps/api/src/routes/track-batch.router.test.ts
Closes #373.
Summary
Adds
POST /track/batch— accepts up to 2000 events / 10 MB per request and back-dates each event to its own__timestamp(up to 5 days in the past). Built so offline-capable SDKs (mobile, IoT, edge) can drain a local buffer in one round-trip and have those events land in the right historical session, not all stamped with arrival time.The PR is split into 9 focused commits so each architectural decision is reviewable in isolation. They build on each other:
feat: add POST /track/batch ingestion endpointrejected[]fix: vi.mock hoisting + lint cleanupfeat: bump batch limit to 2000 events / 10 MBfeat: deterministic session_id for all events at API edge__timestampdrive session bucketingfeat: reject events with __timestamp older than 5 daysfeat: split live vs historical worker dispatchfeat: Redis lock for session_start dedupfix: tests use Date.now()-relative timestamps + correct referrerNamefix(test): reset getJob spy in lock-not-acquired testWhy each architectural choice
Per-row rejection instead of whole-batch failure (commit 1)
A 2000-event batch where one row is malformed shouldn't fail the other 1999. The controller
safeParses each row throughzTrackHandlerPayload; failures become{ index, reason: 'validation' \| 'internal', error }entries inrejected[]. Caller gets 202 with{ accepted, rejected[] }and can re-send only the bad indices.aliasis rejected per-row withreason: 'validation', matching the existing single-event/track400 for alias. Same behaviour, surfaced consistently in both shapes.Salts + geo fetched once per request (commit 1)
buildSharedRequestContextdoes the salts query and request-IP geo lookup once.buildEventContextthen uses those for every event. Only events that override__iptrigger a second geo lookup. For a typical 2000-event batch this is the difference between 1 PG round-trip + 1 geo lookup vs. 4000 of each.2000 events / 10 MB caps (commit 3)
Mixpanel
/importuses the same numbers. Going higher means a single bad request can OOM a Fastify worker. Going lower forces SDKs to chunk excessively for the IoT use case (a device with a week of 1-min readings has 10080 events; 2000-per-batch = 5 round-trips, which is fine; 1000-per-batch = 11 round-trips, noticeably worse on a flaky cell network).The 10 MB body cap is enforced via Fastify's existing limit; no new mechanism needed.
Deterministic session_id from
__timestamp(commit 4)This is the load-bearing change for the IoT use case. Before this PR, when a client sends
__deviceId(override path), the server returnedsessionId: ''and let the worker re-derive a session at processing time. The worker usesDate.now()for that derivation. Result: a 6-hour-old buffered event lands in whatever session is currently active for that device, not the session it belonged to when captured.After this PR:
getDeviceIdaccepts aneventMsparameter (the event's own timestamp, not wall-clock now)eventMs__deviceIdoverride path no longer short-circuits to emptysessionIddeviceIdandsessionIdpopulatedA batch covering 5 days of buffered readings produces 240 distinct sessions (one per 30-min bucket per device), each with
session_startback-dated to the actual capture time. Dashboards show the activity in the right hour of the right day.Hard 5-day floor (commit 5)
Same as Mixpanel's
/importcontract. The deterministic session bucket has finite memory in Redis (TTL = 30 min), and emittingsession_startrows for events older than the analytics tool will reasonably backfill is wasteful. 5 days is the longest plausible offline window for the use cases this is designed to support (weekly sync devices are an edge case worth a separate discussion).In single-event
/trackthis returns 400. In batch it surfaces as a per-row{ reason: 'validation', error: 'event timestamp older than 5 days' }.Three-way worker dispatch: server / historical / live (commit 6)
Before: the worker had two paths — server events (enrich from
sessionBufferif a profileId is supplied) and everything else (extend live session or emit session_start + schedule sessionEnd).After: a third "historical" path. An event is historical when
Date.now() - createdAt > SESSION_TIMEOUT (30 min)and not server-side. Historical events:deviceId/sessionIdextendSessionEndJob(would push the live timer for whatever current session exists)createSessionEndJob(would schedule a 30-min close timer for a 6-hour-old session — meaningless)session_startper bucket via the lock (commit 7)This keeps live session state and historical backfill in completely separate code paths. A device that's actively online while a batch of historical events is being processed for it doesn't get its current session disturbed.
Redis lock for session_start dedup (commit 7)
Two scenarios where duplicate
session_startrows leak in without a lock:Live race: N events for the same
(projectId, deviceId)arrive at the API in parallel. All see no Redis sessionEnd key. All queue with no active session. The worker — even with sequential per-device processing — has all N events trying to emit session_start before any of them has finished writing.Historical batch: a single batch contains 50 events spanning the same 30-min bucket. Each one independently tries to emit session_start.
The lock key is
session_start:{projectId}:{sessionId}, TTL = SESSION_TIMEOUT (30 min — the bucket itself rolls by then). First caller acquires, emits, others see the existing key and skip. The lock-not-acquired branch still callscreateSessionEndJob(idempotent on jobId) so the session still closes cleanly.Keyed on
sessionId, notdeviceId, so two events from the same device but different historical buckets each get their ownsession_start. That's correct behaviour — they ARE different sessions.What this PR does NOT do
session_startis deduped via the Redis lock, but events themselves are not. This is deliberate so the dedup design (Mixpanel insert_id vs Segment messageId, and the Redis vs CH-side lookup tradeoff) can be discussed in isolation. Listed as out-of-scope in Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge #373.Production verification
I deployed this against a self-hosted instance and ran ~80 assertions across 24 scenarios. All passing on the deployed image, with ClickHouse cross-checks for session_id distribution, session_start counts, and bucket boundaries:
/track).__ipoverride: distinct geo per event in CH (US/IN/RU) within one shared session.Test fixtures for these scenarios are in
apps/api/src/routes/track-batch.router.test.ts. The worker-side splits are covered inapps/worker/src/jobs/events.incoming-events.test.ts.Test plan
pnpm vitest runpasses (existing tests + new test files)/trackregression test passes — no behavioural change to that endpointSummary by CodeRabbit
New Features
Bug Fixes / Reliability
Tests