Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions packages/cloudflare/src/flush.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ type FlushLock = {
readonly finalize: () => Promise<void>;
};

type FlushLockRegistry = {
readonly locks: Set<FlushLockInternal>;
};

type FlushLockInternal = FlushLock & {
readonly acquire: () => void;
readonly release: () => void;
};

const flushLockRegistries = new WeakMap<ExecutionContext['waitUntil'], FlushLockRegistry>();

/**
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
* to monitor pending tasks, and provides a flusher function to ensure all tasks
Expand All @@ -16,27 +27,69 @@ type FlushLock = {
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
*/
export function makeFlushLock(context: ExecutionContext): FlushLock {
const registry = getOrCreateFlushLockRegistry(context);
let resolveAllDone: () => void = () => undefined;
const allDone = new Promise<void>(res => {
resolveAllDone = res;
});
let pending = 0;

const lock: FlushLockInternal = {
ready: allDone,
acquire: () => {
pending++;
},
release: () => {
if (--pending === 0) {
registry.locks.delete(lock);
resolveAllDone();
}
},
finalize: () => {
if (pending === 0) {
registry.locks.delete(lock);
resolveAllDone();
}
return allDone;
},
};

registry.locks.add(lock);
return Object.freeze(lock);
}

function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry {
// eslint-disable-next-line @typescript-eslint/unbound-method
const waitUntil = context.waitUntil;
const existingRegistry = flushLockRegistries.get(waitUntil);

if (existingRegistry) {
return existingRegistry;
}

const registry: FlushLockRegistry = { locks: new Set() };
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
context.waitUntil = promise => {
pending++;
const instrumentedWaitUntil: typeof context.waitUntil = promise => {
// Snapshot active locks so locks created after this call do not wait for earlier waitUntil work.
const locks = [...registry.locks];

for (const lock of locks) {
lock.acquire();
}

return originalWaitUntil(
promise.finally(() => {
if (--pending === 0) resolveAllDone();
for (const lock of locks) {
lock.release();
}
}),
);
};
return Object.freeze({
ready: allDone,
finalize: () => {
if (pending === 0) resolveAllDone();
return allDone;
},
});

flushLockRegistries.set(instrumentedWaitUntil, registry);
context.waitUntil = instrumentedWaitUntil;

return registry;
}

/**
Expand Down
47 changes: 47 additions & 0 deletions packages/cloudflare/test/flush.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,53 @@ describe('Flush buffer test', () => {
await Promise.all(waitUntilPromises);
await expect(lock.ready).resolves.toBeUndefined();
});

it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => {
const waitUntilPromises: Promise<void>[] = [];
const context: ExecutionContext = {
waitUntil: vi.fn(promise => {
waitUntilPromises.push(promise);
}),
passThroughOnException: vi.fn(),
};

for (let i = 0; i < 20_000; i++) {
makeFlushLock(context);
}

expect(() => context.waitUntil(Promise.resolve())).not.toThrow();
await Promise.all(waitUntilPromises);
});

it('creates a fresh flush lock when waitUntil was already instrumented', async () => {
const waitUntilPromises: Promise<void>[] = [];
const context: ExecutionContext = {
waitUntil: vi.fn(promise => {
waitUntilPromises.push(promise);
}),
passThroughOnException: vi.fn(),
};

const firstLock = makeFlushLock(context);
await firstLock.finalize();

let resolveWaitUntil!: () => void;
const secondTask = new Promise<void>(resolve => {
resolveWaitUntil = resolve;
});
const secondLock = makeFlushLock(context);

context.waitUntil(secondTask);
void secondLock.finalize();

await Promise.resolve();
expect(waitUntilPromises).toHaveLength(1);
await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending');

resolveWaitUntil();
await Promise.all(waitUntilPromises);
await expect(secondLock.ready).resolves.toBeUndefined();
});
});

describe('flushAndDispose', () => {
Expand Down
Loading