Skip to content

feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#374

Open
mraj602-tohands wants to merge 11 commits into
Openpanel-dev:mainfrom
mraj602-tohands:feat/track-batch-ingestion
Open

feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#374
mraj602-tohands wants to merge 11 commits into
Openpanel-dev:mainfrom
mraj602-tohands:feat/track-batch-ingestion

Conversation

@mraj602-tohands
Copy link
Copy Markdown

@mraj602-tohands mraj602-tohands commented May 18, 2026

Closes #373.

Summary

Adds POST /track/batch — accepts up to 2000 events / 10 MB per request and back-dates each event to its own __timestamp (up to 5 days in the past). Built so offline-capable SDKs (mobile, IoT, edge) can drain a local buffer in one round-trip and have those events land in the right historical session, not all stamped with arrival time.

The PR is split into 9 focused commits so each architectural decision is reviewable in isolation. They build on each other:

# Commit What it does
1 feat: add POST /track/batch ingestion endpoint Route, controller, validation envelope, 202 response shape with per-row rejected[]
2 fix: vi.mock hoisting + lint cleanup Test-only fixup for the new test file
3 feat: bump batch limit to 2000 events / 10 MB Mixpanel-parity caps
4 feat: deterministic session_id for all events at API edge Make __timestamp drive session bucketing
5 feat: reject events with __timestamp older than 5 days Hard floor for historical events
6 feat: split live vs historical worker dispatch Worker stops touching live state for historical events
7 feat: Redis lock for session_start dedup Collapse duplicate session_start emissions across batches/workers
8 fix: tests use Date.now()-relative timestamps + correct referrerName Test-only fixup after adding the 5-day floor
9 fix(test): reset getJob spy in lock-not-acquired test Test-only fixup for vitest mock leakage

Why each architectural choice

Per-row rejection instead of whole-batch failure (commit 1)

A 2000-event batch where one row is malformed shouldn't fail the other 1999. The controller safeParses each row through zTrackHandlerPayload; failures become { index, reason: 'validation' \| 'internal', error } entries in rejected[]. Caller gets 202 with { accepted, rejected[] } and can re-send only the bad indices.

alias is rejected per-row with reason: 'validation', matching the existing single-event /track 400 for alias. Same behaviour, surfaced consistently in both shapes.

Salts + geo fetched once per request (commit 1)

buildSharedRequestContext does the salts query and request-IP geo lookup once. buildEventContext then uses those for every event. Only events that override __ip trigger a second geo lookup. For a typical 2000-event batch this is the difference between 1 PG round-trip + 1 geo lookup vs. 4000 of each.

2000 events / 10 MB caps (commit 3)

Mixpanel /import uses the same numbers. Going higher means a single bad request can OOM a Fastify worker. Going lower forces SDKs to chunk excessively for the IoT use case (a device with a week of 1-min readings has 10080 events; 2000-per-batch = 5 round-trips, which is fine; 1000-per-batch = 11 round-trips, noticeably worse on a flaky cell network).

The 10 MB body cap is enforced via Fastify's existing limit; no new mechanism needed.

Deterministic session_id from __timestamp (commit 4)

This is the load-bearing change for the IoT use case. Before this PR, when a client sends __deviceId (override path), the server returned sessionId: '' and let the worker re-derive a session at processing time. The worker uses Date.now() for that derivation. Result: a 6-hour-old buffered event lands in whatever session is currently active for that device, not the session it belonged to when captured.

After this PR:

  • getDeviceId accepts an eventMs parameter (the event's own timestamp, not wall-clock now)
  • The deterministic 30-min session bucket is keyed on eventMs
  • The __deviceId override path no longer short-circuits to empty sessionId
  • Every event leaves the API with both deviceId and sessionId populated

A batch covering 5 days of buffered readings produces 240 distinct sessions (one per 30-min bucket per device), each with session_start back-dated to the actual capture time. Dashboards show the activity in the right hour of the right day.

Hard 5-day floor (commit 5)

Same as Mixpanel's /import contract. The deterministic session bucket has finite memory in Redis (TTL = 30 min), and emitting session_start rows for events older than the analytics tool will reasonably backfill is wasteful. 5 days is the longest plausible offline window for the use cases this is designed to support (weekly sync devices are an edge case worth a separate discussion).

In single-event /track this returns 400. In batch it surfaces as a per-row { reason: 'validation', error: 'event timestamp older than 5 days' }.

Three-way worker dispatch: server / historical / live (commit 6)

Before: the worker had two paths — server events (enrich from sessionBuffer if a profileId is supplied) and everything else (extend live session or emit session_start + schedule sessionEnd).

After: a third "historical" path. An event is historical when Date.now() - createdAt > SESSION_TIMEOUT (30 min) and not server-side. Historical events:

  • Write the event row with the API-computed deviceId / sessionId
  • Do not call extendSessionEndJob (would push the live timer for whatever current session exists)
  • Do not call createSessionEndJob (would schedule a 30-min close timer for a 6-hour-old session — meaningless)
  • Do emit one session_start per bucket via the lock (commit 7)

This keeps live session state and historical backfill in completely separate code paths. A device that's actively online while a batch of historical events is being processed for it doesn't get its current session disturbed.

Redis lock for session_start dedup (commit 7)

Two scenarios where duplicate session_start rows leak in without a lock:

  1. Live race: N events for the same (projectId, deviceId) arrive at the API in parallel. All see no Redis sessionEnd key. All queue with no active session. The worker — even with sequential per-device processing — has all N events trying to emit session_start before any of them has finished writing.

  2. Historical batch: a single batch contains 50 events spanning the same 30-min bucket. Each one independently tries to emit session_start.

The lock key is session_start:{projectId}:{sessionId}, TTL = SESSION_TIMEOUT (30 min — the bucket itself rolls by then). First caller acquires, emits, others see the existing key and skip. The lock-not-acquired branch still calls createSessionEndJob (idempotent on jobId) so the session still closes cleanly.

Keyed on sessionId, not deviceId, so two events from the same device but different historical buckets each get their own session_start. That's correct behaviour — they ARE different sessions.

What this PR does NOT do

  • No insert_id / dedup. Two identical batches sent twice produce two copies of every event. Worker session_start is deduped via the Redis lock, but events themselves are not. This is deliberate so the dedup design (Mixpanel insert_id vs Segment messageId, and the Redis vs CH-side lookup tradeoff) can be discussed in isolation. Listed as out-of-scope in Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge #373.
  • No SDK changes. API-only. SDKs continue to send single events; the batch endpoint is for new offline-buffer integrations and migration-time replays.
  • No request compression. Fastify can add gzip handling separately if it becomes a real bottleneck.

Production verification

I deployed this against a self-hosted instance and ran ~80 assertions across 24 scenarios. All passing on the deployed image, with ClickHouse cross-checks for session_id distribution, session_start counts, and bucket boundaries:

  • IoT 7-day backlog (168 events, 1/hour): 119 accepted, 119 distinct buckets, 119 session_starts at the historical timestamps. 49 rejected by the 5-day floor.
  • Cross-bucket boundary precision (events 30s apart spanning a bucket edge): 2 sessions, 2 session_starts, correct split.
  • Multi-device household (3 devices, shared IP): 3 separate sessions, no cross-talk.
  • Anonymous → identify → identified mid-batch: anon events use device-as-profile, then flip to real profileId, all share one session.
  • Concurrent batches same device (3 parallel HTTP requests, 6 events total): 1 session_start total — Redis lock dedups across requests/workers.
  • Multi-batch same bucket (sequential HTTP requests in same 30-min window): 1 session_start across requests.
  • Shared kiosk (1 device, 2 profiles): 2 sessions, attributed correctly to each profile period.
  • Same-millisecond ties (20 events with identical timestamp): 20 events, 1 session, 1 session_start.
  • 5-day boundary precision (4d59m vs 5d1m): one accepted, one rejected.
  • Future timestamps: ≤1min preserved, >1min silently clamped to now (matches existing /track).
  • IoT flush (100 events spanning ~30min sent in one request): 2 sessions because they crossed a bucket edge — correct.
  • Per-event __ip override: distinct geo per event in CH (US/IN/RU) within one shared session.

Test fixtures for these scenarios are in apps/api/src/routes/track-batch.router.test.ts. The worker-side splits are covered in apps/worker/src/jobs/events.incoming-events.test.ts.

Test plan

  • pnpm vitest run passes (existing tests + new test files)
  • Single-event /track regression test passes — no behavioural change to that endpoint
  • Manual prod verification on self-hosted instance with full ClickHouse cross-checks (24 scenarios above)
  • Reviewer acceptance of the 9-commit structure (or request to squash)

Summary by CodeRabbit

  • New Features

    • Batch event ingestion (up to 2,000 events) with per-item validation and 202 response containing accepted/rejected results.
    • Event timestamps now drive deterministic session bucketing; timestamps are validated and older/historic events are rejected.
  • Bug Fixes / Reliability

    • Prevent duplicate session-starts via deduplication lock; improved handling for server-side and historical events.
  • Tests

    • New integration and regression tests covering batch flows, per-item validation, timestamp rules, and session behaviors.

Review Change Stack

mraj602 and others added 9 commits May 18, 2026 23:51
Accept up to 1000 ITrackHandlerPayload events in a single request and
dispatch each through the same per-event pipeline as POST /track.
Per-event validation failures are returned in `rejected[]` with their
index and reason so callers can fix and retry only the bad rows; the
whole batch never fails on a single malformed item.

- New zTrackBatchBody schema (envelope-only validation; per-item
  parsing runs in the handler so we can collect errors without
  short-circuiting)
- Refactor buildContext into buildSharedRequestContext +
  buildEventContext so the batch handler fetches salts and request-IP
  geo once and reuses them across all events. Single-event /track
  keeps identical behavior.
- Extract dispatchEvent so /track and /track/batch share the per-type
  switch; the handler stays the only place that knows about the
  unsupported `alias` type.
- duplicateHook (the 100ms body-hash dedup) moved to be /track-only.
  Hashing a 1000-item array is meaningless — the hook would either
  drop the entire retry on hash collision, or no-op on retries that
  mutate __timestamp.

Response shape (always 202 when envelope + auth pass):
  { accepted: number,
    rejected: [{ index, reason: 'validation' | 'internal', error }] }

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Wrap shared spies in `vi.hoisted(...)` so they exist when the
  hoisted vi.mock factories run. Without this, CI failed with
  `Cannot access 'upsertProfileMock' before initialization` (TDZ
  inside the factory).
- Drop the post-schema defensive length check in batchHandler — the
  route schema enforces `.min(1).max(1000)` before the handler sees
  the body, so the runtime guard was unreachable.
- Rework dispatchEvent's exhaustiveness check to actually use the
  never-typed local in the thrown message. The previous `_exhaustive`
  was unused and would trip Biome's unused-variable rule.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Updates the constant, the route's bodyLimit, the OpenAPI description,
and the over-the-cap test fixture.

Note: this only changes transport limits. Real mobile/offline use of
the batch endpoint is still limited by pre-existing session-derivation
behavior (15-min isTimestampFromThePast threshold, session_start row
inflation under concurrent device events). Those will be addressed in
follow-up commits.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Plumb the event's __timestamp through getDeviceId/getInfoFromSession
so the deterministic 30-min session bucket is computed from the
event's own time, not Date.now(). Critical for /track/batch where one
request can carry events spanning days.

Two behavior changes:

1. __deviceId override path no longer short-circuits to sessionId: ''.
   It now still hits the live-session Redis lookup (keyed on the
   supplied deviceId) and falls back to the deterministic bucket
   computed at the event's __timestamp. SDKs that supply a stable
   client-side device id (mobile, server) get sessionful events.

2. The deterministic getSessionId fallback in getInfoFromSession now
   receives eventMs explicitly. Historical/buffered events bucket
   into the wall-clock window they actually happened in, so events
   arriving in different batches with the same __timestamp window
   collapse to one session_id (merge on read).

Live-traffic behavior is unchanged: when a Redis sessionEnd key
exists, we still return that session's id from the live mechanism.
Only the fallback path changed.

The isTimestampFromThePast field on the queue payload stays in place
for now; the worker still reads it. A follow-up commit removes both
producer and consumer in one step so neither side is broken in
isolation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Events whose __timestamp is more than 5 days in the past throw
HttpError(400) from getTimestamp, which the existing batchHandler
error path routes to a per-row { reason: 'validation', error: '...' }
entry. Single-event /track returns 400 to the caller via the
standard handler error path.

Window: hard-coded 5 days. Not per-project configurable.

Future-timestamp tolerance unchanged: clientTimestamp > now + 1 min
still falls back to server time.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Stop blanking deviceId/sessionId for events that aren't currently
live. With the API now computing a deterministic session_id for every
event using its __timestamp, the worker can trust those values
instead of replacing them with empty strings.

The worker now branches on three cases:

1. Server-side event (uaInfo.isServer): enrich from sessionBuffer when
   a profileId resolves to an existing browser session; otherwise keep
   the API-computed identity. Never schedules sessionEnd.
2. Historical event (not server, __timestamp older than
   SESSION_TIMEOUT): write the event with the API-computed identity.
   No sessionEnd scheduling, no extendSessionEndJob — historical
   events must not mutate live session state.
3. Live event (default browser SDK traffic): unchanged. Emit
   session_start when no Redis sessionEnd key exists, schedule the
   30-min sessionEnd job, or extend it if already present.

Also drops the isTimestampFromThePast field from the queue payload
and all its producers. The "is this event currently live?" decision
is now local to the worker and uses the same SESSION_TIMEOUT constant
the sessionEnd job already runs on (no new tunable).

Documented compromise: historical sessions never get a session_end
row written in CH. Their is_bounce / duration are null/0. Acceptable
trade-off; alternative is a much bigger change involving lazy
session-row creation on bucket close.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Re-introduce the (projectId, sessionId)-keyed lock that was removed
on the bet that groupmq's per-groupId serialization would prevent
duplicates. That bet holds for single-event live traffic but fails
for batch ingestion: when N events for the same device land in
/track/batch, all N see no Redis sessionEnd key during the API's
parallel dispatch and queue with session: undefined, causing the
worker to emit N session_start rows.

Live verification on production showed the symptom: a 1000-event
batch across 25 devices produced 1000 session_start rows instead
of 25.

The lock:
- Key: session_start:{projectId}:{sessionId}
- TTL: SESSION_TIMEOUT (30 min) — same constant the live mechanism
  already uses, no new tunable
- Keyed on sessionId not deviceId so historical events from the
  same device but different 30-min buckets each get their own
  session_start

Both the live and historical worker branches now gate session_start
emission on lock acquisition. When the lock is not acquired:
- Live branch: still schedule sessionEnd (idempotent on jobId —
  no-op if already present), still write the event row
- Historical branch: still write the event row, skip session_start

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three CI failures, all in tests:

1. Worker server-events test expected `referrerName: undefined` but
   the actual baseEvent now resolves it to `''` (empty string from the
   utmReferrer/referrer fallback chain). Align the expectation.

2 + 3. API bucketing tests used hard-coded Date.UTC(2026, 4, 1, ...)
   timestamps. That date is more than 5 days before "now" when CI
   runs, so the new 5-day rejection rejects both events and the
   tests see accepted: 0 instead of 2. Switch to Date.now()-relative
   timestamps:
   - "different buckets" test: anchor to (now - 2h) and use a 1h gap
   - "same bucket" test: anchor to the start of a bucket that closed
     1h ago and use a 5-min gap (both inside one wall-clock window
     and inside the 5-day acceptance window)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
vi.clearAllMocks resets call history but keeps mockResolvedValue
implementations. A prior test set sessionsQueue.getJob to return a
live job, which leaked into the lock-not-acquired test and routed
control to the extendSessionEndJob branch instead of the lock check.
That in turn left a mockResolvedValueOnce(false) on getLock unconsumed,
which then broke the next test's session_start emission.

Force getJob to undefined explicitly so each test starts from a clean
"no active session" state.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

📝 Walkthrough

Walkthrough

This PR implements POST /track/batch for bulk event ingestion with per-item validation and bounded concurrency, derives device/session IDs deterministically from each event's timestamp (eventMs), removes the old past-flag from queue payloads, and prevents duplicate session_start emissions using a Redis lock strategy in the worker.

Changes

Batch event ingestion and session bucketing

Layer / File(s) Summary
Validation schemas and queue payload types
packages/validation/src/track.validation.ts, packages/queue/src/queues.ts
Add TRACK_BATCH_MAX_EVENTS and zTrackBatchBody (1–2000 events); remove isTimestampFromThePast from EventsQueuePayloadIncomingEvent payload event shape.
Timestamp validation and request context refactoring
apps/api/src/controllers/track.controller.ts
Refactor getTimestamp to reject events older than 5 days (400) and allow 1-minute future tolerance; introduce shared-request context and per-event buildEventContext to reuse geo/salts across batch items.
Deterministic session ID generation from event timestamp
apps/api/src/utils/ids.ts, apps/api/src/controllers/track.controller.ts
Require and propagate eventMs into getDeviceId / getInfoFromSession so session IDs are bucketed by event timestamp rather than request arrival time; handle __deviceId overrides without dropping sessionId.
Per-type event dispatch and single-event handler update
apps/api/src/controllers/track.controller.ts
Introduce dispatchEvent to centralize routing for track, identify, increment, decrement, group, assign_group, replay; update single-event handler to build context once and return computed deviceId/sessionId.
Batch endpoint routing and HTTP handler
apps/api/src/routes/track.router.ts, apps/api/src/controllers/track.controller.ts
Add TRACK_BATCH_BODY_LIMIT_BYTES (10 MB), move duplicateHook to single-event route, add POST /track/batch with zTrackBatchBody validation and body limit; implement batchHandler to parse and dispatch each item with per-item error classification and return 202 with { accepted, rejected[] }.
Event group controller timestamp handling
apps/api/src/controllers/event.controller.ts
Update postEvent to extract only timestamp from validation and include eventMs in the queued payload; remove the deprecated past-flag from queued event shape.
Batch endpoint integration tests
apps/api/src/routes/track-batch.router.test.ts
Add Vitest suite with hoisted mocks and lifecycle setup; validate auth/envelope constraints, happy-path per-event dispatch (queue for track, profile upsert for identify), per-item validation continuation, alias rejection, sessionId derivation with device overrides, deterministic bucketing by __timestamp, and old-timestamp rejections.
Redis-backed session start deduplication
apps/worker/src/jobs/events.incoming-event.ts
Add acquireSessionStartLock (Redis) with TTL = SESSION_TIMEOUT; compute isLiveEvent from uaInfo and event recency; emit historical session_start only when lock acquired (best-effort) and gate live session_start emission with the lock, scheduling session_end when appropriate without throwing on lock loss.
Worker tests: lock behavior and historical event handling
apps/worker/src/jobs/events.incoming-events.test.ts
Mock Redis lock acquisition, remove past-flag from fixtures, adjust server-event identity expectations, and add regression tests verifying lock-not-acquired and historical-event behaviors.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant BatchRoute as POST /track/batch
  participant TrackController as batchHandler
  participant Dispatch as dispatchEvent
  participant QueueDB as Queue/DB
  participant Worker as Worker

  Client->>BatchRoute: POST /track/batch (auth + events[])
  BatchRoute->>TrackController: validate + process
  TrackController->>TrackController: buildSharedRequestContext (geo, salts)
  loop each event in batch
    TrackController->>Dispatch: buildEventContext (eventMs from __timestamp)
    Dispatch->>Dispatch: getDeviceId(eventMs) => deterministic sessionId
    Dispatch->>QueueDB: dispatch by type (track→queue, identify→profile upsert)
  end
  TrackController->>Client: 202 { accepted, rejected[] }
  QueueDB->>Worker: incoming-event job
  Worker->>Worker: isLiveEvent = !uaInfo.isServer && recent(createdAt)
  alt isLiveEvent
    Worker->>Worker: acquireSessionStartLock(projectId, sessionId)
    alt lock acquired
      Worker->>QueueDB: emit session_start
    else lock not acquired
      Worker->>QueueDB: schedule session_end (idempotent) and log
    end
  else historical
    Worker->>Worker: acquireSessionStartLock (best-effort)
    alt lock acquired
      Worker->>QueueDB: emit historical session_start (best-effort)
    end
  end
  Worker->>QueueDB: createEventAndNotify
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 I hopped through batches, timestamps in paw,

events once lost now find where they belong.
Sessions line up by the moments they saw,
Redis keeps watch so starts don't double-song.
One request, many stories — hop along!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 29.41% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding bulk event ingestion via a POST /track/batch endpoint for offline-first SDKs.
Linked Issues check ✅ Passed All requirements from issue #373 are met: 2000-event limit with 10MB body cap [#373], per-item validation without batch failure [#373], historical timestamp support for deterministic session derivation [#373], Redis-based session_start dedup [#373], isolation of historical events from live session state [#373], and 5-day acceptance window with rejection per-row [#373].
Out of Scope Changes check ✅ Passed All changes directly support batch ingestion scope. Controller/router/validation additions implement the endpoint; worker changes enforce historical-event isolation; queue/IDs updates support deterministic session bucketing from event timestamps; removals of isTimestampFromThePast are scope-aligned cleanups.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
apps/api/src/utils/ids.ts (1)

32-37: ⚡ Quick win

Avoid duplicate Redis lookup when overrideDeviceId is set.

This path passes the same ID as both current and previous, which causes duplicate hget calls in getInfoFromSession. Make previousDeviceId optional (or skip second fetch when equal).

Suggested change
-    return getInfoFromSession({
-      projectId,
-      currentDeviceId: overrideDeviceId,
-      previousDeviceId: overrideDeviceId,
-      eventMs,
-    });
+    return getInfoFromSession({
+      projectId,
+      currentDeviceId: overrideDeviceId,
+      eventMs,
+    });
 async function getInfoFromSession({
   projectId,
   currentDeviceId,
   previousDeviceId,
   eventMs,
 }: {
   projectId: string;
   currentDeviceId: string;
-  previousDeviceId: string;
+  previousDeviceId?: string;
   eventMs: number;
 }): Promise<DeviceIdResult> {
   try {
     const multi = getRedisCache().multi();
     multi.hget(
       `bull:sessions:sessionEnd:${projectId}:${currentDeviceId}`,
       'data'
     );
-    multi.hget(
-      `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`,
-      'data'
-    );
+    if (previousDeviceId && previousDeviceId !== currentDeviceId) {
+      multi.hget(
+        `bull:sessions:sessionEnd:${projectId}:${previousDeviceId}`,
+        'data'
+      );
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/api/src/utils/ids.ts` around lines 32 - 37, The call is passing
overrideDeviceId as both currentDeviceId and previousDeviceId causing duplicate
Redis hget calls; make previousDeviceId optional and modify getInfoFromSession
to skip the second fetch when previousDeviceId is undefined or equal to
currentDeviceId (i.e., only perform the hget for previousDeviceId if
previousDeviceId && previousDeviceId !== currentDeviceId), or alternatively stop
passing previousDeviceId from the caller when it's the same as currentDeviceId
so the function's existing optional behavior can be used; update
getInfoFromSession signature/usage accordingly and ensure any Redis hget for
previousDeviceId is guarded by that condition.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@apps/api/src/controllers/track.controller.ts`:
- Around line 531-568: The current Promise.all(events.map(...)) launches all
event pipelines concurrently (up to ~2000), causing load spikes; replace that
full-parallel map with a bounded-concurrency executor so per-item work (the
logic using zTrackHandlerPayload, buildEventContext, dispatchEvent, and
request.log) runs only N at a time. For example, import or implement a p-limit
style limiter, create const limit = pLimit(CONCURRENCY) (or a simple semaphore),
then map events to limit(() => /* the existing async handler body that returns
BatchItemResult and uses parsed, buildEventContext, dispatchEvent, request.log
*/) and await Promise.all(limitedPromises) so results keep their original index
and status semantics; ensure error handling and returned shape ({ index, status,
reason, error }) are preserved exactly.

---

Nitpick comments:
In `@apps/api/src/utils/ids.ts`:
- Around line 32-37: The call is passing overrideDeviceId as both
currentDeviceId and previousDeviceId causing duplicate Redis hget calls; make
previousDeviceId optional and modify getInfoFromSession to skip the second fetch
when previousDeviceId is undefined or equal to currentDeviceId (i.e., only
perform the hget for previousDeviceId if previousDeviceId && previousDeviceId
!== currentDeviceId), or alternatively stop passing previousDeviceId from the
caller when it's the same as currentDeviceId so the function's existing optional
behavior can be used; update getInfoFromSession signature/usage accordingly and
ensure any Redis hget for previousDeviceId is guarded by that condition.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 50b23ca2-f41b-4739-8511-83a3ffb87243

📥 Commits

Reviewing files that changed from the base of the PR and between 445a864 and 44b297d.

📒 Files selected for processing (9)
  • apps/api/src/controllers/event.controller.ts
  • apps/api/src/controllers/track.controller.ts
  • apps/api/src/routes/track-batch.router.test.ts
  • apps/api/src/routes/track.router.ts
  • apps/api/src/utils/ids.ts
  • apps/worker/src/jobs/events.incoming-event.ts
  • apps/worker/src/jobs/events.incoming-events.test.ts
  • packages/queue/src/queues.ts
  • packages/validation/src/track.validation.ts
💤 Files with no reviewable changes (1)
  • packages/queue/src/queues.ts

Comment thread apps/api/src/controllers/track.controller.ts Outdated
Process /track/batch events in chunks of 50 instead of an unbounded
Promise.all over up to 2000 events. Each event hits Redis (session
lookup + groupmq add) and may trigger a geo lookup if __ip is
overridden, so an unbounded fan-out can pressure Redis pool size and
the geo provider's rate budget on smaller self-hosted instances. 50
keeps the pipeline saturated without thundering-herd behaviour.

Throughput in our prod testing was already fine (2000 events / ~1.3s)
without bounding, but the unbounded version is fragile under burst
traffic for self-hosters. Per-index result alignment is preserved via
the existing `index` field on BatchItemResult.
@mraj602-tohands
Copy link
Copy Markdown
Author

Thanks @coderabbitai for the review.

Inline comment (concurrency): addressed in e5bcc3b — see the threaded reply for rationale and trade-off discussion.

Pre-merge: docstring coverage 35%: leaving as-is. Most of the missing docstrings are on test fixtures and small internal helpers where a docstring would just restate the next line of code. The load-bearing functions (batchHandler, getTimestamp, acquireSessionStartLock, dispatchEvent, the worker's three-way split, buildSharedRequestContext, buildEventContext) all have explanatory comments where the why is non-obvious. Chasing 80% would add noise without adding signal — happy to revisit if a maintainer prefers otherwise.

Possibly related PRs (#231): looked at it — it touches the queue payload type and ingestion path in adjacent ways but doesn't conflict. If/when #231 lands first I'd reconcile during rebase; if this PR lands first that one would need to update for the removed isTimestampFromThePast field. Either order is fine.

Branch HEAD is now e5bcc3b0.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

@mraj602-tohands: Thanks for the thorough follow-up!

  • Concurrency (e5bcc3b) — Noted, I'll re-examine that commit in the updated review.
  • Docstring coverage — The rationale is sound. Keeping prose comments on the load-bearing functions (batchHandler, getTimestamp, acquireSessionStartLock, the three-way worker split, etc.) while skipping boilerplate on fixtures and one-liner helpers is a reasonable call. No objection.
  • PR #231 compatibility — The reconciliation plan is clear in both merge orders. The removed isTimestampFromThePast field is the only coupling point, and it's straightforward to handle during rebase.

Triggering a fresh review against e5bcc3b0 now.

(´• ω •`) 🐇

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Adds a 200-event batch with bad rows scattered across chunk
boundaries (indices 0, 50, 99, 100, 149, 199 — first/last of each
50-event chunk plus a chunk-internal one). Locks down the contract
that BatchItemResult.index aligns with the input index regardless of
the chunked processing order, which would catch any future
off-by-one in the slicing or out-of-order accumulation.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
apps/api/src/routes/track-batch.router.test.ts (2)

226-261: ⚡ Quick win

Cover the per-item internal rejection path.

This section only exercises validation failures. The batch contract also returns reason: 'internal' when dispatch fails, so a regression in queueAdd/upsertProfile error handling would currently go uncaught.

🧪 Suggested regression test
+  it('marks downstream dispatch failures as per-item internal rejections', async () => {
+    queueAdd.mockRejectedValueOnce(new Error('queue exploded'));
+
+    const res = await postBatch({
+      events: [validTrack('bad_dispatch'), validTrack('still_ok')],
+    });
+
+    expect(res.statusCode).toBe(202);
+    expect(res.json()).toMatchObject({
+      accepted: 1,
+      rejected: [{ index: 0, reason: 'internal' }],
+    });
+    expect(queueAdd).toHaveBeenCalledTimes(2);
+  });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/api/src/routes/track-batch.router.test.ts` around lines 226 - 261, Add a
test in the same "per-item validation" suite that simulates an internal dispatch
failure by having the mocked queueAdd (or upsertProfile if that path is used)
throw for one event; call postBatch with multiple events (including at least one
that would be accepted) and assert the response is 202, that accepted count
reflects only successfully-dispatched items, that the rejected array includes
the failing event's index with reason 'internal', and that
queueAdd/upsertProfile was called as expected; reference postBatch, queueAdd,
upsertProfile, and validTrack to locate the relevant stubbing and assertions to
implement this regression test.

291-414: ⚡ Quick win

Add coverage for future __timestamp handling.

These timestamp tests cover historical rejection and bucket derivation, but not the > 1 minute future => clamp to now rule or the ~1 minute tolerance. That leaves a core acceptance rule unguarded.

🧪 Suggested regression test
+  it('clamps timestamps that are more than 1 minute in the future', async () => {
+    const before = Date.now();
+    const res = await postBatch({
+      events: [
+        {
+          type: 'track' as const,
+          payload: {
+            name: 'future_event',
+            properties: {
+              __deviceId: 'future-device',
+              __timestamp: new Date(before + 5 * 60 * 1000).toISOString(),
+            },
+          },
+        },
+      ],
+    });
+
+    expect(res.statusCode).toBe(202);
+    const queuedJob = queueAdd.mock.calls[0]?.[0];
+    expect(queuedJob.data.eventMs).toBeGreaterThanOrEqual(before);
+    expect(queuedJob.data.eventMs).toBeLessThanOrEqual(Date.now());
+  });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/api/src/routes/track-batch.router.test.ts` around lines 291 - 414, Add a
new test that verifies timestamps more than 1 minute in the future are clamped
to now: use postBatch to send two track events for the same device where one has
__timestamp = new Date(Date.now() + 2*60*1000).toISOString() (2 minutes in the
future) and the other uses Date.now(); assert the response shows both accepted,
queueAdd was called twice, extract sessionId from
queueAdd.mock.calls[0]?.[0].data.sessionId and [1] and assert they are equal
(showing the future timestamp was clamped into the same session/bucket), naming
the test something like 'clamps __timestamp >1 minute in future to now'.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@apps/api/src/routes/track-batch.router.test.ts`:
- Around line 226-261: Add a test in the same "per-item validation" suite that
simulates an internal dispatch failure by having the mocked queueAdd (or
upsertProfile if that path is used) throw for one event; call postBatch with
multiple events (including at least one that would be accepted) and assert the
response is 202, that accepted count reflects only successfully-dispatched
items, that the rejected array includes the failing event's index with reason
'internal', and that queueAdd/upsertProfile was called as expected; reference
postBatch, queueAdd, upsertProfile, and validTrack to locate the relevant
stubbing and assertions to implement this regression test.
- Around line 291-414: Add a new test that verifies timestamps more than 1
minute in the future are clamped to now: use postBatch to send two track events
for the same device where one has __timestamp = new Date(Date.now() +
2*60*1000).toISOString() (2 minutes in the future) and the other uses
Date.now(); assert the response shows both accepted, queueAdd was called twice,
extract sessionId from queueAdd.mock.calls[0]?.[0].data.sessionId and [1] and
assert they are equal (showing the future timestamp was clamped into the same
session/bucket), naming the test something like 'clamps __timestamp >1 minute in
future to now'.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 188ee57d-0327-4414-805e-31e58c360005

📥 Commits

Reviewing files that changed from the base of the PR and between e5bcc3b and cabae9b.

📒 Files selected for processing (1)
  • apps/api/src/routes/track-batch.router.test.ts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge

2 participants