Skip to content

Support V2 Format for Iceberg Ingestion#19534

Open
a2l007 wants to merge 2 commits into
apache:masterfrom
a2l007:iceberg-v2
Open

Support V2 Format for Iceberg Ingestion#19534
a2l007 wants to merge 2 commits into
apache:masterfrom
a2l007:iceberg-v2

Conversation

@a2l007
Copy link
Copy Markdown
Contributor

@a2l007 a2l007 commented Jun 1, 2026

Fixes #19190

Description

This PR fixes Iceberg delete semantics which was previously broken for v2 tables. No spec changes are required. The same ingestion spec works for V1 and V2 tables. Delete detection is automatic at planning time.

In the V2 path, each FileScanTask is encoded as an IcebergFileTaskInputSource , containing the data file path, format, size, delete file metadata (paths, content type, equality field IDs), and the table schema captured as JSON at planning time.

IcebergNativeRecordReader processes each split:

  1. Delete metadata is loaded upfront. For each associated delete file is processed based on whether it is a position delete file or an equality delete file.
    - Position delete files: Rows to skip are read and filtered to only positions matching the current data file path.
    - Equality delete files : Rows of key tuples are read per equality delete file.
  2. The data file is streamed lazily using Iceberg's Parquet.read() builder with GenericParquetReaders.
  3. Deletes are applied per record: each record's position is checked against the position delete set; its field values are checked against each equality delete set. Matching records are skipped.
  4. Surviving records are converted to MapBasedInputRow via IcebergRecordConverter

FileIO is resolved per path: local paths use WarehouseFileIO, non-local paths (HDFS, S3) fall back to catalog.loadTable().io() using the namespace and table name passed through the split.

Release note

The druid-iceberg-extensions input source now correctly handles Iceberg V2 tables that contain row-level delete files. Previously, position deletes and equality deletes were silently ignored, causing deleted rows to appear in ingested segments. Druid now auto-detects delete files at planning time and applies them during ingestion with no spec changes required.


Key changed/added classes in this PR
  • IcebergCatalog
  • IcebergInputSource

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 1
P3 0
Total 2

Reviewed 8 of 8 changed files.


This is an automated review by Codex GPT-5.5

{
private static final Logger log = new Logger(IcebergNativeRecordReader.class);

private static final WarehouseFileIO WAREHOUSE_FILE_IO = new WarehouseFileIO();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Undefined FileIO class breaks compilation

The new reader instantiates WarehouseFileIO, but no class with that name is defined in this package or elsewhere in the repo, and there is no import that could resolve it. Any build that compiles this extension will fail before the V2 ingestion path can run. Use an existing Iceberg FileIO implementation or add the missing implementation.


// Handle residual filter based on mode
if (detectedResidual != null) {
if (detectedResidual == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Residual-filter check is inverted

This condition now enters the residual-handling branch when detectedResidual is null, even though null means no residual was found. With ResidualFilterMode.FAIL, valid scans that have no residual, such as partition-pruned scans, will throw a residual-filter error, while scans that do have a residual skip the warning/failure path. Restore the check to run only when detectedResidual != null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Proposal: Iceberg V2 Format Support in Druid Iceberg Extension

2 participants