From b641967558286a745497bd3a0d45b3ec2bbf778c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 7 May 2026 10:41:20 +0800 Subject: [PATCH 1/7] split scan partition by conf --- .../BucketOffsetsRetrieverImpl.java | 19 ++++-- .../apache/fluss/spark/SparkFlussConf.scala | 10 +++ .../apache/fluss/spark/read/FlussBatch.scala | 61 +++++++++++++++---- .../fluss/spark/SparkLogTableReadTest.scala | 9 +++ 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java index e868a84cc1..037790f0be 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java @@ -37,10 +37,17 @@ public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever { private final Admin flussAdmin; private final TablePath tablePath; + private final Boolean fetchEarliestOffset; public BucketOffsetsRetrieverImpl(Admin flussAdmin, TablePath tablePath) { + this(flussAdmin, tablePath, false); + } + + public BucketOffsetsRetrieverImpl( + Admin flussAdmin, TablePath tablePath, Boolean fetchEarliestOffset) { this.flussAdmin = flussAdmin; this.tablePath = tablePath; + this.fetchEarliestOffset = fetchEarliestOffset; } @Override @@ -52,11 +59,15 @@ public Map latestOffsets( @Override public Map earliestOffsets( @Nullable String partitionName, Collection buckets) { - Map bucketWithOffset = new HashMap<>(buckets.size()); - for (Integer bucket : buckets) { - bucketWithOffset.put(bucket, EARLIEST_OFFSET); + if (!fetchEarliestOffset) { + Map bucketWithOffset = new HashMap<>(buckets.size()); + for (Integer bucket : buckets) { + bucketWithOffset.put(bucket, EARLIEST_OFFSET); + } + return bucketWithOffset; + } else { + return listOffsets(partitionName, buckets, new OffsetSpec.EarliestSpec()); } - return bucketWithOffset; } @Override diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala index 28fb633b52..00d6400f64 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala @@ -50,4 +50,14 @@ object SparkFlussConf { .durationType() .defaultValue(Duration.ofMillis(10000L)) .withDescription("The timeout for log scanner to poll records.") + + val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] = + ConfigBuilder + .key("scan.max.records.per.partition") + .longType() + .noDefaultValue() + .withDescription( + "The maximum number of records per Spark input partition when reading a log table. " + + "When set, each Fluss bucket whose offset range exceeds this value will be split " + + "into multiple partitions. Disabled by default (one partition per bucket).") } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 87f2fdad0f..92f7a31f5c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath} import org.apache.fluss.predicate.Predicate import org.apache.fluss.spark.utils.SparkPartitionPredicate +import org.apache.fluss.spark.SparkFlussConf import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory} import org.apache.spark.sql.types.StructType @@ -127,26 +128,64 @@ class FlussAppendBatch( } override def planInputPartitions(): Array[InputPartition] = { - val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin, tablePath) + val maxRecordsPerPartition: Option[Long] = { + val opt = flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION) + if (opt.isPresent) Some(opt.get().longValue()) else None + } + + val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match { + case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true) + case None => new BucketOffsetsRetrieverImpl(admin, tablePath) + } val buckets = (0 until tableInfo.getNumBuckets).toSeq + def splitOffsetRange( + tableBucket: TableBucket, + startOffset: Long, + stopOffset: Long, + maxRecords: Long): Seq[InputPartition] = { + if ( + startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords) + ) { + return Seq( + FlussAppendInputPartition(tableBucket, startOffset, stopOffset) + .asInstanceOf[InputPartition]) + } + val rangeSize = stopOffset - startOffset + val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt + val step = (rangeSize + numSplits - 1) / numSplits + + Iterator + .from(0) + .take(numSplits) + .map(i => startOffset + i * step) + .map { + from => + FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) + .asInstanceOf[InputPartition] + } + .toSeq + } + def createPartitions( partitionId: Option[Long], startBucketOffsets: Map[Integer, Long], stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = { - buckets.map { + buckets.flatMap { bucketId => - val (startBucketOffset, stoppingBucketOffset) = + val (startOffset, stopOffset) = (startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId)) - partitionId match { - case Some(partitionId) => - val tableBucket = new TableBucket(tableInfo.getTableId, partitionId, bucketId) - FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset) - .asInstanceOf[InputPartition] + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableInfo.getTableId, pid, bucketId) + case None => new TableBucket(tableInfo.getTableId, bucketId) + } + maxRecordsPerPartition match { + case Some(maxRecs) => + splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs) case None => - val tableBucket = new TableBucket(tableInfo.getTableId, bucketId) - FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset) - .asInstanceOf[InputPartition] + Seq( + FlussAppendInputPartition(tableBucket, startOffset, stopOffset) + .asInstanceOf[InputPartition]) } }.toArray } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 42b0aa62d0..e09e82f434 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -603,4 +603,13 @@ class SparkLogTableReadTest extends FlussSparkTestBase { assert(numRowsRead == 5L, s"Expected 5 rows read, got $numRowsRead") } } + + test("Spark Read: split partition by config") { + withSampleTable { + withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") { + val query = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") + checkAnswer(query, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + } + } + } } From 390e9d5b905031a659a65f729999d74acef49d0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 13 May 2026 18:45:57 +0800 Subject: [PATCH 2/7] fix comments --- .../BucketOffsetsRetrieverImpl.java | 4 +- .../apache/fluss/spark/read/FlussBatch.scala | 16 ++--- .../fluss/spark/SparkLogTableReadTest.scala | 59 ++++++++++++++++++- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java index 037790f0be..2abd998828 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java @@ -37,14 +37,14 @@ public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever { private final Admin flussAdmin; private final TablePath tablePath; - private final Boolean fetchEarliestOffset; + private final boolean fetchEarliestOffset; public BucketOffsetsRetrieverImpl(Admin flussAdmin, TablePath tablePath) { this(flussAdmin, tablePath, false); } public BucketOffsetsRetrieverImpl( - Admin flussAdmin, TablePath tablePath, Boolean fetchEarliestOffset) { + Admin flussAdmin, TablePath tablePath, boolean fetchEarliestOffset) { this.flussAdmin = flussAdmin; this.tablePath = tablePath; this.fetchEarliestOffset = fetchEarliestOffset; diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 92f7a31f5c..4f925ac3f8 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -147,9 +147,7 @@ class FlussAppendBatch( if ( startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords) ) { - return Seq( - FlussAppendInputPartition(tableBucket, startOffset, stopOffset) - .asInstanceOf[InputPartition]) + return Seq(FlussAppendInputPartition(tableBucket, startOffset, stopOffset)) } val rangeSize = stopOffset - startOffset val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt @@ -160,9 +158,7 @@ class FlussAppendBatch( .take(numSplits) .map(i => startOffset + i * step) .map { - from => - FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) - .asInstanceOf[InputPartition] + from => FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) } .toSeq } @@ -180,12 +176,10 @@ class FlussAppendBatch( case None => new TableBucket(tableInfo.getTableId, bucketId) } maxRecordsPerPartition match { - case Some(maxRecs) => + case Some(maxRecs) if maxRecs > 0 => splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs) - case None => - Seq( - FlussAppendInputPartition(tableBucket, startOffset, stopOffset) - .asInstanceOf[InputPartition]) + case _ => + Seq(FlussAppendInputPartition(tableBucket, startOffset, stopOffset)) } }.toArray } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index e09e82f434..4ccc58f928 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -21,8 +21,8 @@ import org.apache.fluss.spark.read.{FlussMetrics, FlussScan} import org.apache.fluss.spark.read.FlussAppendScan import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.Row import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation} import org.assertj.core.api.Assertions.assertThat @@ -607,9 +607,62 @@ class SparkLogTableReadTest extends FlussSparkTestBase { test("Spark Read: split partition by config") { withSampleTable { withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") { - val query = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") - checkAnswer(query, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + val df = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") + checkAnswer(df, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + + val partitions = getInputPartitions(df) + assertThat(partitions.length).isEqualTo(3) + } + } + + withTable("t_partition") { + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.t_partition (orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING) + |PARTITIONED BY (dt) + |""".stripMargin + ) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_partition VALUES + |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, "addr2", "2026-01-01"), + |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, "addr4", "2026-01-02"), + |(1000L, 25L, 605, "addr5", "2026-01-03") + |""".stripMargin) + Seq((0, 3), (1, 5), (2, 3)).foreach { + case (maxRecords, expectedPartitions) => + withClue(s"maxRecords = $maxRecords, expectedPartitions = $expectedPartitions") { + withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> maxRecords.toString) { + val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partition ORDER BY orderId") + checkAnswer( + df, + Row(600L, 21L, 601, "addr1", "2026-01-01") :: + Row(700L, 22L, 602, "addr2", "2026-01-01") :: + Row(800L, 23L, 603, "addr3", "2026-01-02") :: + Row(900L, 24L, 604, "addr4", "2026-01-02") :: + Row(1000L, 25L, 605, "addr5", "2026-01-03") :: Nil + ) + + val partitions = getInputPartitions(df) + assertThat(partitions.length).isEqualTo(expectedPartitions) + } + } + } + } + } + + private def getInputPartitions(df: DataFrame): Array[InputPartition] = { + // Try executedPlan first (after AQE), then optimizedPlan + val fromExecutedPlan = df.queryExecution.executedPlan.collect { + case b: BatchScanExec => b.inputPartitions.toArray + } + if (fromExecutedPlan.nonEmpty) { + fromExecutedPlan.head + } else { + val scans = df.queryExecution.optimizedPlan.collect { + case DataSourceV2ScanRelation(_, scan: FlussAppendScan, _, _, _) => scan } + scans.headOption.map(_.toBatch.planInputPartitions()).getOrElse(Array.empty[InputPartition]) } } } From 1a0e567aa056bed2e0696fc7081670e2befd67d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 13 May 2026 18:52:59 +0800 Subject: [PATCH 3/7] fix rebase --- .../src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 4f925ac3f8..b69878c981 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -25,8 +25,8 @@ import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath} import org.apache.fluss.predicate.Predicate -import org.apache.fluss.spark.utils.SparkPartitionPredicate import org.apache.fluss.spark.SparkFlussConf +import org.apache.fluss.spark.utils.SparkPartitionPredicate import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory} import org.apache.spark.sql.types.StructType From a2de14366ae67ba59294368e88f2e2d13f565a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 14 May 2026 10:25:21 +0800 Subject: [PATCH 4/7] trigger CI From 90e76a7bbf92a17674fc9030e63d21c4124cb1c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Fri, 15 May 2026 17:34:18 +0800 Subject: [PATCH 5/7] fix comments --- .../apache/fluss/spark/SparkFlussConf.scala | 2 +- .../apache/fluss/spark/read/FlussBatch.scala | 8 +++---- .../fluss/spark/SparkLogTableReadTest.scala | 22 +++++++++---------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala index 00d6400f64..aac6a698da 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala @@ -53,7 +53,7 @@ object SparkFlussConf { val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] = ConfigBuilder - .key("scan.max.records.per.partition") + .key("scan.maxRecordsPerPartition") .longType() .noDefaultValue() .withDescription( diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index b69878c981..d876abae59 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -129,13 +129,13 @@ class FlussAppendBatch( override def planInputPartitions(): Array[InputPartition] = { val maxRecordsPerPartition: Option[Long] = { - val opt = flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION) - if (opt.isPresent) Some(opt.get().longValue()) else None + val value = flussConfig.getLong(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION, 0) + if (value > 0) Some(value) else None } val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match { case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true) - case None => new BucketOffsetsRetrieverImpl(admin, tablePath) + case _ => new BucketOffsetsRetrieverImpl(admin, tablePath) } val buckets = (0 until tableInfo.getNumBuckets).toSeq @@ -176,7 +176,7 @@ class FlussAppendBatch( case None => new TableBucket(tableInfo.getTableId, bucketId) } maxRecordsPerPartition match { - case Some(maxRecs) if maxRecs > 0 => + case Some(maxRecs) => splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs) case _ => Seq(FlussAppendInputPartition(tableBucket, startOffset, stopOffset)) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 4ccc58f928..34c6fd3aa5 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -23,6 +23,7 @@ import org.apache.fluss.spark.read.FlussAppendScan import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation} import org.assertj.core.api.Assertions.assertThat @@ -651,18 +652,15 @@ class SparkLogTableReadTest extends FlussSparkTestBase { } } - private def getInputPartitions(df: DataFrame): Array[InputPartition] = { - // Try executedPlan first (after AQE), then optimizedPlan - val fromExecutedPlan = df.queryExecution.executedPlan.collect { - case b: BatchScanExec => b.inputPartitions.toArray - } - if (fromExecutedPlan.nonEmpty) { - fromExecutedPlan.head - } else { - val scans = df.queryExecution.optimizedPlan.collect { - case DataSourceV2ScanRelation(_, scan: FlussAppendScan, _, _, _) => scan - } - scans.headOption.map(_.toBatch.planInputPartitions()).getOrElse(Array.empty[InputPartition]) + private def getInputPartitions(df: DataFrame): Seq[InputPartition] = { + df.queryExecution.executedPlan match { + case aeq: AdaptiveSparkPlanExec => + aeq.inputPlan.collect { case b: BatchScanExec => b.inputPartitions }.flatten + case e => + e.collect { + case b: BatchScanExec => b.inputPartitions + case _ => Seq.empty[InputPartition] + }.flatten } } } From 35c1b64582dbbf05f4cc5e2c52f388a45d0f553e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Fri, 15 May 2026 18:17:29 +0800 Subject: [PATCH 6/7] fix conf name --- .../org/apache/fluss/spark/SparkLogTableReadTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 34c6fd3aa5..853fc60b58 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -607,7 +607,8 @@ class SparkLogTableReadTest extends FlussSparkTestBase { test("Spark Read: split partition by config") { withSampleTable { - withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") { + withSQLConf( + s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}.${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION}" -> "2") { val df = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") checkAnswer(df, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) @@ -633,7 +634,8 @@ class SparkLogTableReadTest extends FlussSparkTestBase { Seq((0, 3), (1, 5), (2, 3)).foreach { case (maxRecords, expectedPartitions) => withClue(s"maxRecords = $maxRecords, expectedPartitions = $expectedPartitions") { - withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> maxRecords.toString) { + withSQLConf( + s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}.${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION}" -> maxRecords.toString) { val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partition ORDER BY orderId") checkAnswer( df, From 72ee21e87cbd67c9e9fa4a65dae571a18aacc930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Sun, 17 May 2026 16:16:16 +0800 Subject: [PATCH 7/7] fix ut --- .../org/apache/fluss/spark/SparkLogTableReadTest.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 853fc60b58..578ce8be35 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -17,8 +17,7 @@ package org.apache.fluss.spark -import org.apache.fluss.spark.read.{FlussMetrics, FlussScan} -import org.apache.fluss.spark.read.FlussAppendScan +import org.apache.fluss.spark.read.{FlussAppendScan, FlussMetrics, FlussScan} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -608,7 +607,8 @@ class SparkLogTableReadTest extends FlussSparkTestBase { test("Spark Read: split partition by config") { withSampleTable { withSQLConf( - s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}.${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION}" -> "2") { + s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION.key()}" + -> "2") { val df = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") checkAnswer(df, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) @@ -635,7 +635,8 @@ class SparkLogTableReadTest extends FlussSparkTestBase { case (maxRecords, expectedPartitions) => withClue(s"maxRecords = $maxRecords, expectedPartitions = $expectedPartitions") { withSQLConf( - s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}.${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION}" -> maxRecords.toString) { + s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION.key()}" + -> maxRecords.toString) { val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partition ORDER BY orderId") checkAnswer( df,