diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 76c3fe7c157e..f8a2e080c329 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -246,7 +246,14 @@ private Function>> pathProcessor(Set emptyDirs List files = tryBestListingDirs(path); if (files.isEmpty()) { - emptyDirs.add(path); + try { + FileStatus dirStatus = fileIO.getFileStatus(path); + if (oldEnough(dirStatus)) { + emptyDirs.add(path); + } + } catch (IOException e) { + LOG.warn("IOException during check dirStatus for {}, ignore it", path, e); + } return Collections.emptyList(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index bb08d90a423f..5b1b045a7d00 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -376,7 +376,14 @@ private List listFileDirs(Path dir, int level) { List result = new ArrayList<>(); for (Path partitionPath : partitionPaths) { - result.addAll(listFileDirs(partitionPath, level - 1)); + List sub = listFileDirs(partitionPath, level - 1); + if (sub.isEmpty()) { + // Empty partition (no bucket subdirs), include for empty-dir cleanup + LOG.info("Found empty partition directory for cleanup: {}", partitionPath); + result.add(partitionPath); + } else { + result.addAll(sub); + } } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 9d3a0e9476b4..5c582e22a6c5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -64,6 +64,8 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -558,6 +560,13 @@ public void testRemovingEmptyDirectories() throws Exception { assertThat(fileIO.exists(emptyDirectory1)).isTrue(); assertThat(fileIO.exists(emptyDirectory2)).isTrue(); + Files.setLastModifiedTime( + tempDir.resolve("part1=1/part2=2/bucket-0"), + FileTime.fromMillis(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2))); + Files.setLastModifiedTime( + tempDir.resolve("part1=1/part2=2/bucket-1"), + FileTime.fromMillis(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2))); + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); List deleted = orphanFilesClean.clean().getDeletedFilesPath(); assertThat(fileIO.exists(emptyDirectory1)).isFalse(); @@ -566,6 +575,59 @@ public void testRemovingEmptyDirectories() throws Exception { validate(deleted, snapshotData, new HashMap<>()); } + @Test + void testEmptyPartitionDirectories() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + commit(Collections.singletonList(new TestPojo(2, 0, "b", "v2"))); + + Path partitionPath1 = new Path(tablePath, "part1=0/part2=a"); + Path partitionPath2 = new Path(tablePath, "part1=0/part2=b"); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + assertThat(fileIO.exists(partitionPath2)).isTrue(); + + FileStatus[] partition2Files = fileIO.listStatus(partitionPath2); + assertThat(partition2Files).isNotEmpty(); + for (FileStatus file : partition2Files) { + if (file.isDir() && file.getPath().getName().startsWith("bucket-")) { + FileStatus[] bucketFiles = fileIO.listStatus(file.getPath()); + for (FileStatus bucketFile : bucketFiles) { + fileIO.deleteQuietly(bucketFile.getPath()); + } + fileIO.deleteQuietly(file.getPath()); + } + } + assertThat(fileIO.listStatus(partitionPath2)).isEmpty(); + assertThat(fileIO.exists(partitionPath2)).isTrue(); + + Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1"); + fileIO.mkdirs(emptyNonLeafPartitionPath); + + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime(tempDir.resolve("part1=0/part2=b"), FileTime.fromMillis(oldTime)); + Files.setLastModifiedTime(tempDir.resolve("part1=1"), FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); + orphanFilesClean.clean(); + + assertThat(fileIO.exists(partitionPath2)) + .as("Empty partition (no bucket subdirs) is cleaned by orphan files clean.") + .isFalse(); + assertThat(fileIO.exists(emptyNonLeafPartitionPath)) + .as( + "Empty non-leaf partition dir (e.g. part1=1 with no part2) is cleaned by orphan files clean.") + .isFalse(); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + + Path recentEmptyPath = new Path(tablePath, "part1=2"); + fileIO.mkdirs(recentEmptyPath); + LocalOrphanFilesClean cleanRecent = + new LocalOrphanFilesClean(table, System.currentTimeMillis() - 1, false); + cleanRecent.clean(); + assertThat(fileIO.exists(recentEmptyPath)) + .as("Recent empty partition dir must not be deleted (age safeguard).") + .isTrue(); + } + private void writeData( SnapshotManager snapshotManager, List> committedData, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index 97ba35b2c07b..beead97cc746 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -18,13 +18,43 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Utils for {@link OrphanFilesClean}. */ public class OrphanFilesCleanTest { + @Rule public TemporaryFolder tempDir = new TemporaryFolder(); + @Test public void testOlderThanMillis() { // normal olderThan @@ -37,4 +67,80 @@ public void testOlderThanMillis() { .hasMessage( "The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up."); } + + @Test + public void testListPaimonFileDirsWithEmptyPartition() throws Exception { + Path tablePath = new Path(tempDir.newFolder().toURI()); + FileIO fileIO = LocalFileIO.create(); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING() + }, + new String[] {"pk", "part1", "part2", "value"}); + FileStoreTable table = createFileStoreTable(fileIO, tablePath, rowType, new Options()); + String commitUser = UUID.randomUUID().toString(); + try (TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + write.write( + GenericRow.ofKind( + RowKind.INSERT, + 1, + 0, + BinaryString.fromString("a"), + BinaryString.fromString("v1"))); + commit.commit(0, write.prepareCommit(true, 0)); + } + + Path emptyPartitionPath = new Path(tablePath, "part1=0/part2=b"); + fileIO.mkdirs(emptyPartitionPath); + Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1"); + fileIO.mkdirs(emptyNonLeafPartitionPath); + + java.lang.reflect.Method method = + LocalOrphanFilesClean.class.getSuperclass().getDeclaredMethod("listPaimonFileDirs"); + method.setAccessible(true); + @SuppressWarnings("unchecked") + List dirs = (List) method.invoke(new LocalOrphanFilesClean(table)); + + assertThat(dirs) + .as( + "Empty partition (no bucket subdirs) is listed by listPaimonFileDirs for empty-dir cleanup") + .contains(emptyPartitionPath); + assertThat(dirs) + .as( + "Empty non-leaf partition dir (e.g. part1=1 with no part2) is listed by listPaimonFileDirs") + .contains(emptyNonLeafPartitionPath); + } + + @Test + public void testDeleteNonEmptyDir() throws Exception { + Path dir = new Path(tempDir.newFolder().toURI().toString(), "part1=0"); + FileIO fileIO = LocalFileIO.create(); + fileIO.mkdirs(dir); + Path file = new Path(dir, "data.dat"); + fileIO.writeFile(file, "x", true); + + assertThat(fileIO.exists(dir)).isTrue(); + assertThatThrownBy(() -> fileIO.delete(dir, false)) + .isInstanceOf(IOException.class) + .hasMessageContaining("not empty"); + assertThat(fileIO.exists(dir)).isTrue(); + } + + private FileStoreTable createFileStoreTable( + FileIO fileIO, Path tablePath, RowType rowType, Options conf) throws Exception { + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.set(CoreOptions.BUCKET, 2); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(fileIO, tablePath), + new Schema( + rowType.getFields(), + Arrays.asList("part1", "part2"), + Arrays.asList("pk", "part1", "part2"), + conf.toMap(), + "")); + return FileStoreTableFactory.create(fileIO, tablePath, tableSchema); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 1a91d937d721..d3376cb80cf5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -290,7 +290,18 @@ public void processElement( } } if (files.isEmpty()) { - ctx.output(emptyDirOutputTag, dirPath); + try { + FileStatus dirStatus = + fileIO.getFileStatus(dirPath); + if (oldEnough(dirStatus)) { + ctx.output(emptyDirOutputTag, dirPath); + } + } catch (IOException e) { + LOG.warn( + "IOException during check dirStatus for {}, ignore it", + dirPath, + e); + } } } }) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index 50fbd7dac10b..e54fd5c66205 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; @@ -41,15 +42,21 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH; import static org.assertj.core.api.Assertions.assertThat; @@ -171,8 +178,11 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { assertThat(fileIO.listDirectories(bucketDir)).isEmpty(); // clean empty directories + setLastModifiedToPastIfLocal(bucketDir, 2); ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(fileIO.exists(bucketDir)).isFalse(); + if ("file".equals(bucketDir.toUri().getScheme())) { + assertThat(fileIO.exists(bucketDir)).isFalse(); + } // table should not be deleted assertThat(fileIO.exists(location)).isTrue(); } @@ -392,6 +402,114 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { .hasMessageContaining("Unknown mode"); } + @Test + public void testEmptyPartitionDirectories() throws Exception { + FileStoreTable table = createPartitionedTableWithData(); + table = getFileStoreTable(tableName); + FileIO fileIO = table.fileIO(); + Path location = table.location(); + Path partitionPath1 = new Path(location, "part1=0/part2=a"); + Path partitionPath2 = new Path(location, "part1=0/part2=b"); + Path emptyNonLeaf = new Path(location, "part1=1"); + + emptyPartitionDir(fileIO, partitionPath2); + fileIO.mkdirs(emptyNonLeaf); + + boolean localFs = "file".equals(partitionPath2.toUri().getScheme()); + if (localFs) { + setLastModifiedToPastIfLocal(partitionPath2, emptyNonLeaf, 2); + executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName)); + assertThat(fileIO.exists(partitionPath2)).isFalse(); + assertThat(fileIO.exists(emptyNonLeaf)).isFalse(); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + } + + Path recentEmpty = new Path(location, "part1=2"); + fileIO.mkdirs(recentEmpty); + executeSQL( + String.format( + "CALL sys.remove_orphan_files('%s.%s', '%s')", + database, tableName, olderThanNowMinusMillis(1000))); + assertThat(fileIO.exists(recentEmpty)).isTrue(); + } + + private FileStoreTable createPartitionedTableWithData() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING() + }, + new String[] {"pk", "part1", "part2", "value"}); + FileStoreTable table = + createFileStoreTable( + tableName, + rowType, + Arrays.asList("part1", "part2"), + Arrays.asList("pk", "part1", "part2", "value"), + Collections.emptyList(), + Collections.singletonMap("bucket", "2")); + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + writeData( + rowData(1, 0, BinaryString.fromString("a"), BinaryString.fromString("v1")), + rowData(2, 0, BinaryString.fromString("a"), BinaryString.fromString("v2"))); + writeData(rowData(3, 0, BinaryString.fromString("b"), BinaryString.fromString("v3"))); + write.close(); + commit.close(); + write = null; + commit = null; + return table; + } + + private void emptyPartitionDir(FileIO fileIO, Path partitionPath) throws IOException { + for (FileStatus file : fileIO.listStatus(partitionPath)) { + if (file.isDir() && file.getPath().getName().startsWith("bucket-")) { + for (FileStatus bucketFile : fileIO.listStatus(file.getPath())) { + fileIO.deleteQuietly(bucketFile.getPath()); + } + fileIO.deleteQuietly(file.getPath()); + } + } + } + + private void setLastModifiedToPastIfLocal(Path path1, Path path2, int daysAgo) { + setLastModifiedToPastIfLocal(path1, daysAgo); + setLastModifiedToPastIfLocal(path2, daysAgo); + } + + private void setLastModifiedToPastIfLocal(Path path, int daysAgo) { + try { + if (path.toUri().getScheme() != null && "file".equals(path.toUri().getScheme())) { + Files.setLastModifiedTime( + Paths.get(path.toUri()), + FileTime.fromMillis( + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(daysAgo))); + } + } catch (Exception ignored) { + } + } + + private static String olderThanNowMinusMillis(long millis) { + return DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis() - millis), 3); + } + + @Test + public void testNonEmptyPartitionDir() throws Exception { + createPartitionedTableWithData(); + FileStoreTable table = getFileStoreTable(tableName); + FileIO fileIO = table.fileIO(); + Path nonEmptyPath = new Path(table.location(), "part1=0/part2=c"); + fileIO.mkdirs(nonEmptyPath); + fileIO.writeFile(new Path(nonEmptyPath, "guard.txt"), "must not delete", true); + + executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName)); + + assertThat(fileIO.exists(nonEmptyPath)).isTrue(); + assertThat(fileIO.exists(new Path(nonEmptyPath, "guard.txt"))).isTrue(); + } + protected boolean supportNamedArgument() { return true; }