diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 3d02ede61743..46e221998e9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -63,6 +63,7 @@ import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; +import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; @@ -145,6 +146,21 @@ public static void validateTableSchema(TableSchema schema) { ChangelogProducer.LOOKUP)); } + if (options.fullCompactionDeltaCommits() != null + && changelogProducer == ChangelogProducer.LOOKUP) { + throw new UnsupportedOperationException( + String.format( + "'%s' is not supported with '%s'='%s'. " + + "Use '%s'='%s' to get periodic full compaction with changelog generation, " + + "or remove '%s'.", + FULL_COMPACTION_DELTA_COMMITS.key(), + CHANGELOG_PRODUCER.key(), + ChangelogProducer.LOOKUP, + CHANGELOG_PRODUCER.key(), + ChangelogProducer.FULL_COMPACTION, + FULL_COMPACTION_DELTA_COMMITS.key())); + } + checkArgument( options.snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index c3a79d91fdf1..f8ae460af8d0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -553,4 +553,29 @@ public void testMergeOnReadRequiresDvEnabled() { .hasMessageContaining( "deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true"); } + + @Test + public void testFullCompactionDeltaCommitsWithLookupChangelogProducer() { + Map options = new HashMap<>(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key()) + .hasMessageContaining("lookup") + .hasMessageContaining("full-compaction"); + + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + + options.clear(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + + options.clear(); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "input"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + } }