feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
Draft
schenksj wants to merge 33 commits into
Draft
feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366schenksj wants to merge 33 commits into
schenksj wants to merge 33 commits into
Conversation
Initial scaffolding for the direct Delta integration that replaces the generic contrib SPI proposed in apache#4339. Mirrors Iceberg's pattern: - native/proto/src/proto/operator.proto: typed `DeltaScan delta_scan = 117` variant on `OpStruct`, with the six message definitions (DeltaScanCommon, DeltaScan, DeltaScanTask, DeltaPartitionValue, DeltaScanTaskList, DeltaColumnMapping) inlined next to the IcebergScan group. Field numbers preserved from the contrib-delta-pr2 branch. - native/core/src/execution/planner.rs: unconditional `OpStruct::DeltaScan` dispatcher arm with feature-gated body. Default builds return a clear "rebuild with --features contrib-delta" error; the feature-on arm is a `todo!` stub today and gets filled in as the implementation ports over. - native/core/src/execution/jni_api.rs + planner/operator_registry.rs: extend the existing `OpStruct` match sites so default builds compile exhaustively. - native/core/Cargo.toml: new optional `contrib-delta` feature backed by an optional path dep on `comet-contrib-delta`. Default builds carry zero Delta surface (verified: `cargo check` builds clean without the feature, and the Delta crate is not in the workspace `members` list). - native/Cargo.toml: explicit `exclude = ["../contrib"]` so the workspace doesn't try to absorb the contrib crate (which would fail -- workspace members must live hierarchically under the workspace root). - contrib/delta/native/{Cargo.toml,src/lib.rs}: skeleton crate that re-exports the typed Delta proto messages so contrib-internal code has a stable short alias. Real implementation (kernel-rs log replay, DV filter, column mapping, partition parsing) ports over from contrib-delta-pr2 in follow-up commits. Build verification: cargo check -p datafusion-comet # default: green cargo check -p datafusion-comet --features contrib-delta # green This addresses Parth's review on apache#4339: ~40 lines of core touchpoints all behind a feature gate, no SPI/registry/traits/runtime dispatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings the working delta-kernel-rs integration over from contrib-delta-pr2 without the contrib SPI plumbing Parth flagged on apache#4339. contrib/delta/native/: - jni.rs, scan.rs, engine.rs, error.rs, predicate.rs, dv_filter.rs -- ported verbatim from contrib-delta-pr2 (only crate::proto::* import paths needed adjustment, handled via lib.rs re-export of the typed messages that now live in core's proto crate) - planner.rs -- Delta-specific helpers (build_delta_partitioned_files, parse_delta_partition_scalar with the DATE -> TIMESTAMP_NTZ widening fallback already inlined, ColumnMappingFilterRewriter) exposed as pure-DataFusion functions that core's dispatcher arm composes onto the standard parquet datasource path. NO ContribOperatorPlanner trait, NO ContribPlannerContext, NO ParquetDatasourceParams -- the contrib crate is now a plain library with public functions. - lib.rs -- module decls + a `pub mod proto` re-export of the six typed Delta messages from `datafusion_comet_proto::spark_operator`. No `#[ctor]` and no `register_contrib_planner` call. - Cargo.toml -- standalone (outside the native/ workspace root), no comet-contrib-spi dep, all delta-specific deps stay confined here. native/core/src/execution/planner/contrib_delta_scan.rs (new): - `PhysicalPlanner::plan_delta_scan` -- the `OpStruct::DeltaScan` arm body extracted into its own file (~210 lines, mirrors `OpStruct::IcebergScan` in size and shape). Gated `#[cfg(feature = "contrib-delta")]`; calls core's `init_datasource_exec`, `prepare_object_store_with_configs`, `convert_spark_types_to_arrow_schema` directly + comet-contrib-delta's helpers for the Delta-specific pieces. native/core/src/execution/planner.rs: - `OpStruct::DeltaScan` arm: 6-line dispatcher that calls into `self.plan_delta_scan(...)` under `#[cfg(feature = "contrib-delta")]`. native/core/src/parquet/parquet_exec.rs: - New `ignore_missing_files: bool` arg on `init_datasource_exec`. Threaded through to `IgnoreMissingFileSource` wrapper (ported verbatim from PR2's native/core/src/parquet/missing_file_tolerant.rs) which decorates the final FileSource so its FileOpener swallows object-store NotFound errors as empty streams. Matches Spark's `spark.sql.files.ignoreMissingFiles=true` semantics. All existing call sites updated to pass `false`. Build verification (both checked clean): cargo check -p datafusion-comet # default cargo check -p datafusion-comet --features contrib-delta Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
These five files port verbatim from contrib-delta-pr2 -- they touch only
Spark APIs (via reflection) and standard Scala, none of the rejected SPI
surface:
- DeltaConf.scala Config keys (COMET_DELTA_NATIVE_ENABLED, ...)
- Native.scala JNI bridge for planDeltaScan
- DeltaReflection.scala Reflective access to spark-delta internals
(isDeltaFileFormat, isBatchFileIndex,
extractBatchAddFiles, ...)
- RowTrackingAugmentedFileIndex Wraps a FileIndex to inject row-tracking
metadata columns
- DeltaInputFileBlockHolder Thread-local replacement for
InputFileBlockHolder on the Delta scan path
Plus the regression infrastructure (4.1.0.diff, run-test.sh,
run-regression.sh).
The remaining four files (CometDeltaNativeScan, CometDeltaNativeScanExec,
DeltaScanRuleExtension, DeltaOperatorSerdeExtension, DeltaPlanDataInjector)
each reference the rejected SPI surface (CometOperatorSerde,
CometScanRuleExtension, ContribOp envelope, PlanDataSource, PlanDataInjector).
Those need rewriting before they can compile against main -- queued as the
next commit on this branch:
- drop the `extends CometOperatorSerde[CometScanExec]` trait bound;
expose `convert(...)` as a static method
- replace ContribOp envelope with the typed OpStruct::DeltaScan
- drop the SPI extension class wrappers; integrate detection directly
into CometScanRule.scala + CometExecRule.scala (Iceberg-style)
- bake DeltaPlanDataInjector logic directly into CometDeltaNativeScanExec
Maven `-Pcontrib-delta` profile, scalastyle wiring, and the SPI rewrite
all land together in the follow-up commit so the contrib compiles
end-to-end against main.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ction bridge
The four SPI-touching files from contrib-delta-pr2 rewritten to drop the
rejected SPI base classes and use the typed `OpStruct::DeltaScan` proto
variant directly:
- CometDeltaNativeScan.scala no longer `extends CometOperatorSerde`;
plain object with `convert(scan, builder, childOp*)` static method.
All `ContribOp` envelope wrapping replaced with
`builder.setDeltaScan(...)`. DeltaOperator.* imports redirected to
core's `org.apache.comet.serde.OperatorOuterClass`.
- CometDeltaNativeScanExec.scala no longer `with PlanDataSource`;
public accessors (planDataSourceKey, planDataCommonBytes,
planDataPerPartitionBytes) stay so core's CometExecRDD can read them
directly. `nativeOp.getContribOp.getPayload` calls collapse to the
typed `nativeOp.getDeltaScan` accessor.
- DeltaScanRule.scala was `class DeltaScanRuleExtension extends
CometScanRuleExtension`; now a plain `object DeltaScanRule` with a
single static entry point `transformV1IfDelta(plan, session,
scanExec, relation): Option[SparkPlan]`. The private
`CometScanRule.isSchemaSupported` is unreachable from contrib, so
inline the equivalent check (CometScanTypeChecker + fallback-reason
emission).
- The DeltaOperatorSerdeExtension + DeltaPlanDataInjector files are
not ported -- their roles fold into the next commit's CometExecRule
Delta serde dispatch and into CometDeltaNativeScanExec respectively.
Core wiring:
- spark/pom.xml: new `<profile id="contrib-delta">` adds
contrib/delta/src/main/scala/ as a compile source on comet-spark and
pulls in `io.delta:delta-spark_2.13:4.1.0` at provided scope.
- CometScanRule.scala: 5-line Delta detection block at the head of
`transformV1Scan`'s HadoopFsRelation case (Iceberg-style; calls into
`DeltaIntegration.transformV1IfDelta` which is a no-op when the
contrib isn't bundled).
- DeltaIntegration.scala (new): reflection bridge that resolves the
contrib's `DeltaScanRule` + `CometDeltaNativeScan` companion objects
by class name. Default builds get `None`; -Pcontrib-delta builds get
a working delegate. No SPI / ServiceLoader / registry.
Build verification:
mvn compile # default: still green
mvn compile -Pcontrib-delta # GREEN -- this is the milestone
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tics Spark's UnsafeRow.getUTF8String wraps bytes via UTF8String.fromAddress with no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy reinterpret that leaves arbitrary bytes in a StringType column. Delta's Z-Order uses interleave_bits(...).cast(StringType) for opaque sort keys, which panicked Comet's strict from_utf8(...).unwrap() and cascaded into JVM classloader errors (60+ ServiceConfigurationError tests in the contrib-delta-pr2 regression run). Switch to from_utf8_unchecked since the bytes flow directly into Arrow's StringBuilder::append_value and are never introspected as a &str. Verified on contrib-delta-pr2: OptimizeZOrderScalaSuite "interleaving" 4/4 PASS after this fix. Pure core fix -- independent of the contrib/delta integration. Lands on this branch because it's a prerequisite for the Delta regression to be meaningful (without it the Z-Order panic poisons every following test). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects core's CometExecRule to the contrib's Delta scan serde so the
Delta-marker CometScanExec produced by CometScanRule flows through the
same `convertToComet(scan, handler)` path as Iceberg / NativeScan / etc.
- CometDeltaNativeScan re-extends core's `CometOperatorSerde` trait
(the trait itself is core, not part of the rejected extension SPI;
every Comet operator handler implements it). `getSupportLevel` /
`enabledConfig` / `convert` now properly override.
- DeltaIntegration.scanHandler: a single reflective lookup exposes
the contrib's companion as a `CometOperatorSerde[CometScanExec]`.
Returns None on default builds.
- CometExecRule.transform: new case beside the SCAN_NATIVE_DATAFUSION
one that recognises the Delta scan marker (scanImpl ==
"native_delta_compat") and dispatches via the handler.
Build verification:
mvn compile GREEN
mvn compile -Pcontrib-delta GREEN
Still pending for end-to-end:
- per-partition task-list injection (replaces PR2's DeltaPlanDataInjector
SPI) -- baked into CometExecRDD via another small reflection hook
- live smoke test once the dylib is rebuilt with --features contrib-delta
and bundled into the jar
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects the contrib's per-partition Delta task-list serialisation into
core's existing `PlanDataInjector.injectPlanData` pipeline. Without this
the native side decodes a tasks-empty `DeltaScan` and returns `EmptyExec`
(0 rows) for every Delta scan.
- contrib/delta/.../DeltaPlanDataInjector.scala: implements core's
`PlanDataInjector` trait. `canInject` checks `op.hasDeltaScan` and
rejects already-injected operators (idempotent). `inject` splices the
partition's tasks into the operator's common-only DeltaScan envelope
via `op.toBuilder.setDeltaScan(...)` -- pure typed-proto operations,
no `ContribOp` envelope.
- spark/.../operators.scala: `PlanDataInjector.injectors` Seq now
appends the contrib injector via one reflective Class.forName lookup.
Default builds get None (no contrib classes on classpath) so the
list is unchanged; -Pcontrib-delta builds get the Delta injector.
Build verification:
mvn compile -Pcontrib-delta GREEN
End-to-end Scala+Maven integration is now complete. Remaining work:
- rebuild native dylib with `--features contrib-delta` and bundle
into comet-spark.jar
- run an isolated test (e.g. OptimizeZOrderScalaSuite "interleaving")
to confirm the end-to-end path works
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap Class.forName calls in `// scalastyle:off classforname`, change Option[Class[_]] to Option[Class[AnyRef]] to avoid existential type warnings, reword the doc comment so the verbatim string Class.forName doesn't trip scalastyle's source-pattern check. mvn scalastyle:check -Pcontrib-delta GREEN Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th file path CometExecIterator was wrapping native Parquet failures (e.g. corrupt-footer errors from kernel-rs reading a broken Delta checkpoint) in `_LEGACY_ERROR_TEMP_2254`, whose message is literally "Data read failed." -- no file path, no useful context. That broke tests that mirror Spark/Delta's standard parquet-failure shape, e.g. SnapshotManagementSuite "should not recover when the current checkpoint is broken" which asserts the resulting SparkException's message contains both the file path and "Encountered error while reading file" -- the format `QueryExecutionErrors.cannotReadFilesError` produces. Switch the wrapping to `cannotReadFilesError(cause, filePath)` via a new helper on ShimSparkErrorConverter (which lives in the spark package and can reach the private InputFileBlockHolder / QueryExecutionErrors). File path is read from InputFileBlockHolder, with an empty-string fallback when the thread-local isn't set; the static phrasing still satisfies the test assertion. Pure core fix -- benefits every native parquet read, not just Delta. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DeltaTable.forPath(spark, path, fsOptions) with a Hadoop custom-fs scheme (e.g. fake://) was being claimed by CometScanRule for V1 parquet scans on the _delta_log/checkpoint.parquet files Delta reads internally. The native side then crashed at executePlan with `Generic URL error: Unable to recognise URL "fake:///..."` since object_store doesn't know the custom scheme. Add a scheme allowlist check (same set already used in the Iceberg branch and the contrib Delta path) at the top of the HadoopFsRelation arm; decline via withInfo when any rootPaths scheme is outside the allowlist so Spark's Hadoop-FS-aware reader handles the scan. Fixes DeltaTableSuite "dropFeatureSupport - with filesystem options" and is also a baseline fix (the same crash reproduces on main per full-20260415-222735.log). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each `plan_delta_scan` JNI call was creating a fresh `DefaultEngine`. Kernel's `DefaultEngine<TokioBackgroundExecutor>` spawns one std::thread per executor that hosts a current_thread tokio runtime, and that runtime's blocking pool (used by kernel for parquet metadata IO and object_store reads) keeps `spawn_blocking` worker threads alive for ~10s after each task. Under regression load (hundreds of Delta scans/minute, each spawning a handful of blocking IO tasks) this accumulates OS threads faster than tokio reaps them, eventually hitting the per-process `ulimit -u` (~1300 on macOS) — visible in the log as `pthread_create EAGAIN` aborts of GenerateIdentityValuesSuite and MergeIntoUnlimitedMergeClausesScalaSuite ~2 hours into the run. Replace the per-call `create_engine` with `get_or_create_engine` that returns an `Arc<DeltaEngine>` from a static cache keyed by `(scheme, authority, DeltaStorageConfig)`. Engines are constructed lazily on first miss per key and reused for the lifetime of the JVM, bounding live OS threads by table-storage diversity rather than by request count. The standalone `create_engine` is kept (behind `#[allow(dead_code)]`) for tests that want a fresh engine. `scan.rs` updated to deref `Arc<DeltaEngine>` to `&dyn Engine` at each kernel call (`builder.build`, `scan.scan_metadata`, `dv.get_row_indexes`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DataFusion's `make_array_inner` asserts strict element-type equality (down to
nested field nullability) via `MutableArrayData::with_capacities`. Spark's
`CreateArray` is more permissive: when the analyzer doesn't insert coercion
casts, children can share the same surface struct type but disagree on a
nested field's nullability. Delta's CDF write path builds
`array(struct(id, b, _change_type=lit("delete")), struct(id, b, _change_type=col))`
manually -- one arm's `_change_type` is `Utf8` non-nullable (from a literal),
another is `Utf8` nullable -- and Comet's native serde happily emitted a
`make_array` call. Native execution then panicked:
assertion `left == right` failed: Arrays with inconsistent types passed to
MutableArrayData
left: Struct([..., Field { name: "_change_type", data_type: Utf8 }])
right: Struct([..., Field { name: "_change_type", data_type: Utf8, nullable: true }])
Decline in `CometCreateArray` when `children.map(_.dataType).distinct.size > 1`
so the JVM evaluator (which doesn't have this strictness) handles it. Fixes 4
`DescribeDeltaHistorySuite "replaceWhere on data column ... enableCDF=true"`
failures.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… kind Two perf-sweep items from apache#135: apache#7 parse_delta_partition_scalar TZ parse-once. The per-row chrono_tz::Tz::from_str (or fixed-offset parse) was happening inside parse_delta_partition_scalar for every TIMESTAMP partition value, but the session TZ string doesn't change within a scan. Introduce SessionTimezone enum (Tz | Offset | Invalid), parse once in build_delta_partitioned_files, pass the parsed value through. parse_delta_partition_scalar's signature gains &SessionTimezone and keeps session_tz: &str only for the error message. apache#2 PlanDataInjector lookup by op kind. injectPlanData was running `for (injector <- injectors if injector.canInject(op))` against every operator in the tree; for a 50-op plan with 3 injectors that's 150 canInject calls just to find no match on most ops. Add `opStructCase` to the PlanDataInjector trait, build a Map[OpStructCase, PlanDataInjector] once at object init, and look up by op.getOpStructCase before any canInject call. Iceberg/NativeScan/Delta injectors set their own opStructCase. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion Perf-sweep #1 from apache#135. `DeltaIntegration.transformV1IfDelta` is invoked for every V1 scan in every plan (the bridge is called unconditionally by CometScanRule before the contrib's own Delta-format check). On -Pcontrib-delta builds each call was doing `getField MODULE$` + `getMethod("transformV1IfDelta", ...)` + 4-arg Method.invoke -- a reflection round-trip per scan. Cache the resolved (module, method) binding once per JVM as `transformV1IfDeltaBinding: Option[(AnyRef, Method)]`, single OnceLock-style volatile. Steady-state per-scan cost drops to one volatile read + one Method.invoke. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Perf-sweep apache#5 from apache#135. `isSchemaCometCompatible` was allocating a fresh CometScanTypeChecker(CometDeltaNativeScan.ScanImpl) on every scan. The checker is stateless w.r.t. its scanImpl tag and is safe to share. Promote it to a private val on DeltaScanRule; the per-scan fallback-reasons ListBuffer remains per-call (it's the only mutable input). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…artitioned_files Perf-audit apache#137 finding #1. The inner `partition_schema.fields()` loop was calling `.iter().find()` on `task.partition_values` for every field -- O(width × values) per task. Pre-build a per-task HashMap<&str, &str> once, then O(1) gets. The map is reused across tasks via clear() so the allocation amortises across all DeltaScanTasks in the scan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SnapshotManagementSuite "should not recover when the current checkpoint is broken..." asserts the wrapped FAILED_READ_FILE.NO_HINT SparkException message contains the file path (e.g. "0001.checkpoint"). de9e0d3 got the error class right but left the path empty because: 1. Comet's native scan path does NOT go through Spark's FileScanRDD, so the standard InputFileBlockHolder thread-local is never populated. 2. ShimSparkErrorConverter.wrapNativeParquetError was reading from InputFileBlockHolder, getting null, and passing "" to cannotReadFilesError -- producing "Encountered error while reading file . " (with the empty path), which the test rejected. Plumb per-partition file paths from CometNativeScanExec (where they're known at planning time) -> CometExecRDD -> CometExecPartition -> CometExecIterator -> wrapNativeParquetError. CometNativeExec.doExecuteColumnar (the actual call site that constructs the iterator for query trees with a scan) collects file paths from any CometNativeScanExec leaves and passes them through the same CometExecRDD parameter. Verified with a /tmp/cometdiag.log file sentinel that the existing logWarning diags were being silently dropped by the test's `quietly { ... }` block, which is why my earlier "the wrap isn't being reached" conclusion was wrong. Test results after fix: SnapshotManagementSuite checkpoint-broken 2/2 PASS (was 0/2 with empty path). The other 3 fix clusters (de9e0d3+effe5f76+56c2b011) continue to pass: replaceWhere CDF 8/8, dropFeatureSupport 1/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ safeguards
Five fixes from the comprehensive code review of contrib-delta-direct:
1. Implement the missing InputFileBlockHolder hook in CometExecRDD.compute.
Several docs referenced `CometExecRDD.setInputFileForDeltaScan` but no such
method existed and nothing called `DeltaInputFileBlockHolder.set`, leaving
Delta's UPDATE/DELETE/MERGE flows (which use `input_file_name()` to find
touched files) silently looking at an empty path. Now set the thread-local
to the partition's first file (one-per-partition is enforced by
DeltaScanRule when input_file_name() is referenced), unset on task
completion. Stale doc references updated to point at the real call site.
2. DV filter ordering safeguards. DeltaDvFilterExec's `current_row_offset`
tracking assumes physical row ordering from the parquet scan. Override
`maintains_input_order() = [true]` and
`benefits_from_input_partitioning() = [false]` so any future optimizer
that wants to insert a RepartitionExec / SortPreservingMergeExec is
forced to bail rather than silently re-order rows.
3. Tighten IgnoreMissingFileSource's `is_not_found` Display fallback. The
prior `msg.contains("not found")` would match unrelated parquet messages
like "row group statistics not found" or "page index not found" and
silently swallow them as missing-file (returning empty results instead
of failing). Restrict to recognised NotFound prefixes from object_store /
S3 / FS error formats.
4. Multi-line regex for native parquet errors in CometExecIterator. Native
parquet errors with embedded newlines (e.g. footer hex dumps) would slip
past the single-line `^Parquet error: .*$` and surface as bare
CometNativeException. Add `(?s)` so `.` spans newlines.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The post-review fixes added/modified scaladoc that broke spotless line-length rules. Apply spotless:apply across the three touched files. Verified with test-compile. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7 tasks
Adds a TODO note linking the decline-and-fallback to apache/datafusion#22366. Lets a future maintainer find the upstream fix when it lands and remove the workaround. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…log replay Closes the P1 credential-asymmetry gap carried from apache#3932 (commit 461fa4f). Previously the kernel-rs log-replay path's DeltaStorageConfig only honored explicit static keys (`fs.s3a.access.key` / `fs.s3a.secret.key` / `fs.s3a.session.token`) set in core-site.xml. Users running under SimpleAWSCredentialsProvider / TemporaryAWSCredentialsProvider / AssumedRoleCredentialProvider / IAMInstanceCredentialsProvider would see data-file reads authenticate (those go through Comet's existing native `build_credential_provider`) but log replay fail. Resolution happens Scala-side via reflection against `org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderList` -- the same Hadoop credential machinery Spark uses everywhere else. The resolved (access_key, secret_key, session_token) tuple is stuffed into the `storageOptions` map under the standard Hadoop keys before the JNI call. Reflective because hadoop-aws is an optional dep; absence falls through to static-only behavior (any user without S3 stays unaffected). Architecture note: an in-crate cherry-pick of 461fa4f wasn't viable here because the JNI lives in `contrib/delta/native/` -- a standalone Cargo crate that deliberately doesn't depend on core (to keep the arrow-57 / arrow-58 split clean). The Scala-side approach has the same correctness properties and avoids the crate boundary entirely. Method handles cached via @volatile Option[Option[Binding]] -- the augment path runs on every Delta scan; resolving the Class + getMethod chain on each call would be a per-scan reflection round-trip just to find the same handles every time. SNAPSHOT resolution: log replay completes in seconds, well within any reasonable credential TTL. Long-running data reads continue to use Comet's refresh-capable native credential provider. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad-bearing Expand the comment on the CM-name + checkLatestSchemaOnRead=false guard to explain the specific failure mode (column_mappings from one snapshot vs. parquet physical names from another after a concurrent ALTER TABLE). The guard is conservative but necessary; a future reader of the code shouldn't mistake it for laziness. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uet field IDs Implements apache#142. Previously declined at DeltaScanRule.scala:271 because the contrib's native path matched parquet columns by name and CM-id mode demands ID-based matching. Comet core's `schema_adapter.rs` already supports field-ID matching via `use_field_id` + `ignore_missing_field_id` flags; this PR wires the Delta contrib through that machinery. Five mechanical changes: 1. Add `parquet.field.id` (Spark's standard StructField metadata key for parquet field IDs) and `delta.columnMapping.id` (Delta's CM-id storage key) as named constants in DeltaReflection. 2. Add `use_field_id` bool to DeltaScanCommon proto (field 17). 3. CometDeltaNativeScan.translateDeltaFieldIdToParquet walks the schema tree recursively (StructType -> nested fields, ArrayType -> element, MapType -> key/value) copying `delta.columnMapping.id` to `parquet.field.id` on every StructField. Spark's `ParquetUtils.hasFieldId` -- which schema2Proto and serializeDataType's StructType arm read -- looks at `parquet.field.id`, so this is what makes the field IDs actually reach the proto. 4. In `convert()`, detect CM-id mode from snapshot metadata and apply the translator to data_schema / required_schema / partition_schema before calling `schema2Proto`. Set `commonBuilder.setUseFieldId(true)` so the native dispatcher passes `use_field_id=true` to `init_datasource_exec`. 5. native/core/src/execution/planner/contrib_delta_scan.rs uses `common.use_field_id` from the proto instead of the hardcoded `false`. The recursive translator handles nested struct / array / map field IDs -- the "complex sub-types" gotcha from earlier CM-name work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n-blocking Expand inline comments on the three remaining DeltaScanRule fallback gates (TahoeLogFileIndexWithCloudFetch, __delta_internal_* synthetic columns, CometScanTypeChecker decline) to document why they're correctness-correct as fallback-only paths and to capture the implementation sketches for any future native-perf work. No behavioral change. Each gate was verified in the recent regression to either never fire (cloud-fetch -- OSS Delta doesn't have the class) or fire on a path Spark's reader handles correctly without test failures (synthetic columns, schema type decline). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pache#144) Native ExecutionPlan wrapper that appends Delta's `__delta_internal_row_index` (UInt64) and `__delta_internal_is_row_deleted` (Int32) columns to scan output batches. Replaces the decline for these synthetic columns where the surrounding plan asks for them (UPDATE/DELETE/MERGE flows). - `synthetic_columns.rs`: new module with DeltaSyntheticColumnsExec. Same physical-order invariant as DeltaDvFilterExec (one file per partition; parquet emits in file row order). Appends columns via a single sweep over the DV-sorted indexes alongside the batch's row range. - proto: add `emit_row_index` (18) and `emit_is_row_deleted` (19) flags on DeltaScanCommon. - contrib_delta_scan.rs: wire three mutually-exclusive wrap modes -- synthetic exec, DV filter exec, or passthrough. NOT YET WIRED Scala-side: when scan.requiredSchema contains these synthetic column names, CometDeltaNativeScan still needs to (a) strip them from the proto schemas (so the native parquet reader doesn't try to read them) and (b) set the proto emit flags. Until that lands the existing decline gate at DeltaScanRule.scala:331-342 stays active. Native module compiles clean. Full linker validation deferred -- disk-space pressure from concurrent regression run blocked the full link cycle. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…olumns Completes apache#144. CometDeltaNativeScan.convert now: - Detects __delta_internal_row_index / __delta_internal_is_row_deleted in scan.requiredSchema - Verifies they form a contiguous SUFFIX of required_schema (so wrapped DeltaSyntheticColumnsExec's appended-at-end output matches Spark's expected layout); declines otherwise - Strips them from the proto required_schema and data_schema so the parquet reader doesn't look for columns that aren't on disk - Filters them out of projection_vector (their -1 sentinel would have been out-of-bounds for native usize) - Sets the proto emit_row_index / emit_is_row_deleted flags so the dispatcher wraps the parquet scan in DeltaSyntheticColumnsExec to append them back DeltaScanRule: removed the decline gate at scanWithMappedSchema. Removed the belt-and-suspenders guard in CometDeltaNativeScan now that the convert path handles synthetics rather than falling back. Combined with the native exec from 2cb9188, this lets UPDATE/DELETE/MERGE flows that materialise the DV deletion flag stay on the native path instead of falling back to Spark's Delta reader. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The decline at DeltaScanRule for "$ScanImpl does not support Parquet field ID matching" was a separate gate from CM-id mode, fired when the user explicitly set spark.sql.parquet.fieldId.read.enabled=true AND scan.requiredSchema carried Spark's standard `parquet.field.id` metadata (non-Delta-id path that nevertheless wants field-ID matching). The same native machinery wired for CM-id (apache#142, commit 7ace165) handles this case unchanged -- `serializeDataType`'s StructType arm reads `ParquetUtils.hasFieldId` for nested types and `schema2Proto` does the same for top-level. The only thing needed was setting `use_field_id=true` on the proto. CometDeltaNativeScan.convert now sets `useFieldIdActive` from EITHER CM-id mode OR (Spark's PARQUET_FIELD_ID_READ_ENABLED + hasFieldIds). Gate removed from DeltaScanRule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…commit_version)
Unblocks the second gate in DeltaScanRule.applyRowTrackingRewrite, which used
to decline native execution when a row-tracking-enabled table HAD no
materialised column names (rowIdPhysical / rowVerPhysical both empty,
meaning Delta expects synthesis from baseRowId + physical row index).
End-to-end wiring:
- scan.rs: extract baseRowId / defaultRowCommitVersion per scan file from
each ScanMetadata batch's underlying RecordBatch
(`fileConstantValues.baseRowId` / `defaultRowCommitVersion` -- not
exposed by kernel's `ScanFile`). Uses an `RawEntryAcc` context struct
because `visit_scan_files` requires `fn` (not `FnMut`), so the per-batch
row-tracking lookup vec lives in the context.
- jni.rs: thread the extracted values into DeltaScanTask proto fields 6/7
(already present, previously hard-None'd).
- proto: add `emit_row_id` (20) and `emit_row_commit_version` (21) flags
on DeltaScanCommon.
- synthetic_columns.rs: extend DeltaSyntheticColumnsExec to emit the two
new columns (row_id = baseRowId + physical_row_index per file,
row_commit_version = defaultRowCommitVersion constant per file). Nullable
Int64 columns; null-valued when the file has no row tracking.
- contrib_delta_scan.rs: force per-file FileGroups when emit_row_id /
emit_row_commit_version is on (the per-partition row offset counter
doesn't reset across files within a FileGroup, so baseRowId arithmetic
requires 1:1 file-to-partition mapping just like the DV case).
- CometDeltaNativeScan: detect row_id / row_commit_version in
scan.requiredSchema, add to synthetic-column suffix check + strip from
proto schemas + projection_vector, set emit flags.
- DeltaScanRule.applyRowTrackingRewrite: stop declining the no-materialised
case; return None (no rewrite needed) so nativeDeltaScan proceeds and
CometDeltaNativeScan.convert sets the synthesis path.
Also unblocks the related field-id-matching gate when
spark.sql.parquet.fieldId.read.enabled is true (commit ee9f9e4) -- the
same use_field_id machinery handles both CM-id and non-CM-id paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With native synthesis of `__delta_internal_is_row_deleted` wired in apache#144, the `outputHasIsRowDeleted` branch of `scanBelowFallsBackForDvs` no longer needs to force a decline. CometDeltaNativeScan.convert detects the column in scan.requiredSchema and routes through DeltaSyntheticColumnsExec to append it -- the surrounding Delta projection that filters on the column runs against the synthesised output without falling back to Spark. Only `batchFallback` (TahoeBatchFileIndex with DV-bearing AddFiles) still forces decline because our native path can't extract DV info from pre-materialised batch indexes -- separate issue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`scanBelowFallsBackForDvs` used to force a decline when the scan was a `TahoeBatchFileIndex` with DV-bearing AddFiles. That branch was structurally checking "scan has a batch index" rather than the precise failure mode -- but our native path already handles batch indexes with DVs via `buildTaskListFromAddFiles` + `deletedRowIndexesByPath` in `CometDeltaNativeScan.convert`. The convert path declines internally only when DV materialisation itself fails (the "DV file missing / unsupported version / read error" decline at convert line 479-484), which is the precise failure mode that warrants a fallback. Combined with apache#144 (synthetic-column emission) from P7u, the predicate now returns `false` unconditionally -- `stripDeltaDvWrappers` always proceeds to strip the Delta DV-filter wrapper and let CometDeltaNativeScan handle DVs via its own DeltaDvFilterExec wrap. DvProtectedTag stays in place (now effectively dead code) -- removing it would touch more files; can drop in a follow-up cleanup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tables with `delta.enableRowTracking=false` have no AddFile.baseRowId / defaultRowCommitVersion. The native synthesis path (DeltaSyntheticColumnsExec) already handles this -- it emits NULL row_id and row_commit_version columns when the per-file base_row_id Option is None. That matches Delta's own behaviour: the column is queryable but reads as null. Drop the decline gate in applyRowTrackingRewrite and let nativeDeltaScan proceed; CometDeltaNativeScan.convert detects the columns and sets the emit flags. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…via reorder `DeltaSyntheticColumnsExec` appends synthetic columns to the END of its input batch's columns. When Delta's required_schema has them in any other order (interspersed with parquet columns), our prior code declined to avoid producing rows whose layout didn't match Spark's expected output. Replace the decline with a reorder. CometDeltaNativeScan.convert now computes `finalOutputIndices`: for each position in the user-visible required_schema, the index into the wrapped-exec output schema (parquet output cols followed by appended synthetics in canonical order row_index, is_row_deleted, row_id, row_commit_version). When synthetics already form a suffix the vec is empty and no reorder is needed. Native dispatcher reads `final_output_indices` from the proto and, when non-empty, wraps the synthetic exec in a final ProjectionExec that reorders the columns to match. Empty = synthetics are a suffix, no reorder. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Delta's `checkLatestSchemaOnRead` flag controls an at-read-time consistency check between the user's cached DataFrame schema and the latest snapshot. Our native path doesn't do a separate at-read check -- both `column_mappings` and the parquet reads are pinned to the snapshot version we get from `DeltaReflection.extractSnapshotVersion(relation)`, the same cached snapshot Spark/Delta used to build scan.requiredSchema. So we're internally consistent regardless of the flag. The flag's user-visible effect (Delta would or wouldn't error on cached vs latest schema mismatch) only matters for code that performs a separate at-read validation -- which we don't. Disabling it is purely a user choice about Delta's own validation, not a correctness concern for our pinned-version path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Briefing
This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design there was rejected in favor of the Iceberg-style contrib
pattern this PR uses (typed proto variant + ~40 lines of feature-gated core
touchpoints + standalone
contrib/delta/tree). Default builds are entirelyunaware of this code: no SPI lookups, no ServiceLoader scans, no contrib
surface at runtime. Only when the
-Pcontrib-deltaMaven profile (and parallelcontrib-deltaCargo feature) is activated do the contrib classes land on theclasspath and the bridge resolve.
The integration reads Delta metadata via
delta-kernel-rson the driver,encodes the resolved file list (with column mappings, DV info, partition
values) into a typed
OpStruct::DeltaScanproto, and executes via DataFusion'sparquet reader on each executor. Deletion vectors, column mapping (
namemode,not
id), type widening, row tracking via materialised columns, andmulti-task-per-partition packing are all supported. Drop-in for any Delta 4.1
table that doesn't use unsupported features.
Shape
delta_scan = 117native/proto/src/proto/operator.protospark/.../comet/rules/DeltaIntegration.scalaspark/.../comet/rules/CometScanRule.scalaspark/.../comet/rules/CometExecRule.scalaPlanDataInjector.opStructCasespark/.../comet/sql/comet/operators.scalaCometExecRDD,CometNativeScanExec,CometExecIterator,ShimSparkErrorConverterinput_file_name()andFAILED_READ_FILE.NO_HINTwrapping in any native scan, not just Delta)contrib/delta/src/main/scala/...contrib/delta/native/src/*.rs+native/core/src/execution/planner/contrib_delta_scan.rsspark/pom.xml,contrib/delta/native/Cargo.toml,native/core/Cargo.tomlcontrib/delta/dev/run-regression.sh+dev/diffs/delta/4.1.0.diffKey design decisions
Iceberg-style contrib, not SPI. Static helper objects with stable names
(
DeltaScanRule.transformV1IfDelta,CometDeltaNativeScan.MODULE$); a singlereflection bridge in core resolves and caches
Methodhandles once per JVM.No registry, no
ServiceLoader, no extension points beyond what core alreadyexposes. The contrib is just classpath-or-not.
Typed proto, not an envelope.
OpStruct::DeltaScanis a first-classvariant alongside
IcebergScanandNativeScan. Avoids theContribOp { kind, payload }envelope discussed in #3932 — type safety,IDE refactoring, and
PlanDataInjectorcan key byOpStructCasefor O(1)dispatch instead of string match.
Split-mode plan serialization.
CometDeltaNativeScan.convertemits aDeltaScan proto with the
commonblock only (schemas, table root, filters);each partition's
tasksride in a per-partition byte array viaPlanDataInjectorat execution time. Avoids closure-capturing every file inevery partition, which is what makes scans of large Delta tables tractable.
InputFileBlockHolderthread-local hook inCometExecRDD.compute.Comet's native scans bypass Spark's
FileScanRDD, so the standardinput_file_name()thread-local would otherwise be empty for any nativescan (not just Delta). This is a small but load-bearing change: it fixes
both Delta's UPDATE/DELETE/MERGE flows (which use
input_file_name()toidentify touched files) and the
FAILED_READ_FILE.NO_HINTerrorwrapping (which embeds the file path in the message).
Engine cache by
(scheme, authority, DeltaStorageConfig). kernel-rs'sDefaultEngine<TokioBackgroundExecutor>spawns one OS thread per executorthat hosts a tokio runtime with a blocking pool. Without caching, hundreds
of scans/min was leaking threads faster than tokio reaped them, tripping
pthread_create EAGAIN~2h into regression. The cache bounds live threadcount by table-storage diversity instead of by request count.
DV filter ordering safeguards.
DeltaDvFilterExectrackscurrent_row_offsetacross batches, which assumes physical-order input.Overrides
maintains_input_order() = [true]andbenefits_from_input_partitioning() = [false]so any future optimizer thatwants to insert a
RepartitionExecis forced to bail rather than silentlyre-order rows out from under the DV index map.
Decline-and-fallback everywhere, never silently wrong.
id(parquet field-ID resolution not yet wired)fake://etc) on V1 scansCreateArraywith mismatched child types (CDF struct nullability)Each path emits a
withInforeason so explain-fallback surfaces why.No SPI/registry/traits, but one new trait method.
PlanDataInjector.opStructCaseis the one core API addition. It keys theexisting injector map for O(1) dispatch and lets the contrib declare its
op kind without adding any runtime surface.
Review strategy
Suggest reviewing in this order, with different bars:
Core touchpoints (~5 minutes, high bar). The new core surface area
is small and worth careful reading because it ships in the default build:
native/proto/src/proto/operator.proto(one variant added)spark/.../comet/rules/DeltaIntegration.scala(whole file — reflection bridge)CometScanRule.transformV1Scanand the new case inCometExecRule.transformCometExecRDD+CometExecIterator+CometNativeScanExecdiffs(per-partition file paths,
InputFileBlockHolderhook)ShimSparkErrorConverter.wrapNativeParquetErrorspark/.../comet/serde/arrays.scala(CreateArraydecline)Contrib Scala (~30 minutes, contrib bar). Bigger and lives entirely
in
contrib/delta/. Walk in this order:DeltaScanRule.scala— entry point, feature gates, scheme allowlist,input_file_name()detectionCometDeltaNativeScan.scala— split serde, kernel-rs call, taskprune/split/pack, column-mapping fixup
CometDeltaNativeScanExec.scala— exec wrapper, DPP partition pruning,metric reporting
DeltaPlanDataInjector.scala,DeltaInputFileBlockHolder.scalaDeltaReflection.scala,RowTrackingAugmentedFileIndex.scalaContrib Rust (~30 minutes, contrib bar):
contrib/delta/native/src/engine.rs— kernel-rs engine + cachecontrib/delta/native/src/scan.rs—plan_delta_scanentry,DV row-index resolution
contrib/delta/native/src/planner.rs—build_delta_partitioned_files,SessionTimezone,ColumnMappingFilterRewritercontrib/delta/native/src/dv_filter.rs—DeltaDvFilterExeccontrib/delta/native/src/jni.rs—planDeltaScanJNI entrynative/core/src/execution/planner/contrib_delta_scan.rs— thecore-side dispatcher arm
Build / regression infra (~5 minutes):
spark/pom.xml-Pcontrib-deltaprofilenative/core/Cargo.tomlcontrib-deltafeaturecontrib/delta/native/Cargo.toml(standalone, not in workspace —intentional to avoid arrow-57 / arrow-58 cross-contamination)
contrib/delta/dev/run-regression.sh+dev/diffs/delta/4.1.0.diffThe
git log --oneline main..HEADis also a useful walk — commits arelabeled by phase (P7a..P7n) and each commit message documents the
specific concern it addresses.
What's not in this PR (follow-ups)
idmode (requires parquet field-ID resolution inComet's parquet reader)
RowTrackingmaterialisation for tables without materialised columnsTypeWideningcases DataFusion's schema adapter doesn'talready handle
ProjectionExeccolumn-mapping rename pushdown intoParquetSource'sschema adapter (item copy over the script to enable pyspark as well #4 from the in-PR perf sweep)
would block long-lived production drivers using STS)
Test plan
-Pcontrib-delta):mvn -pl spark -am test-compilegreen-Pcontrib-deltabuilds green (Maven + Cargo)DescribeDeltaHistorySuite "replaceWhere on data column"— 8/8DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options"— 1/1SnapshotManagementSuite "should not recover when the current checkpoint is broken..."— 2/2pthread_create EAGAIN)-Pcontrib-deltabuild paths exercisedUpstream issue
apache/datafusion#22366
— filed for
make_arrayelement-type strictness. TheCometCreateArraydecline in this PR is a caller-side workaround until upstream relaxes.
🤖 Generated with Claude Code