Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,38 @@ public void testUpgrade() {
}
}

@Test
public void testClusteringIncrementalModeDefault() throws Exception {
// Test default mode is GLOBAL_SORT
Map<String, String> options = new HashMap<>();
FileStoreTable table = createTable(options, Collections.emptyList());
IncrementalClusterManager manager = new IncrementalClusterManager(table);
assertThat(manager.clusteringIncrementalMode())
.isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
}

@Test
public void testClusteringIncrementalModeLocalSort() throws Exception {
// Test local-sort mode
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), "local-sort");
FileStoreTable table = createTable(options, Collections.emptyList());
IncrementalClusterManager manager = new IncrementalClusterManager(table);
assertThat(manager.clusteringIncrementalMode())
.isEqualTo(CoreOptions.ClusteringIncrementalMode.LOCAL_SORT);
}

@Test
public void testClusteringIncrementalModeGlobalSort() throws Exception {
// Test explicit global-sort mode
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), "global-sort");
FileStoreTable table = createTable(options, Collections.emptyList());
IncrementalClusterManager manager = new IncrementalClusterManager(table);
assertThat(manager.clusteringIncrementalMode())
.isEqualTo(CoreOptions.ClusteringIncrementalMode.GLOBAL_SORT);
}

@Test
public void testHistoryPartitionAutoClustering() throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ private void clusterIncrementalUnAwareBucketTable(
incrementalClusterManager.clusterCurve(),
incrementalClusterManager.clusterKeys());

CoreOptions.ClusteringIncrementalMode mode =
incrementalClusterManager.clusteringIncrementalMode();

Dataset<Row> datasetForWrite =
partitionSplits.values().stream()
.map(Pair::getKey)
Expand All @@ -678,7 +681,12 @@ private void clusterIncrementalUnAwareBucketTable(
ScanPlanHelper$.MODULE$.createNewScanPlan(
splits.toArray(new DataSplit[0]),
relation));
return sorter.sort(dataset);
// Use sortLocal() for LOCAL_SORT, sort() for GLOBAL_SORT
if (mode == CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) {
return sorter.sortLocal(dataset);
} else {
return sorter.sort(dataset);
}
})
.reduce(Dataset::union)
.orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public Dataset<Row> sort(Dataset<Row> df) {
return sortedDF.drop(H_COLUMN);
}

@Override
public Dataset<Row> sortLocal(Dataset<Row> df) {
Column hilbertColumn = hilbertValue(df);
Dataset<Row> hilbertValueDF = df.withColumn(H_COLUMN, hilbertColumn);
Dataset<Row> sortedDF = hilbertValueDF.sortWithinPartitions(hilbertValueDF.col(H_COLUMN));
return sortedDF.drop(H_COLUMN);
}

private Column hilbertValue(Dataset<Row> df) {
SparkHilbertUDF hilbertUDF = new SparkHilbertUDF();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ public Dataset<Row> sort(Dataset<Row> input) {
Column[] sortColumns = orderColNames.stream().map(input::col).toArray(Column[]::new);
return input.repartitionByRange(sortColumns).sortWithinPartitions(sortColumns);
}

@Override
public Dataset<Row> sortLocal(Dataset<Row> input) {
Column[] sortColumns = orderColNames.stream().map(input::col).toArray(Column[]::new);
return input.sortWithinPartitions(sortColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ private void checkColNames() {

public abstract Dataset<Row> sort(Dataset<Row> input);

public abstract Dataset<Row> sortLocal(Dataset<Row> input);

public static TableSorter getSorter(
FileStoreTable table, OrderType orderType, List<String> orderColumns) {
switch (orderType) {
Expand All @@ -77,6 +79,11 @@ public static TableSorter getSorter(
public Dataset<Row> sort(Dataset<Row> input) {
return input;
}

@Override
public Dataset<Row> sortLocal(Dataset<Row> input) {
return input;
}
};
default:
throw new IllegalArgumentException("cannot match order type: " + orderType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ public Dataset<Row> sort(Dataset<Row> df) {
return sortedDF.drop(Z_COLUMN);
}

@Override
public Dataset<Row> sortLocal(Dataset<Row> df) {
Column zColumn = zValue(df);
Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);
Dataset<Row> sortedDF = zValueDF.sortWithinPartitions(zValueDF.col(Z_COLUMN));
return sortedDF.drop(Z_COLUMN);
}

private Column zValue(Dataset<Row> df) {
SparkZOrderUDF zOrderUDF =
new SparkZOrderUDF(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,187 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
}
}

test("Paimon Procedure: incremental clustering with local-sort mode") {
withTable("T") {
spark.sql(s"""
|CREATE TABLE T (a INT, b INT, c STRING)
|TBLPROPERTIES (
| 'bucket'='-1',
| 'num-levels'='4',
| 'num-sorted-run.compaction-trigger'='2',
| 'clustering.columns'='a',
| 'clustering.incremental'='true',
| 'clustering.incremental.mode'='local-sort'
|)
|""".stripMargin)

// Insert data in multiple batches to create multiple files
spark.sql("INSERT INTO T VALUES (5, 1, 'a')")
spark.sql("INSERT INTO T VALUES (3, 2, 'b')")
spark.sql("INSERT INTO T VALUES (1, 3, 'c')")
spark.sql("INSERT INTO T VALUES (2, 3, 'd')")
spark.sql(
"INSERT INTO T SELECT /*+ COALESCE(1) */ col1, col2, col3 FROM VALUES (8, 3, 'e'), (1, 4, 'f'), (2, 5, 'g') AS t(col1, col2, col3);")

checkAnswer(
spark.sql("SELECT a, b, c FROM T"),
Seq(
Row(5, 1, "a"),
Row(3, 2, "b"),
Row(1, 3, "c"),
Row(2, 3, "d"),
Row(8, 3, "e"),
Row(1, 4, "f"),
Row(2, 5, "g"))
)

val filesBeforeCompact = spark.sql("SELECT count(*) FROM `T$files`").collect()(0).getLong(0)
Assertions.assertThat(filesBeforeCompact).isEqualTo(5)

// Run compaction to trigger incremental clustering
checkAnswer(spark.sql("CALL sys.compact(table => 'T')"), Row(true) :: Nil)

// Verify data integrity
checkAnswer(
spark.sql("SELECT a, b, c FROM T"),
Seq(
Row(5, 1, "a"),
Row(3, 2, "b"),
Row(1, 3, "c"),
Row(2, 3, "d"),
Row(1, 4, "f"),
Row(2, 5, "g"),
Row(8, 3, "e"))
)

// Verify files are clustered (level >= 1)
val files = spark.sql("SELECT level FROM `T$files`").collect()
files.foreach(row => Assertions.assertThat(row.getInt(0)).isGreaterThanOrEqualTo(1))
val filesAfterCompact = spark.sql("SELECT count(*) FROM `T$files`").collect()(0).getLong(0)
Assertions.assertThat(filesAfterCompact).isLessThanOrEqualTo(filesBeforeCompact)

// Verify local-sort effect: within each file, rows are physically sorted by column 'a'.
// file_path in T$files is an absolute path (bucketPath + "/" + fileName), so we can
// read each file directly as parquet (bypassing Paimon's scan) to check physical row order.
val fileRows =
spark.sql("SELECT file_path, record_count FROM `T$files`").collect()
fileRows.foreach {
row =>
val filePath = row.getString(0)
val recordCount = row.getLong(1)
if (recordCount > 1) {
// For multi-row files, verify rows are physically sorted by 'a'
val aValues =
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
for (i <- 1 until aValues.length) {
Assertions
.assertThat(aValues(i))
.as(
s"File $filePath: row $i (a=${aValues(i)}) should be >= row ${i - 1} (a=${aValues(i - 1)})")
.isGreaterThanOrEqualTo(aValues(i - 1))
}
}
}

val table = loadTable("T").asInstanceOf[FileStoreTable]
checkSnapshot(table)
}
}

test("Paimon Procedure: incremental clustering local-sort vs global-sort") {
withTable("T_local", "T_global") {
// Create table with local-sort
spark.sql(s"""
|CREATE TABLE T_local (a INT, b INT)
|TBLPROPERTIES (
| 'bucket'='-1',
| 'num-levels'='4',
| 'num-sorted-run.compaction-trigger'='2',
| 'clustering.columns'='a',
| 'clustering.incremental'='true',
| 'clustering.incremental.mode'='local-sort'
|)
|""".stripMargin)

// Create table with global-sort (default)
spark.sql(s"""
|CREATE TABLE T_global (a INT, b INT)
|TBLPROPERTIES (
| 'bucket'='-1',
| 'num-levels'='4',
| 'num-sorted-run.compaction-trigger'='2',
| 'clustering.columns'='a',
| 'clustering.incremental'='true',
| 'clustering.incremental.mode'='global-sort'
|)
|""".stripMargin)

// Insert same data into both tables
for (table <- Seq("T_local", "T_global")) {
spark.sql(s"INSERT INTO $table VALUES (5, 1), (3, 2)")
spark.sql(s"INSERT INTO $table VALUES (8, 3), (1, 4)")
spark.sql(s"INSERT INTO $table VALUES (7, 5), (2, 6)")
spark.sql(s"INSERT INTO $table VALUES (4, 7), (6, 8)")
}

// Run compact on both
spark.sql("CALL sys.compact(table => 'T_local')")
spark.sql("CALL sys.compact(table => 'T_global')")

// Both should have same data
checkAnswer(
spark.sql("SELECT a, b FROM T_local ORDER BY a"),
spark.sql("SELECT a, b FROM T_global ORDER BY a")
)

val localFileRows =
spark.sql("SELECT file_path, record_count FROM `T_local$files`").collect()
val globalFileRows =
spark.sql("SELECT file_path, record_count FROM `T_global$files`").collect()

// Global-sort uses repartitionByRange which shuffles all data into 1 sorted file.
// Reading multiple files in parallel gives non-deterministic cross-file ordering,
// so we must verify physical ordering by reading each parquet file directly.
Assertions.assertThat(globalFileRows.length).isEqualTo(1)
val globalAValues =
spark.read
.format("parquet")
.load(globalFileRows(0).getString(0))
.select("a")
.collect()
.map(_.getInt(0))
Assertions.assertThat(globalAValues).isEqualTo(Array(1, 2, 3, 4, 5, 6, 7, 8))

// Local-sort uses sortWithinPartitions only (no range shuffle), so multiple output
// files are produced. Each file is individually sorted by 'a', but ranges may overlap.
Assertions.assertThat(localFileRows.length.toLong).isGreaterThan(globalFileRows.length)
var localFilesWithMultiRows = 0
localFileRows.foreach {
row =>
val filePath = row.getString(0)
val recordCount = row.getLong(1)
if (recordCount > 1) {
localFilesWithMultiRows += 1
val aValues =
spark.read.format("parquet").load(filePath).select("a").collect().map(_.getInt(0))
for (i <- 1 until aValues.length) {
Assertions
.assertThat(aValues(i))
.as(
s"local-sort file $filePath: a[$i]=${aValues(i)} should be >= a[${i - 1}]=${aValues(i - 1)}")
.isGreaterThanOrEqualTo(aValues(i - 1))
}
}
}
Assertions.assertThat(localFilesWithMultiRows).isGreaterThan(0)

val localFiles = spark.sql("SELECT count(*) FROM `T_local$files`").collect()(0).getLong(0)
val globalFiles = spark.sql("SELECT count(*) FROM `T_global$files`").collect()(0).getLong(0)
Assertions.assertThat(localFiles).isGreaterThanOrEqualTo(globalFiles)
Assertions.assertThat(globalFiles).isEqualTo(1)
}
}

def checkSnapshot(table: FileStoreTable): Unit = {
Assertions
.assertThat(table.latestSnapshot().get().commitKind().toString)
Expand Down