diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 570aeabb2b..084936475d 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$" allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$" allowed_expr+="|^org/apache/spark/CometPlugin.class$" allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" +allowed_expr+="|^org/apache/spark/CometSource.*$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$" allowed_expr+="|^scala-collection-compat.properties$" diff --git a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala new file mode 100644 index 0000000000..e8907d8264 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.CometSource +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +class CometMetricsListener extends QueryExecutionListener { + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + val stats = CometCoverageStats.forPlan(qe.executedPlan) + CometSource.recordStats(stats) + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} +} diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index f47428e801..d30a1fe788 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -192,6 +192,25 @@ class CometCoverageStats { } } +object CometCoverageStats { + + /** + * Compute coverage stats for a plan without generating explain string. + */ + def forPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + val explainInfo = new ExtendedExplainInfo() + explainInfo.generateTreeString( + CometExplainInfo.getActualPlan(plan), + 0, + Seq(), + 0, + new StringBuilder(), + stats) + stats + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala b/spark/src/main/scala/org/apache/spark/CometSource.scala new file mode 100644 index 0000000000..95d7523616 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/CometSource.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark + +import org.apache.spark.metrics.source.Source + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.comet.CometCoverageStats + +/** + * Exposes following metrics (hooked from CometCoverageStats) + * - operators.native: Total operators executed natively + * - operators.spark: Total operators that fell back to Spark + * - queries.planned: Total queries processed + * - transitions: Total Spark-to-Comet transitions + * - acceleration.ratio: native / (native + spark) + */ +object CometSource extends Source { + override val sourceName = "comet" + override val metricRegistry = new MetricRegistry() + + val NATIVE_OPERATORS: Counter = + metricRegistry.counter(MetricRegistry.name("operators", "native")) + val SPARK_OPERATORS: Counter = metricRegistry.counter(MetricRegistry.name("operators", "spark")) + val QUERIES_PLANNED: Counter = metricRegistry.counter(MetricRegistry.name("queries", "planned")) + val TRANSITIONS: Counter = metricRegistry.counter(MetricRegistry.name("transitions")) + + metricRegistry.register( + MetricRegistry.name("acceleration", "ratio"), + new Gauge[Double] { + override def getValue: Double = { + val native = NATIVE_OPERATORS.getCount + val total = native + SPARK_OPERATORS.getCount + if (total > 0) native.toDouble / total else 0.0 + } + }) + + def recordStats(stats: CometCoverageStats): Unit = { + NATIVE_OPERATORS.inc(stats.cometOperators) + SPARK_OPERATORS.inc(stats.sparkOperators) + TRANSITIONS.inc(stats.transitions) + QUERIES_PLANNED.inc() + } +} diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 2529f08cfb..e03433e7b2 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl // register CometSparkSessionExtensions if it isn't already registered CometDriverPlugin.registerCometSessionExtension(sc.conf) + // Register Comet metrics + CometDriverPlugin.registerCometMetrics(sc) + if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) { val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) { sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key) @@ -101,6 +104,25 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl } object CometDriverPlugin extends Logging { + def registerCometMetrics(sc: SparkContext): Unit = { + sc.env.metricsSystem.registerSource(CometSource) + + val listenerKey = "spark.sql.queryExecutionListeners" + val listenerClass = "org.apache.comet.CometMetricsListener" + val listeners = sc.conf.get(listenerKey, "") + if (listeners.isEmpty) { + logInfo(s"Setting $listenerKey=$listenerClass") + sc.conf.set(listenerKey, listenerClass) + } else { + val currentListeners = listeners.split(",").map(_.trim) + if (!currentListeners.contains(listenerClass)) { + val newValue = s"$listeners,$listenerClass" + logInfo(s"Setting $listenerKey=$newValue") + sc.conf.set(listenerKey, newValue) + } + } + } + def registerCometSessionExtension(conf: SparkConf): Unit = { val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key val extensionClass = classOf[CometSparkSessionExtensions].getName diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index c493b22f79..cab263854f 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark -import org.apache.spark.sql.CometTestBase +import java.io.File + +import org.apache.spark.sql.{CometTestBase, SaveMode} import org.apache.spark.sql.internal.StaticSQLConf class CometPluginsSuite extends CometTestBase { @@ -77,6 +79,43 @@ class CometPluginsSuite extends CometTestBase { } } + test("CometSource metrics are recorded") { + val nativeBefore = CometSource.NATIVE_OPERATORS.getCount + val queriesBefore = CometSource.QUERIES_PLANNED.getCount + + withTempPath { dir => + val path = new File(dir, "test.parquet").toString + spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) + spark.read.parquet(path).filter("id > 500").collect() + } + spark.sparkContext.listenerBus.waitUntilEmpty() + assert( + CometSource.QUERIES_PLANNED.getCount > queriesBefore, + "queries.planned should increment after query") + assert( + CometSource.NATIVE_OPERATORS.getCount > nativeBefore, + "operators.native should increment for native execution") + } + + test("metrics not double counted with AQE") { + withSQLConf("spark.sql.adaptive.enabled" -> "true") { + withTempPath { dir => + val path = new File(dir, "test.parquet").toString + spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) + + spark.sparkContext.listenerBus.waitUntilEmpty() + val queriesBefore = CometSource.QUERIES_PLANNED.getCount + spark.read.parquet(path).filter("id > 100").collect() + spark.read.parquet(path).filter("id > 200").collect() + spark.sparkContext.listenerBus.waitUntilEmpty() + val queriesAfter = CometSource.QUERIES_PLANNED.getCount + assert( + queriesAfter == queriesBefore + 2, + s"Expected 2 queries, got ${queriesAfter - queriesBefore}") + } + } + } + test("Default Comet memory overhead") { val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead") val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")