Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions packages/audience/core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,51 @@ export class TransportError extends Error {
* `ok: true` means the backend accepted the payload (HTTP 2xx). On
* `ok: false`, `error` is always populated with a structured reason.
*
* Implementations of `HttpSend` MUST NOT reject failures travel
* Implementations of `HttpSend` MUST NOT reject; failures travel
* through this result type. Callers (notably `MessageQueue.flushUnload`)
* rely on this contract for fire-and-forget paths.
*
* `retryAfterMs` is set when the server returned 429 with a parseable
* `Retry-After` header. The queue uses this to override the exponential
* backoff schedule with the server-supplied delay.
*/
export interface TransportResult {
ok: boolean;
error?: TransportError;
retryAfterMs?: number;
}

/**
* Stable, machine-readable code identifying the kind of audience SDK
* failure. Studios can branch on this in their `onError` handler.
*
* - `'FLUSH_FAILED'`POST to `/v1/audience/messages` returned non-2xx.
* - `'CONSENT_SYNC_FAILED'`PUT to `/v1/audience/tracking-consent` returned non-2xx.
* - `'NETWORK_ERROR'`fetch rejected before a response was received
* - `'FLUSH_FAILED'`:POST to `/v1/audience/messages` returned non-2xx.
* - `'CONSENT_SYNC_FAILED'`:PUT to `/v1/audience/tracking-consent` returned non-2xx.
* - `'NETWORK_ERROR'`:fetch rejected before a response was received
* (network failure, CORS, DNS, etc.).
* - `'VALIDATION_REJECTED'` — backend returned 2xx but the body reported
* `rejected > 0`. Terminal: retrying won't help, the
* messages were dropped from the queue. Inspect
* `responseBody` for the per-message detail when the
* backend provides it.
* - `'VALIDATION_REJECTED'`:backend returned 2xx but the body reported
* `rejected > 0`, or the server returned a 4xx (non-429)
* indicating the payload was malformed or unauthorised.
* Terminal: retrying won't help, the messages were dropped
* from the queue.
* - `'RATE_LIMITED'`:server returned 429. The batch is retained and will
* be retried after the backoff window (honoring
* `Retry-After` when present).
*/
export type AudienceErrorCode =
| 'FLUSH_FAILED'
| 'CONSENT_SYNC_FAILED'
| 'NETWORK_ERROR'
| 'VALIDATION_REJECTED';
| 'VALIDATION_REJECTED'
| 'RATE_LIMITED';

/**
* Public error type passed to the SDK's `onError` callback. Wraps the
* low-level {@link TransportError} and adds a closed `code` union plus a
* human-readable `message`.
*
* Lives in `@imtbl/audience-core` so every surface (web, pixel, unity,
* unreal) reports failures through the same shape no per-package
* unreal) reports failures through the same shape, with no per-package
* error class, no duplicated mapping logic.
*
* Is an instance of `Error` so it can be thrown, logged, or passed to
Expand Down Expand Up @@ -126,7 +135,7 @@ export class AudienceError extends Error {
* own copy of `status === 0 → NETWORK_ERROR` mapping logic.
*
* @param err The transport-level failure.
* @param source Which subsystem hit the error selects the error code
* @param source Which subsystem hit the error; selects the error code
* and shapes the human message.
* @param count For `'flush'` failures, the number of messages in the
* batch. Used in the human-readable message; ignored for
Expand All @@ -137,7 +146,7 @@ export function toAudienceError(
source: 'flush' | 'consent',
count?: number,
): AudienceError {
// Network failure no HTTP response received.
// Network failure: no HTTP response received.
if (err.status === 0) {
return new AudienceError({
code: 'NETWORK_ERROR',
Expand All @@ -150,8 +159,8 @@ export function toAudienceError(
});
}

// 2xx response with backend-rejected messages. Terminal, do not retry
// the only way ok:false comes back with a 2xx status is when httpSend
// 2xx response with backend-rejected messages. Terminal, do not retry.
// The only way ok:false comes back with a 2xx status is when httpSend
// detected `rejected > 0` in the parsed response body.
if (err.status >= 200 && err.status < 300) {
const body = err.body as { accepted?: number; rejected?: number } | undefined;
Expand All @@ -166,7 +175,34 @@ export function toAudienceError(
});
}

// Generic HTTP failure (4xx / 5xx).
// 429: rate limited, retryable. Batch is kept; queue applies backoff.
if (err.status === 429) {
return new AudienceError({
code: 'RATE_LIMITED',
message: source === 'flush'
? 'Flush rate limited (429)'
: 'Consent sync rate limited (429)',
status: err.status,
endpoint: err.endpoint,
responseBody: err.body,
});
}

// 4xx (non-429): server deterministically rejected the payload. Terminal:
// a bad publishable key or malformed body won't be fixed by retrying.
if (err.status >= 400 && err.status < 500) {
return new AudienceError({
code: source === 'flush' ? 'VALIDATION_REJECTED' : 'CONSENT_SYNC_FAILED',
message: source === 'flush'
? `Flush rejected with status ${err.status}`
: `Consent sync failed with status ${err.status}`,
status: err.status,
endpoint: err.endpoint,
responseBody: err.body,
});
}

// 5xx or other non-2xx/4xx: server unhealthy. Keep batch, apply backoff.
return new AudienceError({
code: source === 'flush' ? 'FLUSH_FAILED' : 'CONSENT_SYNC_FAILED',
message: source === 'flush'
Expand All @@ -183,13 +219,13 @@ export function toAudienceError(
* Invoke a studio-supplied `onError` callback, swallowing any exception
* it throws.
*
* Used by {@link MessageQueue} and {@link createConsentManager} both
* Used by {@link MessageQueue} and {@link createConsentManager}; both
* must not wedge their internal state machines on a badly-written handler.
* Centralised here to keep the swallow-and-continue semantics identical
* across every audience surface and avoid duplicating the try/catch at
* each call site.
*
* Intentionally not re-exported from `index.ts` this is an internal
* Intentionally not re-exported from `index.ts`; this is an internal
* helper, not public API.
*/
export function invokeOnError(
Expand All @@ -200,6 +236,6 @@ export function invokeOnError(
try {
onError(err);
} catch {
// Swallow handler must not crash the state machine.
// Swallow; handler must not crash the state machine.
}
}
180 changes: 177 additions & 3 deletions packages/audience/core/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ describe('MessageQueue', () => {
expect(send).not.toHaveBeenCalled();

queue.enqueue(makeMessage('2'));
// flush is async await the microtask
// flush is async, await the microtask
await Promise.resolve();
expect(send).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -326,6 +326,180 @@ describe('MessageQueue', () => {
});
});

describe('exponential backoff', () => {
it('skips flush while inside the backoff window', async () => {
const start = 1_000_000;
jest.setSystemTime(start);
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(failResult);
const queue = createQueue(send);

queue.enqueue(makeMessage('1'));

// First flush: records failure, sets backoff to start+5000
await queue.flush();
expect(send).toHaveBeenCalledTimes(1);

// Still inside backoff window: flush is a no-op
jest.setSystemTime(start + 4_999);
await queue.flush();
expect(send).toHaveBeenCalledTimes(1);

// Past backoff window: flush proceeds
jest.setSystemTime(start + 5_001);
await queue.flush();
expect(send).toHaveBeenCalledTimes(2);
});

it('escalates backoff: 5s → 10s → 20s → 40s → 60s', async () => {
// Each step: trigger a failure, assert blocked before window, assert unblocked after.
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(failResult);
const queue = createQueue(send);
queue.enqueue(makeMessage('1'));

let now = 1_000_000;
let calls = 0;

const step = async (delay: number) => {
jest.setSystemTime(now);
await queue.flush();
calls++;
expect(send).toHaveBeenCalledTimes(calls);

jest.setSystemTime(now + delay - 1);
await queue.flush();
expect(send).toHaveBeenCalledTimes(calls); // still blocked

now += delay + 1;
jest.setSystemTime(now);
};

await step(5_000);
await step(10_000);
await step(20_000);
await step(40_000);
await step(60_000);
});

it('resets backoff on a successful flush', async () => {
const start = 1_000_000;
jest.setSystemTime(start);
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>()
.mockResolvedValueOnce(failResult)
.mockResolvedValue(okResult);
const queue = createQueue(send);

queue.enqueue(makeMessage('1'));
queue.enqueue(makeMessage('2'));

// First flush fails; backoff starts
await queue.flush();
expect(send).toHaveBeenCalledTimes(1);

// Advance past the 5s window; second flush succeeds, backoff resets
jest.setSystemTime(start + 5_001);
await queue.flush();
expect(send).toHaveBeenCalledTimes(2);

// Should be able to flush immediately after reset
queue.enqueue(makeMessage('3'));
await queue.flush();
expect(send).toHaveBeenCalledTimes(3);
});

it('uses Retry-After delay when server supplies it on 429', async () => {
const start = 1_000_000;
jest.setSystemTime(start);
const rateLimitResult: TransportResult = {
ok: false,
error: new TransportError({ status: 429, endpoint: 'https://api.immutable.com/v1/audience/messages' }),
retryAfterMs: 30_000,
};
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>()
.mockResolvedValueOnce(rateLimitResult)
.mockResolvedValue(okResult);
const queue = createQueue(send);

queue.enqueue(makeMessage('1'));

await queue.flush();
expect(send).toHaveBeenCalledTimes(1);

// 29s: still inside Retry-After window
jest.setSystemTime(start + 29_000);
await queue.flush();
expect(send).toHaveBeenCalledTimes(1);

// Past the window
jest.setSystemTime(start + 30_001);
await queue.flush();
expect(send).toHaveBeenCalledTimes(2);
expect(queue.length).toBe(0);
});

it('fires RATE_LIMITED via onError on 429 and keeps the batch', async () => {
const onError = jest.fn();
const rateLimitResult: TransportResult = {
ok: false,
error: new TransportError({ status: 429, endpoint: 'https://api.immutable.com/v1/audience/messages' }),
};
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(rateLimitResult);
const queue = createQueue(send, { onError });

queue.enqueue(makeMessage('1'));
await queue.flush();

expect(queue.length).toBe(1);
expect(onError).toHaveBeenCalledTimes(1);
expect(onError.mock.calls[0][0].code).toBe('RATE_LIMITED');
expect(onError.mock.calls[0][0].status).toBe(429);
});
});

describe('4xx drop', () => {
it('drops batch and fires VALIDATION_REJECTED on non-retryable 4xx', async () => {
const onError = jest.fn();
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue({
ok: false,
error: new TransportError({
status: 401,
endpoint: 'https://api.immutable.com/v1/audience/messages',
body: 'Unauthorized',
}),
});
const queue = createQueue(send, { onError });

queue.enqueue(makeMessage('1'));
queue.enqueue(makeMessage('2'));
await queue.flush();

expect(queue.length).toBe(0);
expect(onError).toHaveBeenCalledTimes(1);
const err = onError.mock.calls[0][0];
expect(err.code).toBe('VALIDATION_REJECTED');
expect(err.status).toBe(401);
});

it('drops batch and fires VALIDATION_REJECTED on 400', async () => {
const onError = jest.fn();
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue({
ok: false,
error: new TransportError({
status: 400,
endpoint: 'https://api.immutable.com/v1/audience/messages',
body: { error: 'bad request' },
}),
});
const queue = createQueue(send, { onError });

queue.enqueue(makeMessage('1'));
await queue.flush();

expect(queue.length).toBe(0);
expect(onError.mock.calls[0][0].code).toBe('VALIDATION_REJECTED');
expect(onError.mock.calls[0][0].status).toBe(400);
});
});

describe('page-unload flush (keepalive)', () => {
it('flushes via keepalive fetch on visibilitychange to hidden', () => {
const send = jest.fn<ReturnType<HttpSend>, Parameters<HttpSend>>().mockResolvedValue(okResult);
Expand Down Expand Up @@ -417,7 +591,7 @@ describe('page-unload flush (keepalive)', () => {
);
expect(queue.length).toBe(0);

// Listeners removed no double flush
// Listeners removed - no double flush
queue.enqueue(makeMessage('3'));
window.dispatchEvent(new Event('pagehide'));
expect(send).toHaveBeenCalledTimes(1);
Expand All @@ -435,7 +609,7 @@ describe('page-unload flush (keepalive)', () => {
// Start an async flush (sets flushing = true)
const pending = queue.flush();

// pagehide fires while async flush is in flight unload flush should be skipped
// pagehide fires while async flush is in flight; unload flush should be skipped
window.dispatchEvent(new Event('pagehide'));
// Only 1 call (the async flush), no keepalive call
expect(send).toHaveBeenCalledTimes(1);
Expand Down
Loading
Loading