From e887b98373641a91a42222b66dc4398ebd039f59 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Mon, 25 May 2026 12:20:50 +0200 Subject: [PATCH 1/2] test(cloudflare): reproduce flush lock wrapper growth --- packages/cloudflare/test/flush.test.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/cloudflare/test/flush.test.ts b/packages/cloudflare/test/flush.test.ts index 2a2b68aab02d..18727d46af9d 100644 --- a/packages/cloudflare/test/flush.test.ts +++ b/packages/cloudflare/test/flush.test.ts @@ -29,6 +29,19 @@ describe('Flush buffer test', () => { await Promise.all(waitUntilPromises); await expect(lock.ready).resolves.toBeUndefined(); }); + + it('does not grow the waitUntil wrapper stack on repeated flush lock creation', () => { + const context: ExecutionContext = { + waitUntil: vi.fn(), + passThroughOnException: vi.fn(), + }; + + for (let i = 0; i < 20_000; i++) { + makeFlushLock(context); + } + + expect(() => context.waitUntil(Promise.resolve())).not.toThrow(); + }); }); describe('flushAndDispose', () => { From b979984ab275eaeb4bd09e3f318bc7acf70d301c Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Mon, 25 May 2026 12:59:24 +0200 Subject: [PATCH 2/2] fix(cloudflare): avoid repeated flush lock wrapping --- packages/cloudflare/src/flush.ts | 73 ++++++++++++++++++++++---- packages/cloudflare/test/flush.test.ts | 38 +++++++++++++- 2 files changed, 99 insertions(+), 12 deletions(-) diff --git a/packages/cloudflare/src/flush.ts b/packages/cloudflare/src/flush.ts index e7f036971f4a..e68399d468e7 100644 --- a/packages/cloudflare/src/flush.ts +++ b/packages/cloudflare/src/flush.ts @@ -7,6 +7,17 @@ type FlushLock = { readonly finalize: () => Promise; }; +type FlushLockRegistry = { + readonly locks: Set; +}; + +type FlushLockInternal = FlushLock & { + readonly acquire: () => void; + readonly release: () => void; +}; + +const flushLockRegistries = new WeakMap(); + /** * Enhances the given execution context by wrapping its `waitUntil` method with a proxy * to monitor pending tasks, and provides a flusher function to ensure all tasks @@ -16,27 +27,69 @@ type FlushLock = { * @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined. */ export function makeFlushLock(context: ExecutionContext): FlushLock { + const registry = getOrCreateFlushLockRegistry(context); let resolveAllDone: () => void = () => undefined; const allDone = new Promise(res => { resolveAllDone = res; }); let pending = 0; + + const lock: FlushLockInternal = { + ready: allDone, + acquire: () => { + pending++; + }, + release: () => { + if (--pending === 0) { + registry.locks.delete(lock); + resolveAllDone(); + } + }, + finalize: () => { + if (pending === 0) { + registry.locks.delete(lock); + resolveAllDone(); + } + return allDone; + }, + }; + + registry.locks.add(lock); + return Object.freeze(lock); +} + +function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry { + // eslint-disable-next-line @typescript-eslint/unbound-method + const waitUntil = context.waitUntil; + const existingRegistry = flushLockRegistries.get(waitUntil); + + if (existingRegistry) { + return existingRegistry; + } + + const registry: FlushLockRegistry = { locks: new Set() }; const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil; - context.waitUntil = promise => { - pending++; + const instrumentedWaitUntil: typeof context.waitUntil = promise => { + // Snapshot active locks so locks created after this call do not wait for earlier waitUntil work. + const locks = [...registry.locks]; + + for (const lock of locks) { + lock.acquire(); + } + return originalWaitUntil( promise.finally(() => { - if (--pending === 0) resolveAllDone(); + for (const lock of locks) { + lock.release(); + } }), ); }; - return Object.freeze({ - ready: allDone, - finalize: () => { - if (pending === 0) resolveAllDone(); - return allDone; - }, - }); + + flushLockRegistries.set(instrumentedWaitUntil, registry); + context.waitUntil = instrumentedWaitUntil; + + return registry; } /** diff --git a/packages/cloudflare/test/flush.test.ts b/packages/cloudflare/test/flush.test.ts index 18727d46af9d..0226378aac1d 100644 --- a/packages/cloudflare/test/flush.test.ts +++ b/packages/cloudflare/test/flush.test.ts @@ -30,9 +30,12 @@ describe('Flush buffer test', () => { await expect(lock.ready).resolves.toBeUndefined(); }); - it('does not grow the waitUntil wrapper stack on repeated flush lock creation', () => { + it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => { + const waitUntilPromises: Promise[] = []; const context: ExecutionContext = { - waitUntil: vi.fn(), + waitUntil: vi.fn(promise => { + waitUntilPromises.push(promise); + }), passThroughOnException: vi.fn(), }; @@ -41,6 +44,37 @@ describe('Flush buffer test', () => { } expect(() => context.waitUntil(Promise.resolve())).not.toThrow(); + await Promise.all(waitUntilPromises); + }); + + it('creates a fresh flush lock when waitUntil was already instrumented', async () => { + const waitUntilPromises: Promise[] = []; + const context: ExecutionContext = { + waitUntil: vi.fn(promise => { + waitUntilPromises.push(promise); + }), + passThroughOnException: vi.fn(), + }; + + const firstLock = makeFlushLock(context); + await firstLock.finalize(); + + let resolveWaitUntil!: () => void; + const secondTask = new Promise(resolve => { + resolveWaitUntil = resolve; + }); + const secondLock = makeFlushLock(context); + + context.waitUntil(secondTask); + void secondLock.finalize(); + + await Promise.resolve(); + expect(waitUntilPromises).toHaveLength(1); + await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending'); + + resolveWaitUntil(); + await Promise.all(waitUntilPromises); + await expect(secondLock.ready).resolves.toBeUndefined(); }); });