diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js
index 0ac1f84cd396..4ab62b593cfa 100644
--- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js
+++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js
@@ -1909,4 +1909,157 @@ describe('ReactFlightDOMNode', () => {
globalThis.eval = previousEval;
}
});
+
+ // Shared scenario for the two regression tests below. Both guard against
+ // the same destination-backpressure bug — emitTextChunk and
+ // emitTypedArrayChunk each push a [headerChunk, contentChunk] pair into
+ // completedRegularChunks. Before the fix, a flush that broke between the
+ // two writes left the content chunk stranded at the head of the queue,
+ // and the next flush emitted any newly-arrived Import rows ahead of it —
+ // splicing Import bytes into the position the Flight Client expects to
+ // read as the row's content.
+ //
+ // The scenario embeds `payload` in the model under the key `payload` and
+ // returns the deserialized model. Each test asserts that result.payload
+ // round-trips identically to what the Flight Server emitted.
+ async function runScenarioWithBackpressureBetweenHeaderAndContent(payload) {
+ function Client1() {
+ return client1;
+ }
+ // Client1's Import row must exceed VIEW_SIZE (4096) so writeStringChunk
+ // takes its BIG path and calls destination.write directly. That write
+ // returning false is what triggers the backpressure we want to test.
+ const Client1Reference = clientExports(
+ Client1,
+ 1,
+ '/' + 'a'.repeat(5000),
+ Promise.resolve(),
+ );
+
+ function Client2() {
+ return client2;
+ }
+ const Client2Reference = clientExports(
+ Client2,
+ 2,
+ '/client2.js',
+ Promise.resolve(),
+ );
+
+ let resolveAsync;
+ const asyncPromise = new Promise(resolve => {
+ resolveAsync = resolve;
+ });
+
+ async function AsyncWrapper() {
+ await asyncPromise;
+ return ;
+ }
+
+ const model = {
+ client: ,
+ payload,
+ async: ,
+ };
+
+ const heldCallbacks = [];
+ const collectedChunks = [];
+
+ // A destination that returns false from every write (highWaterMark: 1 in
+ // byte mode) and never completes any of them until the test releases the
+ // stored callback. This gives us deterministic control over when each write
+ // finishes and when 'drain' fires.
+ const destination = new Stream.Writable({
+ highWaterMark: 1,
+ write(chunk, encoding, callback) {
+ collectedChunks.push(
+ Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding),
+ );
+ heldCallbacks.push(callback);
+ },
+ });
+
+ const finished = new Promise((resolve, reject) => {
+ destination.on('finish', resolve);
+ destination.on('error', reject);
+ });
+
+ // First flush: Client1's huge Import row hits backpressure, so the flush
+ // loop reaches the payload row's [headerChunk, contentChunk] pair while
+ // destinationHasCapacity is already false. Before the fix, this would
+ // have encoded just the header into currentView, broken the loop, and
+ // let completeWriting flush the header as its own write — stranding
+ // the content chunk at the front of completedRegularChunks.
+ const {pipe} = await serverAct(() =>
+ ReactServerDOMServer.renderToPipeableStream(model, webpackMap),
+ );
+ await serverAct(() => {
+ pipe(destination);
+ });
+
+ // While the destination is still paused, push Client2's Import row into
+ // completedImportChunks. No flush runs (request.destination is null after
+ // the first flush's backpressure break).
+ await serverAct(() => {
+ resolveAsync();
+ });
+
+ // Release callbacks one at a time. The drain that empties the writable
+ // buffer triggers flushCompletedChunks; before the fix, this is where
+ // Client2's newly-queued Import row would have been emitted ahead of
+ // the still-orphaned payload content chunk.
+ while (heldCallbacks.length > 0) {
+ await serverAct(() => {
+ const cb = heldCallbacks.shift();
+ cb();
+ });
+ }
+
+ await finished;
+
+ const readable = new Stream.Readable({read() {}});
+ for (let i = 0; i < collectedChunks.length; i++) {
+ readable.push(collectedChunks[i]);
+ }
+ readable.push(null);
+
+ const response = ReactServerDOMClient.createFromNodeStream(readable, {
+ moduleMap: null,
+ moduleLoading: null,
+ });
+ return await response;
+ }
+
+ it("keeps a Text row's header and content chunks adjacent when a flush hits backpressure between them", async () => {
+ // length >= 1024 makes the Flight Server outline this as a Text row via
+ // serializeLargeTextString, which is where emitTextChunk pushes its
+ // [headerChunk, textChunk] pair.
+ const largeText = 'x'.repeat(2048);
+
+ const result =
+ await runScenarioWithBackpressureBetweenHeaderAndContent(largeText);
+
+ // Before the fix, the Flight Client would have framed Client2's Import
+ // row bytes as text-row content, making result.payload `:I[...]...`
+ // garbage rather than the x's the Flight Server emitted.
+ expect(result.payload).toBe(largeText);
+ });
+
+ it("keeps a TypedArray row's header and content chunks adjacent when a flush hits backpressure between them", async () => {
+ // emitTypedArrayChunk pushes the same [headerChunk, contentChunk] pair as
+ // emitTextChunk. Before the fix, a flush break after the header would have
+ // stranded the content chunk in exactly the same way.
+ const binaryData = new Uint8Array(1024);
+ for (let i = 0; i < binaryData.length; i++) {
+ binaryData[i] = i % 256;
+ }
+
+ const result =
+ await runScenarioWithBackpressureBetweenHeaderAndContent(binaryData);
+
+ // Before the fix, the typed array's bytes would have been replaced by
+ // Client2's Import row bytes followed by whatever happened to land in
+ // the next 1024-byte window.
+ expect(result.payload).toEqual(binaryData);
+ });
});
diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js
index 7f6376821b9b..5a080fe406ad 100644
--- a/packages/react-server/src/ReactFlightServer.js
+++ b/packages/react-server/src/ReactFlightServer.js
@@ -576,7 +576,12 @@ export type Request = {
pingedTasks: Array,
completedImportChunks: Array,
completedHintChunks: Array,
- completedRegularChunks: Array,
+ // Some rows (Text, TypedArray) are pushed as a [headerChunk, contentChunk]
+ // tuple so flushCompletedChunks can write the pair atomically and never
+ // strand the content chunk on a backpressure break.
+ completedRegularChunks: Array<
+ Chunk | BinaryChunk | Array,
+ >,
completedErrorChunks: Array,
writtenSymbols: Map,
writtenClientReferences: Map,
@@ -594,7 +599,8 @@ export type Request = {
abortTime: number,
// DEV-only
pendingDebugChunks: number,
- completedDebugChunks: Array,
+ // See completedRegularChunks for why some entries are tuples.
+ completedDebugChunks: Array>,
debugDestination: null | Destination,
environmentName: () => string,
filterStackFrame: (
@@ -695,7 +701,9 @@ function RequestInstance(
this.pingedTasks = pingedTasks;
this.completedImportChunks = ([]: Array);
this.completedHintChunks = ([]: Array);
- this.completedRegularChunks = ([]: Array);
+ this.completedRegularChunks = ([]: Array<
+ Chunk | BinaryChunk | Array,
+ >);
this.completedErrorChunks = ([]: Array);
this.writtenSymbols = new Map();
this.writtenClientReferences = new Map();
@@ -711,7 +719,9 @@ function RequestInstance(
if (__DEV__) {
this.pendingDebugChunks = 0;
- this.completedDebugChunks = ([]: Array);
+ this.completedDebugChunks = ([]: Array<
+ Chunk | BinaryChunk | Array,
+ >);
this.debugDestination = null;
this.environmentName =
environmentName === undefined
@@ -4691,10 +4701,16 @@ function emitTypedArrayChunk(
const binaryLength = byteLengthOfBinaryChunk(binaryChunk);
const row = id.toString(16) + ':' + tag + binaryLength.toString(16) + ',';
const headerChunk = stringToChunk(row);
+ // Push the header and binary as a single tuple so flushCompletedChunks can
+ // write them atomically. Otherwise, if the destination's backpressure flips
+ // between the two writes, the content chunk would be stranded at the front of
+ // the queue and the next drain would emit Import or Hint chunks between the
+ // header and the content — and the Flight Client would frame those
+ // intervening bytes as this row's content.
if (__DEV__ && debug) {
- request.completedDebugChunks.push(headerChunk, binaryChunk);
+ request.completedDebugChunks.push([headerChunk, binaryChunk]);
} else {
- request.completedRegularChunks.push(headerChunk, binaryChunk);
+ request.completedRegularChunks.push([headerChunk, binaryChunk]);
}
}
@@ -4719,10 +4735,11 @@ function emitTextChunk(
const binaryLength = byteLengthOfChunk(textChunk);
const row = id.toString(16) + ':T' + binaryLength.toString(16) + ',';
const headerChunk = stringToChunk(row);
+ // See emitTypedArrayChunk for why the pair is pushed as a tuple.
if (__DEV__ && debug) {
- request.completedDebugChunks.push(headerChunk, textChunk);
+ request.completedDebugChunks.push([headerChunk, textChunk]);
} else {
- request.completedRegularChunks.push(headerChunk, textChunk);
+ request.completedRegularChunks.push([headerChunk, textChunk]);
}
}
@@ -6014,9 +6031,16 @@ function flushCompletedChunks(request: Request): void {
const debugChunks = request.completedDebugChunks;
let i = 0;
for (; i < debugChunks.length; i++) {
- request.pendingDebugChunks--;
- const chunk = debugChunks[i];
- writeChunkAndReturn(debugDestination, chunk);
+ const item = debugChunks[i];
+ if (isArray(item)) {
+ request.pendingDebugChunks -= item.length;
+ for (let j = 0; j < item.length; j++) {
+ writeChunkAndReturn(debugDestination, item[j]);
+ }
+ } else {
+ request.pendingDebugChunks--;
+ writeChunkAndReturn(debugDestination, item);
+ }
}
debugChunks.splice(0, i);
} finally {
@@ -6064,9 +6088,18 @@ function flushCompletedChunks(request: Request): void {
const debugChunks = request.completedDebugChunks;
i = 0;
for (; i < debugChunks.length; i++) {
- request.pendingDebugChunks--;
- const chunk = debugChunks[i];
- const keepWriting: boolean = writeChunkAndReturn(destination, chunk);
+ const item = debugChunks[i];
+ let keepWriting: boolean;
+ if (isArray(item)) {
+ request.pendingDebugChunks -= item.length;
+ keepWriting = true;
+ for (let j = 0; j < item.length; j++) {
+ keepWriting = writeChunkAndReturn(destination, item[j]);
+ }
+ } else {
+ request.pendingDebugChunks--;
+ keepWriting = writeChunkAndReturn(destination, item);
+ }
if (!keepWriting) {
request.destination = null;
i++;
@@ -6080,9 +6113,18 @@ function flushCompletedChunks(request: Request): void {
const regularChunks = request.completedRegularChunks;
i = 0;
for (; i < regularChunks.length; i++) {
- request.pendingChunks--;
- const chunk = regularChunks[i];
- const keepWriting: boolean = writeChunkAndReturn(destination, chunk);
+ const item = regularChunks[i];
+ let keepWriting: boolean;
+ if (isArray(item)) {
+ request.pendingChunks -= item.length;
+ keepWriting = true;
+ for (let j = 0; j < item.length; j++) {
+ keepWriting = writeChunkAndReturn(destination, item[j]);
+ }
+ } else {
+ request.pendingChunks--;
+ keepWriting = writeChunkAndReturn(destination, item);
+ }
if (!keepWriting) {
request.destination = null;
i++;