From df20f6bb9d02c26bfb9309c96f7ee94cb9b71000 Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Wed, 13 May 2026 13:16:20 -0500 Subject: [PATCH] feat: detect Iceberg V2 writes and emit fall-back reasons --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + .../scala/org/apache/comet/CometConf.scala | 12 + .../user-guide/latest/iceberg-writes.md | 89 ++++++ docs/source/user-guide/latest/index.rst | 1 + .../comet/iceberg/IcebergReflection.scala | 98 ++++++ .../apache/comet/rules/CometExecRule.scala | 11 +- .../operator/CometIcebergNativeWrite.scala | 272 +++++++++++++++++ .../CometIcebergWriteDetectionSuite.scala | 282 ++++++++++++++++++ 9 files changed, 766 insertions(+), 1 deletion(-) create mode 100644 docs/source/user-guide/latest/iceberg-writes.md create mode 100644 spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeWrite.scala create mode 100644 spark/src/test/scala/org/apache/comet/CometIcebergWriteDetectionSuite.scala diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index dd5377ef54..5802bd5cf9 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -335,6 +335,7 @@ jobs: org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite org.apache.comet.CometIcebergRewriteActionSuite + org.apache.comet.CometIcebergWriteDetectionSuite org.apache.comet.iceberg.IcebergReflectionSuite - name: "csv" value: | diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 8abaa1c776..558b9bcf76 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -183,6 +183,7 @@ jobs: org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite org.apache.comet.CometIcebergRewriteActionSuite + org.apache.comet.CometIcebergWriteDetectionSuite org.apache.comet.iceberg.IcebergReflectionSuite - name: "csv" value: | diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9b376837f7..cdb0cb9983 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -139,6 +139,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_ICEBERG_NATIVE_WRITE_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.write.icebergNative.enabled") + .category(CATEGORY_TESTING) + .doc( + "Whether to enable native Iceberg writes through iceberg-rust. When enabled, " + + "Iceberg V2 writes (INSERT INTO, INSERT OVERWRITE, RewriteDataFiles, copy-on-write " + + "MERGE/UPDATE/DELETE) execute through native iceberg-rust writers instead of the " + + "JVM-side Iceberg writer. Falls back to Spark when table " + + "properties or operation shape are out of scope.") + .booleanConf + .createWithDefault(false) + val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] = conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit") .category(CATEGORY_SCAN) diff --git a/docs/source/user-guide/latest/iceberg-writes.md b/docs/source/user-guide/latest/iceberg-writes.md new file mode 100644 index 0000000000..ca52ba11a9 --- /dev/null +++ b/docs/source/user-guide/latest/iceberg-writes.md @@ -0,0 +1,89 @@ + + +# Accelerating Apache Iceberg Writes using Comet + +Comet routes Iceberg writes through the `iceberg-rust` Parquet writer when the table and operation +are within scope, and falls back to Spark's standard Iceberg writer otherwise. The feature is +disabled by default; enable it with `spark.comet.write.icebergNative.enabled=true`. + +Commit, conflict detection, isolation levels, and snapshot management remain in the JVM-side +Iceberg `Transaction`. Comet only owns the per-task Arrow→Parquet file writing. + +The restrictions documented below reflect what iceberg-rust and parquet-rs implement at the time +of writing. As those libraries gain features, individual fall-back triggers may be relaxed in +future Comet releases. + +## Operation-level fallback + +Comet falls back to Spark for every Iceberg write outside this set: + +| Operation | Status | +| ---------------------------------------------------------- | ------- | +| `INSERT INTO` (V2) | Native | +| `INSERT OVERWRITE` (static + dynamic, V2) | Native | +| `RewriteDataFiles` (binPack / sort / zOrder, V2) | Native | +| Copy-on-write `MERGE` / `UPDATE` / `DELETE` (V2) | Native | +| Merge-on-read `MERGE` / `UPDATE` / `DELETE` | Fallback | +| `RewritePositionDeleteFiles` | Fallback | +| Streaming writes | Fallback | +| ORC / Avro data files | Fallback | +| Format-version 3 tables | Fallback | + +iceberg-rust does not implement a positional delete writer, so Merge-on-Read DML (which emits +positional deletes by default through Spark's Iceberg DSv2 path) and `RewritePositionDeleteFiles` +fall back. iceberg-rust does ship an `EqualityDeleteFileWriterBuilder`, but Spark's DSv2 DML path +does not produce equality deletes, so it is not exercised here. Format-version 3 requires row +lineage and introduces deletion vectors, variant types, and revised manifest rules that iceberg-rust +does not produce. + +## Table-property fallback triggers + +If any of the following are set on the target table, the write falls back to Spark. The conditions +are evaluated once per write; partial acceleration of a single statement is never attempted. + +| Property | Triggers fallback when | Why | +| -------------------------------------------------------------- | ------------------------------ | --- | +| `write.format.default` | not `parquet` | only Parquet is implemented in iceberg-rust's writer stack | +| `write.object-storage.enabled` | `true` | the hashed-prefix object-storage location generator is not implemented in iceberg-rust | +| `write.location-provider.impl` | set | custom Java `LocationProvider` classes cannot run in Rust | +| `format-version` | `>= 3` | format-version 3 requires row lineage, which is not supported in iceberg-rust | +| `encryption.*` (any key) | set | iceberg-rust's `ParquetWriter` does not wire Parquet modular encryption through to parquet-rs (parquet-rs supports it, but the integration layer does not) | +| `write.metadata.metrics.default` | mentions `counts` | Iceberg's `counts` mode emits null/value counts without min/max bounds; parquet-rs has no writer mode that produces counts without also producing bounds (when statistics are enabled, parquet-rs writes both) | +| `write.metadata.metrics.column.` | `counts` | same as above, applied per column | +| `write.parquet.bloom-filter-max-bytes` | set | parquet-rs has no global byte cap on bloom filters; an explicit limit would diverge from iceberg-java | +| `write.parquet.bloom-filter-enabled.column.` | `true` | parquet-rs has no equivalent of iceberg-java's 1 MiB default cap, so non-trivial NDV columns would produce larger filters than iceberg-java writes | +| `write.parquet.row-group-check-min-record-count` | set | the row-group sizing cadence differs between parquet-rs and parquet-mr (see Known divergences) | +| `write.parquet.row-group-check-max-record-count` | set | same as above | +| `write.metadata.metrics.max-inferred-column-defaults` | smaller than the schema column count | iceberg-java applies `MetricsModes.None` to columns past this index; Comet's translation does not currently emit the matching per-column overrides in iceberg-rust | + +## Known divergences (no fallback, behaviour differs) + +The native and JVM writers produce Parquet files that read identically through Iceberg but are not +byte-for-byte identical: + +- **Row group boundaries**. parquet-rs checks row-group close after every record batch; parquet-mr + checks every 100–10,000 rows. With `write.parquet.row-group-size-bytes` honoured by both writers, + the target row-group size matches, but exact split row counts differ slightly and per-row-group + statistics granularity differs. +- **`created_by` metadata**. Comet stamps `Apache Iceberg (Comet )` into the Parquet + `created_by` field so Comet-written files are identifiable. iceberg-java writes + `parquet-mr version X.Y.Z (...)`. +- **Aborted tasks**. iceberg-rust does not delete partial data files written by a failed task on + abort; rely on Iceberg's `expire_orphan_files` maintenance to reclaim them. diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst index 314a0a51bd..1633909125 100644 --- a/docs/source/user-guide/latest/index.rst +++ b/docs/source/user-guide/latest/index.rst @@ -49,4 +49,5 @@ to read more. Tuning Guide Metrics Guide Iceberg Guide + Iceberg Writes Kubernetes Guide diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index e85eac2c40..93b5f83c78 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -48,6 +48,9 @@ object IcebergReflection extends Logging { val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate" val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan" val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan" + val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite" + val TABLE_PROPERTIES = "org.apache.iceberg.TableProperties" + val TYPE_UTIL = "org.apache.iceberg.types.TypeUtil" } /** @@ -648,6 +651,101 @@ object IcebergReflection extends Logging { unsupportedTypes.toList } + + /** + * Probes a class via reflection at most once, then memoises the outcome. Returns `None` when + * Iceberg is not on the classpath so Comet stays buildable in non-Iceberg deployments. + */ + private def tryLoadClass(name: String): Option[Class[_]] = + try Some(loadClass(name)) + catch { case _: ClassNotFoundException => None } + + private lazy val sparkWriteClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SPARK_WRITE) + private lazy val tablePropertiesClassOpt: Option[Class[_]] = + tryLoadClass(ClassNames.TABLE_PROPERTIES) + + /** + * Returns true when `write` is an Iceberg `SparkWrite` (or subclass). `Class.isInstance` itself + * never throws; the only failure mode is Iceberg being absent from the classpath, in which case + * `sparkWriteClassOpt` is `None`. + * + * Verified against `SparkWrite` in Iceberg 1.5.2 (spark-3.4 profile), 1.8.1 (spark-3.5), and + * 1.10.0 (spark-4.x) -- all expose the same class name under `org.apache.iceberg.spark.source`. + */ + def isIcebergSparkWrite(write: Any): Boolean = + sparkWriteClassOpt.exists(_.isInstance(write)) + + /** + * Reads the private `table` field from a `SparkWrite` instance. + * + * `SparkWrite.table` is a `private final Table` set by the constructor; verified consistent + * across Iceberg 1.5.2 / 1.8.1 / 1.10.0 for the v3.4, v3.5, and v4.0 Spark integrations. No + * public accessor exists on the V2 API path, so reflection with `setAccessible(true)` is + * required. + */ + def getTableFromSparkWrite(sparkWrite: Any): Option[Any] = { + sparkWriteClassOpt.flatMap { cls => + try { + val field = cls.getDeclaredField("table") + field.setAccessible(true) + Option(field.get(sparkWrite)) + } catch { + case e: Exception => + logError( + "Iceberg reflection failure: Failed to get table from SparkWrite: " + + e.getMessage) + None + } + } + } + + /** + * Reads a static `String` constant from Iceberg's `org.apache.iceberg.TableProperties` by field + * name (e.g. `"DEFAULT_FILE_FORMAT"` -> `"write.format.default"`). Throws when Iceberg is + * absent or the constant has been renamed/removed; both cases indicate a version we have not + * vetted. Verified present since Iceberg 1.5.2 for every constant Comet uses. + */ + def tablePropertyConstant(fieldName: String): String = + readTablePropertiesField(fieldName).asInstanceOf[String] + + /** + * Reads a static `int` constant from Iceberg's `org.apache.iceberg.TableProperties` (e.g. one + * of the `*_DEFAULT` numeric defaults). Same failure semantics as `tablePropertyConstant`. + */ + def tablePropertyIntConstant(fieldName: String): Int = + readTablePropertiesField(fieldName).asInstanceOf[Integer].intValue() + + private def readTablePropertiesField(fieldName: String): Any = { + val cls = tablePropertiesClassOpt.getOrElse( + throw new IllegalStateException(s"${ClassNames.TABLE_PROPERTIES} is not on the classpath")) + try cls.getField(fieldName).get(null) + catch { + case e: NoSuchFieldException => + throw new IllegalStateException( + s"${ClassNames.TABLE_PROPERTIES}.$fieldName not found " + + "(unsupported Iceberg version?)", + e) + } + } + + private lazy val typeUtilClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.TYPE_UTIL) + private lazy val schemaClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SCHEMA) + + /** + * Returns the number of "projected" field IDs in the schema -- the same count Iceberg-Java uses + * to decide when `write.metadata.metrics.max-inferred-column-defaults` kicks in + * (`TypeUtil.getProjectedIds(Schema)`). Counts top-level + nested fields. + */ + def getProjectedFieldIdCount(schema: Any): Option[Int] = { + for { + typeUtilClass <- typeUtilClassOpt + schemaClass <- schemaClassOpt + } yield { + val method = typeUtilClass.getMethod("getProjectedIds", schemaClass) + val ids = method.invoke(null, schema.asInstanceOf[AnyRef]).asInstanceOf[java.util.Set[_]] + ids.size() + } + } } /** diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 72c2bea9e4..8afc67279d 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, V2CommandExec} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, V2CommandExec, V2ExistingTableWriteExec} import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.json.JsonScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -50,6 +50,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ +import org.apache.comet.iceberg.IcebergReflection import org.apache.comet.rules.CometExecRule.allExecs import org.apache.comet.serde._ import org.apache.comet.serde.operator._ @@ -288,6 +289,14 @@ case class CometExecRule(session: SparkSession) case op: DataWritingCommandExec => convertToComet(op, CometDataWritingCommand).getOrElse(op) + // Spark's V2 existing-table write operators -- `AppendDataExec`, + // `OverwriteByExpressionExec`, `OverwritePartitionsDynamicExec`, `ReplaceDataExec` -- + // are shared across DSv2 connectors (Iceberg, Delta, Hudi, etc.). Gating on the underlying + // `Write` being an Iceberg `SparkWrite` keeps non-Iceberg DSv2 writes out of the serde, + // so unrelated plans aren't tagged with confusing fall-back reasons. + case op: V2ExistingTableWriteExec if IcebergReflection.isIcebergSparkWrite(op.write) => + convertToComet(op, CometIcebergNativeWrite).getOrElse(op) + // For AQE broadcast stage on a Comet broadcast exchange case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => convertToComet(s, CometExchangeSink).getOrElse(s) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeWrite.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeWrite.scala new file mode 100644 index 0000000000..fd1cb7ef2c --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeWrite.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde.operator + +import java.util.Locale + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.comet.CometNativeExec +import org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec + +import org.apache.comet.{CometConf, ConfigEntry} +import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.iceberg.IcebergReflection +import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} +import org.apache.comet.serde.OperatorOuterClass.Operator + +/** + * Detection-and-fall-back scaffolding for Iceberg V2 writes. Matches the four V2 write operators + * (`AppendDataExec`, `OverwriteByExpressionExec`, `OverwritePartitionsDynamicExec`, + * `ReplaceDataExec`), inspects the underlying Iceberg `SparkWrite`, and falls back to Spark for + * any table-property value or table-metadata state that the planned native writer cannot + * faithfully reproduce. When every requirement is met the serde still returns `Incompatible` and + * `convert` returns `None`; the native path will be filled in by a follow-up commit. + */ +object CometIcebergNativeWrite extends CometOperatorSerde[V2ExistingTableWriteExec] with Logging { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = + Some(CometConf.COMET_ICEBERG_NATIVE_WRITE_ENABLED) + + override def requiresNativeChildren: Boolean = true + + /** + * Property names pulled from Iceberg's own `TableProperties` class via reflection. Centralising + * them here means the suite asserting on these triggers references the same strings the rule + * checks, instead of duplicating literals that could drift from Iceberg's canonical names. + * + * Resolved lazily -- first access happens inside `getSupportLevel` after the write has already + * been confirmed to be an Iceberg `SparkWrite`, so we know Iceberg is on the classpath. + */ + object PropertyKeys { + lazy val DefaultFileFormat: String = + IcebergReflection.tablePropertyConstant("DEFAULT_FILE_FORMAT") + lazy val ObjectStoreEnabled: String = + IcebergReflection.tablePropertyConstant("OBJECT_STORE_ENABLED") + lazy val WriteLocationProviderImpl: String = + IcebergReflection.tablePropertyConstant("WRITE_LOCATION_PROVIDER_IMPL") + lazy val DefaultWriteMetricsMode: String = + IcebergReflection.tablePropertyConstant("DEFAULT_WRITE_METRICS_MODE") + lazy val MetricsModeColumnPrefix: String = + IcebergReflection.tablePropertyConstant("METRICS_MODE_COLUMN_CONF_PREFIX") + lazy val ParquetBloomFilterMaxBytes: String = + IcebergReflection.tablePropertyConstant("PARQUET_BLOOM_FILTER_MAX_BYTES") + lazy val ParquetRowGroupCheckMinRecordCount: String = + IcebergReflection.tablePropertyConstant("PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT") + lazy val ParquetRowGroupCheckMaxRecordCount: String = + IcebergReflection.tablePropertyConstant("PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT") + lazy val MetricsMaxInferredColumnDefaults: String = + IcebergReflection.tablePropertyConstant("METRICS_MAX_INFERRED_COLUMN_DEFAULTS") + + /** Default value Iceberg applies when `write.format.default` is unset. */ + lazy val DefaultFileFormatValue: String = + IcebergReflection.tablePropertyConstant("DEFAULT_FILE_FORMAT_DEFAULT") + + /** + * Default value Iceberg applies when `write.metadata.metrics.max-inferred-column-defaults` is + * unset. + */ + lazy val MetricsMaxInferredColumnDefaultsValue: Int = + IcebergReflection.tablePropertyIntConstant("METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT") + + lazy val BloomFilterColumnEnabledPrefix: String = + IcebergReflection.tablePropertyConstant("PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX") + } + + // V3 is rejected because iceberg-rust's writer can't yet emit the features V3 introduces. + // The headline feature is row lineage: Iceberg PR apache/iceberg#12593 ("Enable row lineage + // for all v3 tables") landed before 1.10.0, so every V3 table written by Iceberg today + // requires `_row_id` / `_last_updated_sequence_number` columns and per-snapshot `firstRowId` + // stamping that iceberg-rust does not produce. V3 also brings variant types, default column + // values, deletion vectors, and revised manifest/sequence-number rules, none of which the + // Rust writer handles. A short 1.8.x window allowed opting out of row lineage on V3 tables; + // we treat that as out of scope here because the other V3 changes still rule out the path. + private val MinUnsupportedFormatVersion = 3 + + /** + * One check that an Iceberg write must pass to be eligible for native execution. Returns + * `Some(reason)` when the write should fall back to Spark and `None` when the check passes. + * + * Both the table and its full property map are passed in so requirements can read either (e.g. + * format-version comes from `TableMetadata`, codec choice comes from properties). + */ + private type Requirement = (Any, Map[String, String]) => Option[String] + + private lazy val writeRequirements: Seq[Requirement] = Seq( + requireFormatParquet, + requirePropertyAbsentOrNotTrue( + PropertyKeys.ObjectStoreEnabled, + "iceberg-rust has no hashed-prefix object-storage location generator"), + requirePropertyAbsent( + PropertyKeys.WriteLocationProviderImpl, + "custom location providers are not supported"), + requireFormatVersionAtMostTwo, + requireNoEncryptionPrefix, + requireMetricsModeIsNotCounts, + requireNoPerColumnCountsMetricsMode, + requirePropertyAbsent( + PropertyKeys.ParquetBloomFilterMaxBytes, + "parquet-rs has no global bloom-filter byte cap; explicit value would diverge"), + requireNoBloomFilterColumnsEnabled, + requirePropertyAbsent( + PropertyKeys.ParquetRowGroupCheckMinRecordCount, + "parquet-rs uses a different row-group sizing cadence; explicit value would diverge"), + requirePropertyAbsent( + PropertyKeys.ParquetRowGroupCheckMaxRecordCount, + "parquet-rs uses a different row-group sizing cadence; explicit value would diverge"), + requireSchemaWithinMaxInferredColumnDefaults) + + override def getSupportLevel(op: V2ExistingTableWriteExec): SupportLevel = { + val write = op.write + if (!IcebergReflection.isIcebergSparkWrite(write)) { + return Unsupported(Some("Not an Iceberg V2 write")) + } + + val table = IcebergReflection.getTableFromSparkWrite(write) match { + case Some(t) => t + case None => + return Unsupported(Some("Could not extract Iceberg Table from SparkWrite")) + } + + val properties = IcebergReflection + .getTableProperties(table) + .map(_.asScala.toMap) + .getOrElse(Map.empty[String, String]) + + val firstFailure = writeRequirements.iterator.flatMap(_(table, properties)).take(1).toSeq + firstFailure.headOption match { + case Some(reason) => Unsupported(Some(reason)) + case None => + Incompatible( + Some( + "Iceberg native write conversion is not yet implemented; " + + "operator will fall back to Spark")) + } + } + + // -- Requirement implementations ------------------------------------------ + + private val requireFormatParquet: Requirement = (_, props) => { + val key = PropertyKeys.DefaultFileFormat + val default = PropertyKeys.DefaultFileFormatValue + val v = props.getOrElse(key, default).toLowerCase(Locale.ROOT) + if (v == "parquet") None else Some(s"$key=$v (only parquet is supported)") + } + + private def requirePropertyAbsentOrNotTrue(key: String, reason: String): Requirement = + (_, props) => + if (props.get(key).exists(_.equalsIgnoreCase("true"))) Some(s"$key=true ($reason)") + else None + + private def requirePropertyAbsent(key: String, reason: String): Requirement = + (_, props) => if (props.contains(key)) Some(s"$key is set ($reason)") else None + + private val requireFormatVersionAtMostTwo: Requirement = (table, _) => + IcebergReflection.getFormatVersion(table) match { + case Some(v) if v >= MinUnsupportedFormatVersion => + Some( + s"format-version=$v introduces features the Rust writer cannot emit " + + "(row lineage, variant types, deletion vectors, revised manifest rules)") + case _ => None + } + + private val requireNoEncryptionPrefix: Requirement = (_, props) => + props.keys.find(_.startsWith(EncryptionPropertyPrefix)).map { key => + s"$key is set (Parquet modular encryption not supported)" + } + + private val EncryptionPropertyPrefix = "encryption." + + private val CountsMetricsMode = "counts" + + private val requireMetricsModeIsNotCounts: Requirement = (_, props) => { + val key = PropertyKeys.DefaultWriteMetricsMode + props + .get(key) + .filter(_.toLowerCase(Locale.ROOT).contains(CountsMetricsMode)) + .map(v => + s"$key=$v mentions '$CountsMetricsMode' (Parquet has no counts-without-bounds mode)") + } + + private val requireNoPerColumnCountsMetricsMode: Requirement = (_, props) => { + val prefix = PropertyKeys.MetricsModeColumnPrefix + props + .find { case (k, v) => + k.startsWith(prefix) && v.toLowerCase(Locale.ROOT) == CountsMetricsMode + } + .map { case (k, _) => s"$k=$CountsMetricsMode (counts-only mode not supported)" } + } + + /** + * Falls back whenever any column has `write.parquet.bloom-filter-enabled.column.=true`. + * parquet-rs has no equivalent of iceberg-java's 1 MiB total bloom-filter byte cap, so any + * non-trivial NDV column would produce bloom files larger than iceberg-java would write. + * Iceberg-java leaves bloom filters disabled by default, so the default path is unaffected. + */ + private val requireNoBloomFilterColumnsEnabled: Requirement = (_, props) => { + val prefix = PropertyKeys.BloomFilterColumnEnabledPrefix + props + .find { case (k, v) => k.startsWith(prefix) && v.equalsIgnoreCase("true") } + .map { case (k, _) => + s"$k=true (iceberg-rust cannot enforce iceberg-java's bloom-filter byte cap)" + } + } + + /** + * Falls back when the schema has more projected field IDs than + * `write.metadata.metrics.max-inferred-column-defaults` (default 100). Iceberg-java applies + * `MetricsModes.None` to columns beyond that index when no per-column metrics mode is set; + * mimicking that on the Rust side would require per-column stats wiring we don't yet have. + */ + private val requireSchemaWithinMaxInferredColumnDefaults: Requirement = (table, props) => { + val key = PropertyKeys.MetricsMaxInferredColumnDefaults + val limit = parseIntProperty(props, key, PropertyKeys.MetricsMaxInferredColumnDefaultsValue) + for { + schema <- IcebergReflection.getSchema(table) + count <- IcebergReflection.getProjectedFieldIdCount(schema) + reason <- { + if (count > limit) { + Some( + s"schema has $count projected field IDs which exceeds $key=$limit " + + "(iceberg-java applies MetricsModes.None to columns beyond this index)") + } else { + None + } + } + } yield reason + } + + private def parseIntProperty(props: Map[String, String], key: String, default: Int): Int = + props.get(key).flatMap(s => scala.util.Try(s.trim.toInt).toOption).getOrElse(default) + + override def convert( + op: V2ExistingTableWriteExec, + builder: Operator.Builder, + childOp: Operator*): Option[OperatorOuterClass.Operator] = { + withInfo(op, "Iceberg native write conversion is not yet implemented") + None + } + + override def createExec(nativeOp: Operator, op: V2ExistingTableWriteExec): CometNativeExec = + throw new UnsupportedOperationException( + "CometIcebergNativeWrite.createExec must not be called: convert returns None in v1") + +} diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergWriteDetectionSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergWriteDetectionSuite.scala new file mode 100644 index 0000000000..1f76b810e6 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometIcebergWriteDetectionSuite.scala @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.File + +import scala.collection.mutable + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.execution.{CommandExecutionMode, SparkPlan} +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, V2ExistingTableWriteExec} + +import org.apache.comet.serde.operator.CometIcebergNativeWrite + +/** + * Detection-only suite for the Iceberg native write rule. Runs writes against a Hadoop-catalog + * Iceberg table, captures the planned (not executed) plan, and asserts the rule: + * + * - recognises the three "external rewrite" V2 write operators (`AppendDataExec`, + * `OverwriteByExpressionExec`, `OverwritePartitionsDynamicExec`) and emits a fall-back + * reason. `ReplaceDataExec` (CoW MERGE/UPDATE/DELETE) shares the same + * `V2ExistingTableWriteExec` supertype as those three and is dispatched identically; see the + * inline note further down for why its detection test is deferred to the end-to-end suite. + * - falls back with a property-specific reason whenever a table property is set to a value + * iceberg-rust cannot match + * + * No native write conversion happens in this commit -- `CometIcebergNativeWrite.convert` returns + * `None`, so the actual write still runs through Spark/Iceberg's JVM writer. + */ +class CometIcebergWriteDetectionSuite extends CometTestBase with CometIcebergTestBase { + + private val catalog = "test_cat" + + // -- Positive detection ----------------------------------------------------- + + test("AppendDataExec is matched by CometIcebergNativeWrite") { + assume(icebergAvailable, "Iceberg not available in classpath") + withTempIcebergDir { warehouseDir => + withIcebergComet(warehouseDir) { + val table = s"$catalog.db.append_detect" + try { + spark.sql(s"CREATE TABLE $table (id INT, value DOUBLE) USING iceberg") + val plan = compileExecutedPlan(s"INSERT INTO $table VALUES (1, 1.0), (2, 2.0)") + assertOperator[AppendDataExec](plan) + assertFallbackReason(plan, "Iceberg native write conversion is not yet implemented") + } finally { + spark.sql(s"DROP TABLE IF EXISTS $table") + } + } + } + } + + test("OverwriteByExpressionExec is matched by CometIcebergNativeWrite") { + assume(icebergAvailable, "Iceberg not available in classpath") + withTempIcebergDir { warehouseDir => + withIcebergComet(warehouseDir) { + val table = s"$catalog.db.overwrite_by_expr_detect" + try { + spark.sql(s"CREATE TABLE $table (id INT, value DOUBLE) USING iceberg") + spark.sql(s"INSERT INTO $table VALUES (1, 1.0)") + val plan = compileExecutedPlan(s"INSERT OVERWRITE $table VALUES (2, 2.0), (3, 3.0)") + assertOperator[OverwriteByExpressionExec](plan) + assertFallbackReason(plan, "Iceberg native write conversion is not yet implemented") + } finally { + spark.sql(s"DROP TABLE IF EXISTS $table") + } + } + } + } + + test("OverwritePartitionsDynamicExec is matched by CometIcebergNativeWrite") { + assume(icebergAvailable, "Iceberg not available in classpath") + withTempIcebergDir { warehouseDir => + withIcebergComet(warehouseDir) { + // Default partitionOverwriteMode is STATIC, which Spark turns into + // OverwriteByExpressionExec. We need DYNAMIC for OverwritePartitionsDynamicExec. + withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "DYNAMIC") { + val table = s"$catalog.db.overwrite_dynamic_detect" + try { + spark.sql(s"""CREATE TABLE $table (id INT, value DOUBLE, dt STRING) + |USING iceberg PARTITIONED BY (dt)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 1.0, '2025-01-01')") + val plan = compileExecutedPlan(s"""INSERT OVERWRITE TABLE $table + |SELECT 2 AS id, 2.0 AS value, '2025-01-02' AS dt""".stripMargin) + assertOperator[OverwritePartitionsDynamicExec](plan) + assertFallbackReason(plan, "Iceberg native write conversion is not yet implemented") + } finally { + spark.sql(s"DROP TABLE IF EXISTS $table") + } + } + } + } + } + + // Note: ReplaceDataExec coverage (CoW MERGE/UPDATE/DELETE) is deferred to the end-to-end + // suite that lands with the native write path. ReplaceDataExec extends V2ExistingTableWriteExec + // so the CometExecRule case here catches it just like the three operators above; the + // detection-only test is awkward to write because: + // - With CommandExecutionMode.SKIP the row-level rewrite that produces ReplaceData never + // fires (it is wired through Iceberg's command-execution path, not the analyzer), so the + // planned tree stays as DeleteFromTable/DeleteFromTableExec. + // - With ALL mode (eager command execution) the SparkPlan tree is wrapped in + // CommandResult by the time the test reads queryExecution.executedPlan, so the original + // ReplaceDataExec instance (and any withInfo tags on it) is no longer reachable. + // The integration suite in the follow-up commit asserts on actual on-disk outputs and does + // not need to round-trip the SparkPlan instance. + + // -- Fall-back triggers ---------------------------------------------------- + + test("table property values that disqualify a write each emit a distinct fall-back reason") { + assume(icebergAvailable, "Iceberg not available in classpath") + import CometIcebergNativeWrite.PropertyKeys._ + + val triggers: Seq[FallBackCase] = Seq( + FallBackCase( + name = "non-parquet format", + tableProps = Map(DefaultFileFormat -> "orc"), + reasonContains = DefaultFileFormat), + FallBackCase( + name = "object-storage enabled", + tableProps = Map(ObjectStoreEnabled -> "true"), + reasonContains = ObjectStoreEnabled), + FallBackCase( + name = "custom location provider", + tableProps = Map(WriteLocationProviderImpl -> "com.example.MyLocationProvider"), + reasonContains = WriteLocationProviderImpl), + FallBackCase( + name = "encryption property set", + tableProps = Map("encryption.kms-impl" -> "com.example.MyKms"), + reasonContains = "encryption."), + FallBackCase( + name = "metrics default is counts", + tableProps = Map(DefaultWriteMetricsMode -> "counts"), + reasonContains = DefaultWriteMetricsMode), + FallBackCase( + name = "per-column counts metrics mode", + tableProps = Map(s"${MetricsModeColumnPrefix}value" -> "counts"), + reasonContains = s"${MetricsModeColumnPrefix}value"), + FallBackCase( + name = "parquet bloom-filter-max-bytes set", + tableProps = Map(ParquetBloomFilterMaxBytes -> "1048576"), + reasonContains = ParquetBloomFilterMaxBytes), + FallBackCase( + name = "parquet row-group-check min set", + tableProps = Map(ParquetRowGroupCheckMinRecordCount -> "100"), + reasonContains = ParquetRowGroupCheckMinRecordCount), + FallBackCase( + name = "parquet row-group-check max set", + tableProps = Map(ParquetRowGroupCheckMaxRecordCount -> "10000"), + reasonContains = ParquetRowGroupCheckMaxRecordCount), + FallBackCase( + name = "any column has bloom filter enabled", + tableProps = Map(s"${BloomFilterColumnEnabledPrefix}id" -> "true"), + reasonContains = s"${BloomFilterColumnEnabledPrefix}id=true")) + + triggers.foreach(runFallBackCase) + } + + test("schema with too many projected field IDs falls back") { + assume(icebergAvailable, "Iceberg not available in classpath") + withTempIcebergDir { warehouseDir => + withIcebergComet(warehouseDir) { + val table = s"$catalog.db.too_many_columns" + try { + // 5 columns is enough to verify the gate when we cap max-inferred at 4. + spark.sql(s"""CREATE TABLE $table ( + | c1 INT, c2 INT, c3 INT, c4 INT, c5 INT + |) USING iceberg + |TBLPROPERTIES ('write.metadata.metrics.max-inferred-column-defaults' = '4') + |""".stripMargin) + val plan = compileExecutedPlan(s"INSERT INTO $table VALUES (1, 2, 3, 4, 5)") + assertFallbackReason(plan, "exceeds") + assertFallbackReason(plan, "max-inferred-column-defaults") + } finally { + spark.sql(s"DROP TABLE IF EXISTS $table") + } + } + } + } + + // -- Helpers --------------------------------------------------------------- + + private case class FallBackCase( + name: String, + tableProps: Map[String, String], + reasonContains: String) + + private def runFallBackCase(tc: FallBackCase): Unit = { + withTempIcebergDir { warehouseDir => + withIcebergComet(warehouseDir) { + val table = s"$catalog.db.fallback_${slug(tc.name)}" + try { + val propsClause = + if (tc.tableProps.isEmpty) "" + else + tc.tableProps + .map { case (k, v) => s"'$k' = '$v'" } + .mkString("TBLPROPERTIES (", ", ", ")") + spark.sql(s"""CREATE TABLE $table (id INT, value DOUBLE) USING iceberg $propsClause""") + val plan = compileExecutedPlan(s"INSERT INTO $table VALUES (1, 1.0)") + assertFallbackReason(plan, tc.reasonContains, failureContext = s"trigger: ${tc.name}") + } finally { + spark.sql(s"DROP TABLE IF EXISTS $table") + } + } + } + } + + private def slug(s: String): String = s.toLowerCase.replaceAll("[^a-z0-9]+", "_") + + /** + * Compiles a SQL string to its executed `SparkPlan` without running the write. V2 write + * operators are command-typed in Spark, so the default `executePlan` mode eagerly executes them + * when `executedPlan` is touched. Passing `CommandExecutionMode.SKIP` keeps the plan + * post-Comet-rules but unexecuted, which is what the detection assertions need. + */ + private def compileExecutedPlan(sql: String): SparkPlan = { + val logical = spark.sessionState.sqlParser.parsePlan(sql) + spark.sessionState.executePlan(logical, CommandExecutionMode.SKIP).executedPlan + } + + private def assertOperator[T <: V2ExistingTableWriteExec: scala.reflect.ClassTag]( + plan: SparkPlan): Unit = { + val clazz = implicitly[scala.reflect.ClassTag[T]].runtimeClass + val matches = plan.collect { case op if clazz.isInstance(op) => op } + assert( + matches.nonEmpty, + s"Expected plan to contain ${clazz.getSimpleName} but found:\n${plan.toString()}") + } + + private def assertFallbackReason( + plan: SparkPlan, + substring: String, + failureContext: String = ""): Unit = { + val reasons = collectFallbackReasons(plan) + val ctx = if (failureContext.isEmpty) "" else s" ($failureContext)" + assert( + reasons.exists(_.contains(substring)), + s"Expected a fall-back reason containing '$substring'$ctx; got:\n " + + reasons.mkString("\n ") + + s"\nPlan:\n${plan.toString()}") + } + + private def collectFallbackReasons(plan: SparkPlan): Set[String] = { + val out = mutable.Set[String]() + def walk(node: TreeNode[_]): Unit = { + node.getTagValue(CometExplainInfo.EXTENSION_INFO).foreach(out ++= _) + node.innerChildren.foreach { case c: TreeNode[_] => walk(c); case _ => } + node.children.foreach { case c: TreeNode[_] => walk(c); case _ => } + } + walk(plan) + out.toSet + } + + private def withIcebergComet(warehouseDir: File)(body: => Unit): Unit = + withSQLConf( + s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog", + s"spark.sql.catalog.$catalog.type" -> "hadoop", + s"spark.sql.catalog.$catalog.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_WRITE_ENABLED.key -> "true")(body) +}