From b827dd18a826cb1e33aeaced06a05e967a65b3bb Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Sun, 28 Sep 2025 08:12:09 +0900 Subject: [PATCH] fix: cancel a readable stream if a writable stream is closed before a readable stream is closed. --- src/utils.ts | 10 ++++++---- test/utils.test.ts | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 7230bc8..7962eee 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -12,15 +12,17 @@ export function writeFromReadableStreamDefaultReader( writable: Writable, currentReadPromise?: Promise> | undefined ) { - const handleError = () => { - // ignore the error + const cancel = (error?: unknown) => { + reader.cancel(error).catch(() => {}) } - writable.on('error', handleError) + writable.on('close', cancel) + writable.on('error', cancel) ;(currentReadPromise ?? reader.read()).then(flow, handleStreamError) return reader.closed.finally(() => { - writable.off('error', handleError) + writable.off('close', cancel) + writable.off('error', cancel) }) // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/test/utils.test.ts b/test/utils.test.ts index 391a647..f105b89 100644 --- a/test/utils.test.ts +++ b/test/utils.test.ts @@ -78,14 +78,15 @@ describe('buildOutgoingHttpHeaders', () => { }) describe('writeFromReadableStream', () => { - it('should handle client disconnection gracefully without canceling stream', async () => { + it('should handle client disconnection', async () => { let enqueueCalled = false let cancelCalled = false + let enqueueTimeout: NodeJS.Timeout | undefined // Create test ReadableStream const stream = new ReadableStream({ start(controller) { - setTimeout(() => { + enqueueTimeout = setTimeout(() => { try { controller.enqueue(new TextEncoder().encode('test')) enqueueCalled = true @@ -97,6 +98,7 @@ describe('writeFromReadableStream', () => { }, cancel() { cancelCalled = true + clearTimeout(enqueueTimeout) }, }) @@ -110,8 +112,8 @@ describe('writeFromReadableStream', () => { await writeFromReadableStream(stream, writable) - expect(enqueueCalled).toBe(true) // enqueue should succeed - expect(cancelCalled).toBe(false) // cancel should not be called + expect(enqueueCalled).toBe(false) // enqueue should not be called + expect(cancelCalled).toBe(true) // cancel should be called }) })