From ca36d9113910bd786e95f6d4d27f48d4c27a3c9f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 11 Feb 2026 16:12:54 +0800 Subject: [PATCH] empty dir clean clean code clean code refactor to list empty partition dirs when listPaimonFileDirs revert orphan files clean change in spark update test case after refactor optimize test case to cover emptyNonLeafPartitionPath optimize test case to cover emptyNonLeafPartitionPath add test case about non-empty dir delete add log for empty partition clean clean empty partition dirs with age safeguard --- .../operation/LocalOrphanFilesClean.java | 9 +- .../paimon/operation/OrphanFilesClean.java | 9 +- .../operation/LocalOrphanFilesCleanTest.java | 62 +++++++++ .../operation/OrphanFilesCleanTest.java | 106 ++++++++++++++++ .../flink/orphan/FlinkOrphanFilesClean.java | 13 +- .../RemoveOrphanFilesActionITCaseBase.java | 120 +++++++++++++++++- 6 files changed, 315 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 76c3fe7c157e..f8a2e080c329 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -246,7 +246,14 @@ private Function>> pathProcessor(Set emptyDirs List files = tryBestListingDirs(path); if (files.isEmpty()) { - emptyDirs.add(path); + try { + FileStatus dirStatus = fileIO.getFileStatus(path); + if (oldEnough(dirStatus)) { + emptyDirs.add(path); + } + } catch (IOException e) { + LOG.warn("IOException during check dirStatus for {}, ignore it", path, e); + } return Collections.emptyList(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index bb08d90a423f..5b1b045a7d00 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -376,7 +376,14 @@ private List listFileDirs(Path dir, int level) { List result = new ArrayList<>(); for (Path partitionPath : partitionPaths) { - result.addAll(listFileDirs(partitionPath, level - 1)); + List sub = listFileDirs(partitionPath, level - 1); + if (sub.isEmpty()) { + // Empty partition (no bucket subdirs), include for empty-dir cleanup + LOG.info("Found empty partition directory for cleanup: {}", partitionPath); + result.add(partitionPath); + } else { + result.addAll(sub); + } } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 9d3a0e9476b4..5c582e22a6c5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -64,6 +64,8 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -558,6 +560,13 @@ public void testRemovingEmptyDirectories() throws Exception { assertThat(fileIO.exists(emptyDirectory1)).isTrue(); assertThat(fileIO.exists(emptyDirectory2)).isTrue(); + Files.setLastModifiedTime( + tempDir.resolve("part1=1/part2=2/bucket-0"), + FileTime.fromMillis(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2))); + Files.setLastModifiedTime( + tempDir.resolve("part1=1/part2=2/bucket-1"), + FileTime.fromMillis(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2))); + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); List deleted = orphanFilesClean.clean().getDeletedFilesPath(); assertThat(fileIO.exists(emptyDirectory1)).isFalse(); @@ -566,6 +575,59 @@ public void testRemovingEmptyDirectories() throws Exception { validate(deleted, snapshotData, new HashMap<>()); } + @Test + void testEmptyPartitionDirectories() throws Exception { + commit(Collections.singletonList(new TestPojo(1, 0, "a", "v1"))); + commit(Collections.singletonList(new TestPojo(2, 0, "b", "v2"))); + + Path partitionPath1 = new Path(tablePath, "part1=0/part2=a"); + Path partitionPath2 = new Path(tablePath, "part1=0/part2=b"); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + assertThat(fileIO.exists(partitionPath2)).isTrue(); + + FileStatus[] partition2Files = fileIO.listStatus(partitionPath2); + assertThat(partition2Files).isNotEmpty(); + for (FileStatus file : partition2Files) { + if (file.isDir() && file.getPath().getName().startsWith("bucket-")) { + FileStatus[] bucketFiles = fileIO.listStatus(file.getPath()); + for (FileStatus bucketFile : bucketFiles) { + fileIO.deleteQuietly(bucketFile.getPath()); + } + fileIO.deleteQuietly(file.getPath()); + } + } + assertThat(fileIO.listStatus(partitionPath2)).isEmpty(); + assertThat(fileIO.exists(partitionPath2)).isTrue(); + + Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1"); + fileIO.mkdirs(emptyNonLeafPartitionPath); + + long oldTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + Files.setLastModifiedTime(tempDir.resolve("part1=0/part2=b"), FileTime.fromMillis(oldTime)); + Files.setLastModifiedTime(tempDir.resolve("part1=1"), FileTime.fromMillis(oldTime)); + + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); + orphanFilesClean.clean(); + + assertThat(fileIO.exists(partitionPath2)) + .as("Empty partition (no bucket subdirs) is cleaned by orphan files clean.") + .isFalse(); + assertThat(fileIO.exists(emptyNonLeafPartitionPath)) + .as( + "Empty non-leaf partition dir (e.g. part1=1 with no part2) is cleaned by orphan files clean.") + .isFalse(); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + + Path recentEmptyPath = new Path(tablePath, "part1=2"); + fileIO.mkdirs(recentEmptyPath); + LocalOrphanFilesClean cleanRecent = + new LocalOrphanFilesClean(table, System.currentTimeMillis() - 1, false); + cleanRecent.clean(); + assertThat(fileIO.exists(recentEmptyPath)) + .as("Recent empty partition dir must not be deleted (age safeguard).") + .isTrue(); + } + private void writeData( SnapshotManager snapshotManager, List> committedData, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index 97ba35b2c07b..beead97cc746 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -18,13 +18,43 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Utils for {@link OrphanFilesClean}. */ public class OrphanFilesCleanTest { + @Rule public TemporaryFolder tempDir = new TemporaryFolder(); + @Test public void testOlderThanMillis() { // normal olderThan @@ -37,4 +67,80 @@ public void testOlderThanMillis() { .hasMessage( "The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up."); } + + @Test + public void testListPaimonFileDirsWithEmptyPartition() throws Exception { + Path tablePath = new Path(tempDir.newFolder().toURI()); + FileIO fileIO = LocalFileIO.create(); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING() + }, + new String[] {"pk", "part1", "part2", "value"}); + FileStoreTable table = createFileStoreTable(fileIO, tablePath, rowType, new Options()); + String commitUser = UUID.randomUUID().toString(); + try (TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + write.write( + GenericRow.ofKind( + RowKind.INSERT, + 1, + 0, + BinaryString.fromString("a"), + BinaryString.fromString("v1"))); + commit.commit(0, write.prepareCommit(true, 0)); + } + + Path emptyPartitionPath = new Path(tablePath, "part1=0/part2=b"); + fileIO.mkdirs(emptyPartitionPath); + Path emptyNonLeafPartitionPath = new Path(tablePath, "part1=1"); + fileIO.mkdirs(emptyNonLeafPartitionPath); + + java.lang.reflect.Method method = + LocalOrphanFilesClean.class.getSuperclass().getDeclaredMethod("listPaimonFileDirs"); + method.setAccessible(true); + @SuppressWarnings("unchecked") + List dirs = (List) method.invoke(new LocalOrphanFilesClean(table)); + + assertThat(dirs) + .as( + "Empty partition (no bucket subdirs) is listed by listPaimonFileDirs for empty-dir cleanup") + .contains(emptyPartitionPath); + assertThat(dirs) + .as( + "Empty non-leaf partition dir (e.g. part1=1 with no part2) is listed by listPaimonFileDirs") + .contains(emptyNonLeafPartitionPath); + } + + @Test + public void testDeleteNonEmptyDir() throws Exception { + Path dir = new Path(tempDir.newFolder().toURI().toString(), "part1=0"); + FileIO fileIO = LocalFileIO.create(); + fileIO.mkdirs(dir); + Path file = new Path(dir, "data.dat"); + fileIO.writeFile(file, "x", true); + + assertThat(fileIO.exists(dir)).isTrue(); + assertThatThrownBy(() -> fileIO.delete(dir, false)) + .isInstanceOf(IOException.class) + .hasMessageContaining("not empty"); + assertThat(fileIO.exists(dir)).isTrue(); + } + + private FileStoreTable createFileStoreTable( + FileIO fileIO, Path tablePath, RowType rowType, Options conf) throws Exception { + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.set(CoreOptions.BUCKET, 2); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(fileIO, tablePath), + new Schema( + rowType.getFields(), + Arrays.asList("part1", "part2"), + Arrays.asList("pk", "part1", "part2"), + conf.toMap(), + "")); + return FileStoreTableFactory.create(fileIO, tablePath, tableSchema); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 1a91d937d721..d3376cb80cf5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -290,7 +290,18 @@ public void processElement( } } if (files.isEmpty()) { - ctx.output(emptyDirOutputTag, dirPath); + try { + FileStatus dirStatus = + fileIO.getFileStatus(dirPath); + if (oldEnough(dirStatus)) { + ctx.output(emptyDirOutputTag, dirPath); + } + } catch (IOException e) { + LOG.warn( + "IOException during check dirStatus for {}, ignore it", + dirPath, + e); + } } } }) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index 50fbd7dac10b..e54fd5c66205 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; @@ -41,15 +42,21 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH; import static org.assertj.core.api.Assertions.assertThat; @@ -171,8 +178,11 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { assertThat(fileIO.listDirectories(bucketDir)).isEmpty(); // clean empty directories + setLastModifiedToPastIfLocal(bucketDir, 2); ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(fileIO.exists(bucketDir)).isFalse(); + if ("file".equals(bucketDir.toUri().getScheme())) { + assertThat(fileIO.exists(bucketDir)).isFalse(); + } // table should not be deleted assertThat(fileIO.exists(location)).isTrue(); } @@ -392,6 +402,114 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { .hasMessageContaining("Unknown mode"); } + @Test + public void testEmptyPartitionDirectories() throws Exception { + FileStoreTable table = createPartitionedTableWithData(); + table = getFileStoreTable(tableName); + FileIO fileIO = table.fileIO(); + Path location = table.location(); + Path partitionPath1 = new Path(location, "part1=0/part2=a"); + Path partitionPath2 = new Path(location, "part1=0/part2=b"); + Path emptyNonLeaf = new Path(location, "part1=1"); + + emptyPartitionDir(fileIO, partitionPath2); + fileIO.mkdirs(emptyNonLeaf); + + boolean localFs = "file".equals(partitionPath2.toUri().getScheme()); + if (localFs) { + setLastModifiedToPastIfLocal(partitionPath2, emptyNonLeaf, 2); + executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName)); + assertThat(fileIO.exists(partitionPath2)).isFalse(); + assertThat(fileIO.exists(emptyNonLeaf)).isFalse(); + assertThat(fileIO.exists(partitionPath1)).isTrue(); + } + + Path recentEmpty = new Path(location, "part1=2"); + fileIO.mkdirs(recentEmpty); + executeSQL( + String.format( + "CALL sys.remove_orphan_files('%s.%s', '%s')", + database, tableName, olderThanNowMinusMillis(1000))); + assertThat(fileIO.exists(recentEmpty)).isTrue(); + } + + private FileStoreTable createPartitionedTableWithData() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING() + }, + new String[] {"pk", "part1", "part2", "value"}); + FileStoreTable table = + createFileStoreTable( + tableName, + rowType, + Arrays.asList("part1", "part2"), + Arrays.asList("pk", "part1", "part2", "value"), + Collections.emptyList(), + Collections.singletonMap("bucket", "2")); + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + writeData( + rowData(1, 0, BinaryString.fromString("a"), BinaryString.fromString("v1")), + rowData(2, 0, BinaryString.fromString("a"), BinaryString.fromString("v2"))); + writeData(rowData(3, 0, BinaryString.fromString("b"), BinaryString.fromString("v3"))); + write.close(); + commit.close(); + write = null; + commit = null; + return table; + } + + private void emptyPartitionDir(FileIO fileIO, Path partitionPath) throws IOException { + for (FileStatus file : fileIO.listStatus(partitionPath)) { + if (file.isDir() && file.getPath().getName().startsWith("bucket-")) { + for (FileStatus bucketFile : fileIO.listStatus(file.getPath())) { + fileIO.deleteQuietly(bucketFile.getPath()); + } + fileIO.deleteQuietly(file.getPath()); + } + } + } + + private void setLastModifiedToPastIfLocal(Path path1, Path path2, int daysAgo) { + setLastModifiedToPastIfLocal(path1, daysAgo); + setLastModifiedToPastIfLocal(path2, daysAgo); + } + + private void setLastModifiedToPastIfLocal(Path path, int daysAgo) { + try { + if (path.toUri().getScheme() != null && "file".equals(path.toUri().getScheme())) { + Files.setLastModifiedTime( + Paths.get(path.toUri()), + FileTime.fromMillis( + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(daysAgo))); + } + } catch (Exception ignored) { + } + } + + private static String olderThanNowMinusMillis(long millis) { + return DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis() - millis), 3); + } + + @Test + public void testNonEmptyPartitionDir() throws Exception { + createPartitionedTableWithData(); + FileStoreTable table = getFileStoreTable(tableName); + FileIO fileIO = table.fileIO(); + Path nonEmptyPath = new Path(table.location(), "part1=0/part2=c"); + fileIO.mkdirs(nonEmptyPath); + fileIO.writeFile(new Path(nonEmptyPath, "guard.txt"), "must not delete", true); + + executeSQL(String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName)); + + assertThat(fileIO.exists(nonEmptyPath)).isTrue(); + assertThat(fileIO.exists(new Path(nonEmptyPath, "guard.txt"))).isTrue(); + } + protected boolean supportNamedArgument() { return true; }