Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void processLogFile(FileSystem fs, Path filePath) throws IOException {

if (!logFileReaderOptional.isPresent()) {
// This is an empty file, assume processed successfully and return
LOG.warn("Found empty file to process {}", filePath);
LOG.info("Ignoring zero length replication log file {}", filePath);
return;
}

Expand Down Expand Up @@ -303,32 +303,31 @@ protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path filePa
LogFileReader logFileReader = new LogFileReader();
LogFileReaderContext logFileReaderContext =
new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
boolean isClosed = isFileClosed(fs, filePath);
if (isClosed) {
// As file is closed, ensure that the file has a valid header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} else {
LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
try {
// Ensure to recover lease first, in case file was un-closed. If it was already closed,
// recoverLease would return true immediately.
recoverLease(fs, filePath);
if (fs.getFileStatus(filePath).getLen() <= 0) {
// Found empty file, returning null LogReader
if (fs.getFileStatus(filePath).getLen() > 0) {
try {
// Acquired the lease, try to create reader with validation both header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (InvalidLogTrailerException invalidLogTrailerException) {
// If trailer is missing or corrupt, create reader without trailer validation
LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException);
logFileReaderContext.setValidateTrailer(false);
logFileReader.init(logFileReaderContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 init calls here so we need to handle failures from any of the two and close the reader

return Optional.of(logFileReader);
}
} else {
// Ignore the file and returning empty LogReader.
return Optional.empty();
}
try {
// Acquired the lease, try to create reader with validation both header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (InvalidLogTrailerException invalidLogTrailerException) {
// If trailer is missing or corrupt, create reader without trailer validation
LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException);
logFileReaderContext.setValidateTrailer(false);
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (IOException exception) {
LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception);
throw exception;
}
} catch (IOException exception) {
LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception);
// close the reader to avoid leaking socket connection
closeReader(logFileReader);
throw exception;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,52 @@ public void testCreateLogFileReaderWithNonExistentFile() throws IOException {
}

/**
* Tests error handling when attempting to create LogFileReader with an invalid/corrupted file.
* Tests that createLogFileReader returns empty for a zero-byte file.
*/
@Test
public void testCreateLogFileReaderWithInvalidLogFile() throws IOException {
public void testCreateLogFileReaderWithEmptyLogFile() throws IOException {
Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI());
localFs.create(invalidFilePath).close(); // Create empty file
ReplicationLogProcessor replicationLogProcessor =
new ReplicationLogProcessor(conf, testHAGroupName);
try {
replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
fail("Should throw IOException for invalid file");
} catch (IOException e) {
// Should throw some kind of IOException when trying to read header
assertTrue("Should throw IOException", true);
Optional<LogFileReader> optionalLogFileReader =
replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
assertFalse("Reader should not be present for empty file", optionalLogFileReader.isPresent());
} finally {
// Delete the invalid file
localFs.delete(invalidFilePath);
replicationLogProcessor.close();
}
}

/**
* Tests that createLogFileReader closes the reader when init() throws IOException.
*/
@Test
public void testCreateLogFileReaderClosesReaderOnInitFailure() throws IOException {
// Write garbage data so the file is non-empty but has no valid log file header
Path invalidFilePath = new Path(testFolder.newFile("init_failure_file").toURI());
org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(invalidFilePath, true);
out.write("garbage data that is not a valid log file header".getBytes());
out.close();

assertTrue("File should be non-empty", localFs.getFileStatus(invalidFilePath).getLen() > 0);

ReplicationLogProcessor spyProcessor =
Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
try {
spyProcessor.createLogFileReader(localFs, invalidFilePath);
fail("Should throw IOException for file with invalid content");
} catch (IOException e) {
// Expected: init() fails because the file does not contain a valid log file header
} finally {
localFs.delete(invalidFilePath);
spyProcessor.close();
}

Mockito.verify(spyProcessor, Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class));
}

/**
* Tests the closeReader method with both null and valid LogFileReader instances.
*/
Expand Down