Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1909,4 +1909,157 @@ describe('ReactFlightDOMNode', () => {
globalThis.eval = previousEval;
}
});

// Shared scenario for the two regression tests below. Both guard against
// the same destination-backpressure bug — emitTextChunk and
// emitTypedArrayChunk each push a [headerChunk, contentChunk] pair into
// completedRegularChunks. Before the fix, a flush that broke between the
// two writes left the content chunk stranded at the head of the queue,
// and the next flush emitted any newly-arrived Import rows ahead of it —
// splicing Import bytes into the position the Flight Client expects to
// read as the row's content.
//
// The scenario embeds `payload` in the model under the key `payload` and
// returns the deserialized model. Each test asserts that result.payload
// round-trips identically to what the Flight Server emitted.
async function runScenarioWithBackpressureBetweenHeaderAndContent(payload) {
function Client1() {
return <span>client1</span>;
}
// Client1's Import row must exceed VIEW_SIZE (4096) so writeStringChunk
// takes its BIG path and calls destination.write directly. That write
// returning false is what triggers the backpressure we want to test.
const Client1Reference = clientExports(
Client1,
1,
'/' + 'a'.repeat(5000),
Promise.resolve(),
);

function Client2() {
return <span>client2</span>;
}
const Client2Reference = clientExports(
Client2,
2,
'/client2.js',
Promise.resolve(),
);

let resolveAsync;
const asyncPromise = new Promise(resolve => {
resolveAsync = resolve;
});

async function AsyncWrapper() {
await asyncPromise;
return <Client2Reference />;
}

const model = {
client: <Client1Reference />,
payload,
async: <AsyncWrapper />,
};

const heldCallbacks = [];
const collectedChunks = [];

// A destination that returns false from every write (highWaterMark: 1 in
// byte mode) and never completes any of them until the test releases the
// stored callback. This gives us deterministic control over when each write
// finishes and when 'drain' fires.
const destination = new Stream.Writable({
highWaterMark: 1,
write(chunk, encoding, callback) {
collectedChunks.push(
Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding),
);
heldCallbacks.push(callback);
},
});

const finished = new Promise((resolve, reject) => {
destination.on('finish', resolve);
destination.on('error', reject);
});

// First flush: Client1's huge Import row hits backpressure, so the flush
// loop reaches the payload row's [headerChunk, contentChunk] pair while
// destinationHasCapacity is already false. Before the fix, this would
// have encoded just the header into currentView, broken the loop, and
// let completeWriting flush the header as its own write — stranding
// the content chunk at the front of completedRegularChunks.
const {pipe} = await serverAct(() =>
ReactServerDOMServer.renderToPipeableStream(model, webpackMap),
);
await serverAct(() => {
pipe(destination);
});

// While the destination is still paused, push Client2's Import row into
// completedImportChunks. No flush runs (request.destination is null after
// the first flush's backpressure break).
await serverAct(() => {
resolveAsync();
});

// Release callbacks one at a time. The drain that empties the writable
// buffer triggers flushCompletedChunks; before the fix, this is where
// Client2's newly-queued Import row would have been emitted ahead of
// the still-orphaned payload content chunk.
while (heldCallbacks.length > 0) {
await serverAct(() => {
const cb = heldCallbacks.shift();
cb();
});
}

await finished;

const readable = new Stream.Readable({read() {}});
for (let i = 0; i < collectedChunks.length; i++) {
readable.push(collectedChunks[i]);
}
readable.push(null);

const response = ReactServerDOMClient.createFromNodeStream(readable, {
moduleMap: null,
moduleLoading: null,
});
return await response;
}

it("keeps a Text row's header and content chunks adjacent when a flush hits backpressure between them", async () => {
// length >= 1024 makes the Flight Server outline this as a Text row via
// serializeLargeTextString, which is where emitTextChunk pushes its
// [headerChunk, textChunk] pair.
const largeText = 'x'.repeat(2048);

const result =
await runScenarioWithBackpressureBetweenHeaderAndContent(largeText);

// Before the fix, the Flight Client would have framed Client2's Import
// row bytes as text-row content, making result.payload `<id>:I[...]...`
// garbage rather than the x's the Flight Server emitted.
expect(result.payload).toBe(largeText);
});

it("keeps a TypedArray row's header and content chunks adjacent when a flush hits backpressure between them", async () => {
// emitTypedArrayChunk pushes the same [headerChunk, contentChunk] pair as
// emitTextChunk. Before the fix, a flush break after the header would have
// stranded the content chunk in exactly the same way.
const binaryData = new Uint8Array(1024);
for (let i = 0; i < binaryData.length; i++) {
binaryData[i] = i % 256;
}

const result =
await runScenarioWithBackpressureBetweenHeaderAndContent(binaryData);

// Before the fix, the typed array's bytes would have been replaced by
// Client2's Import row bytes followed by whatever happened to land in
// the next 1024-byte window.
expect(result.payload).toEqual(binaryData);
});
});
76 changes: 59 additions & 17 deletions packages/react-server/src/ReactFlightServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,12 @@ export type Request = {
pingedTasks: Array<Task>,
completedImportChunks: Array<Chunk>,
completedHintChunks: Array<Chunk>,
completedRegularChunks: Array<Chunk | BinaryChunk>,
// Some rows (Text, TypedArray) are pushed as a [headerChunk, contentChunk]
// tuple so flushCompletedChunks can write the pair atomically and never
// strand the content chunk on a backpressure break.
completedRegularChunks: Array<
Chunk | BinaryChunk | Array<Chunk | BinaryChunk>,
>,
completedErrorChunks: Array<Chunk>,
writtenSymbols: Map<symbol, number>,
writtenClientReferences: Map<ClientReferenceKey, number>,
Expand All @@ -594,7 +599,8 @@ export type Request = {
abortTime: number,
// DEV-only
pendingDebugChunks: number,
completedDebugChunks: Array<Chunk | BinaryChunk>,
// See completedRegularChunks for why some entries are tuples.
completedDebugChunks: Array<Chunk | BinaryChunk | Array<Chunk | BinaryChunk>>,
debugDestination: null | Destination,
environmentName: () => string,
filterStackFrame: (
Expand Down Expand Up @@ -695,7 +701,9 @@ function RequestInstance(
this.pingedTasks = pingedTasks;
this.completedImportChunks = ([]: Array<Chunk>);
this.completedHintChunks = ([]: Array<Chunk>);
this.completedRegularChunks = ([]: Array<Chunk | BinaryChunk>);
this.completedRegularChunks = ([]: Array<
Chunk | BinaryChunk | Array<Chunk | BinaryChunk>,
>);
this.completedErrorChunks = ([]: Array<Chunk>);
this.writtenSymbols = new Map();
this.writtenClientReferences = new Map();
Expand All @@ -711,7 +719,9 @@ function RequestInstance(

if (__DEV__) {
this.pendingDebugChunks = 0;
this.completedDebugChunks = ([]: Array<Chunk>);
this.completedDebugChunks = ([]: Array<
Chunk | BinaryChunk | Array<Chunk | BinaryChunk>,
>);
this.debugDestination = null;
this.environmentName =
environmentName === undefined
Expand Down Expand Up @@ -4691,10 +4701,16 @@ function emitTypedArrayChunk(
const binaryLength = byteLengthOfBinaryChunk(binaryChunk);
const row = id.toString(16) + ':' + tag + binaryLength.toString(16) + ',';
const headerChunk = stringToChunk(row);
// Push the header and binary as a single tuple so flushCompletedChunks can
// write them atomically. Otherwise, if the destination's backpressure flips
// between the two writes, the content chunk would be stranded at the front of
// the queue and the next drain would emit Import or Hint chunks between the
// header and the content — and the Flight Client would frame those
// intervening bytes as this row's content.
if (__DEV__ && debug) {
request.completedDebugChunks.push(headerChunk, binaryChunk);
request.completedDebugChunks.push([headerChunk, binaryChunk]);
} else {
request.completedRegularChunks.push(headerChunk, binaryChunk);
request.completedRegularChunks.push([headerChunk, binaryChunk]);
}
}

Expand All @@ -4719,10 +4735,11 @@ function emitTextChunk(
const binaryLength = byteLengthOfChunk(textChunk);
const row = id.toString(16) + ':T' + binaryLength.toString(16) + ',';
const headerChunk = stringToChunk(row);
// See emitTypedArrayChunk for why the pair is pushed as a tuple.
if (__DEV__ && debug) {
request.completedDebugChunks.push(headerChunk, textChunk);
request.completedDebugChunks.push([headerChunk, textChunk]);
} else {
request.completedRegularChunks.push(headerChunk, textChunk);
request.completedRegularChunks.push([headerChunk, textChunk]);
}
}

Expand Down Expand Up @@ -6014,9 +6031,16 @@ function flushCompletedChunks(request: Request): void {
const debugChunks = request.completedDebugChunks;
let i = 0;
for (; i < debugChunks.length; i++) {
request.pendingDebugChunks--;
const chunk = debugChunks[i];
writeChunkAndReturn(debugDestination, chunk);
const item = debugChunks[i];
if (isArray(item)) {
request.pendingDebugChunks -= item.length;
for (let j = 0; j < item.length; j++) {
writeChunkAndReturn(debugDestination, item[j]);
}
} else {
request.pendingDebugChunks--;
writeChunkAndReturn(debugDestination, item);
}
}
debugChunks.splice(0, i);
} finally {
Expand Down Expand Up @@ -6064,9 +6088,18 @@ function flushCompletedChunks(request: Request): void {
const debugChunks = request.completedDebugChunks;
i = 0;
for (; i < debugChunks.length; i++) {
request.pendingDebugChunks--;
const chunk = debugChunks[i];
const keepWriting: boolean = writeChunkAndReturn(destination, chunk);
const item = debugChunks[i];
let keepWriting: boolean;
if (isArray(item)) {
request.pendingDebugChunks -= item.length;
keepWriting = true;
for (let j = 0; j < item.length; j++) {
keepWriting = writeChunkAndReturn(destination, item[j]);
}
} else {
request.pendingDebugChunks--;
keepWriting = writeChunkAndReturn(destination, item);
}
if (!keepWriting) {
request.destination = null;
i++;
Expand All @@ -6080,9 +6113,18 @@ function flushCompletedChunks(request: Request): void {
const regularChunks = request.completedRegularChunks;
i = 0;
for (; i < regularChunks.length; i++) {
request.pendingChunks--;
const chunk = regularChunks[i];
const keepWriting: boolean = writeChunkAndReturn(destination, chunk);
const item = regularChunks[i];
let keepWriting: boolean;
if (isArray(item)) {
request.pendingChunks -= item.length;
keepWriting = true;
for (let j = 0; j < item.length; j++) {
keepWriting = writeChunkAndReturn(destination, item[j]);
}
} else {
request.pendingChunks--;
keepWriting = writeChunkAndReturn(destination, item);
}
if (!keepWriting) {
request.destination = null;
i++;
Expand Down
Loading