[FLINK-39055] [Iceberg] Support default column values in Iceberg sink connector#4277
[FLINK-39055] [Iceberg] Support default column values in Iceberg sink connector#4277lvyanquan merged 8 commits intoapache:masterfrom
Conversation
| FlinkSchemaUtil.convert( | ||
| DataTypeUtils.toFlinkDataType(addColumn.getType()) | ||
| .getLogicalType()); | ||
| updateSchema.addColumn(columnName, icebergType, columnComment); |
There was a problem hiding this comment.
Moved up to avoid unnecessary repetition.
There was a problem hiding this comment.
NestedField.builder() to verify both schema structure and default values
39f2d04 to
23b679e
Compare
There was a problem hiding this comment.
Pull request overview
Adds support for propagating column default values from Flink CDC schemas into Iceberg table metadata (Iceberg spec v3 defaults), primarily for table creation and ADD COLUMN evolution in the Iceberg sink connector.
Changes:
- Introduce
IcebergTypeUtils.parseDefaultValue(...)to parse CDC default-value expressions into IcebergLiterals for a subset of primitive types. - Apply parsed defaults during
CREATE TABLE(viaupdateColumnDefault) andADD COLUMN(viaaddColumn(..., default)), including column positioning moves. - Add/extend tests to validate parsing behavior and that Iceberg schema equality reflects default values.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | Adds default-value parsing into Iceberg Literals. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java | Applies defaults on create/add-column schema updates. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java | Updates schema-evolution test assertions to include write-defaults. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java | New unit tests for default-value parsing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5c5a1ef to
e0bda02
Compare
|
@lvyanquan The failure is caused by a Testcontainers issue where the Kafka Docker container ( |
You could rebase master to fix this. |
4bee50c to
c8e6b35
Compare
c8e6b35 to
dfd837f
Compare
thank you. i rebased master |
|
Looks like the test failure is related to the change here now. |
Thank you. I just solved it by separating the table format v3 test function. |
Summary
Test
In MySQL Source
In Iceberg metadata file
If add default values column
{"id":6,"name":"add_col_test","required":false,"type":"string","initial-default":"active","write-default":"active"}Reference