Skip to content

[FLINK-39055] [Iceberg] Support default column values in Iceberg sink connector#4277

Merged
lvyanquan merged 8 commits intoapache:masterfrom
suhwan-cheon:FLINK-39055
Mar 10, 2026
Merged

[FLINK-39055] [Iceberg] Support default column values in Iceberg sink connector#4277
lvyanquan merged 8 commits intoapache:masterfrom
suhwan-cheon:FLINK-39055

Conversation

@suhwan-cheon
Copy link
Copy Markdown
Contributor

@suhwan-cheon suhwan-cheon commented Feb 13, 2026

Summary

  • In the Iceberg table version 3, default value support for columns
  • Add default column value support for Iceberg sink connector
  • Supported types: STRING/VARCHAR, BOOLEAN, TINYINT/SMALLINT/INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL
    • DATE/TIME/TIMESTAMP defaults not supported: parseDefaultValue() returns null for these types

Test

In MySQL Source

CREATE TABLE `default_test` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
  `status` varchar(20) COLLATE utf8mb4_general_ci DEFAULT 'active',
  `points` int DEFAULT '0',
  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

In Iceberg metadata file

"fields":[{"id":1,"name":"id","required":true,"type":"int"},{"id":2,"name":"username","required":true,"type":"string"},{"id":3,"name":"status","required":false,"type":"string","write-default":"active"},{"id":4,"name":"points","required":false,"type":"int","write-default":0},{"id":5,"name":"created_at","required":false,"type":"timestamptz"}]

If add default values column

{"id":6,"name":"add_col_test","required":false,"type":"string","initial-default":"active","write-default":"active"}

Reference

FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
updateSchema.addColumn(columnName, icebergType, columnComment);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved up to avoid unnecessary repetition.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NestedField.builder() to verify both schema structure and default values

@suhwan-cheon suhwan-cheon marked this pull request as draft February 14, 2026 03:21
@suhwan-cheon suhwan-cheon marked this pull request as ready for review February 14, 2026 06:52
@lvyanquan lvyanquan requested a review from Copilot February 24, 2026 09:43
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds support for propagating column default values from Flink CDC schemas into Iceberg table metadata (Iceberg spec v3 defaults), primarily for table creation and ADD COLUMN evolution in the Iceberg sink connector.

Changes:

  • Introduce IcebergTypeUtils.parseDefaultValue(...) to parse CDC default-value expressions into Iceberg Literals for a subset of primitive types.
  • Apply parsed defaults during CREATE TABLE (via updateColumnDefault) and ADD COLUMN (via addColumn(..., default)), including column positioning moves.
  • Add/extend tests to validate parsing behavior and that Iceberg schema equality reflects default values.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java Adds default-value parsing into Iceberg Literals.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java Applies defaults on create/add-column schema updates.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java Updates schema-evolution test assertions to include write-defaults.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java New unit tests for default-value parsing.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@suhwan-cheon suhwan-cheon force-pushed the FLINK-39055 branch 2 times, most recently from 5c5a1ef to e0bda02 Compare February 25, 2026 00:10
@lvyanquan lvyanquan added this to the V3.6.0 milestone Mar 3, 2026
@suhwan-cheon
Copy link
Copy Markdown
Contributor Author

suhwan-cheon commented Mar 8, 2026

@lvyanquan
The CI failure in flink-cdc-pipeline-connector-kafka module is unrelated to the changes in this PR.

The failure is caused by a Testcontainers issue where the Kafka Docker container (confluentinc/cp-kafka:7.2.2) failed to start within the timeout period:

ContainerLaunchException: Container startup failed for image confluentinc/cp-kafka:7.2.2
Caused by: Timed out waiting for log output matching '.*\[KafkaServer id=\d+\] started.*'

@lvyanquan
Copy link
Copy Markdown
Contributor

@lvyanquan The CI failure in flink-cdc-pipeline-connector-kafka module is unrelated to the changes in this PR.

The failure is caused by a Testcontainers issue where the Kafka Docker container (confluentinc/cp-kafka:7.2.2) failed to start within the timeout period:

ContainerLaunchException: Container startup failed for image confluentinc/cp-kafka:7.2.2
Caused by: Timed out waiting for log output matching '.*\[KafkaServer id=\d+\] started.*'

You could rebase master to fix this.

@suhwan-cheon
Copy link
Copy Markdown
Contributor Author

@lvyanquan The CI failure in flink-cdc-pipeline-connector-kafka module is unrelated to the changes in this PR.
The failure is caused by a Testcontainers issue where the Kafka Docker container (confluentinc/cp-kafka:7.2.2) failed to start within the timeout period:

ContainerLaunchException: Container startup failed for image confluentinc/cp-kafka:7.2.2
Caused by: Timed out waiting for log output matching '.*\[KafkaServer id=\d+\] started.*'

You could rebase master to fix this.

thank you. i rebased master
Could you please remove the extra labels? Sorry for the noise. it happened during a rebase/push issue.

@lvyanquan
Copy link
Copy Markdown
Contributor

Looks like the test failure is related to the change here now.

@suhwan-cheon
Copy link
Copy Markdown
Contributor Author

Looks like the test failure is related to the change here now.

Thank you. I just solved it by separating the table format v3 test function.

Copy link
Copy Markdown
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@lvyanquan lvyanquan merged commit 91ae677 into apache:master Mar 10, 2026
16 checks passed
ThorneANN pushed a commit to ThorneANN/flink-cdc that referenced this pull request Mar 19, 2026
Mrart pushed a commit to Mrart/flink-cdc that referenced this pull request Mar 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment