Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
- name: "csv"
value: |
Expand Down
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_ICEBERG_NATIVE_WRITE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.write.icebergNative.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to enable native Iceberg writes through iceberg-rust. When enabled, " +
"Iceberg V2 writes (INSERT INTO, INSERT OVERWRITE, RewriteDataFiles, copy-on-write " +
"MERGE/UPDATE/DELETE) execute through native iceberg-rust writers instead of the " +
"JVM-side Iceberg writer. Falls back to Spark when table " +
"properties or operation shape are out of scope.")
.booleanConf
.createWithDefault(false)

val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
.category(CATEGORY_SCAN)
Expand Down
89 changes: 89 additions & 0 deletions docs/source/user-guide/latest/iceberg-writes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Accelerating Apache Iceberg Writes using Comet

Comet routes Iceberg writes through the `iceberg-rust` Parquet writer when the table and operation
are within scope, and falls back to Spark's standard Iceberg writer otherwise. The feature is
disabled by default; enable it with `spark.comet.write.icebergNative.enabled=true`.

Commit, conflict detection, isolation levels, and snapshot management remain in the JVM-side
Iceberg `Transaction`. Comet only owns the per-task Arrow→Parquet file writing.

The restrictions documented below reflect what iceberg-rust and parquet-rs implement at the time
of writing. As those libraries gain features, individual fall-back triggers may be relaxed in
future Comet releases.

## Operation-level fallback

Comet falls back to Spark for every Iceberg write outside this set:

| Operation | Status |
| ---------------------------------------------------------- | ------- |
| `INSERT INTO` (V2) | Native |
| `INSERT OVERWRITE` (static + dynamic, V2) | Native |
| `RewriteDataFiles` (binPack / sort / zOrder, V2) | Native |
| Copy-on-write `MERGE` / `UPDATE` / `DELETE` (V2) | Native |
| Merge-on-read `MERGE` / `UPDATE` / `DELETE` | Fallback |
| `RewritePositionDeleteFiles` | Fallback |
| Streaming writes | Fallback |
| ORC / Avro data files | Fallback |
| Format-version 3 tables | Fallback |

iceberg-rust does not implement a positional delete writer, so Merge-on-Read DML (which emits
positional deletes by default through Spark's Iceberg DSv2 path) and `RewritePositionDeleteFiles`
fall back. iceberg-rust does ship an `EqualityDeleteFileWriterBuilder`, but Spark's DSv2 DML path
does not produce equality deletes, so it is not exercised here. Format-version 3 requires row
lineage and introduces deletion vectors, variant types, and revised manifest rules that iceberg-rust
does not produce.

## Table-property fallback triggers

If any of the following are set on the target table, the write falls back to Spark. The conditions
are evaluated once per write; partial acceleration of a single statement is never attempted.

| Property | Triggers fallback when | Why |
| -------------------------------------------------------------- | ------------------------------ | --- |
| `write.format.default` | not `parquet` | only Parquet is implemented in iceberg-rust's writer stack |
| `write.object-storage.enabled` | `true` | the hashed-prefix object-storage location generator is not implemented in iceberg-rust |
| `write.location-provider.impl` | set | custom Java `LocationProvider` classes cannot run in Rust |
| `format-version` | `>= 3` | format-version 3 requires row lineage, which is not supported in iceberg-rust |
| `encryption.*` (any key) | set | iceberg-rust's `ParquetWriter` does not wire Parquet modular encryption through to parquet-rs (parquet-rs supports it, but the integration layer does not) |
| `write.metadata.metrics.default` | mentions `counts` | Iceberg's `counts` mode emits null/value counts without min/max bounds; parquet-rs has no writer mode that produces counts without also producing bounds (when statistics are enabled, parquet-rs writes both) |
| `write.metadata.metrics.column.<col>` | `counts` | same as above, applied per column |
| `write.parquet.bloom-filter-max-bytes` | set | parquet-rs has no global byte cap on bloom filters; an explicit limit would diverge from iceberg-java |
| `write.parquet.bloom-filter-enabled.column.<col>` | `true` | parquet-rs has no equivalent of iceberg-java's 1 MiB default cap, so non-trivial NDV columns would produce larger filters than iceberg-java writes |
| `write.parquet.row-group-check-min-record-count` | set | the row-group sizing cadence differs between parquet-rs and parquet-mr (see Known divergences) |
| `write.parquet.row-group-check-max-record-count` | set | same as above |
| `write.metadata.metrics.max-inferred-column-defaults` | smaller than the schema column count | iceberg-java applies `MetricsModes.None` to columns past this index; Comet's translation does not currently emit the matching per-column overrides in iceberg-rust |

## Known divergences (no fallback, behaviour differs)

The native and JVM writers produce Parquet files that read identically through Iceberg but are not
byte-for-byte identical:

- **Row group boundaries**. parquet-rs checks row-group close after every record batch; parquet-mr
checks every 100–10,000 rows. With `write.parquet.row-group-size-bytes` honoured by both writers,
the target row-group size matches, but exact split row counts differ slightly and per-row-group
statistics granularity differs.
- **`created_by` metadata**. Comet stamps `Apache Iceberg <ver> (Comet <ver>)` into the Parquet
`created_by` field so Comet-written files are identifiable. iceberg-java writes
`parquet-mr version X.Y.Z (...)`.
- **Aborted tasks**. iceberg-rust does not delete partial data files written by a failed task on
abort; rely on Iceberg's `expire_orphan_files` maintenance to reclaim them.
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ to read more.
Tuning Guide <tuning>
Metrics Guide <metrics>
Iceberg Guide <iceberg>
Iceberg Writes <iceberg-writes>
Kubernetes Guide <kubernetes>
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ object IcebergReflection extends Logging {
val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite"
val TABLE_PROPERTIES = "org.apache.iceberg.TableProperties"
val TYPE_UTIL = "org.apache.iceberg.types.TypeUtil"
}

/**
Expand Down Expand Up @@ -648,6 +651,101 @@ object IcebergReflection extends Logging {

unsupportedTypes.toList
}

/**
* Probes a class via reflection at most once, then memoises the outcome. Returns `None` when
* Iceberg is not on the classpath so Comet stays buildable in non-Iceberg deployments.
*/
private def tryLoadClass(name: String): Option[Class[_]] =
try Some(loadClass(name))
catch { case _: ClassNotFoundException => None }

private lazy val sparkWriteClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SPARK_WRITE)
private lazy val tablePropertiesClassOpt: Option[Class[_]] =
tryLoadClass(ClassNames.TABLE_PROPERTIES)

/**
* Returns true when `write` is an Iceberg `SparkWrite` (or subclass). `Class.isInstance` itself
* never throws; the only failure mode is Iceberg being absent from the classpath, in which case
* `sparkWriteClassOpt` is `None`.
*
* Verified against `SparkWrite` in Iceberg 1.5.2 (spark-3.4 profile), 1.8.1 (spark-3.5), and
* 1.10.0 (spark-4.x) -- all expose the same class name under `org.apache.iceberg.spark.source`.
*/
def isIcebergSparkWrite(write: Any): Boolean =
sparkWriteClassOpt.exists(_.isInstance(write))

/**
* Reads the private `table` field from a `SparkWrite` instance.
*
* `SparkWrite.table` is a `private final Table` set by the constructor; verified consistent
* across Iceberg 1.5.2 / 1.8.1 / 1.10.0 for the v3.4, v3.5, and v4.0 Spark integrations. No
* public accessor exists on the V2 API path, so reflection with `setAccessible(true)` is
* required.
*/
def getTableFromSparkWrite(sparkWrite: Any): Option[Any] = {
sparkWriteClassOpt.flatMap { cls =>
try {
val field = cls.getDeclaredField("table")
field.setAccessible(true)
Option(field.get(sparkWrite))
} catch {
case e: Exception =>
logError(
"Iceberg reflection failure: Failed to get table from SparkWrite: " +
e.getMessage)
None
}
}
}

/**
* Reads a static `String` constant from Iceberg's `org.apache.iceberg.TableProperties` by field
* name (e.g. `"DEFAULT_FILE_FORMAT"` -> `"write.format.default"`). Throws when Iceberg is
* absent or the constant has been renamed/removed; both cases indicate a version we have not
* vetted. Verified present since Iceberg 1.5.2 for every constant Comet uses.
*/
def tablePropertyConstant(fieldName: String): String =
readTablePropertiesField(fieldName).asInstanceOf[String]

/**
* Reads a static `int` constant from Iceberg's `org.apache.iceberg.TableProperties` (e.g. one
* of the `*_DEFAULT` numeric defaults). Same failure semantics as `tablePropertyConstant`.
*/
def tablePropertyIntConstant(fieldName: String): Int =
readTablePropertiesField(fieldName).asInstanceOf[Integer].intValue()

private def readTablePropertiesField(fieldName: String): Any = {
val cls = tablePropertiesClassOpt.getOrElse(
throw new IllegalStateException(s"${ClassNames.TABLE_PROPERTIES} is not on the classpath"))
try cls.getField(fieldName).get(null)
catch {
case e: NoSuchFieldException =>
throw new IllegalStateException(
s"${ClassNames.TABLE_PROPERTIES}.$fieldName not found " +
"(unsupported Iceberg version?)",
e)
}
}

private lazy val typeUtilClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.TYPE_UTIL)
private lazy val schemaClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SCHEMA)

/**
* Returns the number of "projected" field IDs in the schema -- the same count Iceberg-Java uses
* to decide when `write.metadata.metrics.max-inferred-column-defaults` kicks in
* (`TypeUtil.getProjectedIds(Schema)`). Counts top-level + nested fields.
*/
def getProjectedFieldIdCount(schema: Any): Option[Int] = {
for {
typeUtilClass <- typeUtilClassOpt
schemaClass <- schemaClassOpt
} yield {
val method = typeUtilClass.getMethod("getProjectedIds", schemaClass)
val ids = method.invoke(null, schema.asInstanceOf[AnyRef]).asInstanceOf[java.util.Set[_]]
ids.size()
}
}
}

/**
Expand Down
11 changes: 10 additions & 1 deletion spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, V2CommandExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, V2CommandExec, V2ExistingTableWriteExec}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
Expand All @@ -50,6 +50,7 @@ import org.apache.spark.sql.types._
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.iceberg.IcebergReflection
import org.apache.comet.rules.CometExecRule.allExecs
import org.apache.comet.serde._
import org.apache.comet.serde.operator._
Expand Down Expand Up @@ -288,6 +289,14 @@ case class CometExecRule(session: SparkSession)
case op: DataWritingCommandExec =>
convertToComet(op, CometDataWritingCommand).getOrElse(op)

// Spark's V2 existing-table write operators -- `AppendDataExec`,
// `OverwriteByExpressionExec`, `OverwritePartitionsDynamicExec`, `ReplaceDataExec` --
// are shared across DSv2 connectors (Iceberg, Delta, Hudi, etc.). Gating on the underlying
// `Write` being an Iceberg `SparkWrite` keeps non-Iceberg DSv2 writes out of the serde,
// so unrelated plans aren't tagged with confusing fall-back reasons.
case op: V2ExistingTableWriteExec if IcebergReflection.isIcebergSparkWrite(op.write) =>
convertToComet(op, CometIcebergNativeWrite).getOrElse(op)

// For AQE broadcast stage on a Comet broadcast exchange
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
convertToComet(s, CometExchangeSink).getOrElse(s)
Expand Down
Loading
Loading