diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js index 0ac1f84cd396..4ab62b593cfa 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js @@ -1909,4 +1909,157 @@ describe('ReactFlightDOMNode', () => { globalThis.eval = previousEval; } }); + + // Shared scenario for the two regression tests below. Both guard against + // the same destination-backpressure bug — emitTextChunk and + // emitTypedArrayChunk each push a [headerChunk, contentChunk] pair into + // completedRegularChunks. Before the fix, a flush that broke between the + // two writes left the content chunk stranded at the head of the queue, + // and the next flush emitted any newly-arrived Import rows ahead of it — + // splicing Import bytes into the position the Flight Client expects to + // read as the row's content. + // + // The scenario embeds `payload` in the model under the key `payload` and + // returns the deserialized model. Each test asserts that result.payload + // round-trips identically to what the Flight Server emitted. + async function runScenarioWithBackpressureBetweenHeaderAndContent(payload) { + function Client1() { + return client1; + } + // Client1's Import row must exceed VIEW_SIZE (4096) so writeStringChunk + // takes its BIG path and calls destination.write directly. That write + // returning false is what triggers the backpressure we want to test. + const Client1Reference = clientExports( + Client1, + 1, + '/' + 'a'.repeat(5000), + Promise.resolve(), + ); + + function Client2() { + return client2; + } + const Client2Reference = clientExports( + Client2, + 2, + '/client2.js', + Promise.resolve(), + ); + + let resolveAsync; + const asyncPromise = new Promise(resolve => { + resolveAsync = resolve; + }); + + async function AsyncWrapper() { + await asyncPromise; + return ; + } + + const model = { + client: , + payload, + async: , + }; + + const heldCallbacks = []; + const collectedChunks = []; + + // A destination that returns false from every write (highWaterMark: 1 in + // byte mode) and never completes any of them until the test releases the + // stored callback. This gives us deterministic control over when each write + // finishes and when 'drain' fires. + const destination = new Stream.Writable({ + highWaterMark: 1, + write(chunk, encoding, callback) { + collectedChunks.push( + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding), + ); + heldCallbacks.push(callback); + }, + }); + + const finished = new Promise((resolve, reject) => { + destination.on('finish', resolve); + destination.on('error', reject); + }); + + // First flush: Client1's huge Import row hits backpressure, so the flush + // loop reaches the payload row's [headerChunk, contentChunk] pair while + // destinationHasCapacity is already false. Before the fix, this would + // have encoded just the header into currentView, broken the loop, and + // let completeWriting flush the header as its own write — stranding + // the content chunk at the front of completedRegularChunks. + const {pipe} = await serverAct(() => + ReactServerDOMServer.renderToPipeableStream(model, webpackMap), + ); + await serverAct(() => { + pipe(destination); + }); + + // While the destination is still paused, push Client2's Import row into + // completedImportChunks. No flush runs (request.destination is null after + // the first flush's backpressure break). + await serverAct(() => { + resolveAsync(); + }); + + // Release callbacks one at a time. The drain that empties the writable + // buffer triggers flushCompletedChunks; before the fix, this is where + // Client2's newly-queued Import row would have been emitted ahead of + // the still-orphaned payload content chunk. + while (heldCallbacks.length > 0) { + await serverAct(() => { + const cb = heldCallbacks.shift(); + cb(); + }); + } + + await finished; + + const readable = new Stream.Readable({read() {}}); + for (let i = 0; i < collectedChunks.length; i++) { + readable.push(collectedChunks[i]); + } + readable.push(null); + + const response = ReactServerDOMClient.createFromNodeStream(readable, { + moduleMap: null, + moduleLoading: null, + }); + return await response; + } + + it("keeps a Text row's header and content chunks adjacent when a flush hits backpressure between them", async () => { + // length >= 1024 makes the Flight Server outline this as a Text row via + // serializeLargeTextString, which is where emitTextChunk pushes its + // [headerChunk, textChunk] pair. + const largeText = 'x'.repeat(2048); + + const result = + await runScenarioWithBackpressureBetweenHeaderAndContent(largeText); + + // Before the fix, the Flight Client would have framed Client2's Import + // row bytes as text-row content, making result.payload `:I[...]...` + // garbage rather than the x's the Flight Server emitted. + expect(result.payload).toBe(largeText); + }); + + it("keeps a TypedArray row's header and content chunks adjacent when a flush hits backpressure between them", async () => { + // emitTypedArrayChunk pushes the same [headerChunk, contentChunk] pair as + // emitTextChunk. Before the fix, a flush break after the header would have + // stranded the content chunk in exactly the same way. + const binaryData = new Uint8Array(1024); + for (let i = 0; i < binaryData.length; i++) { + binaryData[i] = i % 256; + } + + const result = + await runScenarioWithBackpressureBetweenHeaderAndContent(binaryData); + + // Before the fix, the typed array's bytes would have been replaced by + // Client2's Import row bytes followed by whatever happened to land in + // the next 1024-byte window. + expect(result.payload).toEqual(binaryData); + }); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 7f6376821b9b..5a080fe406ad 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -576,7 +576,12 @@ export type Request = { pingedTasks: Array, completedImportChunks: Array, completedHintChunks: Array, - completedRegularChunks: Array, + // Some rows (Text, TypedArray) are pushed as a [headerChunk, contentChunk] + // tuple so flushCompletedChunks can write the pair atomically and never + // strand the content chunk on a backpressure break. + completedRegularChunks: Array< + Chunk | BinaryChunk | Array, + >, completedErrorChunks: Array, writtenSymbols: Map, writtenClientReferences: Map, @@ -594,7 +599,8 @@ export type Request = { abortTime: number, // DEV-only pendingDebugChunks: number, - completedDebugChunks: Array, + // See completedRegularChunks for why some entries are tuples. + completedDebugChunks: Array>, debugDestination: null | Destination, environmentName: () => string, filterStackFrame: ( @@ -695,7 +701,9 @@ function RequestInstance( this.pingedTasks = pingedTasks; this.completedImportChunks = ([]: Array); this.completedHintChunks = ([]: Array); - this.completedRegularChunks = ([]: Array); + this.completedRegularChunks = ([]: Array< + Chunk | BinaryChunk | Array, + >); this.completedErrorChunks = ([]: Array); this.writtenSymbols = new Map(); this.writtenClientReferences = new Map(); @@ -711,7 +719,9 @@ function RequestInstance( if (__DEV__) { this.pendingDebugChunks = 0; - this.completedDebugChunks = ([]: Array); + this.completedDebugChunks = ([]: Array< + Chunk | BinaryChunk | Array, + >); this.debugDestination = null; this.environmentName = environmentName === undefined @@ -4691,10 +4701,16 @@ function emitTypedArrayChunk( const binaryLength = byteLengthOfBinaryChunk(binaryChunk); const row = id.toString(16) + ':' + tag + binaryLength.toString(16) + ','; const headerChunk = stringToChunk(row); + // Push the header and binary as a single tuple so flushCompletedChunks can + // write them atomically. Otherwise, if the destination's backpressure flips + // between the two writes, the content chunk would be stranded at the front of + // the queue and the next drain would emit Import or Hint chunks between the + // header and the content — and the Flight Client would frame those + // intervening bytes as this row's content. if (__DEV__ && debug) { - request.completedDebugChunks.push(headerChunk, binaryChunk); + request.completedDebugChunks.push([headerChunk, binaryChunk]); } else { - request.completedRegularChunks.push(headerChunk, binaryChunk); + request.completedRegularChunks.push([headerChunk, binaryChunk]); } } @@ -4719,10 +4735,11 @@ function emitTextChunk( const binaryLength = byteLengthOfChunk(textChunk); const row = id.toString(16) + ':T' + binaryLength.toString(16) + ','; const headerChunk = stringToChunk(row); + // See emitTypedArrayChunk for why the pair is pushed as a tuple. if (__DEV__ && debug) { - request.completedDebugChunks.push(headerChunk, textChunk); + request.completedDebugChunks.push([headerChunk, textChunk]); } else { - request.completedRegularChunks.push(headerChunk, textChunk); + request.completedRegularChunks.push([headerChunk, textChunk]); } } @@ -6014,9 +6031,16 @@ function flushCompletedChunks(request: Request): void { const debugChunks = request.completedDebugChunks; let i = 0; for (; i < debugChunks.length; i++) { - request.pendingDebugChunks--; - const chunk = debugChunks[i]; - writeChunkAndReturn(debugDestination, chunk); + const item = debugChunks[i]; + if (isArray(item)) { + request.pendingDebugChunks -= item.length; + for (let j = 0; j < item.length; j++) { + writeChunkAndReturn(debugDestination, item[j]); + } + } else { + request.pendingDebugChunks--; + writeChunkAndReturn(debugDestination, item); + } } debugChunks.splice(0, i); } finally { @@ -6064,9 +6088,18 @@ function flushCompletedChunks(request: Request): void { const debugChunks = request.completedDebugChunks; i = 0; for (; i < debugChunks.length; i++) { - request.pendingDebugChunks--; - const chunk = debugChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + const item = debugChunks[i]; + let keepWriting: boolean; + if (isArray(item)) { + request.pendingDebugChunks -= item.length; + keepWriting = true; + for (let j = 0; j < item.length; j++) { + keepWriting = writeChunkAndReturn(destination, item[j]); + } + } else { + request.pendingDebugChunks--; + keepWriting = writeChunkAndReturn(destination, item); + } if (!keepWriting) { request.destination = null; i++; @@ -6080,9 +6113,18 @@ function flushCompletedChunks(request: Request): void { const regularChunks = request.completedRegularChunks; i = 0; for (; i < regularChunks.length; i++) { - request.pendingChunks--; - const chunk = regularChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + const item = regularChunks[i]; + let keepWriting: boolean; + if (isArray(item)) { + request.pendingChunks -= item.length; + keepWriting = true; + for (let j = 0; j < item.length; j++) { + keepWriting = writeChunkAndReturn(destination, item[j]); + } + } else { + request.pendingChunks--; + keepWriting = writeChunkAndReturn(destination, item); + } if (!keepWriting) { request.destination = null; i++;