From ce8919faa2fdddae6ea90060dd0aaa41f207c4d9 Mon Sep 17 00:00:00 2001 From: Shubham Ranjan Date: Wed, 11 Mar 2026 01:09:34 +0530 Subject: [PATCH 1/3] SOLR-18098: Fix replication failure for files with exact MB sizes --- .../org/apache/solr/handler/IndexFetcher.java | 21 +- .../IndexFetcherPacketProtocolTest.java | 366 ++++++++++++++++++ 2 files changed, 381 insertions(+), 6 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index d83c581a2c41..3695dd0bb712 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -1752,14 +1752,28 @@ private int fetchPackets(FastInputStream fis) throws Exception { aborted = true; throw new ReplicationHandlerException("User aborted replication"); } - long checkSumServer = -1; fis.readFully(intbytes); // read the size of the packet int packetSize = readInt(intbytes); + + // EOF marker: size=0 with no trailing data (no checksum follows) + if (packetSize <= 0 && fis.peek() == -1) { + continue; + } + + // read the checksum (all data packets have checksums, including zero-length ones) + long checkSumServer = -1; + if (checksum != null) { + fis.readFully(longbytes); + checkSumServer = readLong(longbytes); + } + + // zero-length data packet: checksum already consumed, skip to next packet if (packetSize <= 0) { continue; } + // TODO consider recoding the remaining logic to not use/need buf[]; instead use the // internal buffer of fis if (buf.length < packetSize) { @@ -1767,11 +1781,6 @@ private int fetchPackets(FastInputStream fis) throws Exception { // that too buf = new byte[packetSize]; } - if (checksum != null) { - // read the checksum - fis.readFully(longbytes); - checkSumServer = readLong(longbytes); - } // then read the packet of bytes fis.readFully(buf, 0, packetSize); // compare the checksum as sent from the leader diff --git a/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java b/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java new file mode 100644 index 000000000000..e8dcb8bd34ed --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.FastInputStream; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.DirectoryFactory; +import org.apache.solr.handler.admin.api.CoreReplication; +import org.apache.solr.handler.admin.api.ReplicationAPIBase; +import org.apache.solr.request.SolrQueryRequestBase; +import org.apache.solr.response.SolrQueryResponse; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests the replication packet protocol between DirectoryFileStream (sender) and + * IndexFetcher.FileFetcher.fetchPackets (receiver). + * + *

The packet protocol is: + * + *

+ * + *

These tests verify correct handling of: + * + *

+ */ +public class IndexFetcherPacketProtocolTest extends SolrTestCaseJ4 { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int PACKET_SZ = ReplicationAPIBase.PACKET_SZ; + private static final int NO_CONTENT = 1; + + @BeforeClass + public static void beforeClass() throws Exception { + System.setProperty("solr.security.allow.urls.enabled", "false"); + initCore("solrconfig.xml", "schema.xml"); + } + + @AfterClass + public static void afterClass() throws Exception { + System.clearProperty("solr.security.allow.urls.enabled"); + } + + // Tests for files that are exact multiples of PACKET_SZ + @Test + public void testExactlyOnePacketSize() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ, "test-1mb.bin"); + } + + @Test + public void testExactlyTwoPacketSizes() throws Exception { + assertFetchPacketsSuccess(2 * PACKET_SZ, "test-2mb.bin"); + } + + @Test + public void testExactlyThreePacketSizes() throws Exception { + assertFetchPacketsSuccess(3 * PACKET_SZ, "test-3mb.bin"); + } + + @Test + public void testExactly63PacketSizes() throws Exception { + assertFetchPacketsSuccess(63 * PACKET_SZ, "test-63mb.bin"); + } + + // Tests for files that are not exact multiples of PACKET_SZ + @Test + public void testEmptyFile() throws Exception { + assertFetchPacketsSuccess(0, "test-empty.bin"); + } + + @Test + public void testSingleByte() throws Exception { + assertFetchPacketsSuccess(1, "test-1byte.bin"); + } + + @Test + public void testSmallFile100Bytes() throws Exception { + assertFetchPacketsSuccess(100, "test-100bytes.bin"); + } + + @Test + public void testSmallFile100KB() throws Exception { + assertFetchPacketsSuccess(100 * 1024, "test-100kb.bin"); + } + + @Test + public void testHalfPacketSize() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ / 2, "test-512kb.bin"); + } + + @Test + public void testJustUnderOnePacketSize() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ - 1, "test-under-1mb.bin"); + } + + @Test + public void testJustOverOnePacketSize() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ + 1, "test-over-1mb.bin"); + } + + @Test + public void testOneAndHalfPacketSizes() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ + PACKET_SZ / 2, "test-1.5mb.bin"); + } + + @Test + public void testJustUnderTwoPacketSizes() throws Exception { + assertFetchPacketsSuccess(2 * PACKET_SZ - 1, "test-under-2mb.bin"); + } + + @Test + public void testJustOverTwoPacketSizes() throws Exception { + assertFetchPacketsSuccess(2 * PACKET_SZ + 1, "test-over-2mb.bin"); + } + + @Test + public void testArbitrarySize() throws Exception { + assertFetchPacketsSuccess(PACKET_SZ + 500000, "test-arbitrary.bin"); + } + + // Additional tests for comprehensive coverage + + @Test + public void testMultipleExactPacketSizeFiles() throws Exception { + // Ensures zero-length packet handling works correctly in succession + for (int i = 1; i <= 5; i++) { + assertFetchPacketsSuccess(i * PACKET_SZ, "test-" + i + "mb-successive.bin"); + } + } + + @Test + public void testChecksumMismatchReturnsError() throws Exception { + int fileSize = 1024; + String fileName = "test-checksum-mismatch.bin"; + + byte[] expectedContent = createDeterministicContent(fileSize); + byte[] streamBytes = serializeFileToPacketStream(expectedContent, fileName); + + // Corrupt the checksum bytes (bytes 4-11 after packet size) + // Stream format: int(size) + long(checksum) + data + if (streamBytes.length > 12) { + streamBytes[5] ^= 0xFF; // Flip bits in checksum + } + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + int result = invokeFetchPackets(streamBytes, output, fileName, fileSize); + + assertEquals("fetchPackets should return 1 for checksum mismatch", 1, result); + } + + @Test + public void testLargeFileTriggersBufferBehavior() throws Exception { + // Test with file larger than initial buffer allocation + // Initial buffer is min(fileSize, PACKET_SZ), so for 5MB file, buffer starts at 1MB + // This ensures the code handles multi-packet transfers correctly + assertFetchPacketsSuccess(5 * PACKET_SZ + 12345, "test-large-5mb.bin"); + } + + private void assertFetchPacketsSuccess(int fileSize, String fileName) throws Exception { + log.info("Testing file transfer: {} ({} bytes)", fileName, fileSize); + + byte[] expectedContent = createDeterministicContent(fileSize); + byte[] streamBytes = serializeFileToPacketStream(expectedContent, fileName); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + int result = invokeFetchPackets(streamBytes, output, fileName, fileSize); + + int expectedResult = (fileSize == 0) ? NO_CONTENT : 0; + assertEquals("fetchPackets return code mismatch", expectedResult, result); + assertEquals("Output size should match input size", fileSize, output.size()); + assertArrayEquals( + "Output content should match expected for " + fileName, + expectedContent, + output.toByteArray()); + + log.info("Successfully verified transfer of {} ({} bytes)", fileName, fileSize); + } + + private byte[] createDeterministicContent(int size) { + byte[] content = new byte[size]; + for (int i = 0; i < size; i++) { + content[i] = (byte) (i % 256); + } + return content; + } + + /** Writes file content to a directory and serializes it using the replication packet protocol. */ + private byte[] serializeFileToPacketStream(byte[] content, String fileName) throws Exception { + DirectoryFactory directoryFactory = h.getCore().getDirectoryFactory(); + Directory dir = + directoryFactory.get( + h.getCore().getNewIndexDir(), + DirectoryFactory.DirContext.DEFAULT, + h.getCore().getSolrConfig().indexConfig.lockType); + + try { + if (content.length > 0) { + try (IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT)) { + out.writeBytes(content, content.length); + } + } + + CoreReplication replicationAPI = + new CoreReplication( + h.getCore(), + new SolrQueryRequestBase(h.getCore(), new ModifiableSolrParams()) {}, + new SolrQueryResponse()); + + Method doFetchFileMethod = + ReplicationAPIBase.class.getDeclaredMethod( + "doFetchFile", + String.class, + String.class, + String.class, + String.class, + boolean.class, + boolean.class, + double.class, + Long.class); + doFetchFileMethod.setAccessible(true); + + Object stream = + doFetchFileMethod.invoke( + replicationAPI, fileName, "file", null, null, false, true, 0.0, null); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Method writeMethod = stream.getClass().getMethod("write", java.io.OutputStream.class); + writeMethod.invoke(stream, baos); + return baos.toByteArray(); + } finally { + if (content.length > 0) { + try { + dir.deleteFile(fileName); + } catch (Exception e) { + // ignored + } + } + directoryFactory.release(dir); + } + } + + /** + * Deserializes packet stream bytes using FileFetcher.fetchPackets and returns the result code. + */ + private int invokeFetchPackets( + byte[] streamBytes, ByteArrayOutputStream output, String fileName, long expectedSize) + throws Exception { + + Class fileFetcherClass = null; + for (Class innerClass : IndexFetcher.class.getDeclaredClasses()) { + if (innerClass.getSimpleName().equals("FileFetcher")) { + fileFetcherClass = innerClass; + break; + } + } + assertNotNull("FileFetcher inner class should exist", fileFetcherClass); + + IndexFetcher indexFetcher = createIndexFetcher(); + + try { + Map fileDetails = new HashMap<>(); + fileDetails.put("name", fileName); + fileDetails.put("size", expectedSize); + + Object mockFileInterface = createMockFileInterface(output); + + Constructor constructor = + fileFetcherClass.getDeclaredConstructor( + IndexFetcher.class, + getFileInterfaceClass(), + Map.class, + String.class, + String.class, + long.class); + constructor.setAccessible(true); + + Object fileFetcher = + constructor.newInstance( + indexFetcher, mockFileInterface, fileDetails, fileName, "file", 0L); + + Method fetchPacketsMethod = + fileFetcherClass.getDeclaredMethod("fetchPackets", FastInputStream.class); + fetchPacketsMethod.setAccessible(true); + + FastInputStream fis = new FastInputStream(new ByteArrayInputStream(streamBytes)); + return (Integer) fetchPacketsMethod.invoke(fileFetcher, fis); + } finally { + indexFetcher.destroy(); + } + } + + private IndexFetcher createIndexFetcher() throws Exception { + NamedList initArgs = new NamedList<>(); + initArgs.add("leaderUrl", "http://localhost:8983/solr/collection1"); + return new IndexFetcher(initArgs, null, h.getCore()); + } + + private Class getFileInterfaceClass() throws Exception { + for (Class innerClass : IndexFetcher.class.getDeclaredClasses()) { + if (innerClass.getSimpleName().equals("FileInterface")) { + return innerClass; + } + } + throw new AssertionError("FileInterface not found"); + } + + /** Creates a mock FileInterface that captures written bytes to the provided output stream. */ + private Object createMockFileInterface(ByteArrayOutputStream output) throws Exception { + Class fileInterfaceClass = getFileInterfaceClass(); + + return java.lang.reflect.Proxy.newProxyInstance( + fileInterfaceClass.getClassLoader(), + new Class[] {fileInterfaceClass}, + (proxy, method, args) -> { + switch (method.getName()) { + case "write": + byte[] buf = (byte[]) args[0]; + int packetSize = (Integer) args[1]; + output.write(buf, 0, packetSize); + return null; + case "sync": + case "close": + case "delete": + return null; + default: + throw new UnsupportedOperationException("Unexpected method: " + method.getName()); + } + }); + } +} From c72ea7a76258864d95703f0c8083404a97643c6e Mon Sep 17 00:00:00 2001 From: Shubham Ranjan Date: Tue, 17 Mar 2026 00:05:41 +0530 Subject: [PATCH 2/3] add(SOLR-18098): changelog --- .../SOLR-18098-fix-replication-exact-mb-sizes.yml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 changelog/unreleased/SOLR-18098-fix-replication-exact-mb-sizes.yml diff --git a/changelog/unreleased/SOLR-18098-fix-replication-exact-mb-sizes.yml b/changelog/unreleased/SOLR-18098-fix-replication-exact-mb-sizes.yml new file mode 100644 index 000000000000..9729678a6517 --- /dev/null +++ b/changelog/unreleased/SOLR-18098-fix-replication-exact-mb-sizes.yml @@ -0,0 +1,7 @@ +title: Fix replication failure for files with exact MB sizes +type: fixed +authors: + - name: Shubham Ranjan +links: + - name: SOLR-18098 + url: https://issues.apache.org/jira/browse/SOLR-18098 From aef7e095b584fa3e8e1d12a76e1f98bdd3e78c5c Mon Sep 17 00:00:00 2001 From: Shubham Ranjan Date: Tue, 17 Mar 2026 01:03:12 +0530 Subject: [PATCH 3/3] fix(SOLR-18098): replace setAccessible with MethodHandles.privateLookupIn in tests --- .../IndexFetcherPacketProtocolTest.java | 98 ++++++++++++------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java b/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java index e8dcb8bd34ed..c26aa738b5b0 100644 --- a/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java +++ b/solr/core/src/test/org/apache/solr/handler/IndexFetcherPacketProtocolTest.java @@ -18,8 +18,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; -import java.lang.reflect.Constructor; +import java.lang.invoke.MethodType; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; @@ -242,22 +243,35 @@ private byte[] serializeFileToPacketStream(byte[] content, String fileName) thro new SolrQueryRequestBase(h.getCore(), new ModifiableSolrParams()) {}, new SolrQueryResponse()); - Method doFetchFileMethod = - ReplicationAPIBase.class.getDeclaredMethod( + MethodHandles.Lookup lookup = + MethodHandles.privateLookupIn(ReplicationAPIBase.class, MethodHandles.lookup()); + Class directoryFileStreamClass = + Class.forName(ReplicationAPIBase.class.getName() + "$DirectoryFileStream"); + MethodHandle doFetchFileHandle = + lookup.findVirtual( + ReplicationAPIBase.class, "doFetchFile", - String.class, - String.class, - String.class, - String.class, - boolean.class, - boolean.class, - double.class, - Long.class); - doFetchFileMethod.setAccessible(true); - - Object stream = - doFetchFileMethod.invoke( - replicationAPI, fileName, "file", null, null, false, true, 0.0, null); + MethodType.methodType( + directoryFileStreamClass, + String.class, + String.class, + String.class, + String.class, + boolean.class, + boolean.class, + double.class, + Long.class)); + + Object stream; + try { + stream = + doFetchFileHandle.invoke( + replicationAPI, fileName, "file", null, null, false, true, 0.0, null); + } catch (Exception e) { + throw e; + } catch (Throwable t) { + throw new RuntimeException(t); + } ByteArrayOutputStream baos = new ByteArrayOutputStream(); Method writeMethod = stream.getClass().getMethod("write", java.io.OutputStream.class); @@ -300,26 +314,44 @@ private int invokeFetchPackets( Object mockFileInterface = createMockFileInterface(output); - Constructor constructor = - fileFetcherClass.getDeclaredConstructor( - IndexFetcher.class, - getFileInterfaceClass(), - Map.class, - String.class, - String.class, - long.class); - constructor.setAccessible(true); - - Object fileFetcher = - constructor.newInstance( - indexFetcher, mockFileInterface, fileDetails, fileName, "file", 0L); + MethodHandles.Lookup lookup = + MethodHandles.privateLookupIn(fileFetcherClass, MethodHandles.lookup()); + MethodHandle ctorHandle = + lookup.findConstructor( + fileFetcherClass, + MethodType.methodType( + void.class, + IndexFetcher.class, + getFileInterfaceClass(), + Map.class, + String.class, + String.class, + long.class)); + + Object fileFetcher; + try { + fileFetcher = + ctorHandle.invoke(indexFetcher, mockFileInterface, fileDetails, fileName, "file", 0L); + } catch (Exception e) { + throw e; + } catch (Throwable t) { + throw new RuntimeException(t); + } - Method fetchPacketsMethod = - fileFetcherClass.getDeclaredMethod("fetchPackets", FastInputStream.class); - fetchPacketsMethod.setAccessible(true); + MethodHandle fetchPacketsHandle = + lookup.findVirtual( + fileFetcherClass, + "fetchPackets", + MethodType.methodType(int.class, FastInputStream.class)); FastInputStream fis = new FastInputStream(new ByteArrayInputStream(streamBytes)); - return (Integer) fetchPacketsMethod.invoke(fileFetcher, fis); + try { + return (int) fetchPacketsHandle.invoke(fileFetcher, fis); + } catch (Exception e) { + throw e; + } catch (Throwable t) { + throw new RuntimeException(t); + } } finally { indexFetcher.destroy(); }