From d9d704fd4d6e6bd98466955d20009c526d2cf478 Mon Sep 17 00:00:00 2001 From: sanshi <1715734693@qq.com> Date: Thu, 2 Apr 2026 21:43:38 +0800 Subject: [PATCH] [spark] Support local-sort mode for incremental clustering --- .../IncrementalClusterManagerTest.java | 32 ++++ .../spark/procedure/CompactProcedure.java | 10 +- .../paimon/spark/sort/HilbertSorter.java | 8 + .../apache/paimon/spark/sort/OrderSorter.java | 6 + .../apache/paimon/spark/sort/TableSorter.java | 7 + .../paimon/spark/sort/ZorderSorter.java | 8 + .../procedure/CompactProcedureTestBase.scala | 181 ++++++++++++++++++ 7 files changed, 251 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index cf81cdf85fd5..b38cf3af997a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -146,6 +146,38 @@ public void testUpgrade() { } } + @Test + public void testClusteringIncrementalModeDefault() throws Exception { + // Test default mode is GLOBAL_SORT + Map options = new HashMap<>(); + FileStoreTable table = createTable(options, Collections.emptyList()); + IncrementalClusterManager manager = new IncrementalClusterManager(table); + assertThat(manager.clusteringIncrementalMode()) + .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT); + } + + @Test + public void testClusteringIncrementalModeLocalSort() throws Exception { + // Test local-sort mode + Map options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), "local-sort"); + FileStoreTable table = createTable(options, Collections.emptyList()); + IncrementalClusterManager manager = new IncrementalClusterManager(table); + assertThat(manager.clusteringIncrementalMode()) + .isEqualTo(CoreOptions.ClusteringIncrementalMode.LOCAL_SORT); + } + + @Test + public void testClusteringIncrementalModeGlobalSort() throws Exception { + // Test explicit global-sort mode + Map options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), "global-sort"); + FileStoreTable table = createTable(options, Collections.emptyList()); + IncrementalClusterManager manager = new IncrementalClusterManager(table); + assertThat(manager.clusteringIncrementalMode()) + .isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT); + } + @Test public void testHistoryPartitionAutoClustering() throws Exception { Map options = new HashMap<>(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7f8c89dcbacf..9d86f79bc500 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -667,6 +667,9 @@ private void clusterIncrementalUnAwareBucketTable( incrementalClusterManager.clusterCurve(), incrementalClusterManager.clusterKeys()); + CoreOptions.ClusteringIncrementalMode mode = + incrementalClusterManager.clusteringIncrementalMode(); + Dataset datasetForWrite = partitionSplits.values().stream() .map(Pair::getKey) @@ -678,7 +681,12 @@ private void clusterIncrementalUnAwareBucketTable( ScanPlanHelper$.MODULE$.createNewScanPlan( splits.toArray(new DataSplit[0]), relation)); - return sorter.sort(dataset); + // Use sortLocal() for LOCAL_SORT, sort() for GLOBAL_SORT + if (mode == CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) { + return sorter.sortLocal(dataset); + } else { + return sorter.sort(dataset); + } }) .reduce(Dataset::union) .orElse(null); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java index 1f3007713140..681104fb199d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/HilbertSorter.java @@ -49,6 +49,14 @@ public Dataset sort(Dataset df) { return sortedDF.drop(H_COLUMN); } + @Override + public Dataset sortLocal(Dataset df) { + Column hilbertColumn = hilbertValue(df); + Dataset hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn); + Dataset sortedDF = hilbertValueDF.sortWithinPartitions(hilbertValueDF.col(H_COLUMN)); + return sortedDF.drop(H_COLUMN); + } + private Column hilbertValue(Dataset df) { SparkHilbertUDF hilbertUDF = new SparkHilbertUDF(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java index e2fc18f6967b..fcee0cf97c11 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/OrderSorter.java @@ -39,4 +39,10 @@ public Dataset sort(Dataset input) { Column[] sortColumns = orderColNames.stream().map(input::col).toArray(Column[]::new); return input.repartitionByRange(sortColumns).sortWithinPartitions(sortColumns); } + + @Override + public Dataset sortLocal(Dataset input) { + Column[] sortColumns = orderColNames.stream().map(input::col).toArray(Column[]::new); + return input.sortWithinPartitions(sortColumns); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java index a96724fad6de..de58381fadae 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java @@ -62,6 +62,8 @@ private void checkColNames() { public abstract Dataset sort(Dataset input); + public abstract Dataset sortLocal(Dataset input); + public static TableSorter getSorter( FileStoreTable table, OrderType orderType, List orderColumns) { switch (orderType) { @@ -77,6 +79,11 @@ public static TableSorter getSorter( public Dataset sort(Dataset input) { return input; } + + @Override + public Dataset sortLocal(Dataset input) { + return input; + } }; default: throw new IllegalArgumentException("cannot match order type: " + orderType); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java index 2a6d1b2ceb9f..32d2e468ac4a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java @@ -48,6 +48,14 @@ public Dataset sort(Dataset df) { return sortedDF.drop(Z_COLUMN); } + @Override + public Dataset sortLocal(Dataset df) { + Column zColumn = zValue(df); + Dataset zValueDF = df.withColumn(Z_COLUMN, zColumn); + Dataset sortedDF = zValueDF.sortWithinPartitions(zValueDF.col(Z_COLUMN)); + return sortedDF.drop(Z_COLUMN); + } + private Column zValue(Dataset df) { SparkZOrderUDF zOrderUDF = new SparkZOrderUDF( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index a44158d83207..7d5ef18582a3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -1339,6 +1339,187 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } } + test("Paimon Procedure: incremental clustering with local-sort mode") { + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (a INT, b INT, c STRING) + |TBLPROPERTIES ( + | 'bucket'='-1', + | 'num-levels'='4', + | 'num-sorted-run.compaction-trigger'='2', + | 'clustering.columns'='a', + | 'clustering.incremental'='true', + | 'clustering.incremental.mode'='local-sort' + |) + |""".stripMargin) + + // Insert data in multiple batches to create multiple files + spark.sql("INSERT INTO T VALUES (5, 1, 'a')") + spark.sql("INSERT INTO T VALUES (3, 2, 'b')") + spark.sql("INSERT INTO T VALUES (1, 3, 'c')") + spark.sql("INSERT INTO T VALUES (2, 3, 'd')") + spark.sql( + "INSERT INTO T SELECT /*+ COALESCE(1) */ col1, col2, col3 FROM VALUES (8, 3, 'e'), (1, 4, 'f'), (2, 5, 'g') AS t(col1, col2, col3);") + + checkAnswer( + spark.sql("SELECT a, b, c FROM T"), + Seq( + Row(5, 1, "a"), + Row(3, 2, "b"), + Row(1, 3, "c"), + Row(2, 3, "d"), + Row(8, 3, "e"), + Row(1, 4, "f"), + Row(2, 5, "g")) + ) + + val filesBeforeCompact = spark.sql("SELECT count(*) FROM `T$files`").collect()(0).getLong(0) + Assertions.assertThat(filesBeforeCompact).isEqualTo(5) + + // Run compaction to trigger incremental clustering + checkAnswer(spark.sql("CALL sys.compact(table => 'T')"), Row(true) :: Nil) + + // Verify data integrity + checkAnswer( + spark.sql("SELECT a, b, c FROM T"), + Seq( + Row(5, 1, "a"), + Row(3, 2, "b"), + Row(1, 3, "c"), + Row(2, 3, "d"), + Row(1, 4, "f"), + Row(2, 5, "g"), + Row(8, 3, "e")) + ) + + // Verify files are clustered (level >= 1) + val files = spark.sql("SELECT level FROM `T$files`").collect() + files.foreach(row => Assertions.assertThat(row.getInt(0)).isGreaterThanOrEqualTo(1)) + val filesAfterCompact = spark.sql("SELECT count(*) FROM `T$files`").collect()(0).getLong(0) + Assertions.assertThat(filesAfterCompact).isLessThanOrEqualTo(filesBeforeCompact) + + // Verify local-sort effect: within each file, rows are physically sorted by column 'a'. + // file_path in T$files is an absolute path (bucketPath + "/" + fileName), so we can + // read each file directly as parquet (bypassing Paimon's scan) to check physical row order. + val fileRows = + spark.sql("SELECT file_path, record_count FROM `T$files`").collect() + fileRows.foreach { + row => + val filePath = row.getString(0) + val recordCount = row.getLong(1) + if (recordCount > 1) { + // For multi-row files, verify rows are physically sorted by 'a' + val aValues = + spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0)) + for (i <- 1 until aValues.length) { + Assertions + .assertThat(aValues(i)) + .as( + s"File $filePath: row $i (a=${aValues(i)}) should be >= row ${i - 1} (a=${aValues(i - 1)})") + .isGreaterThanOrEqualTo(aValues(i - 1)) + } + } + } + + val table = loadTable("T").asInstanceOf[FileStoreTable] + checkSnapshot(table) + } + } + + test("Paimon Procedure: incremental clustering local-sort vs global-sort") { + withTable("T_local", "T_global") { + // Create table with local-sort + spark.sql(s""" + |CREATE TABLE T_local (a INT, b INT) + |TBLPROPERTIES ( + | 'bucket'='-1', + | 'num-levels'='4', + | 'num-sorted-run.compaction-trigger'='2', + | 'clustering.columns'='a', + | 'clustering.incremental'='true', + | 'clustering.incremental.mode'='local-sort' + |) + |""".stripMargin) + + // Create table with global-sort (default) + spark.sql(s""" + |CREATE TABLE T_global (a INT, b INT) + |TBLPROPERTIES ( + | 'bucket'='-1', + | 'num-levels'='4', + | 'num-sorted-run.compaction-trigger'='2', + | 'clustering.columns'='a', + | 'clustering.incremental'='true', + | 'clustering.incremental.mode'='global-sort' + |) + |""".stripMargin) + + // Insert same data into both tables + for (table <- Seq("T_local", "T_global")) { + spark.sql(s"INSERT INTO $table VALUES (5, 1), (3, 2)") + spark.sql(s"INSERT INTO $table VALUES (8, 3), (1, 4)") + spark.sql(s"INSERT INTO $table VALUES (7, 5), (2, 6)") + spark.sql(s"INSERT INTO $table VALUES (4, 7), (6, 8)") + } + + // Run compact on both + spark.sql("CALL sys.compact(table => 'T_local')") + spark.sql("CALL sys.compact(table => 'T_global')") + + // Both should have same data + checkAnswer( + spark.sql("SELECT a, b FROM T_local ORDER BY a"), + spark.sql("SELECT a, b FROM T_global ORDER BY a") + ) + + val localFileRows = + spark.sql("SELECT file_path, record_count FROM `T_local$files`").collect() + val globalFileRows = + spark.sql("SELECT file_path, record_count FROM `T_global$files`").collect() + + // Global-sort uses repartitionByRange which shuffles all data into 1 sorted file. + // Reading multiple files in parallel gives non-deterministic cross-file ordering, + // so we must verify physical ordering by reading each parquet file directly. + Assertions.assertThat(globalFileRows.length).isEqualTo(1) + val globalAValues = + spark.read + .format("parquet") + .load(globalFileRows(0).getString(0)) + .select("a") + .collect() + .map(_.getInt(0)) + Assertions.assertThat(globalAValues).isEqualTo(Array(1, 2, 3, 4, 5, 6, 7, 8)) + + // Local-sort uses sortWithinPartitions only (no range shuffle), so multiple output + // files are produced. Each file is individually sorted by 'a', but ranges may overlap. + Assertions.assertThat(localFileRows.length.toLong).isGreaterThan(globalFileRows.length) + var localFilesWithMultiRows = 0 + localFileRows.foreach { + row => + val filePath = row.getString(0) + val recordCount = row.getLong(1) + if (recordCount > 1) { + localFilesWithMultiRows += 1 + val aValues = + spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0)) + for (i <- 1 until aValues.length) { + Assertions + .assertThat(aValues(i)) + .as( + s"local-sort file $filePath: a[$i]=${aValues(i)} should be >= a[${i - 1}]=${aValues(i - 1)}") + .isGreaterThanOrEqualTo(aValues(i - 1)) + } + } + } + Assertions.assertThat(localFilesWithMultiRows).isGreaterThan(0) + + val localFiles = spark.sql("SELECT count(*) FROM `T_local$files`").collect()(0).getLong(0) + val globalFiles = spark.sql("SELECT count(*) FROM `T_global$files`").collect()(0).getLong(0) + Assertions.assertThat(localFiles).isGreaterThanOrEqualTo(globalFiles) + Assertions.assertThat(globalFiles).isEqualTo(1) + } + } + def checkSnapshot(table: FileStoreTable): Unit = { Assertions .assertThat(table.latestSnapshot().get().commitKind().toString)