From 25fade7525daf46229cdefbfce509d4cd44dc00b Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Wed, 8 Apr 2026 17:37:14 +0800 Subject: [PATCH 1/3] [spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION command --- .../spark/PaimonPartitionManagement.scala | 28 +++++++++++++- .../PaimonShowTablePartitionCommand.scala | 18 ++++++++- .../spark/sql/DescribeTableTestBase.scala | 38 +++++++++++++++++++ .../analysis/Spark4ResolutionRules.scala | 13 ++++++- 4 files changed, 92 insertions(+), 5 deletions(-) rename paimon-spark/{paimon-spark3-common => paimon-spark-common}/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala (84%) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 511e728dbca8..cd5955b0ff39 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -19,7 +19,9 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions +import org.apache.paimon.partition.PartitionStatistics import org.apache.paimon.table.{FileStoreTable, Table} +import org.apache.paimon.table.source.ScanMode import org.apache.paimon.types.RowType import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils} @@ -136,7 +138,31 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with L } override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = { - Map.empty[String, String].asJava + table match { + case fileStoreTable: FileStoreTable => + val partitionSpec = toPaimonPartitions(Array(ident)).head + val partitionEntries = fileStoreTable + .newSnapshotReader() + .withMode(ScanMode.ALL) + .withPartitionFilter(partitionSpec) + .partitionEntries() + + if (!partitionEntries.isEmpty) { + val entry = partitionEntries.get(0) + Map( + PartitionStatistics.FIELD_RECORD_COUNT -> entry.recordCount().toString, + PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES -> entry.fileSizeInBytes().toString, + PartitionStatistics.FIELD_FILE_COUNT -> entry.fileCount().toString, + PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME -> entry + .lastFileCreationTime() + .toString + ).asJava + } else { + Map.empty[String, String].asJava + } + case _ => + Map.empty[String, String].asJava + } } override def listPartitionIdentifiers( diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala similarity index 84% rename from paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala index ac98a807ca16..5d85c7bd5ee9 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala @@ -18,13 +18,14 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.partition.PartitionStatistics import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, ToPrettyString} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog} import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ @@ -87,7 +88,20 @@ case class PaimonShowTablePartitionCommand( val partitionValues = partitions.mkString("[", ", ", "]") results.put("Partition Values", s"$partitionValues") - // TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics" + // Partition Parameters and Partition Statistics + val metadata = partitionTable.loadPartitionMetadata(row) + if (!metadata.isEmpty) { + val metadataMap = metadata.asScala + results.put( + "Partition Parameters", + s"{${metadataMap.map { case (k, v) => s"$k=$v" }.mkString(", ")}}") + + val fileSizeInBytes = + metadataMap.getOrElse(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES, "0").toLong + val recordCount = + metadataMap.getOrElse(PartitionStatistics.FIELD_RECORD_COUNT, "0").toLong + results.put("Partition Statistics", s"$recordCount rows, $fileSizeInBytes bytes") + } results .map { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala index 63efa3f7e0a6..a757b24dfe21 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala @@ -98,6 +98,44 @@ abstract class DescribeTableTestBase extends PaimonSparkTestBase { ) Assertions.assertTrue( res2.select("information").collect().head.getString(0).contains("Partition Values")) + + val info2 = res2.select("information").collect().head.getString(0) + Assertions.assertTrue( + info2.contains("Partition Parameters"), + s"SHOW TABLE EXTENDED should contain Partition Parameters, but got: $info2") + Assertions.assertTrue( + info2.contains(PartitionStatistics.FIELD_RECORD_COUNT), + s"SHOW TABLE EXTENDED should contain recordCount, but got: $info2") + Assertions.assertTrue( + info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES), + s"SHOW TABLE EXTENDED should contain fileSizeInBytes, but got: $info2") + Assertions.assertTrue( + info2.contains(PartitionStatistics.FIELD_FILE_COUNT), + s"SHOW TABLE EXTENDED should contain fileCount, but got: $info2") + Assertions.assertTrue( + info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME), + s"SHOW TABLE EXTENDED should contain lastFileCreationTime, but got: $info2") + // Verify recordCount value: partition (pt1='2024', pt2='11') has 1 row + // Spark 3 uses "key=value" format, Spark 4 uses "key: value" format + Assertions.assertTrue( + info2.contains("Partition Statistics"), + s"SHOW TABLE EXTENDED should contain Partition Statistics, but got: $info2") + Assertions.assertTrue( + info2.contains("recordCount=1") || info2.contains("recordCount: 1"), + s"Partition (pt1='2024', pt2='11') should have recordCount=1, but got: $info2") + Assertions.assertTrue( + info2.contains("1 rows"), + s"Partition Statistics should contain '1 rows', but got: $info2") + + val res3 = + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt = '2024')") + val info3 = res3.select("information").collect().head.getString(0) + Assertions.assertTrue( + info3.contains("recordCount=2") || info3.contains("recordCount: 2"), + s"Partition pt='2024' should have recordCount=2, but got: $info3") + Assertions.assertTrue( + info3.contains("2 rows"), + s"Partition Statistics should contain '2 rows', but got: $info3") } } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala index 461cbd0c938a..0202af197a8d 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala @@ -18,10 +18,19 @@ package org.apache.paimon.spark.catalyst.analysis +import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand + import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTablePartition} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case s @ ShowTablePartition(rt: ResolvedTable, _, _) => + val resolvedSpec = + PaimonResolvePartitionSpec.resolve(rt.catalog, rt.identifier, s.partitionSpec) + PaimonShowTablePartitionCommand(s.output, rt.catalog, rt.identifier, resolvedSpec) + } } From c27ef5d7b28420a1db95d9640a2f6bef38ce373b Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Thu, 9 Apr 2026 16:31:22 +0800 Subject: [PATCH 2/3] [spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION command --- .../paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala index 0202af197a8d..23db4e716ddf 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala @@ -21,10 +21,9 @@ package org.apache.paimon.spark.catalyst.analysis import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTablePartition} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { From 2b3407572273b368f8ced29602061b00a095367d Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Thu, 9 Apr 2026 22:49:47 +0800 Subject: [PATCH 3/3] [spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION command --- .../spark/sql/DescribeTableTestBase.scala | 42 +++++-------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala index a757b24dfe21..31cec5fda672 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala @@ -100,42 +100,20 @@ abstract class DescribeTableTestBase extends PaimonSparkTestBase { res2.select("information").collect().head.getString(0).contains("Partition Values")) val info2 = res2.select("information").collect().head.getString(0) - Assertions.assertTrue( - info2.contains("Partition Parameters"), - s"SHOW TABLE EXTENDED should contain Partition Parameters, but got: $info2") - Assertions.assertTrue( - info2.contains(PartitionStatistics.FIELD_RECORD_COUNT), - s"SHOW TABLE EXTENDED should contain recordCount, but got: $info2") - Assertions.assertTrue( - info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES), - s"SHOW TABLE EXTENDED should contain fileSizeInBytes, but got: $info2") - Assertions.assertTrue( - info2.contains(PartitionStatistics.FIELD_FILE_COUNT), - s"SHOW TABLE EXTENDED should contain fileCount, but got: $info2") - Assertions.assertTrue( - info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME), - s"SHOW TABLE EXTENDED should contain lastFileCreationTime, but got: $info2") - // Verify recordCount value: partition (pt1='2024', pt2='11') has 1 row - // Spark 3 uses "key=value" format, Spark 4 uses "key: value" format - Assertions.assertTrue( - info2.contains("Partition Statistics"), - s"SHOW TABLE EXTENDED should contain Partition Statistics, but got: $info2") - Assertions.assertTrue( - info2.contains("recordCount=1") || info2.contains("recordCount: 1"), - s"Partition (pt1='2024', pt2='11') should have recordCount=1, but got: $info2") - Assertions.assertTrue( - info2.contains("1 rows"), - s"Partition Statistics should contain '1 rows', but got: $info2") + Assertions.assertTrue(info2.contains("Partition Parameters")) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_RECORD_COUNT)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_COUNT)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME)) + Assertions.assertTrue(info2.contains("Partition Statistics")) + Assertions.assertTrue(info2.contains("recordCount=1")) + Assertions.assertTrue(info2.contains("1 rows")) val res3 = spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt = '2024')") val info3 = res3.select("information").collect().head.getString(0) - Assertions.assertTrue( - info3.contains("recordCount=2") || info3.contains("recordCount: 2"), - s"Partition pt='2024' should have recordCount=2, but got: $info3") - Assertions.assertTrue( - info3.contains("2 rows"), - s"Partition Statistics should contain '2 rows', but got: $info3") + Assertions.assertTrue(info3.contains("recordCount=2")) + Assertions.assertTrue(info3.contains("2 rows")) } } }