diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java index 84d4f1ba428..e6b488f8f69 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java @@ -238,7 +238,7 @@ public void processLogFile(FileSystem fs, Path filePath) throws IOException { if (!logFileReaderOptional.isPresent()) { // This is an empty file, assume processed successfully and return - LOG.warn("Found empty file to process {}", filePath); + LOG.info("Ignoring zero length replication log file {}", filePath); return; } @@ -303,32 +303,31 @@ protected Optional createLogFileReader(FileSystem fs, Path filePa LogFileReader logFileReader = new LogFileReader(); LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath); - boolean isClosed = isFileClosed(fs, filePath); - if (isClosed) { - // As file is closed, ensure that the file has a valid header and trailer - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } else { - LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath); + try { + // Ensure to recover lease first, in case file was un-closed. If it was already closed, + // recoverLease would return true immediately. recoverLease(fs, filePath); - if (fs.getFileStatus(filePath).getLen() <= 0) { - // Found empty file, returning null LogReader + if (fs.getFileStatus(filePath).getLen() > 0) { + try { + // Acquired the lease, try to create reader with validation both header and trailer + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } catch (InvalidLogTrailerException invalidLogTrailerException) { + // If trailer is missing or corrupt, create reader without trailer validation + LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); + logFileReaderContext.setValidateTrailer(false); + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } + } else { + // Ignore the file and returning empty LogReader. return Optional.empty(); } - try { - // Acquired the lease, try to create reader with validation both header and trailer - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } catch (InvalidLogTrailerException invalidLogTrailerException) { - // If trailer is missing or corrupt, create reader without trailer validation - LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); - logFileReaderContext.setValidateTrailer(false); - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } catch (IOException exception) { - LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); - throw exception; - } + } catch (IOException exception) { + LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); + // close the reader to avoid leaking socket connection + closeReader(logFileReader); + throw exception; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java index 8a985f3ead1..6ebce9718a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java @@ -182,27 +182,52 @@ public void testCreateLogFileReaderWithNonExistentFile() throws IOException { } /** - * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + * Tests that createLogFileReader returns empty for a zero-byte file. */ @Test - public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + public void testCreateLogFileReaderWithEmptyLogFile() throws IOException { Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); localFs.create(invalidFilePath).close(); // Create empty file ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, testHAGroupName); try { - replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); - fail("Should throw IOException for invalid file"); - } catch (IOException e) { - // Should throw some kind of IOException when trying to read header - assertTrue("Should throw IOException", true); + Optional optionalLogFileReader = + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + assertFalse("Reader should not be present for empty file", optionalLogFileReader.isPresent()); } finally { - // Delete the invalid file localFs.delete(invalidFilePath); replicationLogProcessor.close(); } } + /** + * Tests that createLogFileReader closes the reader when init() throws IOException. + */ + @Test + public void testCreateLogFileReaderClosesReaderOnInitFailure() throws IOException { + // Write garbage data so the file is non-empty but has no valid log file header + Path invalidFilePath = new Path(testFolder.newFile("init_failure_file").toURI()); + org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(invalidFilePath, true); + out.write("garbage data that is not a valid log file header".getBytes()); + out.close(); + + assertTrue("File should be non-empty", localFs.getFileStatus(invalidFilePath).getLen() > 0); + + ReplicationLogProcessor spyProcessor = + Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName)); + try { + spyProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for file with invalid content"); + } catch (IOException e) { + // Expected: init() fails because the file does not contain a valid log file header + } finally { + localFs.delete(invalidFilePath); + spyProcessor.close(); + } + + Mockito.verify(spyProcessor, Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class)); + } + /** * Tests the closeReader method with both null and valid LogFileReader instances. */