diff --git a/dev-packages/e2e-tests/test-applications/cloudflare-workers/tests/memory.test.ts b/dev-packages/e2e-tests/test-applications/cloudflare-workers/tests/memory.test.ts index 740961b3083f..d3b6483ad682 100644 --- a/dev-packages/e2e-tests/test-applications/cloudflare-workers/tests/memory.test.ts +++ b/dev-packages/e2e-tests/test-applications/cloudflare-workers/tests/memory.test.ts @@ -3,28 +3,76 @@ import { expect, test } from '@playwright/test'; import { INSPECTOR_PORT } from '../playwright.config'; test.describe('Worker V8 isolate memory tests', () => { - test('worker memory is reclaimed after GC', async ({ baseURL }) => { + test('worker memory is stable across request batches', async ({ baseURL }) => { const profiler = new MemoryProfiler({ port: INSPECTOR_PORT }); // Warm up: make initial requests and let the runtime settle - for (let i = 0; i < 5; i++) { + for (let i = 0; i < 20; i++) { await fetch(baseURL!); } await profiler.connect(); - const baselineSnapshot = await profiler.takeHeapSnapshot(); + // First batch + for (let i = 0; i < 50; i++) { + const res = await fetch(baseURL!); + expect(res.status).toBe(200); + await res.text(); + } + + const afterFirstBatch = await profiler.takeHeapSnapshot(); + // Second batch for (let i = 0; i < 50; i++) { const res = await fetch(baseURL!); expect(res.status).toBe(200); await res.text(); } - const finalSnapshot = await profiler.takeHeapSnapshot(); - const result = profiler.compareSnapshots(baselineSnapshot, finalSnapshot); + const afterSecondBatch = await profiler.takeHeapSnapshot(); + + // Compare batches to detect per-request leaks (excludes warm-up effects) + const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch); + + expect(result.nodeGrowthPercent).toBeLessThan(0.15); + + await profiler.close(); + }); + + test('durable object memory is stable across request batches', async ({ baseURL }) => { + const profiler = new MemoryProfiler({ port: INSPECTOR_PORT }); + + // Warm up: let JIT compile, caches fill, and DO instance stabilize + for (let i = 0; i < 30; i++) { + await fetch(`${baseURL}/pass-to-object/storage/put`); + } + + await profiler.connect(); + + // First batch of requests to the same DO + for (let i = 0; i < 50; i++) { + const res = await fetch(`${baseURL}/pass-to-object/storage/put`); + expect(res.status).toBe(200); + await res.text(); + } + + const afterFirstBatch = await profiler.takeHeapSnapshot(); + + // Second batch of requests to the same DO + for (let i = 0; i < 50; i++) { + const res = await fetch(`${baseURL}/pass-to-object/storage/put`); + expect(res.status).toBe(200); + await res.text(); + } + + const afterSecondBatch = await profiler.takeHeapSnapshot(); + + // Compare batches to detect per-request leaks (excludes warm-up effects) + // Before fix: makeFlushLock re-wrapped waitUntil on each request = leak + // After fix: growth should be minimal + const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch); - expect(result.nodeGrowthPercent).toBeLessThan(1); + expect(result.nodeGrowthPercent).toBeLessThan(0.15); await profiler.close(); }); diff --git a/packages/cloudflare/src/flush.ts b/packages/cloudflare/src/flush.ts index e7f036971f4a..e68399d468e7 100644 --- a/packages/cloudflare/src/flush.ts +++ b/packages/cloudflare/src/flush.ts @@ -7,6 +7,17 @@ type FlushLock = { readonly finalize: () => Promise; }; +type FlushLockRegistry = { + readonly locks: Set; +}; + +type FlushLockInternal = FlushLock & { + readonly acquire: () => void; + readonly release: () => void; +}; + +const flushLockRegistries = new WeakMap(); + /** * Enhances the given execution context by wrapping its `waitUntil` method with a proxy * to monitor pending tasks, and provides a flusher function to ensure all tasks @@ -16,27 +27,69 @@ type FlushLock = { * @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined. */ export function makeFlushLock(context: ExecutionContext): FlushLock { + const registry = getOrCreateFlushLockRegistry(context); let resolveAllDone: () => void = () => undefined; const allDone = new Promise(res => { resolveAllDone = res; }); let pending = 0; + + const lock: FlushLockInternal = { + ready: allDone, + acquire: () => { + pending++; + }, + release: () => { + if (--pending === 0) { + registry.locks.delete(lock); + resolveAllDone(); + } + }, + finalize: () => { + if (pending === 0) { + registry.locks.delete(lock); + resolveAllDone(); + } + return allDone; + }, + }; + + registry.locks.add(lock); + return Object.freeze(lock); +} + +function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry { + // eslint-disable-next-line @typescript-eslint/unbound-method + const waitUntil = context.waitUntil; + const existingRegistry = flushLockRegistries.get(waitUntil); + + if (existingRegistry) { + return existingRegistry; + } + + const registry: FlushLockRegistry = { locks: new Set() }; const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil; - context.waitUntil = promise => { - pending++; + const instrumentedWaitUntil: typeof context.waitUntil = promise => { + // Snapshot active locks so locks created after this call do not wait for earlier waitUntil work. + const locks = [...registry.locks]; + + for (const lock of locks) { + lock.acquire(); + } + return originalWaitUntil( promise.finally(() => { - if (--pending === 0) resolveAllDone(); + for (const lock of locks) { + lock.release(); + } }), ); }; - return Object.freeze({ - ready: allDone, - finalize: () => { - if (pending === 0) resolveAllDone(); - return allDone; - }, - }); + + flushLockRegistries.set(instrumentedWaitUntil, registry); + context.waitUntil = instrumentedWaitUntil; + + return registry; } /** diff --git a/packages/cloudflare/test/flush.test.ts b/packages/cloudflare/test/flush.test.ts index 2a2b68aab02d..0226378aac1d 100644 --- a/packages/cloudflare/test/flush.test.ts +++ b/packages/cloudflare/test/flush.test.ts @@ -29,6 +29,53 @@ describe('Flush buffer test', () => { await Promise.all(waitUntilPromises); await expect(lock.ready).resolves.toBeUndefined(); }); + + it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => { + const waitUntilPromises: Promise[] = []; + const context: ExecutionContext = { + waitUntil: vi.fn(promise => { + waitUntilPromises.push(promise); + }), + passThroughOnException: vi.fn(), + }; + + for (let i = 0; i < 20_000; i++) { + makeFlushLock(context); + } + + expect(() => context.waitUntil(Promise.resolve())).not.toThrow(); + await Promise.all(waitUntilPromises); + }); + + it('creates a fresh flush lock when waitUntil was already instrumented', async () => { + const waitUntilPromises: Promise[] = []; + const context: ExecutionContext = { + waitUntil: vi.fn(promise => { + waitUntilPromises.push(promise); + }), + passThroughOnException: vi.fn(), + }; + + const firstLock = makeFlushLock(context); + await firstLock.finalize(); + + let resolveWaitUntil!: () => void; + const secondTask = new Promise(resolve => { + resolveWaitUntil = resolve; + }); + const secondLock = makeFlushLock(context); + + context.waitUntil(secondTask); + void secondLock.finalize(); + + await Promise.resolve(); + expect(waitUntilPromises).toHaveLength(1); + await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending'); + + resolveWaitUntil(); + await Promise.all(waitUntilPromises); + await expect(secondLock.ready).resolves.toBeUndefined(); + }); }); describe('flushAndDispose', () => {